1 module hunt.io.channel.iocp.AbstractStream;
2 
3 // dfmt off
4 version (HAVE_IOCP) : 
5 // dfmt on
6 
7 import hunt.event.selector.Selector;
8 import hunt.io.ByteBuffer;
9 import hunt.io.BufferUtils;
10 import hunt.io.channel.AbstractSocketChannel;
11 import hunt.io.channel.Common;
12 import hunt.io.channel.iocp.Common;
13 import hunt.logging.ConsoleLogger;
14 import hunt.Functions;
15 import hunt.event.selector.IOCP;
16 import hunt.system.Error;
17 import hunt.util.ThreadHelper;
18 
19 import core.atomic;
20 import core.sys.windows.windows;
21 import core.sys.windows.winsock2;
22 import core.sys.windows.mswsock;
23 import std.format;
24 import std.socket;
25 import core.stdc.string;
26 
27 /**
28 TCP Peer
29 */
30 abstract class AbstractStream : AbstractSocketChannel {
31 
32     // data event handlers
33     
34     /**
35     * Warning: The received data is stored a inner buffer. For a data safe, 
36     * you would make a copy of it. 
37     */
38     protected DataReceivedHandler dataReceivedHandler;
39     protected SimpleActionHandler dataWriteDoneHandler;
40 
41     protected ByteBuffer _bufferForRead;
42     protected AddressFamily _family;
43     
44 
45     this(Selector loop, AddressFamily family = AddressFamily.INET, size_t bufferSize = 4096 * 2) {
46         super(loop, ChannelType.TCP);
47         // setFlag(ChannelFlag.Read, true);
48         // setFlag(ChannelFlag.Write, true);
49 
50         // version (HUNT_IO_DEBUG)
51         //     trace("Buffer size: ", bufferSize);
52         // _readBuffer = new ubyte[bufferSize];
53         _bufferForRead = BufferUtils.allocate(bufferSize);
54         _bufferForRead.clear();
55         _readBuffer = cast(ubyte[])_bufferForRead.array();
56         // _writeQueue = new WritingBufferQueue();
57         // this.socket = new TcpSocket(family);
58 
59         loadWinsockExtension(this.handle);
60     }
61 
62     mixin CheckIocpError;
63 
64     abstract bool isClient();
65 
66     override void onRead() {
67         version (HUNT_IO_DEBUG)
68             trace("ready to read");
69         super.onRead();
70     }
71 
72     /**
73      * Should be thread-safe.
74      */
75     override void onWrite() {  
76         version (HUNT_IO_DEBUG)
77             tracef("checking write status, isWritting: %s, writeBuffer: %s", _isWritting, writeBuffer is null);
78 
79         //if(!_isWritting){
80         //    version (HUNT_IO_DEBUG) infof("No data to write out. fd=%d", this.handle);
81         //    return;
82         //}
83 
84         if(isClosing() && _isWriteCancelling) {
85             version (HUNT_IO_DEBUG) infof("Write cancelled, fd=%d", this.handle);
86             resetWriteStatus();
87             return;
88         }
89         tryNextBufferWrite();
90     }
91     
92     protected override void onClose() {
93         _isWritting = false;
94         resetWriteStatus();
95         if(this._socket is null) {
96             import core.sys.windows.winsock2;
97             .closesocket(this.handle);
98         } else {
99             // FIXME: Needing refactor or cleanup -@Administrator at 2019/8/9 1:20:27 pm
100             //
101             //while(!_isSingleWriteBusy)
102             //{
103                 this._socket.shutdown(SocketShutdown.BOTH);
104                 this._socket.close();
105             //}
106         }
107         super.onClose();
108     }
109 
110     void beginRead() {
111         // https://docs.microsoft.com/en-us/windows/desktop/api/winsock2/nf-winsock2-wsarecv
112 
113         ///  _isSingleWriteBusy = true;
114 
115         WSABUF _dataReadBuffer;
116         _dataReadBuffer.len = cast(uint) _readBuffer.length;
117         _dataReadBuffer.buf = cast(char*) _readBuffer.ptr;
118         memset(&_iocpread.overlapped , 0 ,_iocpread.overlapped.sizeof );
119         _iocpread.channel = this;
120         _iocpread.operation = IocpOperation.read;
121         DWORD dwReceived = 0;
122         DWORD dwFlags = 0;
123         version (HUNT_IO_DEBUG)
124             tracef("start receiving [fd=%d] ", this.socket.handle);
125         // _isSingleWriteBusy = true;
126         int nRet = WSARecv(cast(SOCKET) this.socket.handle, &_dataReadBuffer, 1u, 
127             &dwReceived, &dwFlags, &_iocpread.overlapped, cast(LPWSAOVERLAPPED_COMPLETION_ROUTINE) null);
128 
129         if (nRet == SOCKET_ERROR && (GetLastError() != ERROR_IO_PENDING)) {
130             _isSingleWriteBusy = false;
131             close();
132         }
133         //checkErro(nRet, SOCKET_ERROR);
134     }
135 
136     protected bool doConnect(Address addr) {
137         Address binded = createAddress(this.socket.addressFamily);
138         _isSingleWriteBusy = true;
139         this.socket.bind(binded);
140         _iocpread.channel = this;
141         _iocpread.operation = IocpOperation.connect;
142 
143         import std.datetime.stopwatch;
144         auto sw = StopWatch(AutoStart.yes);
145         sw.start();
146         scope(exit) {
147             sw.stop();
148         }
149 
150         // https://docs.microsoft.com/en-us/windows/win32/api/mswsock/nc-mswsock-lpfn_connectex
151         int nRet = ConnectEx(cast(SOCKET) this.socket.handle(), cast(SOCKADDR*) addr.name(), 
152             addr.nameLen(), null, 0, null, &_iocpread.overlapped);
153         checkErro(nRet, SOCKET_ERROR);
154 
155         if(this._error) 
156             return false;
157 
158         // https://docs.microsoft.com/en-us/windows/win32/api/winsock/nf-winsock-getsockopt
159         int seconds = 0;
160         int bytes = seconds.sizeof;
161         int iResult = 0;
162 
163         CHECK: 
164         iResult = getsockopt(cast(SOCKET) this.socket.handle(), SOL_SOCKET, SO_CONNECT_TIME,
165                             cast(void*)&seconds, cast(PINT)&bytes);
166 
167         bool result = false;
168         if ( iResult != NO_ERROR ) {
169             DWORD dwLastError = WSAGetLastError();
170             warningf("getsockopt(SO_CONNECT_TIME) failed with error: code=%d, message=%s", 
171                 dwLastError, getErrorMessage(dwLastError));
172         } else {
173             if (seconds == 0xFFFFFFFF) {
174                 version(HUNT_IO_DEBUG) warningf("Connection not established yet (destination: %s).", addr);
175                 // so to check again
176                 goto CHECK;
177             } else {
178                 result = true;
179                 version(HUNT_IO_DEBUG) {
180                     //
181                     infof("Connection has been established in %d msecs, destination: %s", sw.peek.total!"msecs", addr);
182                 }
183                 // https://docs.microsoft.com/en-us/windows/win32/winsock/sol-socket-socket-options
184                 enum SO_UPDATE_CONNECT_CONTEXT = 0x7010;
185                 iResult = setsockopt(cast(SOCKET) this.socket.handle(), SOL_SOCKET, 
186                     SO_UPDATE_CONNECT_CONTEXT, NULL, 0 );
187             }
188         }
189         
190         return result;
191     }
192 
193     private uint doWrite(const(ubyte)[] data) {
194         DWORD dwSent = 0;//cast(DWORD)data.length;
195         DWORD dwFlags = 0;
196 
197         memset(&_iocpwrite.overlapped , 0 ,_iocpwrite.overlapped.sizeof );
198         _iocpwrite.channel = this;
199         _iocpwrite.operation = IocpOperation.write;
200         // tracef("To write %d bytes, fd=%d", data.length, this.socket.handle());
201         version (HUNT_IO_DEBUG) {
202             size_t bufferLength = data.length;
203             tracef("To write %d bytes", bufferLength);
204             if (bufferLength > 32)
205                 tracef("%(%02X %) ...", data[0 .. 32]);
206             else
207                 tracef("%s", data);
208         }
209         // size_t bufferLength = data.length;
210         //     tracef("To write %d bytes", bufferLength);
211         //     tracef("%s", data);
212         WSABUF _dataWriteBuffer;
213 
214         //char[] bf = new char[data.length];
215         //memcpy(bf.ptr,data.ptr,data.length);
216         //_dataWriteBuffer.buf =  bf.ptr;
217         _dataWriteBuffer.buf = cast(char*) data.ptr;
218         _dataWriteBuffer.len = cast(uint) data.length;
219         // _isSingleWriteBusy = true;
220         int nRet = WSASend( cast(SOCKET) this.socket.handle(), &_dataWriteBuffer, 1, &dwSent,
221         dwFlags, &_iocpwrite.overlapped, cast(LPWSAOVERLAPPED_COMPLETION_ROUTINE) null);
222         // if (nRet != NO_ERROR && (GetLastError() != ERROR_IO_PENDING))
223         // {
224         //     _isSingleWriteBusy = false;
225         //     // close();
226         // }
227 
228         checkErro( nRet, SOCKET_ERROR);
229 
230         // FIXME: Needing refactor or cleanup -@Administrator at 2019/8/9 12:18:20 pm
231         // Keep this to prevent the buffer corrupted. Why?
232         version (HUNT_IO_DEBUG) {
233             tracef("sent: %d / %d bytes, fd=%d", dwSent, bufferLength, this.handle);
234         }
235 
236         if (this.isError) {
237             errorf("Socket error on write: fd=%d, message=%s", this.handle, this.errorMessage);
238             this.close();
239         }
240 
241         return dwSent;
242     }
243 
244     protected void doRead() {
245         //_isSingleWriteBusy = false;
246         this.clearError();
247         version (HUNT_IO_DEBUG)
248             tracef("start reading: %d nbytes", this.readLen);
249 
250         if (readLen > 0) {
251             // import std.stdio;
252             // writefln("length=%d, data: %(%02X %)", readLen, _readBuffer[0 .. readLen]);
253             version (HUNT_IO_DEBUG)
254                 tracef("reading done: %d nbytes", readLen);
255 
256             auto ss = this.socket;
257             if (dataReceivedHandler !is null){
258                 _bufferForRead.limit(cast(int)readLen);
259                 _bufferForRead.position(0);
260                 dataReceivedHandler(_bufferForRead);
261             }
262 
263             if (isClient())
264             {
265                 this.beginRead();
266             }
267 
268 
269         } else if (readLen == 0) {
270             version (HUNT_IO_DEBUG) {
271                 if (_remoteAddress !is null)
272                     warningf("connection broken: %s", _remoteAddress.toString());
273             }
274             onDisconnected();
275             // if (_isClosed)
276             //     this.close();
277         } else {
278             version (HUNT_IO_DEBUG) {
279                 warningf("undefined behavior on thread %d", getTid());
280             } else {
281                 this._error = true;
282                 this._errorMessage = "undefined behavior on thread";
283             }
284         }
285     }
286 
287     // try to write a block of data directly
288     protected size_t tryWrite(const ubyte[] data) {        
289         version (HUNT_IO_DEBUG)
290             tracef("start to write, total=%d bytes, fd=%d", data.length, this.handle);
291         clearError();
292         size_t nBytes;
293         //scope(exit) {
294         //    _isSingleWriteBusy = false;
295         //}
296         if (!_isSingleWriteBusy)
297         {
298              nBytes = doWrite(data);
299         }
300 
301         return nBytes;
302     }
303 
304     // try to write a block of data from the write queue
305     private void tryNextBufferWrite() {
306         if(checkAllWriteDone()){
307             _isSingleWriteBusy = false;
308             if (!isClient())
309             {
310                 this.beginRead();
311             }
312             return;
313         } 
314         
315         // keep thread-safe here
316         //if(!cas(&_isSingleWriteBusy, false, true)) {
317         //    version (HUNT_IO_DEBUG) warningf("busy writing. fd=%d", this.handle);
318         //    return;
319         //}
320 
321         //scope(exit) {
322         //    _isSingleWriteBusy = false;
323         //}
324 
325         clearError();
326 
327         bool haveBuffer = _writeQueue.tryDequeue(writeBuffer);
328         if(haveBuffer) {
329             writeBufferRemaining();
330         } else {
331             version (HUNT_IO_DEBUG)
332                 warning("No buffer in queue");
333         }
334     }
335 
336     private void writeBufferRemaining() {
337         if (writeBuffer is null )
338         {
339             return;
340         }
341         const(ubyte)[] data = cast(const(ubyte)[])writeBuffer.getRemaining();
342 
343         size_t nBytes = doWrite(data);
344 
345         version (HUNT_IO_DEBUG)
346             tracef("written data: %d bytes, fd=%d", nBytes, this.handle);
347         if(nBytes == data.length) {
348             writeBuffer = null;
349         } else if (nBytes > 0) { 
350             writeBuffer.nextGetIndex(cast(int)nBytes);
351             version (HUNT_IO_DEBUG)
352                 warningf("remaining data: %d / %d, fd=%d", data.length - nBytes, data.length, this.handle);
353         } else { 
354             version (HUNT_IO_DEBUG)
355             warningf("I/O busy: writing. fd=%d", this.handle);
356         }   
357     }
358     
359     protected bool checkAllWriteDone() {
360         if(_writeQueue is null || (_writeQueue.isEmpty() && writeBuffer is null)) {
361             resetWriteStatus();        
362             version (HUNT_IO_DEBUG)
363                 tracef("All data are written out. fd=%d", this.handle);
364             if(dataWriteDoneHandler !is null)
365                 dataWriteDoneHandler(this);
366             return true;
367         }
368 
369         return false;
370     }
371     
372     void resetWriteStatus() {
373         if(_writeQueue !is null)
374             _writeQueue.clear();
375         _isWritting = false;
376         _isWriteCancelling = false;
377         sendDataBuffer = null;
378         sendDataBackupBuffer = null;
379         writeBuffer = null;
380         _isSingleWriteBusy = false;
381     }
382 
383     /**
384      * Called by selector after data sent
385      * Note: It's only for IOCP selector: 
386     */
387     void onWriteDone(size_t nBytes) {
388         version (HUNT_IO_DEBUG) {
389             tracef("write done once: %d bytes, isWritting: %s, writeBuffer: %s, fd=%d",
390                  nBytes, _isWritting, writeBuffer is null, this.handle);
391         }
392         //if (_isWriteCancelling) {
393         //    version (HUNT_IO_DEBUG) tracef("write cancelled.");
394         //    resetWriteStatus();
395         //    return;
396         //}
397 
398 
399         //while(_isSingleWriteBusy) {
400         //    version(HUNT_IO_DEBUG)
401         //    info("waiting for last writting get finished...");
402         //}
403 
404         version (HUNT_IO_DEBUG) {
405             tracef("write done once: %d bytes, isWritting: %s, writeBuffer: %s, fd=%d",
406                  nBytes, _isWritting, writeBuffer is null, this.handle);
407         }
408 
409         if (writeBuffer !is null && writeBuffer.hasRemaining()) {
410             version (HUNT_IO_DEBUG) tracef("try to write the remaining in buffer.");
411             writeBufferRemaining();
412         }  else {
413             version (HUNT_IO_DEBUG) tracef("try to write next buffer.");
414             tryNextBufferWrite();
415         }
416     }
417 
418     private void notifyDataWrittenDone() {
419         if(dataWriteDoneHandler !is null && _writeQueue.isEmpty()) {
420             dataWriteDoneHandler(this);
421         }
422     }
423     
424     DataReceivedHandler getDataReceivedHandler() {
425         return dataReceivedHandler;
426     }
427 
428     void cancelWrite() {
429         _isWriteCancelling = true;
430     }
431 
432     abstract bool isConnected() nothrow;
433     abstract protected void onDisconnected();
434 
435     protected void initializeWriteQueue() {
436         if (_writeQueue is null) {
437             _writeQueue = new WritingBufferQueue();
438         }
439     }
440 
441     SimpleEventHandler disconnectionHandler;
442     
443     protected WritingBufferQueue _writeQueue;
444     protected bool _isWriteCancelling = false;
445     private  bool _isSingleWriteBusy = false; // keep a single I/O write operation atomic
446     private const(ubyte)[] _readBuffer;
447     private const(ubyte)[] sendDataBuffer;
448     private const(ubyte)[] sendDataBackupBuffer;
449     private ByteBuffer writeBuffer; 
450 
451     private IocpContext _iocpread;
452     private IocpContext _iocpwrite;
453 }