1 module hunt.io.channel.iocp.AbstractStream;
2 
3 // dfmt off
4 version (HAVE_IOCP) : 
5 // dfmt on
6 
7 import hunt.event.selector.Selector;
8 import hunt.io.ByteBuffer;
9 import hunt.io.BufferUtils;
10 import hunt.io.channel.AbstractSocketChannel;
11 import hunt.io.channel.ChannelTask;
12 import hunt.io.channel.Common;
13 import hunt.io.channel.iocp.Common;
14 import hunt.logging;
15 import hunt.Functions;
16 import hunt.event.selector.IOCP;
17 import hunt.system.Error;
18 import hunt.util.ThreadHelper;
19 import hunt.util.worker;
20 
21 import core.atomic;
22 import core.sys.windows.windows;
23 import core.sys.windows.winsock2;
24 import core.sys.windows.mswsock;
25 import std.format;
26 import std.socket;
27 import core.stdc.string;
28 
29 /**
30 TCP Peer
31 */
32 abstract class AbstractStream : AbstractSocketChannel {
33 
34     // data event handlers
35     
36     /**
37     * Warning: The received data is stored a inner buffer. For a data safe, 
38     * you would make a copy of it. 
39     */
40     protected DataReceivedHandler dataReceivedHandler;
41     protected SimpleActionHandler dataWriteDoneHandler;
42 
43     protected ByteBuffer _bufferForRead;
44     protected AddressFamily _family;
45 
46     private size_t _bufferSize = 4096;
47     private ChannelTask _task = null;
48     
49 
50     this(Selector loop, AddressFamily family = AddressFamily.INET, size_t bufferSize = 4096 * 2) {
51         _bufferSize = bufferSize;
52         super(loop, ChannelType.TCP);
53         // setFlag(ChannelFlag.Read, true);
54         // setFlag(ChannelFlag.Write, true);
55 
56         // version (HUNT_IO_DEBUG)
57         //     trace("Buffer size: ", bufferSize);
58         // _readBuffer = new ubyte[bufferSize];
59         _bufferForRead = BufferUtils.allocate(bufferSize);
60         _bufferForRead.clear();
61         _readBuffer = cast(ubyte[])_bufferForRead.array();
62         // _writeQueue = new WritingBufferQueue();
63         // this.socket = new TcpSocket(family);
64 
65         loadWinsockExtension(this.handle);
66     }
67 
68     mixin CheckIocpError;
69 
70     abstract bool isClient();
71 
72     override void onRead() {
73         version (HUNT_IO_DEBUG)
74             trace("ready to read");
75         super.onRead();
76     }
77 
78     /**
79      * Should be thread-safe.
80      */
81     override void onWrite() {  
82         version (HUNT_IO_DEBUG)
83             tracef("checking write status, isWritting: %s, writeBuffer: %s", _isWritting, writeBuffer is null);
84 
85         //if(!_isWritting){
86         //    version (HUNT_IO_DEBUG) infof("No data to write out. fd=%d", this.handle);
87         //    return;
88         //}
89 
90         if(isClosing() && _isWriteCancelling) {
91             version (HUNT_IO_DEBUG) infof("Write cancelled, fd=%d", this.handle);
92             resetWriteStatus();
93             return;
94         }
95         tryNextBufferWrite();
96     }
97     
98     protected override void onClose() {
99         _isWritting = false;
100         resetWriteStatus();
101         if(this._socket is null) {
102             import core.sys.windows.winsock2;
103             .closesocket(this.handle);
104         } else {
105             // FIXME: Needing refactor or cleanup -@Administrator at 2019/8/9 1:20:27 pm
106             //
107             //while(!_isSingleWriteBusy)
108             //{
109                 this._socket.shutdown(SocketShutdown.BOTH);
110                 this._socket.close();
111             //}
112         }
113         super.onClose();
114     }
115 
116     void beginRead() {
117         // https://docs.microsoft.com/en-us/windows/desktop/api/winsock2/nf-winsock2-wsarecv
118 
119         ///  _isSingleWriteBusy = true;
120 
121         WSABUF _dataReadBuffer;
122         _dataReadBuffer.len = cast(uint) _readBuffer.length;
123         _dataReadBuffer.buf = cast(char*) _readBuffer.ptr;
124         memset(&_iocpread.overlapped , 0 ,_iocpread.overlapped.sizeof );
125         _iocpread.channel = this;
126         _iocpread.operation = IocpOperation.read;
127         DWORD dwReceived = 0;
128         DWORD dwFlags = 0;
129         version (HUNT_IO_DEBUG)
130             tracef("start receiving [fd=%d] ", this.socket.handle);
131         // _isSingleWriteBusy = true;
132         int nRet = WSARecv(cast(SOCKET) this.socket.handle, &_dataReadBuffer, 1u, 
133             &dwReceived, &dwFlags, &_iocpread.overlapped, cast(LPWSAOVERLAPPED_COMPLETION_ROUTINE) null);
134 
135         if (nRet == SOCKET_ERROR && (GetLastError() != ERROR_IO_PENDING)) {
136             _isSingleWriteBusy = false;
137             close();
138         }
139         //checkErro(nRet, SOCKET_ERROR);
140     }
141 
142     protected bool doConnect(Address addr) {
143         Address binded = createAddress(this.socket.addressFamily);
144         _isSingleWriteBusy = true;
145         this.socket.bind(binded);
146         _iocpread.channel = this;
147         _iocpread.operation = IocpOperation.connect;
148 
149         import std.datetime.stopwatch;
150         auto sw = StopWatch(AutoStart.yes);
151         sw.start();
152         scope(exit) {
153             sw.stop();
154         }
155 
156         // https://docs.microsoft.com/en-us/windows/win32/api/mswsock/nc-mswsock-lpfn_connectex
157         int nRet = ConnectEx(cast(SOCKET) this.socket.handle(), cast(SOCKADDR*) addr.name(), 
158             addr.nameLen(), null, 0, null, &_iocpread.overlapped);
159         checkErro(nRet, SOCKET_ERROR);
160 
161         if(this._error) 
162             return false;
163 
164         // https://docs.microsoft.com/en-us/windows/win32/api/winsock/nf-winsock-getsockopt
165         int seconds = 0;
166         int bytes = seconds.sizeof;
167         int iResult = 0;
168 
169         CHECK: 
170         iResult = getsockopt(cast(SOCKET) this.socket.handle(), SOL_SOCKET, SO_CONNECT_TIME,
171                             cast(void*)&seconds, cast(PINT)&bytes);
172 
173         bool result = false;
174         if ( iResult != NO_ERROR ) {
175             DWORD dwLastError = WSAGetLastError();
176             warningf("getsockopt(SO_CONNECT_TIME) failed with error: code=%d, message=%s", 
177                 dwLastError, getErrorMessage(dwLastError));
178         } else {
179             if (seconds == 0xFFFFFFFF) {
180                 version(HUNT_IO_DEBUG) warningf("Connection not established yet (destination: %s).", addr);
181                 // so to check again
182                 goto CHECK;
183             } else {
184                 result = true;
185                 version(HUNT_IO_DEBUG) {
186                     //
187                     infof("Connection has been established in %d msecs, destination: %s", sw.peek.total!"msecs", addr);
188                 }
189                 // https://docs.microsoft.com/en-us/windows/win32/winsock/sol-socket-socket-options
190                 enum SO_UPDATE_CONNECT_CONTEXT = 0x7010;
191                 iResult = setsockopt(cast(SOCKET) this.socket.handle(), SOL_SOCKET, 
192                     SO_UPDATE_CONNECT_CONTEXT, NULL, 0 );
193             }
194         }
195         
196         return result;
197     }
198 
199     private uint doWrite(const(ubyte)[] data) {
200         DWORD dwSent = 0;//cast(DWORD)data.length;
201         DWORD dwFlags = 0;
202 
203         memset(&_iocpwrite.overlapped , 0 ,_iocpwrite.overlapped.sizeof );
204         _iocpwrite.channel = this;
205         _iocpwrite.operation = IocpOperation.write;
206         // tracef("To write %d bytes, fd=%d", data.length, this.socket.handle());
207         version (HUNT_IO_DEBUG) {
208             size_t bufferLength = data.length;
209             tracef("To write %d bytes", bufferLength);
210             if (bufferLength > 32)
211                 tracef("%(%02X %) ...", data[0 .. 32]);
212             else
213                 tracef("%s", data);
214         }
215         // size_t bufferLength = data.length;
216         //     tracef("To write %d bytes", bufferLength);
217         //     tracef("%s", data);
218         WSABUF _dataWriteBuffer;
219 
220         //char[] bf = new char[data.length];
221         //memcpy(bf.ptr,data.ptr,data.length);
222         //_dataWriteBuffer.buf =  bf.ptr;
223         _dataWriteBuffer.buf = cast(char*) data.ptr;
224         _dataWriteBuffer.len = cast(uint) data.length;
225         // _isSingleWriteBusy = true;
226         int nRet = WSASend( cast(SOCKET) this.socket.handle(), &_dataWriteBuffer, 1, &dwSent,
227         dwFlags, &_iocpwrite.overlapped, cast(LPWSAOVERLAPPED_COMPLETION_ROUTINE) null);
228         // if (nRet != NO_ERROR && (GetLastError() != ERROR_IO_PENDING))
229         // {
230         //     _isSingleWriteBusy = false;
231         //     // close();
232         // }
233 
234         checkErro( nRet, SOCKET_ERROR);
235 
236         // FIXME: Needing refactor or cleanup -@Administrator at 2019/8/9 12:18:20 pm
237         // Keep this to prevent the buffer corrupted. Why?
238         version (HUNT_IO_DEBUG) {
239             tracef("sent: %d / %d bytes, fd=%d", dwSent, bufferLength, this.handle);
240         }
241 
242         if (this.isError) {
243             errorf("Socket error on write: fd=%d, message=%s", this.handle, this.errorMessage);
244             this.close();
245         }
246 
247         return dwSent;
248     }
249 
250     protected void doRead() {
251         //_isSingleWriteBusy = false;
252         this.clearError();
253         version (HUNT_IO_DEBUG)
254             tracef("start reading: %d nbytes", this.readLen);
255 
256         if (readLen > 0) {
257             // import std.stdio;
258             // writefln("length=%d, data: %(%02X %)", readLen, _readBuffer[0 .. readLen]);
259             handleReceivedData(readLen);
260 
261             if (isClient()) {
262                 this.beginRead();
263             }
264 
265         } else if (readLen == 0) {
266             version (HUNT_IO_DEBUG) {
267                 if (_remoteAddress !is null)
268                     warningf("connection broken: %s", _remoteAddress.toString());
269             }
270             onDisconnected();
271             // if (_isClosed)
272             //     this.close();
273         } else {
274             version (HUNT_IO_DEBUG) {
275                 warningf("undefined behavior on thread %d", getTid());
276             } else {
277                 this._error = true;
278                 this._errorMessage = "undefined behavior on thread";
279             }
280         }
281     }
282 
283     private void handleReceivedData(ptrdiff_t len) {
284         version (HUNT_IO_DEBUG)
285             tracef("reading done: %d nbytes", readLen);
286 
287         if (dataReceivedHandler is null) 
288             return;
289 
290         _bufferForRead.limit(cast(int)readLen);
291         _bufferForRead.position(0);
292         // dataReceivedHandler(_bufferForRead);
293 
294         ByteBuffer bufferCopy = BufferUtils.clone(_bufferForRead);
295         if(taskWorker is null) {
296             dataReceivedHandler(bufferCopy);
297         } else {
298             ChannelTask task = _task;
299 
300             // FIXME: Needing refactor or cleanup -@zhangxueping at 2021-02-05T09:18:02+08:00
301             // More tests needed
302             if(task is null || task.isFinishing()) {
303                 task = createChannelTask();
304                 _task = task;
305 
306             } else {
307                 version(HUNT_METRIC) {
308                     warningf("Request peeding... Task status: %s", task.status);
309                 }
310             }
311 
312             task.put(bufferCopy);
313         }        
314 
315     }
316 
317     private ChannelTask createChannelTask() {
318         ChannelTask task = new ChannelTask();
319         task.dataReceivedHandler = dataReceivedHandler;
320         taskWorker.put(task);
321         return task;
322     }
323 
324     // try to write a block of data directly
325     protected size_t tryWrite(const ubyte[] data) {        
326         version (HUNT_IO_DEBUG)
327             tracef("start to write, total=%d bytes, fd=%d", data.length, this.handle);
328         clearError();
329         size_t nBytes;
330         //scope(exit) {
331         //    _isSingleWriteBusy = false;
332         //}
333         if (!_isSingleWriteBusy)
334         {
335              nBytes = doWrite(data);
336         }
337 
338         return nBytes;
339     }
340 
341     // try to write a block of data from the write queue
342     private void tryNextBufferWrite() {
343         if(checkAllWriteDone()){
344             _isSingleWriteBusy = false;
345             if (!isClient())
346             {
347                 this.beginRead();
348             }
349             return;
350         } 
351         
352         // keep thread-safe here
353         //if(!cas(&_isSingleWriteBusy, false, true)) {
354         //    version (HUNT_IO_DEBUG) warningf("busy writing. fd=%d", this.handle);
355         //    return;
356         //}
357 
358         //scope(exit) {
359         //    _isSingleWriteBusy = false;
360         //}
361 
362         clearError();
363 
364         bool haveBuffer = _writeQueue.tryDequeue(writeBuffer);
365         if(haveBuffer) {
366             writeBufferRemaining();
367         } else {
368             version (HUNT_IO_DEBUG)
369                 warning("No buffer in queue");
370         }
371     }
372 
373     private void writeBufferRemaining() {
374         if (writeBuffer is null )
375         {
376             return;
377         }
378         const(ubyte)[] data = cast(const(ubyte)[])writeBuffer.peekRemaining();
379 
380         size_t nBytes = doWrite(data);
381 
382         version (HUNT_IO_DEBUG)
383             tracef("written data: %d bytes, fd=%d", nBytes, this.handle);
384         if(nBytes == data.length) {
385             writeBuffer = null;
386         } else if (nBytes > 0) { 
387             writeBuffer.nextGetIndex(cast(int)nBytes);
388             version (HUNT_IO_DEBUG)
389                 warningf("remaining data: %d / %d, fd=%d", data.length - nBytes, data.length, this.handle);
390         } else { 
391             version (HUNT_IO_DEBUG)
392             warningf("I/O busy: writing. fd=%d", this.handle);
393         }   
394     }
395     
396     protected bool checkAllWriteDone() {
397         if(_writeQueue is null || (_writeQueue.isEmpty() && writeBuffer is null)) {
398             resetWriteStatus();        
399             version (HUNT_IO_DEBUG)
400                 tracef("All data are written out. fd=%d", this.handle);
401             if(dataWriteDoneHandler !is null)
402                 dataWriteDoneHandler(this);
403             return true;
404         }
405 
406         return false;
407     }
408     
409     void resetWriteStatus() {
410         if(_writeQueue !is null)
411             _writeQueue.clear();
412         _isWritting = false;
413         _isWriteCancelling = false;
414         sendDataBuffer = null;
415         sendDataBackupBuffer = null;
416         writeBuffer = null;
417         _isSingleWriteBusy = false;
418     }
419 
420     /**
421      * Called by selector after data sent
422      * Note: It's only for IOCP selector: 
423     */
424     void onWriteDone(size_t nBytes) {
425         version (HUNT_IO_DEBUG) {
426             tracef("write done once: %d bytes, isWritting: %s, writeBuffer: %s, fd=%d",
427                  nBytes, _isWritting, writeBuffer is null, this.handle);
428         }
429         //if (_isWriteCancelling) {
430         //    version (HUNT_IO_DEBUG) tracef("write cancelled.");
431         //    resetWriteStatus();
432         //    return;
433         //}
434 
435 
436         //while(_isSingleWriteBusy) {
437         //    version(HUNT_IO_DEBUG)
438         //    info("waiting for last writting get finished...");
439         //}
440 
441         version (HUNT_IO_DEBUG) {
442             tracef("write done once: %d bytes, isWritting: %s, writeBuffer: %s, fd=%d",
443                  nBytes, _isWritting, writeBuffer is null, this.handle);
444         }
445 
446         if (writeBuffer !is null && writeBuffer.hasRemaining()) {
447             version (HUNT_IO_DEBUG) tracef("try to write the remaining in buffer.");
448             writeBufferRemaining();
449         }  else {
450             version (HUNT_IO_DEBUG) tracef("try to write next buffer.");
451             tryNextBufferWrite();
452         }
453     }
454 
455     private void notifyDataWrittenDone() {
456         if(dataWriteDoneHandler !is null && _writeQueue.isEmpty()) {
457             dataWriteDoneHandler(this);
458         }
459     }
460  
461     
462     DataReceivedHandler getDataReceivedHandler() {
463         return dataReceivedHandler;
464     }
465 
466     void cancelWrite() {
467         _isWriteCancelling = true;
468     }
469 
470     abstract bool isConnected() nothrow;
471     abstract protected void onDisconnected();
472 
473     protected void initializeWriteQueue() {
474         if (_writeQueue is null) {
475             _writeQueue = new WritingBufferQueue();
476         }
477     }
478 
479     SimpleEventHandler disconnectionHandler;
480     
481     protected WritingBufferQueue _writeQueue;
482     protected bool _isWriteCancelling = false;
483     private  bool _isSingleWriteBusy = false; // keep a single I/O write operation atomic
484     private const(ubyte)[] _readBuffer;
485     private const(ubyte)[] sendDataBuffer;
486     private const(ubyte)[] sendDataBackupBuffer;
487     private ByteBuffer writeBuffer; 
488 
489     private IocpContext _iocpread;
490     private IocpContext _iocpwrite;
491 }