1 module hunt.io.channel.iocp.AbstractStream; 2 3 // dfmt off 4 version (HAVE_IOCP) : 5 // dfmt on 6 7 import hunt.event.selector.Selector; 8 import hunt.io.ByteBuffer; 9 import hunt.io.BufferUtils; 10 import hunt.io.channel.AbstractSocketChannel; 11 import hunt.io.channel.Common; 12 import hunt.io.channel.iocp.Common; 13 import hunt.logging.ConsoleLogger; 14 import hunt.Functions; 15 import hunt.event.selector.IOCP; 16 import hunt.system.Error; 17 import hunt.util.ThreadHelper; 18 19 import core.atomic; 20 import core.sys.windows.windows; 21 import core.sys.windows.winsock2; 22 import core.sys.windows.mswsock; 23 import std.format; 24 import std.socket; 25 import core.stdc.string; 26 27 /** 28 TCP Peer 29 */ 30 abstract class AbstractStream : AbstractSocketChannel { 31 32 // data event handlers 33 34 /** 35 * Warning: The received data is stored a inner buffer. For a data safe, 36 * you would make a copy of it. 37 */ 38 protected DataReceivedHandler dataReceivedHandler; 39 protected SimpleActionHandler dataWriteDoneHandler; 40 41 protected ByteBuffer _bufferForRead; 42 protected AddressFamily _family; 43 44 45 this(Selector loop, AddressFamily family = AddressFamily.INET, size_t bufferSize = 4096 * 2) { 46 super(loop, ChannelType.TCP); 47 // setFlag(ChannelFlag.Read, true); 48 // setFlag(ChannelFlag.Write, true); 49 50 // version (HUNT_IO_DEBUG) 51 // trace("Buffer size: ", bufferSize); 52 // _readBuffer = new ubyte[bufferSize]; 53 _bufferForRead = BufferUtils.allocate(bufferSize); 54 _bufferForRead.clear(); 55 _readBuffer = cast(ubyte[])_bufferForRead.array(); 56 // _writeQueue = new WritingBufferQueue(); 57 // this.socket = new TcpSocket(family); 58 59 loadWinsockExtension(this.handle); 60 } 61 62 mixin CheckIocpError; 63 64 abstract bool isClient(); 65 66 override void onRead() { 67 version (HUNT_IO_DEBUG) 68 trace("ready to read"); 69 super.onRead(); 70 } 71 72 /** 73 * Should be thread-safe. 74 */ 75 override void onWrite() { 76 version (HUNT_IO_DEBUG) 77 tracef("checking write status, isWritting: %s, writeBuffer: %s", _isWritting, writeBuffer is null); 78 79 //if(!_isWritting){ 80 // version (HUNT_IO_DEBUG) infof("No data to write out. fd=%d", this.handle); 81 // return; 82 //} 83 84 if(isClosing() && _isWriteCancelling) { 85 version (HUNT_IO_DEBUG) infof("Write cancelled, fd=%d", this.handle); 86 resetWriteStatus(); 87 return; 88 } 89 tryNextBufferWrite(); 90 } 91 92 protected override void onClose() { 93 _isWritting = false; 94 resetWriteStatus(); 95 if(this._socket is null) { 96 import core.sys.windows.winsock2; 97 .closesocket(this.handle); 98 } else { 99 // FIXME: Needing refactor or cleanup -@Administrator at 2019/8/9 1:20:27 pm 100 // 101 //while(!_isSingleWriteBusy) 102 //{ 103 this._socket.shutdown(SocketShutdown.BOTH); 104 this._socket.close(); 105 //} 106 } 107 super.onClose(); 108 } 109 110 void beginRead() { 111 // https://docs.microsoft.com/en-us/windows/desktop/api/winsock2/nf-winsock2-wsarecv 112 113 /// _isSingleWriteBusy = true; 114 115 WSABUF _dataReadBuffer; 116 _dataReadBuffer.len = cast(uint) _readBuffer.length; 117 _dataReadBuffer.buf = cast(char*) _readBuffer.ptr; 118 memset(&_iocpread.overlapped , 0 ,_iocpread.overlapped.sizeof ); 119 _iocpread.channel = this; 120 _iocpread.operation = IocpOperation.read; 121 DWORD dwReceived = 0; 122 DWORD dwFlags = 0; 123 version (HUNT_IO_DEBUG) 124 tracef("start receiving [fd=%d] ", this.socket.handle); 125 // _isSingleWriteBusy = true; 126 int nRet = WSARecv(cast(SOCKET) this.socket.handle, &_dataReadBuffer, 1u, 127 &dwReceived, &dwFlags, &_iocpread.overlapped, cast(LPWSAOVERLAPPED_COMPLETION_ROUTINE) null); 128 129 if (nRet == SOCKET_ERROR && (GetLastError() != ERROR_IO_PENDING)) { 130 _isSingleWriteBusy = false; 131 close(); 132 } 133 //checkErro(nRet, SOCKET_ERROR); 134 } 135 136 protected bool doConnect(Address addr) { 137 Address binded = createAddress(this.socket.addressFamily); 138 _isSingleWriteBusy = true; 139 this.socket.bind(binded); 140 _iocpread.channel = this; 141 _iocpread.operation = IocpOperation.connect; 142 143 import std.datetime.stopwatch; 144 auto sw = StopWatch(AutoStart.yes); 145 sw.start(); 146 scope(exit) { 147 sw.stop(); 148 } 149 150 // https://docs.microsoft.com/en-us/windows/win32/api/mswsock/nc-mswsock-lpfn_connectex 151 int nRet = ConnectEx(cast(SOCKET) this.socket.handle(), cast(SOCKADDR*) addr.name(), 152 addr.nameLen(), null, 0, null, &_iocpread.overlapped); 153 checkErro(nRet, SOCKET_ERROR); 154 155 if(this._error) 156 return false; 157 158 // https://docs.microsoft.com/en-us/windows/win32/api/winsock/nf-winsock-getsockopt 159 int seconds = 0; 160 int bytes = seconds.sizeof; 161 int iResult = 0; 162 163 CHECK: 164 iResult = getsockopt(cast(SOCKET) this.socket.handle(), SOL_SOCKET, SO_CONNECT_TIME, 165 cast(void*)&seconds, cast(PINT)&bytes); 166 167 bool result = false; 168 if ( iResult != NO_ERROR ) { 169 DWORD dwLastError = WSAGetLastError(); 170 warningf("getsockopt(SO_CONNECT_TIME) failed with error: code=%d, message=%s", 171 dwLastError, getErrorMessage(dwLastError)); 172 } else { 173 if (seconds == 0xFFFFFFFF) { 174 version(HUNT_IO_DEBUG) warningf("Connection not established yet (destination: %s).", addr); 175 // so to check again 176 goto CHECK; 177 } else { 178 result = true; 179 version(HUNT_IO_DEBUG) { 180 // 181 infof("Connection has been established in %d msecs, destination: %s", sw.peek.total!"msecs", addr); 182 } 183 // https://docs.microsoft.com/en-us/windows/win32/winsock/sol-socket-socket-options 184 enum SO_UPDATE_CONNECT_CONTEXT = 0x7010; 185 iResult = setsockopt(cast(SOCKET) this.socket.handle(), SOL_SOCKET, 186 SO_UPDATE_CONNECT_CONTEXT, NULL, 0 ); 187 } 188 } 189 190 return result; 191 } 192 193 private uint doWrite(const(ubyte)[] data) { 194 DWORD dwSent = 0;//cast(DWORD)data.length; 195 DWORD dwFlags = 0; 196 197 memset(&_iocpwrite.overlapped , 0 ,_iocpwrite.overlapped.sizeof ); 198 _iocpwrite.channel = this; 199 _iocpwrite.operation = IocpOperation.write; 200 // tracef("To write %d bytes, fd=%d", data.length, this.socket.handle()); 201 version (HUNT_IO_DEBUG) { 202 size_t bufferLength = data.length; 203 tracef("To write %d bytes", bufferLength); 204 if (bufferLength > 32) 205 tracef("%(%02X %) ...", data[0 .. 32]); 206 else 207 tracef("%s", data); 208 } 209 // size_t bufferLength = data.length; 210 // tracef("To write %d bytes", bufferLength); 211 // tracef("%s", data); 212 WSABUF _dataWriteBuffer; 213 214 //char[] bf = new char[data.length]; 215 //memcpy(bf.ptr,data.ptr,data.length); 216 //_dataWriteBuffer.buf = bf.ptr; 217 _dataWriteBuffer.buf = cast(char*) data.ptr; 218 _dataWriteBuffer.len = cast(uint) data.length; 219 // _isSingleWriteBusy = true; 220 int nRet = WSASend( cast(SOCKET) this.socket.handle(), &_dataWriteBuffer, 1, &dwSent, 221 dwFlags, &_iocpwrite.overlapped, cast(LPWSAOVERLAPPED_COMPLETION_ROUTINE) null); 222 // if (nRet != NO_ERROR && (GetLastError() != ERROR_IO_PENDING)) 223 // { 224 // _isSingleWriteBusy = false; 225 // // close(); 226 // } 227 228 checkErro( nRet, SOCKET_ERROR); 229 230 // FIXME: Needing refactor or cleanup -@Administrator at 2019/8/9 12:18:20 pm 231 // Keep this to prevent the buffer corrupted. Why? 232 version (HUNT_IO_DEBUG) { 233 tracef("sent: %d / %d bytes, fd=%d", dwSent, bufferLength, this.handle); 234 } 235 236 if (this.isError) { 237 errorf("Socket error on write: fd=%d, message=%s", this.handle, this.errorMessage); 238 this.close(); 239 } 240 241 return dwSent; 242 } 243 244 protected void doRead() { 245 //_isSingleWriteBusy = false; 246 this.clearError(); 247 version (HUNT_IO_DEBUG) 248 tracef("start reading: %d nbytes", this.readLen); 249 250 if (readLen > 0) { 251 // import std.stdio; 252 // writefln("length=%d, data: %(%02X %)", readLen, _readBuffer[0 .. readLen]); 253 version (HUNT_IO_DEBUG) 254 tracef("reading done: %d nbytes", readLen); 255 256 auto ss = this.socket; 257 if (dataReceivedHandler !is null){ 258 _bufferForRead.limit(cast(int)readLen); 259 _bufferForRead.position(0); 260 dataReceivedHandler(_bufferForRead); 261 } 262 263 if (isClient()) 264 { 265 this.beginRead(); 266 } 267 268 269 } else if (readLen == 0) { 270 version (HUNT_IO_DEBUG) { 271 if (_remoteAddress !is null) 272 warningf("connection broken: %s", _remoteAddress.toString()); 273 } 274 onDisconnected(); 275 // if (_isClosed) 276 // this.close(); 277 } else { 278 version (HUNT_IO_DEBUG) { 279 warningf("undefined behavior on thread %d", getTid()); 280 } else { 281 this._error = true; 282 this._errorMessage = "undefined behavior on thread"; 283 } 284 } 285 } 286 287 // try to write a block of data directly 288 protected size_t tryWrite(const ubyte[] data) { 289 version (HUNT_IO_DEBUG) 290 tracef("start to write, total=%d bytes, fd=%d", data.length, this.handle); 291 clearError(); 292 size_t nBytes; 293 //scope(exit) { 294 // _isSingleWriteBusy = false; 295 //} 296 if (!_isSingleWriteBusy) 297 { 298 nBytes = doWrite(data); 299 } 300 301 return nBytes; 302 } 303 304 // try to write a block of data from the write queue 305 private void tryNextBufferWrite() { 306 if(checkAllWriteDone()){ 307 _isSingleWriteBusy = false; 308 if (!isClient()) 309 { 310 this.beginRead(); 311 } 312 return; 313 } 314 315 // keep thread-safe here 316 //if(!cas(&_isSingleWriteBusy, false, true)) { 317 // version (HUNT_IO_DEBUG) warningf("busy writing. fd=%d", this.handle); 318 // return; 319 //} 320 321 //scope(exit) { 322 // _isSingleWriteBusy = false; 323 //} 324 325 clearError(); 326 327 bool haveBuffer = _writeQueue.tryDequeue(writeBuffer); 328 if(haveBuffer) { 329 writeBufferRemaining(); 330 } else { 331 version (HUNT_IO_DEBUG) 332 warning("No buffer in queue"); 333 } 334 } 335 336 private void writeBufferRemaining() { 337 if (writeBuffer is null ) 338 { 339 return; 340 } 341 const(ubyte)[] data = cast(const(ubyte)[])writeBuffer.getRemaining(); 342 343 size_t nBytes = doWrite(data); 344 345 version (HUNT_IO_DEBUG) 346 tracef("written data: %d bytes, fd=%d", nBytes, this.handle); 347 if(nBytes == data.length) { 348 writeBuffer = null; 349 } else if (nBytes > 0) { 350 writeBuffer.nextGetIndex(cast(int)nBytes); 351 version (HUNT_IO_DEBUG) 352 warningf("remaining data: %d / %d, fd=%d", data.length - nBytes, data.length, this.handle); 353 } else { 354 version (HUNT_IO_DEBUG) 355 warningf("I/O busy: writing. fd=%d", this.handle); 356 } 357 } 358 359 protected bool checkAllWriteDone() { 360 if(_writeQueue is null || (_writeQueue.isEmpty() && writeBuffer is null)) { 361 resetWriteStatus(); 362 version (HUNT_IO_DEBUG) 363 tracef("All data are written out. fd=%d", this.handle); 364 if(dataWriteDoneHandler !is null) 365 dataWriteDoneHandler(this); 366 return true; 367 } 368 369 return false; 370 } 371 372 void resetWriteStatus() { 373 if(_writeQueue !is null) 374 _writeQueue.clear(); 375 _isWritting = false; 376 _isWriteCancelling = false; 377 sendDataBuffer = null; 378 sendDataBackupBuffer = null; 379 writeBuffer = null; 380 _isSingleWriteBusy = false; 381 } 382 383 /** 384 * Called by selector after data sent 385 * Note: It's only for IOCP selector: 386 */ 387 void onWriteDone(size_t nBytes) { 388 version (HUNT_IO_DEBUG) { 389 tracef("write done once: %d bytes, isWritting: %s, writeBuffer: %s, fd=%d", 390 nBytes, _isWritting, writeBuffer is null, this.handle); 391 } 392 //if (_isWriteCancelling) { 393 // version (HUNT_IO_DEBUG) tracef("write cancelled."); 394 // resetWriteStatus(); 395 // return; 396 //} 397 398 399 //while(_isSingleWriteBusy) { 400 // version(HUNT_IO_DEBUG) 401 // info("waiting for last writting get finished..."); 402 //} 403 404 version (HUNT_IO_DEBUG) { 405 tracef("write done once: %d bytes, isWritting: %s, writeBuffer: %s, fd=%d", 406 nBytes, _isWritting, writeBuffer is null, this.handle); 407 } 408 409 if (writeBuffer !is null && writeBuffer.hasRemaining()) { 410 version (HUNT_IO_DEBUG) tracef("try to write the remaining in buffer."); 411 writeBufferRemaining(); 412 } else { 413 version (HUNT_IO_DEBUG) tracef("try to write next buffer."); 414 tryNextBufferWrite(); 415 } 416 } 417 418 private void notifyDataWrittenDone() { 419 if(dataWriteDoneHandler !is null && _writeQueue.isEmpty()) { 420 dataWriteDoneHandler(this); 421 } 422 } 423 424 DataReceivedHandler getDataReceivedHandler() { 425 return dataReceivedHandler; 426 } 427 428 void cancelWrite() { 429 _isWriteCancelling = true; 430 } 431 432 abstract bool isConnected() nothrow; 433 abstract protected void onDisconnected(); 434 435 protected void initializeWriteQueue() { 436 if (_writeQueue is null) { 437 _writeQueue = new WritingBufferQueue(); 438 } 439 } 440 441 SimpleEventHandler disconnectionHandler; 442 443 protected WritingBufferQueue _writeQueue; 444 protected bool _isWriteCancelling = false; 445 private bool _isSingleWriteBusy = false; // keep a single I/O write operation atomic 446 private const(ubyte)[] _readBuffer; 447 private const(ubyte)[] sendDataBuffer; 448 private const(ubyte)[] sendDataBackupBuffer; 449 private ByteBuffer writeBuffer; 450 451 private IocpContext _iocpread; 452 private IocpContext _iocpwrite; 453 }