1 module hunt.io.channel.iocp.AbstractStream; 2 3 // dfmt off 4 version (HAVE_IOCP) : 5 // dfmt on 6 7 import hunt.event.selector.Selector; 8 import hunt.io.ByteBuffer; 9 import hunt.io.BufferUtils; 10 import hunt.io.channel.AbstractSocketChannel; 11 import hunt.io.channel.ChannelTask; 12 import hunt.io.channel.Common; 13 import hunt.io.channel.iocp.Common; 14 import hunt.logging; 15 import hunt.Functions; 16 import hunt.event.selector.IOCP; 17 import hunt.system.Error; 18 import hunt.util.ThreadHelper; 19 import hunt.util.worker; 20 21 import core.atomic; 22 import core.sys.windows.windows; 23 import core.sys.windows.winsock2; 24 import core.sys.windows.mswsock; 25 import std.format; 26 import std.socket; 27 import core.stdc.string; 28 29 /** 30 TCP Peer 31 */ 32 abstract class AbstractStream : AbstractSocketChannel { 33 34 // data event handlers 35 36 /** 37 * Warning: The received data is stored a inner buffer. For a data safe, 38 * you would make a copy of it. 39 */ 40 protected DataReceivedHandler dataReceivedHandler; 41 protected SimpleActionHandler dataWriteDoneHandler; 42 43 protected ByteBuffer _bufferForRead; 44 protected AddressFamily _family; 45 46 private size_t _bufferSize = 4096; 47 private ChannelTask _task = null; 48 49 50 this(Selector loop, AddressFamily family = AddressFamily.INET, size_t bufferSize = 4096 * 2) { 51 _bufferSize = bufferSize; 52 super(loop, ChannelType.TCP); 53 // setFlag(ChannelFlag.Read, true); 54 // setFlag(ChannelFlag.Write, true); 55 56 // version (HUNT_IO_DEBUG) 57 // trace("Buffer size: ", bufferSize); 58 // _readBuffer = new ubyte[bufferSize]; 59 _bufferForRead = BufferUtils.allocate(bufferSize); 60 _bufferForRead.clear(); 61 _readBuffer = cast(ubyte[])_bufferForRead.array(); 62 // _writeQueue = new WritingBufferQueue(); 63 // this.socket = new TcpSocket(family); 64 65 loadWinsockExtension(this.handle); 66 } 67 68 mixin CheckIocpError; 69 70 abstract bool isClient(); 71 72 override void onRead() { 73 version (HUNT_IO_DEBUG) 74 trace("ready to read"); 75 super.onRead(); 76 } 77 78 /** 79 * Should be thread-safe. 80 */ 81 override void onWrite() { 82 version (HUNT_IO_DEBUG) 83 tracef("checking write status, isWritting: %s, writeBuffer: %s", _isWritting, writeBuffer is null); 84 85 //if(!_isWritting){ 86 // version (HUNT_IO_DEBUG) infof("No data to write out. fd=%d", this.handle); 87 // return; 88 //} 89 90 if(isClosing() && _isWriteCancelling) { 91 version (HUNT_IO_DEBUG) infof("Write cancelled, fd=%d", this.handle); 92 resetWriteStatus(); 93 return; 94 } 95 tryNextBufferWrite(); 96 } 97 98 protected override void onClose() { 99 _isWritting = false; 100 resetWriteStatus(); 101 if(this._socket is null) { 102 import core.sys.windows.winsock2; 103 .closesocket(this.handle); 104 } else { 105 // FIXME: Needing refactor or cleanup -@Administrator at 2019/8/9 1:20:27 pm 106 // 107 //while(!_isSingleWriteBusy) 108 //{ 109 this._socket.shutdown(SocketShutdown.BOTH); 110 this._socket.close(); 111 //} 112 } 113 super.onClose(); 114 } 115 116 void beginRead() { 117 // https://docs.microsoft.com/en-us/windows/desktop/api/winsock2/nf-winsock2-wsarecv 118 119 /// _isSingleWriteBusy = true; 120 121 WSABUF _dataReadBuffer; 122 _dataReadBuffer.len = cast(uint) _readBuffer.length; 123 _dataReadBuffer.buf = cast(char*) _readBuffer.ptr; 124 memset(&_iocpread.overlapped , 0 ,_iocpread.overlapped.sizeof ); 125 _iocpread.channel = this; 126 _iocpread.operation = IocpOperation.read; 127 DWORD dwReceived = 0; 128 DWORD dwFlags = 0; 129 version (HUNT_IO_DEBUG) 130 tracef("start receiving [fd=%d] ", this.socket.handle); 131 // _isSingleWriteBusy = true; 132 int nRet = WSARecv(cast(SOCKET) this.socket.handle, &_dataReadBuffer, 1u, 133 &dwReceived, &dwFlags, &_iocpread.overlapped, cast(LPWSAOVERLAPPED_COMPLETION_ROUTINE) null); 134 135 if (nRet == SOCKET_ERROR && (GetLastError() != ERROR_IO_PENDING)) { 136 _isSingleWriteBusy = false; 137 close(); 138 } 139 //checkErro(nRet, SOCKET_ERROR); 140 } 141 142 protected bool doConnect(Address addr) { 143 Address binded = createAddress(this.socket.addressFamily); 144 _isSingleWriteBusy = true; 145 this.socket.bind(binded); 146 _iocpread.channel = this; 147 _iocpread.operation = IocpOperation.connect; 148 149 import std.datetime.stopwatch; 150 auto sw = StopWatch(AutoStart.yes); 151 sw.start(); 152 scope(exit) { 153 sw.stop(); 154 } 155 156 // https://docs.microsoft.com/en-us/windows/win32/api/mswsock/nc-mswsock-lpfn_connectex 157 int nRet = ConnectEx(cast(SOCKET) this.socket.handle(), cast(SOCKADDR*) addr.name(), 158 addr.nameLen(), null, 0, null, &_iocpread.overlapped); 159 checkErro(nRet, SOCKET_ERROR); 160 161 if(this._error) 162 return false; 163 164 // https://docs.microsoft.com/en-us/windows/win32/api/winsock/nf-winsock-getsockopt 165 int seconds = 0; 166 int bytes = seconds.sizeof; 167 int iResult = 0; 168 169 CHECK: 170 iResult = getsockopt(cast(SOCKET) this.socket.handle(), SOL_SOCKET, SO_CONNECT_TIME, 171 cast(void*)&seconds, cast(PINT)&bytes); 172 173 bool result = false; 174 if ( iResult != NO_ERROR ) { 175 DWORD dwLastError = WSAGetLastError(); 176 warningf("getsockopt(SO_CONNECT_TIME) failed with error: code=%d, message=%s", 177 dwLastError, getErrorMessage(dwLastError)); 178 } else { 179 if (seconds == 0xFFFFFFFF) { 180 version(HUNT_IO_DEBUG) warningf("Connection not established yet (destination: %s).", addr); 181 // so to check again 182 goto CHECK; 183 } else { 184 result = true; 185 version(HUNT_IO_DEBUG) { 186 // 187 infof("Connection has been established in %d msecs, destination: %s", sw.peek.total!"msecs", addr); 188 } 189 // https://docs.microsoft.com/en-us/windows/win32/winsock/sol-socket-socket-options 190 enum SO_UPDATE_CONNECT_CONTEXT = 0x7010; 191 iResult = setsockopt(cast(SOCKET) this.socket.handle(), SOL_SOCKET, 192 SO_UPDATE_CONNECT_CONTEXT, NULL, 0 ); 193 } 194 } 195 196 return result; 197 } 198 199 private uint doWrite(const(ubyte)[] data) { 200 DWORD dwSent = 0;//cast(DWORD)data.length; 201 DWORD dwFlags = 0; 202 203 memset(&_iocpwrite.overlapped , 0 ,_iocpwrite.overlapped.sizeof ); 204 _iocpwrite.channel = this; 205 _iocpwrite.operation = IocpOperation.write; 206 // tracef("To write %d bytes, fd=%d", data.length, this.socket.handle()); 207 version (HUNT_IO_DEBUG) { 208 size_t bufferLength = data.length; 209 tracef("To write %d bytes", bufferLength); 210 if (bufferLength > 32) 211 tracef("%(%02X %) ...", data[0 .. 32]); 212 else 213 tracef("%s", data); 214 } 215 // size_t bufferLength = data.length; 216 // tracef("To write %d bytes", bufferLength); 217 // tracef("%s", data); 218 WSABUF _dataWriteBuffer; 219 220 //char[] bf = new char[data.length]; 221 //memcpy(bf.ptr,data.ptr,data.length); 222 //_dataWriteBuffer.buf = bf.ptr; 223 _dataWriteBuffer.buf = cast(char*) data.ptr; 224 _dataWriteBuffer.len = cast(uint) data.length; 225 // _isSingleWriteBusy = true; 226 int nRet = WSASend( cast(SOCKET) this.socket.handle(), &_dataWriteBuffer, 1, &dwSent, 227 dwFlags, &_iocpwrite.overlapped, cast(LPWSAOVERLAPPED_COMPLETION_ROUTINE) null); 228 // if (nRet != NO_ERROR && (GetLastError() != ERROR_IO_PENDING)) 229 // { 230 // _isSingleWriteBusy = false; 231 // // close(); 232 // } 233 234 checkErro( nRet, SOCKET_ERROR); 235 236 // FIXME: Needing refactor or cleanup -@Administrator at 2019/8/9 12:18:20 pm 237 // Keep this to prevent the buffer corrupted. Why? 238 version (HUNT_IO_DEBUG) { 239 tracef("sent: %d / %d bytes, fd=%d", dwSent, bufferLength, this.handle); 240 } 241 242 if (this.isError) { 243 errorf("Socket error on write: fd=%d, message=%s", this.handle, this.errorMessage); 244 this.close(); 245 } 246 247 return dwSent; 248 } 249 250 protected void doRead() { 251 //_isSingleWriteBusy = false; 252 this.clearError(); 253 version (HUNT_IO_DEBUG) 254 tracef("start reading: %d nbytes", this.readLen); 255 256 if (readLen > 0) { 257 // import std.stdio; 258 // writefln("length=%d, data: %(%02X %)", readLen, _readBuffer[0 .. readLen]); 259 handleReceivedData(readLen); 260 261 if (isClient()) { 262 this.beginRead(); 263 } 264 265 } else if (readLen == 0) { 266 version (HUNT_IO_DEBUG) { 267 if (_remoteAddress !is null) 268 warningf("connection broken: %s", _remoteAddress.toString()); 269 } 270 onDisconnected(); 271 // if (_isClosed) 272 // this.close(); 273 } else { 274 version (HUNT_IO_DEBUG) { 275 warningf("undefined behavior on thread %d", getTid()); 276 } else { 277 this._error = true; 278 this._errorMessage = "undefined behavior on thread"; 279 } 280 } 281 } 282 283 private void handleReceivedData(ptrdiff_t len) { 284 version (HUNT_IO_DEBUG) 285 tracef("reading done: %d nbytes", readLen); 286 287 if (dataReceivedHandler is null) 288 return; 289 290 _bufferForRead.limit(cast(int)readLen); 291 _bufferForRead.position(0); 292 // dataReceivedHandler(_bufferForRead); 293 294 ByteBuffer bufferCopy = BufferUtils.clone(_bufferForRead); 295 if(taskWorker is null) { 296 dataReceivedHandler(bufferCopy); 297 } else { 298 ChannelTask task = _task; 299 300 // FIXME: Needing refactor or cleanup -@zhangxueping at 2021-02-05T09:18:02+08:00 301 // More tests needed 302 if(task is null || task.isFinishing()) { 303 task = createChannelTask(); 304 _task = task; 305 306 } else { 307 version(HUNT_METRIC) { 308 warningf("Request peeding... Task status: %s", task.status); 309 } 310 } 311 312 task.put(bufferCopy); 313 } 314 315 } 316 317 private ChannelTask createChannelTask() { 318 ChannelTask task = new ChannelTask(); 319 task.dataReceivedHandler = dataReceivedHandler; 320 taskWorker.put(task); 321 return task; 322 } 323 324 // try to write a block of data directly 325 protected size_t tryWrite(const ubyte[] data) { 326 version (HUNT_IO_DEBUG) 327 tracef("start to write, total=%d bytes, fd=%d", data.length, this.handle); 328 clearError(); 329 size_t nBytes; 330 //scope(exit) { 331 // _isSingleWriteBusy = false; 332 //} 333 if (!_isSingleWriteBusy) 334 { 335 nBytes = doWrite(data); 336 } 337 338 return nBytes; 339 } 340 341 // try to write a block of data from the write queue 342 private void tryNextBufferWrite() { 343 if(checkAllWriteDone()){ 344 _isSingleWriteBusy = false; 345 if (!isClient()) 346 { 347 this.beginRead(); 348 } 349 return; 350 } 351 352 // keep thread-safe here 353 //if(!cas(&_isSingleWriteBusy, false, true)) { 354 // version (HUNT_IO_DEBUG) warningf("busy writing. fd=%d", this.handle); 355 // return; 356 //} 357 358 //scope(exit) { 359 // _isSingleWriteBusy = false; 360 //} 361 362 clearError(); 363 364 bool haveBuffer = _writeQueue.tryDequeue(writeBuffer); 365 if(haveBuffer) { 366 writeBufferRemaining(); 367 } else { 368 version (HUNT_IO_DEBUG) 369 warning("No buffer in queue"); 370 } 371 } 372 373 private void writeBufferRemaining() { 374 if (writeBuffer is null ) 375 { 376 return; 377 } 378 const(ubyte)[] data = cast(const(ubyte)[])writeBuffer.peekRemaining(); 379 380 size_t nBytes = doWrite(data); 381 382 version (HUNT_IO_DEBUG) 383 tracef("written data: %d bytes, fd=%d", nBytes, this.handle); 384 if(nBytes == data.length) { 385 writeBuffer = null; 386 } else if (nBytes > 0) { 387 writeBuffer.nextGetIndex(cast(int)nBytes); 388 version (HUNT_IO_DEBUG) 389 warningf("remaining data: %d / %d, fd=%d", data.length - nBytes, data.length, this.handle); 390 } else { 391 version (HUNT_IO_DEBUG) 392 warningf("I/O busy: writing. fd=%d", this.handle); 393 } 394 } 395 396 protected bool checkAllWriteDone() { 397 if(_writeQueue is null || (_writeQueue.isEmpty() && writeBuffer is null)) { 398 resetWriteStatus(); 399 version (HUNT_IO_DEBUG) 400 tracef("All data are written out. fd=%d", this.handle); 401 if(dataWriteDoneHandler !is null) 402 dataWriteDoneHandler(this); 403 return true; 404 } 405 406 return false; 407 } 408 409 void resetWriteStatus() { 410 if(_writeQueue !is null) 411 _writeQueue.clear(); 412 _isWritting = false; 413 _isWriteCancelling = false; 414 sendDataBuffer = null; 415 sendDataBackupBuffer = null; 416 writeBuffer = null; 417 _isSingleWriteBusy = false; 418 } 419 420 /** 421 * Called by selector after data sent 422 * Note: It's only for IOCP selector: 423 */ 424 void onWriteDone(size_t nBytes) { 425 version (HUNT_IO_DEBUG) { 426 tracef("write done once: %d bytes, isWritting: %s, writeBuffer: %s, fd=%d", 427 nBytes, _isWritting, writeBuffer is null, this.handle); 428 } 429 //if (_isWriteCancelling) { 430 // version (HUNT_IO_DEBUG) tracef("write cancelled."); 431 // resetWriteStatus(); 432 // return; 433 //} 434 435 436 //while(_isSingleWriteBusy) { 437 // version(HUNT_IO_DEBUG) 438 // info("waiting for last writting get finished..."); 439 //} 440 441 version (HUNT_IO_DEBUG) { 442 tracef("write done once: %d bytes, isWritting: %s, writeBuffer: %s, fd=%d", 443 nBytes, _isWritting, writeBuffer is null, this.handle); 444 } 445 446 if (writeBuffer !is null && writeBuffer.hasRemaining()) { 447 version (HUNT_IO_DEBUG) tracef("try to write the remaining in buffer."); 448 writeBufferRemaining(); 449 } else { 450 version (HUNT_IO_DEBUG) tracef("try to write next buffer."); 451 tryNextBufferWrite(); 452 } 453 } 454 455 private void notifyDataWrittenDone() { 456 if(dataWriteDoneHandler !is null && _writeQueue.isEmpty()) { 457 dataWriteDoneHandler(this); 458 } 459 } 460 461 462 DataReceivedHandler getDataReceivedHandler() { 463 return dataReceivedHandler; 464 } 465 466 void cancelWrite() { 467 _isWriteCancelling = true; 468 } 469 470 abstract bool isConnected() nothrow; 471 abstract protected void onDisconnected(); 472 473 protected void initializeWriteQueue() { 474 if (_writeQueue is null) { 475 _writeQueue = new WritingBufferQueue(); 476 } 477 } 478 479 SimpleEventHandler disconnectionHandler; 480 481 protected WritingBufferQueue _writeQueue; 482 protected bool _isWriteCancelling = false; 483 private bool _isSingleWriteBusy = false; // keep a single I/O write operation atomic 484 private const(ubyte)[] _readBuffer; 485 private const(ubyte)[] sendDataBuffer; 486 private const(ubyte)[] sendDataBackupBuffer; 487 private ByteBuffer writeBuffer; 488 489 private IocpContext _iocpread; 490 private IocpContext _iocpwrite; 491 }