1 module hunt.io.channel.posix.AbstractStream; 2 3 // dfmt off 4 version(Posix): 5 // dfmt on 6 7 import hunt.collection.BufferUtils; 8 import hunt.collection.ByteBuffer; 9 import hunt.event.selector.Selector; 10 import hunt.Functions; 11 import hunt.io.channel.AbstractSocketChannel; 12 import hunt.io.channel.Common; 13 import hunt.io.IoError; 14 import hunt.logging.ConsoleLogger; 15 import hunt.system.Error; 16 17 import std.format; 18 import std.socket; 19 20 import core.atomic; 21 import core.stdc.errno; 22 import core.stdc.string; 23 import core.sys.posix.sys.socket : accept; 24 import core.sys.posix.unistd; 25 26 enum string ResponseData = "HTTP/1.1 200 OK\r\nContent-Length: 13\r\nConnection: Keep-Alive\r\nContent-Type: text/plain\r\nServer: Hunt/1.0\r\nDate: Wed, 17 Apr 2013 12:00:00 GMT\r\n\r\nHello, World!"; 27 28 /** 29 TCP Peer 30 */ 31 abstract class AbstractStream : AbstractSocketChannel { 32 enum BufferSize = 4096; 33 private const(ubyte)[] _readBuffer; 34 private ByteBuffer writeBuffer; 35 36 /** 37 * Warning: The received data is stored a inner buffer. For a data safe, 38 * you would make a copy of it. 39 */ 40 protected DataReceivedHandler dataReceivedHandler; 41 protected SimpleEventHandler disconnectionHandler; 42 protected SimpleActionHandler dataWriteDoneHandler; 43 44 protected AddressFamily _family; 45 protected ByteBuffer _bufferForRead; 46 protected WritingBufferQueue _writeQueue; 47 protected bool isWriteCancelling = false; 48 49 this(Selector loop, AddressFamily family = AddressFamily.INET, size_t bufferSize = 4096 * 2) { 50 this._family = family; 51 _bufferForRead = BufferUtils.allocate(bufferSize); 52 _bufferForRead.limit(cast(int)bufferSize); 53 _readBuffer = cast(ubyte[])_bufferForRead.array(); 54 // _writeQueue = new WritingBufferQueue(); 55 super(loop, ChannelType.TCP); 56 setFlag(ChannelFlag.Read, true); 57 setFlag(ChannelFlag.Write, true); 58 setFlag(ChannelFlag.ETMode, true); 59 } 60 61 abstract bool isClient(); 62 abstract bool isConnected() nothrow; 63 abstract protected void onDisconnected(); 64 65 /** 66 * 67 */ 68 protected bool tryRead() { 69 bool isDone = true; 70 this.clearError(); 71 ptrdiff_t len = read(this.handle, cast(void*) _readBuffer.ptr, _readBuffer.length); 72 73 // ubyte[] rb = new ubyte[BufferSize]; 74 // ptrdiff_t len = read(this.handle, cast(void*) rb.ptr, rb.length); 75 version (HUNT_IO_DEBUG) { 76 tracef("reading[fd=%d]: %d bytes", this.handle, len); 77 } 78 79 if (len > 0) { 80 version(HUNT_IO_DEBUG) { 81 if (len <= 32) 82 infof("fd: %d, %d bytes: %(%02X %)", this.handle, len, _readBuffer[0 .. len]); 83 else 84 infof("fd: %d, 32/%d bytes: %(%02X %)", this.handle, len, _readBuffer[0 .. 32]); 85 } 86 87 if (dataReceivedHandler !is null) { 88 _bufferForRead.limit(cast(int)len); 89 _bufferForRead.position(0); 90 dataReceivedHandler(_bufferForRead); 91 92 // ByteBuffer bb = BufferUtils.wrap(cast(byte[])rb[0..len]); 93 // dataReceivedHandler(bb); 94 } 95 96 // size_t nBytes = tryWrite(cast(ubyte[])ResponseData); 97 98 // if(nBytes < ResponseData.length) { 99 // warning("data lost"); 100 // } 101 102 // It's prossible that there are more data waitting for read in the read I/O space. 103 if (len == _readBuffer.length) { 104 version (HUNT_IO_DEBUG) infof("Read buffer is full read %d bytes. Need to read again.", len); 105 isDone = false; 106 } 107 } else if (len == Socket.ERROR) { 108 // https://stackoverflow.com/questions/14595269/errno-35-eagain-returned-on-recv-call 109 // FIXME: Needing refactor or cleanup -@Administrator at 2018-5-8 16:06:13 110 // check more error status 111 this._error = errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK; 112 if (_error) { 113 this._errorMessage = getErrorMessage(errno); 114 errorOccurred(ErrorCode.INTERRUPTED , "read buffer error"); 115 } else { 116 debug warningf("warning on read: fd=%d, errno=%d, message=%s", this.handle, 117 errno, getErrorMessage(errno)); 118 } 119 120 if(errno == ECONNRESET) { 121 // https://stackoverflow.com/questions/1434451/what-does-connection-reset-by-peer-mean 122 onDisconnected(); 123 errorOccurred(ErrorCode.CONNECTIONEESET , "connection reset by peer"); 124 } 125 } else { 126 version (HUNT_DEBUG) 127 infof("connection broken: %s, fd:%d", _remoteAddress.toString(), this.handle); 128 onDisconnected(); 129 } 130 131 return isDone; 132 } 133 134 override protected void doClose() { 135 version (HUNT_IO_DEBUG) { 136 infof("peer socket closing: fd=%d", this.handle); 137 } 138 if(this.socket is null) { 139 import core.sys.posix.unistd; 140 core.sys.posix.unistd.close(this.handle); 141 } else { 142 this.socket.shutdown(SocketShutdown.BOTH); 143 this.socket.close(); 144 } 145 146 version (HUNT_IO_DEBUG) { 147 infof("peer socket closed: fd=%d", this.handle); 148 } 149 } 150 151 152 /** 153 * Try to write a block of data. 154 */ 155 protected ptrdiff_t tryWrite(const ubyte[] data) { 156 clearError(); 157 // const nBytes = this.socket.send(data); 158 version (HUNT_IO_DEBUG) 159 tracef("try to write: %d bytes, fd=%d", data.length, this.handle); 160 const nBytes = write(this.handle, data.ptr, data.length); 161 // version (HUNT_IO_DEBUG) 162 // tracef("actually written: %d / %d bytes, fd=%d", nBytes, data.length, this.handle); 163 164 if (nBytes > 0) { 165 return nBytes; 166 } 167 168 if (nBytes == Socket.ERROR) { 169 // FIXME: Needing refactor or cleanup -@Administrator at 2018-5-8 16:07:38 170 // check more error status 171 // EPIPE/Broken pipe: 172 // https://stackoverflow.com/questions/6824265/sigpipe-broken-pipe 173 // https://github.com/angrave/SystemProgramming/wiki/Networking%2C-Part-7%3A-Nonblocking-I-O%2C-select%28%29%2C-and-epoll 174 175 if(errno == EAGAIN) { 176 version (HUNT_IO_DEBUG) { 177 warningf("warning on write: fd=%d, errno=%d, message=%s", this.handle, 178 errno, getErrorMessage(errno)); 179 } 180 } else if(errno == EINTR || errno == EWOULDBLOCK) { 181 // https://stackoverflow.com/questions/38964745/can-a-socket-become-writeable-after-an-ewouldblock-but-before-an-epoll-wait 182 debug warningf("warning on write: fd=%d, errno=%d, message=%s", this.handle, 183 errno, getErrorMessage(errno)); 184 // eventLoop.update(this); 185 } else { 186 this._error = true; 187 this._errorMessage = getErrorMessage(errno); 188 if(errno == ECONNRESET) { 189 // https://stackoverflow.com/questions/1434451/what-does-connection-reset-by-peer-mean 190 onDisconnected(); 191 errorOccurred(ErrorCode.CONNECTIONEESET , "connection reset by peer"); 192 } 193 } 194 } else { 195 version (HUNT_DEBUG) { 196 warningf("nBytes=%d, message: %s", nBytes, lastSocketError()); 197 assert(false, "Undefined behavior!"); 198 } else { 199 this._error = true; 200 } 201 } 202 203 //if (this._error) { 204 // this._errorMessage = getErrorMessage(errno); 205 // string msg = format("Socket error on write: fd=%d, code: %d, message=%s", 206 // this.handle, errno, this.errorMessage); 207 // debug errorf(msg); 208 // errorOccurred(msg); 209 //} 210 211 return 0; 212 } 213 214 private bool tryNextWrite(ByteBuffer buffer) { 215 const(ubyte)[] data = cast(const(ubyte)[])buffer.getRemaining(); 216 version (HUNT_IO_DEBUG) { 217 tracef("writting from a buffer [fd=%d], %d bytes, buffer: %s", 218 this.handle, data.length, buffer.toString()); 219 } 220 221 ptrdiff_t remaining = data.length; 222 if(data.length == 0) 223 return true; 224 225 while(remaining > 0 && !_error && !isClosing() && !isWriteCancelling) { 226 ptrdiff_t nBytes = tryWrite(data); 227 version (HUNT_IO_DEBUG) 228 { 229 tracef("write out once: fd=%d, %d / %d bytes, remaining: %d buffer: %s", 230 this.handle, nBytes, data.length, remaining, buffer.toString()); 231 } 232 233 if (nBytes > 0) { 234 remaining -= nBytes; 235 data = data[nBytes .. $]; 236 } 237 } 238 239 version (HUNT_IO_DEBUG) { 240 if(remaining == 0) { 241 tracef("A buffer is written out. fd=%d", this.handle); 242 return true; 243 } else { 244 warningf("Writing cancelled or an error ocurred. fd=%d", this.handle); 245 return false; 246 } 247 } else { 248 return remaining == 0; 249 } 250 } 251 252 void resetWriteStatus() { 253 if(_writeQueue !is null) 254 _writeQueue.clear(); 255 atomicStore(_isWritting, false); 256 isWriteCancelling = false; 257 } 258 259 /** 260 * Should be thread-safe. 261 */ 262 override void onWrite() { 263 version (HUNT_IO_DEBUG) 264 { 265 tracef("checking status, isWritting: %s, writeBuffer: %s", 266 _isWritting, writeBuffer is null ? "null" : writeBuffer.toString()); 267 } 268 269 if(!_isWritting) { 270 version (HUNT_IO_DEBUG) 271 infof("No data needs to be written out. fd=%d", this.handle); 272 return; 273 } 274 275 if(isClosing() && isWriteCancelling) { 276 version (HUNT_DEBUG) infof("Write cancelled or closed, fd=%d", this.handle); 277 resetWriteStatus(); 278 return; 279 } 280 281 // FIXME: Needing refactor or cleanup -@zhangxueping at 2020-04-24T14:26:45+08:00 282 // More tests are needed 283 // keep thread-safe here 284 if(!cas(&_isBusyWritting, false, true)) { 285 // version (HUNT_IO_DEBUG) 286 version(HUNT_DEBUG) warningf("busy writing. fd=%d", this.handle); 287 return; 288 } 289 290 scope(exit) { 291 _isBusyWritting = false; 292 } 293 294 if(writeBuffer !is null) { 295 if(tryNextWrite(writeBuffer)) { 296 writeBuffer = null; 297 } else { 298 version (HUNT_IO_DEBUG) 299 { 300 infof("waiting to try again... fd=%d, writeBuffer: %s", 301 this.handle, writeBuffer.toString()); 302 } 303 // eventLoop.update(this); 304 return; 305 } 306 version (HUNT_IO_DEBUG) 307 tracef("running here, fd=%d", this.handle); 308 } 309 310 if(checkAllWriteDone()) { 311 return; 312 } 313 314 version (HUNT_IO_DEBUG) { 315 tracef("start to write [fd=%d], writeBuffer %s empty", this.handle, writeBuffer is null ? "is" : "is not"); 316 } 317 318 if(_writeQueue.tryDequeue(writeBuffer)) { 319 if(tryNextWrite(writeBuffer)) { 320 writeBuffer = null; 321 checkAllWriteDone(); 322 } else { 323 version (HUNT_IO_DEBUG) 324 infof("waiting to try again: fd=%d, writeBuffer: %s", this.handle, writeBuffer.toString()); 325 326 // eventLoop.update(this); 327 } 328 version (HUNT_IO_DEBUG) { 329 warningf("running here, fd=%d", this.handle); 330 } 331 } 332 } 333 private shared bool _isBusyWritting = false; 334 335 protected bool checkAllWriteDone() { 336 version (HUNT_IO_DEBUG) { 337 import std.conv; 338 tracef("checking remaining: fd=%d, writeQueue empty: %s", this.handle, 339 _writeQueue is null || _writeQueue.isEmpty().to!string()); 340 } 341 342 if(_writeQueue is null || _writeQueue.isEmpty()) { 343 resetWriteStatus(); 344 version (HUNT_IO_DEBUG) 345 infof("All data are written out: fd=%d", this.handle); 346 if(dataWriteDoneHandler !is null) 347 dataWriteDoneHandler(this); 348 return true; 349 } 350 351 return false; 352 } 353 354 protected void initializeWriteQueue() { 355 if (_writeQueue is null) { 356 _writeQueue = new WritingBufferQueue(); 357 } 358 } 359 360 protected bool doConnect(Address addr) { 361 try { 362 this.socket.connect(addr); 363 } catch (SocketOSException e) { 364 error(e.msg); 365 version(HUNT_DEBUG) error(e); 366 return false; 367 } 368 return true; 369 } 370 371 void cancelWrite() { 372 isWriteCancelling = true; 373 } 374 }