1 module hunt.io.channel.posix.AbstractStream; 2 3 // dfmt off 4 version(Posix): 5 // dfmt on 6 7 import hunt.io.BufferUtils; 8 import hunt.io.ByteBuffer; 9 import hunt.event.selector.Selector; 10 import hunt.Functions; 11 import hunt.io.channel.AbstractSocketChannel; 12 import hunt.io.channel.Common; 13 import hunt.io.IoError; 14 import hunt.io.worker.WorkerGroupObject; 15 import hunt.logging.ConsoleLogger; 16 import hunt.system.Error; 17 18 19 import std.format; 20 import std.socket; 21 22 import core.atomic; 23 import core.stdc.errno; 24 import core.stdc.string; 25 import core.sys.posix.sys.socket : accept; 26 import core.sys.posix.unistd; 27 28 enum string ResponseData = "HTTP/1.1 200 OK\r\nContent-Length: 13\r\nConnection: Keep-Alive\r\nContent-Type: text/plain\r\nServer: Hunt/1.0\r\nDate: Wed, 17 Apr 2013 12:00:00 GMT\r\n\r\nHello, World!"; 29 30 /** 31 TCP Peer 32 */ 33 abstract class AbstractStream : AbstractSocketChannel { 34 enum BufferSize = 4096; 35 private const(ubyte)[] _readBuffer; 36 private ByteBuffer writeBuffer; 37 38 /** 39 * Warning: The received data is stored a inner buffer. For a data safe, 40 * you would make a copy of it. 41 */ 42 protected DataReceivedHandler dataReceivedHandler; 43 protected SimpleEventHandler disconnectionHandler; 44 protected SimpleActionHandler dataWriteDoneHandler; 45 46 protected AddressFamily _family; 47 protected ByteBuffer _bufferForRead; 48 protected WritingBufferQueue _writeQueue; 49 protected bool _isWriteCancelling = false; 50 51 this(Selector loop, AddressFamily family = AddressFamily.INET, size_t bufferSize = 4096 * 2) { 52 this._family = family; 53 _bufferForRead = BufferUtils.allocate(bufferSize); 54 _bufferForRead.limit(cast(int)bufferSize); 55 _readBuffer = cast(ubyte[])_bufferForRead.array(); 56 // _writeQueue = new WritingBufferQueue(); 57 super(loop, ChannelType.TCP); 58 setFlag(ChannelFlag.Read, true); 59 setFlag(ChannelFlag.Write, true); 60 setFlag(ChannelFlag.ETMode, true); 61 } 62 63 abstract bool isClient(); 64 abstract bool isConnected() nothrow; 65 abstract protected void onDisconnected(); 66 67 /** 68 * 69 */ 70 protected bool tryRead() { 71 bool isDone = true; 72 this.clearError(); 73 ptrdiff_t len = read(this.handle, cast(void*) _readBuffer.ptr, _readBuffer.length); 74 75 // ubyte[] rb = new ubyte[BufferSize]; 76 // ptrdiff_t len = read(this.handle, cast(void*) rb.ptr, rb.length); 77 version (HUNT_IO_DEBUG) { 78 tracef("reading[fd=%d]: %d bytes", this.handle, len); 79 } 80 81 if (len > 0) { 82 version(HUNT_IO_DEBUG) { 83 if (len <= 32) 84 infof("fd: %d, %d bytes: %(%02X %)", this.handle, len, _readBuffer[0 .. len]); 85 else 86 infof("fd: %d, 32/%d bytes: %(%02X %)", this.handle, len, _readBuffer[0 .. 32]); 87 } 88 if (dataReceivedHandler !is null) { 89 _bufferForRead.limit(cast(int)len); 90 _bufferForRead.position(0); 91 92 if (gWorkerGroup !is null) 93 { 94 gWorkerGroup.put(this, BufferUtils.clone(_bufferForRead)); 95 }else 96 { 97 dataReceivedHandler(_bufferForRead); 98 } 99 // ByteBuffer bb = BufferUtils.wrap(cast(byte[])rb[0..len]); 100 // dataReceivedHandler(bb); 101 } 102 103 // size_t nBytes = tryWrite(cast(ubyte[])ResponseData); 104 105 // if(nBytes < ResponseData.length) { 106 // warning("data lost"); 107 // } 108 109 // It's prossible that there are more data waitting for read in the read I/O space. 110 if (len == _readBuffer.length) { 111 version (HUNT_IO_DEBUG) infof("Read buffer is full read %d bytes. Need to read again.", len); 112 isDone = false; 113 } 114 } else if (len == Socket.ERROR) { 115 // https://stackoverflow.com/questions/14595269/errno-35-eagain-returned-on-recv-call 116 // FIXME: Needing refactor or cleanup -@Administrator at 2018-5-8 16:06:13 117 // check more error status 118 this._error = errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK; 119 if (_error) { 120 this._errorMessage = getErrorMessage(errno); 121 122 if(errno == ECONNRESET) { 123 // https://stackoverflow.com/questions/1434451/what-does-connection-reset-by-peer-mean 124 onDisconnected(); 125 errorOccurred(ErrorCode.CONNECTIONEESET , "connection reset by peer"); 126 } else { 127 errorOccurred(ErrorCode.INTERRUPTED , "Error occurred on read"); 128 } 129 } else { 130 debug warningf("warning on read: fd=%d, errno=%d, message=%s", this.handle, 131 errno, getErrorMessage(errno)); 132 } 133 134 } else { 135 version (HUNT_DEBUG) 136 infof("connection broken: %s, fd:%d", _remoteAddress.toString(), this.handle); 137 onDisconnected(); 138 } 139 140 return isDone; 141 } 142 143 override protected void doClose() { 144 version (HUNT_IO_DEBUG) { 145 infof("peer socket %s closing: fd=%d", this.remoteAddress.toString(), this.handle); 146 } 147 if(this.socket is null) { 148 import core.sys.posix.unistd; 149 core.sys.posix.unistd.close(this.handle); 150 } else { 151 this.socket.shutdown(SocketShutdown.BOTH); 152 this.socket.close(); 153 } 154 155 version (HUNT_IO_DEBUG) { 156 infof("peer socket %s closed: fd=%d", this.remoteAddress.toString, this.handle); 157 } 158 } 159 160 161 /** 162 * Try to write a block of data. 163 */ 164 protected ptrdiff_t tryWrite(const(ubyte)[] data) { 165 clearError(); 166 // const nBytes = this.socket.send(data); 167 version (HUNT_IO_DEBUG) 168 tracef("try to write: %d bytes, fd=%d", data.length, this.handle); 169 const nBytes = write(this.handle, data.ptr, data.length); 170 version (HUNT_IO_DEBUG) 171 tracef("actually written: %d / %d bytes, fd=%d", nBytes, data.length, this.handle); 172 173 if (nBytes > 0) { 174 return nBytes; 175 } 176 177 if (nBytes == Socket.ERROR) { 178 // FIXME: Needing refactor or cleanup -@Administrator at 2018-5-8 16:07:38 179 // check more error status 180 // EPIPE/Broken pipe: 181 // https://github.com/angrave/SystemProgramming/wiki/Networking%2C-Part-7%3A-Nonblocking-I-O%2C-select%28%29%2C-and-epoll 182 183 if(errno == EAGAIN) { 184 version (HUNT_IO_DEBUG) { 185 warningf("warning on write: fd=%d, errno=%d, message=%s", this.handle, 186 errno, getErrorMessage(errno)); 187 } 188 } else if(errno == EINTR || errno == EWOULDBLOCK) { 189 // https://stackoverflow.com/questions/38964745/can-a-socket-become-writeable-after-an-ewouldblock-but-before-an-epoll-wait 190 debug warningf("warning on write: fd=%d, errno=%d, message=%s", this.handle, 191 errno, getErrorMessage(errno)); 192 // eventLoop.update(this); 193 } else { 194 this._error = true; 195 this._errorMessage = getErrorMessage(errno); 196 if(errno == ECONNRESET) { 197 // https://stackoverflow.com/questions/1434451/what-does-connection-reset-by-peer-mean 198 onDisconnected(); 199 errorOccurred(ErrorCode.CONNECTIONEESET , "connection reset by peer"); 200 } else if(errno == EPIPE) { 201 // https://stackoverflow.com/questions/6824265/sigpipe-broken-pipe 202 // Handle SIGPIPE signal 203 onDisconnected(); 204 errorOccurred(ErrorCode.BROKENPIPE , "Broken pipe detected!"); 205 } 206 207 } 208 } else { 209 version (HUNT_DEBUG) { 210 warningf("nBytes=%d, message: %s", nBytes, lastSocketError()); 211 assert(false, "Undefined behavior!"); 212 } else { 213 this._error = true; 214 } 215 } 216 217 return 0; 218 } 219 220 private bool tryNextWrite(ByteBuffer buffer) { 221 const(ubyte)[] data = cast(const(ubyte)[])buffer.getRemaining(); 222 version (HUNT_IO_DEBUG) { 223 tracef("writting from a buffer [fd=%d], %d bytes, buffer: %s", 224 this.handle, data.length, buffer.toString()); 225 } 226 227 ptrdiff_t remaining = data.length; 228 if(data.length == 0) 229 return true; 230 231 while(remaining > 0 && !_error && !isClosing() && !_isWriteCancelling) { 232 ptrdiff_t nBytes = tryWrite(data); 233 version (HUNT_IO_DEBUG) 234 { 235 tracef("write out once: fd=%d, %d / %d bytes, remaining: %d buffer: %s", 236 this.handle, nBytes, data.length, remaining, buffer.toString()); 237 } 238 239 if (nBytes > 0) { 240 remaining -= nBytes; 241 data = data[nBytes .. $]; 242 } 243 } 244 245 version (HUNT_IO_DEBUG) { 246 if(remaining == 0) { 247 tracef("A buffer is written out. fd=%d", this.handle); 248 return true; 249 } else { 250 warningf("Writing cancelled or an error ocurred. fd=%d", this.handle); 251 return false; 252 } 253 } else { 254 return remaining == 0; 255 } 256 } 257 258 void resetWriteStatus() { 259 if(_writeQueue !is null) 260 _writeQueue.clear(); 261 atomicStore(_isWritting, false); 262 _isWriteCancelling = false; 263 } 264 265 /** 266 * Should be thread-safe. 267 */ 268 override void onWrite() { 269 version (HUNT_IO_DEBUG) 270 { 271 tracef("checking status, isWritting: %s, writeBuffer: %s", 272 _isWritting, writeBuffer is null ? "null" : writeBuffer.toString()); 273 } 274 275 if(!_isWritting) { 276 version (HUNT_IO_DEBUG) 277 infof("No data needs to be written out. fd=%d", this.handle); 278 return; 279 } 280 281 if(isClosing() && _isWriteCancelling) { 282 version (HUNT_DEBUG) infof("Write cancelled or closed, fd=%d", this.handle); 283 resetWriteStatus(); 284 return; 285 } 286 287 // FIXME: Needing refactor or cleanup -@zhangxueping at 2020-04-24T14:26:45+08:00 288 // More tests are needed 289 // keep thread-safe here 290 if(!cas(&_isBusyWritting, false, true)) { 291 // version (HUNT_IO_DEBUG) 292 version(HUNT_DEBUG) warningf("busy writing. fd=%d", this.handle); 293 return; 294 } 295 296 scope(exit) { 297 _isBusyWritting = false; 298 } 299 300 if(writeBuffer !is null) { 301 if(tryNextWrite(writeBuffer)) { 302 writeBuffer = null; 303 } else { 304 version (HUNT_IO_DEBUG) 305 { 306 infof("waiting to try again... fd=%d, writeBuffer: %s", 307 this.handle, writeBuffer.toString()); 308 } 309 // eventLoop.update(this); 310 return; 311 } 312 version (HUNT_IO_DEBUG) 313 tracef("running here, fd=%d", this.handle); 314 } 315 316 if(checkAllWriteDone()) { 317 return; 318 } 319 320 version (HUNT_IO_DEBUG) { 321 tracef("start to write [fd=%d], writeBuffer %s empty", this.handle, writeBuffer is null ? "is" : "is not"); 322 } 323 324 if(_writeQueue.tryDequeue(writeBuffer)) { 325 if(tryNextWrite(writeBuffer)) { 326 writeBuffer = null; 327 checkAllWriteDone(); 328 } else { 329 version (HUNT_IO_DEBUG) 330 infof("waiting to try again: fd=%d, writeBuffer: %s", this.handle, writeBuffer.toString()); 331 332 // eventLoop.update(this); 333 } 334 version (HUNT_IO_DEBUG) { 335 warningf("running here, fd=%d", this.handle); 336 } 337 } 338 } 339 private shared bool _isBusyWritting = false; 340 341 protected bool checkAllWriteDone() { 342 version (HUNT_IO_DEBUG) { 343 import std.conv; 344 tracef("checking remaining: fd=%d, writeQueue empty: %s", this.handle, 345 _writeQueue is null || _writeQueue.isEmpty().to!string()); 346 } 347 348 if(_writeQueue is null || _writeQueue.isEmpty()) { 349 resetWriteStatus(); 350 version (HUNT_IO_DEBUG) 351 infof("All data are written out: fd=%d", this.handle); 352 if(dataWriteDoneHandler !is null) 353 dataWriteDoneHandler(this); 354 return true; 355 } 356 357 return false; 358 } 359 360 protected void initializeWriteQueue() { 361 if (_writeQueue is null) { 362 _writeQueue = new WritingBufferQueue(); 363 } 364 } 365 366 protected bool doConnect(Address addr) { 367 try { 368 this.socket.connect(addr); 369 } catch (SocketOSException e) { 370 error(e.msg); 371 version(HUNT_DEBUG) error(e); 372 return false; 373 } 374 return true; 375 } 376 377 void cancelWrite() { 378 _isWriteCancelling = true; 379 } 380 381 bool isWriteCancelling() { 382 return _isWriteCancelling; 383 } 384 385 DataReceivedHandler getDataReceivedHandler() { 386 return dataReceivedHandler; 387 } 388 389 }