1 module hunt.io.channel.posix.AbstractStream; 2 3 // dfmt off 4 version(Posix): 5 // dfmt on 6 7 import hunt.event.selector.Selector; 8 import hunt.Functions; 9 import hunt.io.BufferUtils; 10 import hunt.io.ByteBuffer; 11 import hunt.io.channel.AbstractSocketChannel; 12 import hunt.io.channel.ChannelTask; 13 import hunt.io.channel.Common; 14 import hunt.io.IoError; 15 import hunt.io.SimpleQueue; 16 import hunt.logging; 17 import hunt.system.Error; 18 import hunt.util.worker; 19 20 21 import std.format; 22 import std.socket; 23 24 import core.atomic; 25 import core.stdc.errno; 26 import core.stdc.string; 27 import core.sys.posix.sys.socket : accept; 28 import core.sys.posix.unistd; 29 30 31 /** 32 TCP Peer 33 */ 34 abstract class AbstractStream : AbstractSocketChannel { 35 private size_t _bufferSize = 4096; 36 private const(ubyte)[] _readBuffer; 37 private ByteBuffer writeBuffer; 38 private ChannelTask _task = null; 39 40 /** 41 * Warning: The received data is stored a inner buffer. For a data safe, 42 * you would make a copy of it. 43 */ 44 protected DataReceivedHandler dataReceivedHandler; 45 protected SimpleEventHandler disconnectionHandler; 46 protected SimpleActionHandler dataWriteDoneHandler; 47 48 protected AddressFamily _family; 49 // protected ByteBuffer _bufferForRead; 50 protected WritingBufferQueue _writeQueue; 51 protected bool _isWriteCancelling = false; 52 53 this(Selector loop, AddressFamily family = AddressFamily.INET, size_t bufferSize = 4096 * 2) { 54 this._family = family; 55 _bufferSize = bufferSize; 56 // _bufferForRead = BufferUtils.allocate(bufferSize); 57 // _bufferForRead.limit(cast(int)bufferSize); 58 // _readBuffer = cast(ubyte[])_bufferForRead.array(); 59 // _writeQueue = new WritingBufferQueue(); 60 super(loop, ChannelType.TCP); 61 setFlag(ChannelFlag.Read, true); 62 setFlag(ChannelFlag.Write, true); 63 setFlag(ChannelFlag.ETMode, true); 64 } 65 66 abstract bool isClient(); 67 abstract bool isConnected() nothrow; 68 abstract protected void onDisconnected(); 69 70 private void onDataReceived(ByteBuffer buffer) { 71 72 if (dataReceivedHandler is null) 73 return; 74 75 // _bufferForRead.limit(cast(int)len); 76 // _bufferForRead.position(0); 77 78 if(taskWorker is null) { 79 // TODO: Tasks pending completion -@zhangxueping at 2021-03-09T09:59:00+08:00 80 // Using memory pool 81 // ByteBuffer bufferCopy = BufferUtils.clone(_bufferForRead); 82 // dataReceivedHandler(bufferCopy); 83 dataReceivedHandler(buffer); 84 } else { 85 ChannelTask task = _task; 86 87 // FIXME: Needing refactor or cleanup -@zhangxueping at 2021-02-05T09:18:02+08:00 88 // More tests needed 89 if(task is null || task.isFinishing()) { 90 task = createChannelTask(); 91 _task = task; 92 93 } else { 94 version(HUNT_METRIC) { 95 warningf("Request peeding... Task status: %s", task.status); 96 } 97 } 98 99 task.put(buffer); 100 } 101 } 102 103 private ChannelTask createChannelTask() { 104 ChannelTask task = new ChannelTask(); 105 task.dataReceivedHandler = dataReceivedHandler; 106 taskWorker.put(task); 107 return task; 108 } 109 110 /** 111 * 112 */ 113 protected bool tryRead() { 114 bool isDone = true; 115 this.clearError(); 116 117 // TODO: Tasks pending completion -@zhangxueping at 2021-03-09T09:59:00+08:00 118 // Using memory pool 119 // if(taskWorker !is null) { 120 ByteBuffer _bufferForRead = BufferUtils.allocate(_bufferSize); 121 _bufferForRead.limit(cast(int)_bufferSize); 122 ubyte[] _readBuffer = cast(ubyte[])_bufferForRead.array(); 123 // } 124 ptrdiff_t len = read(this.handle, cast(void*) _readBuffer.ptr, _readBuffer.length); 125 126 // ubyte[] rb = new ubyte[BufferSize]; 127 // ptrdiff_t len = read(this.handle, cast(void*) rb.ptr, rb.length); 128 version (HUNT_IO_DEBUG) { 129 tracef("reading[fd=%d]: %d bytes", this.handle, len); 130 } 131 132 if (len > 0) { 133 version(HUNT_IO_DEBUG) { 134 if (len <= 32) 135 infof("fd: %d, %d bytes: %(%02X %)", this.handle, len, _readBuffer[0 .. len]); 136 else 137 infof("fd: %d, 32/%d bytes: %(%02X %)", this.handle, len, _readBuffer[0 .. 32]); 138 } 139 140 _bufferForRead.limit(cast(int)len); 141 _bufferForRead.position(0); 142 onDataReceived(_bufferForRead); 143 144 // It's prossible that there are more data waitting for read in the read I/O space. 145 if (len == _readBuffer.length) { 146 version (HUNT_IO_DEBUG) infof("Read buffer is full read %d bytes. Need to read again.", len); 147 isDone = false; 148 } 149 } else if (len == Socket.ERROR) { 150 // https://stackoverflow.com/questions/14595269/errno-35-eagain-returned-on-recv-call 151 // FIXME: Needing refactor or cleanup -@Administrator at 2018-5-8 16:06:13 152 // check more error status 153 this._error = errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK; 154 if (_error) { 155 this._errorMessage = getErrorMessage(errno); 156 157 version(HUNT_NET_DEBUG) { 158 warningf("Error occurred on read, code: %d, msg: %s, isClosing: %s, isClosed: %s", 159 errno, _errorMessage, isClosing(), isClosed()); 160 } 161 162 if(errno == ECONNRESET) { 163 // https://stackoverflow.com/questions/1434451/what-does-connection-reset-by-peer-mean 164 onDisconnected(); 165 errorOccurred(ErrorCode.CONNECTIONEESET , "connection reset by peer"); 166 } else { 167 if(!isClosed()) { 168 errorOccurred(ErrorCode.INTERRUPTED , format("Error occurred on read, code: %d", errno)); 169 } 170 } 171 } else { 172 debug warningf("warning on read: fd=%d, errno=%d, message=%s", this.handle, 173 errno, getErrorMessage(errno)); 174 } 175 176 } else { 177 version (HUNT_DEBUG) 178 infof("connection broken: %s, fd:%d", _remoteAddress.toString(), this.handle); 179 onDisconnected(); 180 } 181 182 return isDone; 183 } 184 185 override protected void doClose() { 186 version (HUNT_IO_DEBUG) { 187 infof("peer socket %s closing: fd=%d", this.remoteAddress.toString(), this.handle); 188 } 189 if(this.socket is null) { 190 import core.sys.posix.unistd; 191 core.sys.posix.unistd.close(this.handle); 192 } else { 193 this.socket.shutdown(SocketShutdown.BOTH); 194 this.socket.close(); 195 } 196 197 version (HUNT_IO_DEBUG) { 198 infof("peer socket %s closed: fd=%d", this.remoteAddress.toString, this.handle); 199 } 200 201 Task task = _task; 202 if(task !is null) { 203 task.stop(); 204 } 205 } 206 207 208 /** 209 * Try to write a block of data. 210 */ 211 protected ptrdiff_t tryWrite(const(ubyte)[] data) { 212 clearError(); 213 // const nBytes = this.socket.send(data); 214 version (HUNT_IO_DEBUG) 215 tracef("try to write: %d bytes, fd=%d", data.length, this.handle); 216 const nBytes = write(this.handle, data.ptr, data.length); 217 version (HUNT_IO_DEBUG) 218 tracef("actually written: %d / %d bytes, fd=%d", nBytes, data.length, this.handle); 219 220 if (nBytes > 0) { 221 return nBytes; 222 } 223 224 if (nBytes == Socket.ERROR) { 225 // FIXME: Needing refactor or cleanup -@Administrator at 2018-5-8 16:07:38 226 // check more error status 227 // EPIPE/Broken pipe: 228 // https://github.com/angrave/SystemProgramming/wiki/Networking%2C-Part-7%3A-Nonblocking-I-O%2C-select%28%29%2C-and-epoll 229 230 if(errno == EAGAIN) { 231 version (HUNT_IO_DEBUG) { 232 warningf("warning on write: fd=%d, errno=%d, message=%s", this.handle, 233 errno, getErrorMessage(errno)); 234 } 235 } else if(errno == EINTR || errno == EWOULDBLOCK) { 236 // https://stackoverflow.com/questions/38964745/can-a-socket-become-writeable-after-an-ewouldblock-but-before-an-epoll-wait 237 debug warningf("warning on write: fd=%d, errno=%d, message=%s", this.handle, 238 errno, getErrorMessage(errno)); 239 // eventLoop.update(this); 240 } else { 241 this._error = true; 242 this._errorMessage = getErrorMessage(errno); 243 if(errno == ECONNRESET) { 244 // https://stackoverflow.com/questions/1434451/what-does-connection-reset-by-peer-mean 245 onDisconnected(); 246 errorOccurred(ErrorCode.CONNECTIONEESET , "connection reset by peer"); 247 } else if(errno == EPIPE) { 248 // https://stackoverflow.com/questions/6824265/sigpipe-broken-pipe 249 // Handle SIGPIPE signal 250 onDisconnected(); 251 errorOccurred(ErrorCode.BROKENPIPE , "Broken pipe detected!"); 252 } 253 254 } 255 } else { 256 version (HUNT_DEBUG) { 257 warningf("nBytes=%d, message: %s", nBytes, lastSocketError()); 258 assert(false, "Undefined behavior!"); 259 } else { 260 this._error = true; 261 } 262 } 263 264 return 0; 265 } 266 267 private bool tryNextWrite(ByteBuffer buffer) { 268 const(ubyte)[] data = cast(const(ubyte)[])buffer.peekRemaining(); 269 version (HUNT_IO_DEBUG) { 270 tracef("writting from a buffer [fd=%d], %d bytes, buffer: %s", 271 this.handle, data.length, buffer.toString()); 272 } 273 274 ptrdiff_t remaining = data.length; 275 if(data.length == 0) 276 return true; 277 278 while(remaining > 0 && !_error && !isClosing() && !_isWriteCancelling) { 279 ptrdiff_t nBytes = tryWrite(data); 280 version (HUNT_IO_DEBUG) 281 { 282 tracef("write out once: fd=%d, %d / %d bytes, remaining: %d buffer: %s", 283 this.handle, nBytes, data.length, remaining, buffer.toString()); 284 } 285 286 if (nBytes > 0) { 287 remaining -= nBytes; 288 data = data[nBytes .. $]; 289 } 290 } 291 292 version (HUNT_IO_DEBUG) { 293 if(remaining == 0) { 294 tracef("A buffer is written out. fd=%d", this.handle); 295 return true; 296 } else { 297 warningf("Writing cancelled or an error ocurred. fd=%d", this.handle); 298 return false; 299 } 300 } else { 301 return remaining == 0; 302 } 303 } 304 305 void resetWriteStatus() { 306 if(_writeQueue !is null) 307 _writeQueue.clear(); 308 atomicStore(_isWritting, false); 309 _isWriteCancelling = false; 310 } 311 312 /** 313 * Should be thread-safe. 314 */ 315 override void onWrite() { 316 version (HUNT_IO_DEBUG) 317 { 318 tracef("checking status, isWritting: %s, writeBuffer: %s", 319 _isWritting, writeBuffer is null ? "null" : writeBuffer.toString()); 320 } 321 322 if(!_isWritting) { 323 version (HUNT_IO_DEBUG) 324 infof("No data needs to be written out. fd=%d", this.handle); 325 return; 326 } 327 328 if(isClosing() && _isWriteCancelling) { 329 version (HUNT_DEBUG) infof("Write cancelled or closed, fd=%d", this.handle); 330 resetWriteStatus(); 331 return; 332 } 333 334 // FIXME: Needing refactor or cleanup -@zhangxueping at 2020-04-24T14:26:45+08:00 335 // More tests are needed 336 // keep thread-safe here 337 if(!cas(&_isBusyWritting, false, true)) { 338 // version (HUNT_IO_DEBUG) 339 version(HUNT_DEBUG) warningf("busy writing. fd=%d", this.handle); 340 return; 341 } 342 343 scope(exit) { 344 _isBusyWritting = false; 345 } 346 347 if(writeBuffer !is null) { 348 if(tryNextWrite(writeBuffer)) { 349 writeBuffer = null; 350 } else { 351 version (HUNT_IO_DEBUG) 352 { 353 infof("waiting to try again... fd=%d, writeBuffer: %s", 354 this.handle, writeBuffer.toString()); 355 } 356 // eventLoop.update(this); 357 return; 358 } 359 version (HUNT_IO_DEBUG) 360 tracef("running here, fd=%d", this.handle); 361 } 362 363 if(checkAllWriteDone()) { 364 return; 365 } 366 367 version (HUNT_IO_DEBUG) { 368 tracef("start to write [fd=%d], writeBuffer %s empty", this.handle, writeBuffer is null ? "is" : "is not"); 369 } 370 371 if(_writeQueue.tryDequeue(writeBuffer)) { 372 if(tryNextWrite(writeBuffer)) { 373 writeBuffer = null; 374 checkAllWriteDone(); 375 } else { 376 version (HUNT_IO_DEBUG) 377 infof("waiting to try again: fd=%d, writeBuffer: %s", this.handle, writeBuffer.toString()); 378 379 // eventLoop.update(this); 380 } 381 version (HUNT_IO_DEBUG) { 382 warningf("running here, fd=%d", this.handle); 383 } 384 } 385 } 386 private shared bool _isBusyWritting = false; 387 388 protected bool checkAllWriteDone() { 389 version (HUNT_IO_DEBUG) { 390 import std.conv; 391 tracef("checking remaining: fd=%d, writeQueue empty: %s", this.handle, 392 _writeQueue is null || _writeQueue.isEmpty().to!string()); 393 } 394 395 if(_writeQueue is null || _writeQueue.isEmpty()) { 396 resetWriteStatus(); 397 version (HUNT_IO_DEBUG) 398 infof("All data are written out: fd=%d", this.handle); 399 if(dataWriteDoneHandler !is null) 400 dataWriteDoneHandler(this); 401 return true; 402 } 403 404 return false; 405 } 406 407 protected void initializeWriteQueue() { 408 if (_writeQueue is null) { 409 _writeQueue = new WritingBufferQueue(); 410 } 411 } 412 413 protected bool doConnect(Address addr) { 414 try { 415 this.socket.connect(addr); 416 } catch (SocketOSException e) { 417 error(e.msg); 418 version(HUNT_DEBUG) error(e); 419 return false; 420 } 421 return true; 422 } 423 424 void cancelWrite() { 425 _isWriteCancelling = true; 426 } 427 428 bool isWriteCancelling() { 429 return _isWriteCancelling; 430 } 431 432 DataReceivedHandler getDataReceivedHandler() { 433 return dataReceivedHandler; 434 } 435 436 }