1 module hunt.io.channel.posix.AbstractStream; 2 3 // dfmt off 4 version(Posix): 5 // dfmt on 6 7 import hunt.event.selector.Selector; 8 import hunt.Functions; 9 import hunt.io.BufferUtils; 10 import hunt.io.ByteBuffer; 11 import hunt.io.channel.AbstractSocketChannel; 12 import hunt.io.channel.ChannelTask; 13 import hunt.io.channel.Common; 14 import hunt.io.IoError; 15 import hunt.io.SimpleQueue; 16 import hunt.logging.ConsoleLogger; 17 import hunt.system.Error; 18 import hunt.util.worker; 19 20 21 import std.format; 22 import std.socket; 23 24 import core.atomic; 25 import core.stdc.errno; 26 import core.stdc.string; 27 import core.sys.posix.sys.socket : accept; 28 import core.sys.posix.unistd; 29 30 31 /** 32 TCP Peer 33 */ 34 abstract class AbstractStream : AbstractSocketChannel { 35 private size_t _bufferSize = 4096; 36 private const(ubyte)[] _readBuffer; 37 private ByteBuffer writeBuffer; 38 private ChannelTask _task = null; 39 40 /** 41 * Warning: The received data is stored a inner buffer. For a data safe, 42 * you would make a copy of it. 43 */ 44 protected DataReceivedHandler dataReceivedHandler; 45 protected SimpleEventHandler disconnectionHandler; 46 protected SimpleActionHandler dataWriteDoneHandler; 47 48 protected AddressFamily _family; 49 // protected ByteBuffer _bufferForRead; 50 protected WritingBufferQueue _writeQueue; 51 protected bool _isWriteCancelling = false; 52 53 this(Selector loop, AddressFamily family = AddressFamily.INET, size_t bufferSize = 4096 * 2) { 54 this._family = family; 55 _bufferSize = bufferSize; 56 // _bufferForRead = BufferUtils.allocate(bufferSize); 57 // _bufferForRead.limit(cast(int)bufferSize); 58 // _readBuffer = cast(ubyte[])_bufferForRead.array(); 59 // _writeQueue = new WritingBufferQueue(); 60 super(loop, ChannelType.TCP); 61 setFlag(ChannelFlag.Read, true); 62 setFlag(ChannelFlag.Write, true); 63 setFlag(ChannelFlag.ETMode, true); 64 } 65 66 abstract bool isClient(); 67 abstract bool isConnected() nothrow; 68 abstract protected void onDisconnected(); 69 70 private void onDataReceived(ByteBuffer buffer) { 71 72 if (dataReceivedHandler is null) 73 return; 74 75 // _bufferForRead.limit(cast(int)len); 76 // _bufferForRead.position(0); 77 78 if(taskWorker is null) { 79 // TODO: Tasks pending completion -@zhangxueping at 2021-03-09T09:59:00+08:00 80 // Using memory pool 81 // ByteBuffer bufferCopy = BufferUtils.clone(_bufferForRead); 82 // dataReceivedHandler(bufferCopy); 83 dataReceivedHandler(buffer); 84 } else { 85 ChannelTask task = _task; 86 87 // FIXME: Needing refactor or cleanup -@zhangxueping at 2021-02-05T09:18:02+08:00 88 // More tests needed 89 if(task is null || task.isFinishing()) { 90 task = createChannelTask(); 91 _task = task; 92 93 } else { 94 version(HUNT_METRIC) { 95 warningf("Request peeding... Task status: %s", task.status); 96 } 97 } 98 99 task.put(buffer); 100 } 101 } 102 103 private ChannelTask createChannelTask() { 104 ChannelTask task = new ChannelTask(); 105 task.dataReceivedHandler = dataReceivedHandler; 106 taskWorker.put(task); 107 return task; 108 } 109 110 /** 111 * 112 */ 113 protected bool tryRead() { 114 bool isDone = true; 115 this.clearError(); 116 117 // TODO: Tasks pending completion -@zhangxueping at 2021-03-09T09:59:00+08:00 118 // Using memory pool 119 // if(taskWorker !is null) { 120 ByteBuffer _bufferForRead = BufferUtils.allocate(_bufferSize); 121 _bufferForRead.limit(cast(int)_bufferSize); 122 ubyte[] _readBuffer = cast(ubyte[])_bufferForRead.array(); 123 // } 124 ptrdiff_t len = read(this.handle, cast(void*) _readBuffer.ptr, _readBuffer.length); 125 126 // ubyte[] rb = new ubyte[BufferSize]; 127 // ptrdiff_t len = read(this.handle, cast(void*) rb.ptr, rb.length); 128 version (HUNT_IO_DEBUG) { 129 tracef("reading[fd=%d]: %d bytes", this.handle, len); 130 } 131 132 if (len > 0) { 133 version(HUNT_IO_DEBUG) { 134 if (len <= 32) 135 infof("fd: %d, %d bytes: %(%02X %)", this.handle, len, _readBuffer[0 .. len]); 136 else 137 infof("fd: %d, 32/%d bytes: %(%02X %)", this.handle, len, _readBuffer[0 .. 32]); 138 } 139 140 _bufferForRead.limit(cast(int)len); 141 _bufferForRead.position(0); 142 onDataReceived(_bufferForRead); 143 144 // It's prossible that there are more data waitting for read in the read I/O space. 145 if (len == _readBuffer.length) { 146 version (HUNT_IO_DEBUG) infof("Read buffer is full read %d bytes. Need to read again.", len); 147 isDone = false; 148 } 149 } else if (len == Socket.ERROR) { 150 // https://stackoverflow.com/questions/14595269/errno-35-eagain-returned-on-recv-call 151 // FIXME: Needing refactor or cleanup -@Administrator at 2018-5-8 16:06:13 152 // check more error status 153 this._error = errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK; 154 if (_error) { 155 this._errorMessage = getErrorMessage(errno); 156 157 if(errno == ECONNRESET) { 158 // https://stackoverflow.com/questions/1434451/what-does-connection-reset-by-peer-mean 159 onDisconnected(); 160 errorOccurred(ErrorCode.CONNECTIONEESET , "connection reset by peer"); 161 } else { 162 errorOccurred(ErrorCode.INTERRUPTED , "Error occurred on read"); 163 } 164 } else { 165 debug warningf("warning on read: fd=%d, errno=%d, message=%s", this.handle, 166 errno, getErrorMessage(errno)); 167 } 168 169 } else { 170 version (HUNT_DEBUG) 171 infof("connection broken: %s, fd:%d", _remoteAddress.toString(), this.handle); 172 onDisconnected(); 173 } 174 175 return isDone; 176 } 177 178 override protected void doClose() { 179 version (HUNT_IO_DEBUG) { 180 infof("peer socket %s closing: fd=%d", this.remoteAddress.toString(), this.handle); 181 } 182 if(this.socket is null) { 183 import core.sys.posix.unistd; 184 core.sys.posix.unistd.close(this.handle); 185 } else { 186 this.socket.shutdown(SocketShutdown.BOTH); 187 this.socket.close(); 188 } 189 190 version (HUNT_IO_DEBUG) { 191 infof("peer socket %s closed: fd=%d", this.remoteAddress.toString, this.handle); 192 } 193 194 Task task = _task; 195 if(task !is null) { 196 task.stop(); 197 } 198 } 199 200 201 /** 202 * Try to write a block of data. 203 */ 204 protected ptrdiff_t tryWrite(const(ubyte)[] data) { 205 clearError(); 206 // const nBytes = this.socket.send(data); 207 version (HUNT_IO_DEBUG) 208 tracef("try to write: %d bytes, fd=%d", data.length, this.handle); 209 const nBytes = write(this.handle, data.ptr, data.length); 210 version (HUNT_IO_DEBUG) 211 tracef("actually written: %d / %d bytes, fd=%d", nBytes, data.length, this.handle); 212 213 if (nBytes > 0) { 214 return nBytes; 215 } 216 217 if (nBytes == Socket.ERROR) { 218 // FIXME: Needing refactor or cleanup -@Administrator at 2018-5-8 16:07:38 219 // check more error status 220 // EPIPE/Broken pipe: 221 // https://github.com/angrave/SystemProgramming/wiki/Networking%2C-Part-7%3A-Nonblocking-I-O%2C-select%28%29%2C-and-epoll 222 223 if(errno == EAGAIN) { 224 version (HUNT_IO_DEBUG) { 225 warningf("warning on write: fd=%d, errno=%d, message=%s", this.handle, 226 errno, getErrorMessage(errno)); 227 } 228 } else if(errno == EINTR || errno == EWOULDBLOCK) { 229 // https://stackoverflow.com/questions/38964745/can-a-socket-become-writeable-after-an-ewouldblock-but-before-an-epoll-wait 230 debug warningf("warning on write: fd=%d, errno=%d, message=%s", this.handle, 231 errno, getErrorMessage(errno)); 232 // eventLoop.update(this); 233 } else { 234 this._error = true; 235 this._errorMessage = getErrorMessage(errno); 236 if(errno == ECONNRESET) { 237 // https://stackoverflow.com/questions/1434451/what-does-connection-reset-by-peer-mean 238 onDisconnected(); 239 errorOccurred(ErrorCode.CONNECTIONEESET , "connection reset by peer"); 240 } else if(errno == EPIPE) { 241 // https://stackoverflow.com/questions/6824265/sigpipe-broken-pipe 242 // Handle SIGPIPE signal 243 onDisconnected(); 244 errorOccurred(ErrorCode.BROKENPIPE , "Broken pipe detected!"); 245 } 246 247 } 248 } else { 249 version (HUNT_DEBUG) { 250 warningf("nBytes=%d, message: %s", nBytes, lastSocketError()); 251 assert(false, "Undefined behavior!"); 252 } else { 253 this._error = true; 254 } 255 } 256 257 return 0; 258 } 259 260 private bool tryNextWrite(ByteBuffer buffer) { 261 const(ubyte)[] data = cast(const(ubyte)[])buffer.peekRemaining(); 262 version (HUNT_IO_DEBUG) { 263 tracef("writting from a buffer [fd=%d], %d bytes, buffer: %s", 264 this.handle, data.length, buffer.toString()); 265 } 266 267 ptrdiff_t remaining = data.length; 268 if(data.length == 0) 269 return true; 270 271 while(remaining > 0 && !_error && !isClosing() && !_isWriteCancelling) { 272 ptrdiff_t nBytes = tryWrite(data); 273 version (HUNT_IO_DEBUG) 274 { 275 tracef("write out once: fd=%d, %d / %d bytes, remaining: %d buffer: %s", 276 this.handle, nBytes, data.length, remaining, buffer.toString()); 277 } 278 279 if (nBytes > 0) { 280 remaining -= nBytes; 281 data = data[nBytes .. $]; 282 } 283 } 284 285 version (HUNT_IO_DEBUG) { 286 if(remaining == 0) { 287 tracef("A buffer is written out. fd=%d", this.handle); 288 return true; 289 } else { 290 warningf("Writing cancelled or an error ocurred. fd=%d", this.handle); 291 return false; 292 } 293 } else { 294 return remaining == 0; 295 } 296 } 297 298 void resetWriteStatus() { 299 if(_writeQueue !is null) 300 _writeQueue.clear(); 301 atomicStore(_isWritting, false); 302 _isWriteCancelling = false; 303 } 304 305 /** 306 * Should be thread-safe. 307 */ 308 override void onWrite() { 309 version (HUNT_IO_DEBUG) 310 { 311 tracef("checking status, isWritting: %s, writeBuffer: %s", 312 _isWritting, writeBuffer is null ? "null" : writeBuffer.toString()); 313 } 314 315 if(!_isWritting) { 316 version (HUNT_IO_DEBUG) 317 infof("No data needs to be written out. fd=%d", this.handle); 318 return; 319 } 320 321 if(isClosing() && _isWriteCancelling) { 322 version (HUNT_DEBUG) infof("Write cancelled or closed, fd=%d", this.handle); 323 resetWriteStatus(); 324 return; 325 } 326 327 // FIXME: Needing refactor or cleanup -@zhangxueping at 2020-04-24T14:26:45+08:00 328 // More tests are needed 329 // keep thread-safe here 330 if(!cas(&_isBusyWritting, false, true)) { 331 // version (HUNT_IO_DEBUG) 332 version(HUNT_DEBUG) warningf("busy writing. fd=%d", this.handle); 333 return; 334 } 335 336 scope(exit) { 337 _isBusyWritting = false; 338 } 339 340 if(writeBuffer !is null) { 341 if(tryNextWrite(writeBuffer)) { 342 writeBuffer = null; 343 } else { 344 version (HUNT_IO_DEBUG) 345 { 346 infof("waiting to try again... fd=%d, writeBuffer: %s", 347 this.handle, writeBuffer.toString()); 348 } 349 // eventLoop.update(this); 350 return; 351 } 352 version (HUNT_IO_DEBUG) 353 tracef("running here, fd=%d", this.handle); 354 } 355 356 if(checkAllWriteDone()) { 357 return; 358 } 359 360 version (HUNT_IO_DEBUG) { 361 tracef("start to write [fd=%d], writeBuffer %s empty", this.handle, writeBuffer is null ? "is" : "is not"); 362 } 363 364 if(_writeQueue.tryDequeue(writeBuffer)) { 365 if(tryNextWrite(writeBuffer)) { 366 writeBuffer = null; 367 checkAllWriteDone(); 368 } else { 369 version (HUNT_IO_DEBUG) 370 infof("waiting to try again: fd=%d, writeBuffer: %s", this.handle, writeBuffer.toString()); 371 372 // eventLoop.update(this); 373 } 374 version (HUNT_IO_DEBUG) { 375 warningf("running here, fd=%d", this.handle); 376 } 377 } 378 } 379 private shared bool _isBusyWritting = false; 380 381 protected bool checkAllWriteDone() { 382 version (HUNT_IO_DEBUG) { 383 import std.conv; 384 tracef("checking remaining: fd=%d, writeQueue empty: %s", this.handle, 385 _writeQueue is null || _writeQueue.isEmpty().to!string()); 386 } 387 388 if(_writeQueue is null || _writeQueue.isEmpty()) { 389 resetWriteStatus(); 390 version (HUNT_IO_DEBUG) 391 infof("All data are written out: fd=%d", this.handle); 392 if(dataWriteDoneHandler !is null) 393 dataWriteDoneHandler(this); 394 return true; 395 } 396 397 return false; 398 } 399 400 protected void initializeWriteQueue() { 401 if (_writeQueue is null) { 402 _writeQueue = new WritingBufferQueue(); 403 } 404 } 405 406 protected bool doConnect(Address addr) { 407 try { 408 this.socket.connect(addr); 409 } catch (SocketOSException e) { 410 error(e.msg); 411 version(HUNT_DEBUG) error(e); 412 return false; 413 } 414 return true; 415 } 416 417 void cancelWrite() { 418 _isWriteCancelling = true; 419 } 420 421 bool isWriteCancelling() { 422 return _isWriteCancelling; 423 } 424 425 DataReceivedHandler getDataReceivedHandler() { 426 return dataReceivedHandler; 427 } 428 429 }