1 /* 2 * Hunt - A refined core library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.io.socket.Posix; 13 14 // dfmt off 15 version(Posix): 16 17 // dfmt on 18 19 import hunt.concurrency.thread.Helper; 20 import hunt.Functions; 21 import hunt.io.socket.Common; 22 import hunt.logging; 23 import hunt.system.Error; 24 25 import std.conv; 26 import std.exception; 27 import std.format; 28 import std.process; 29 import std.socket; 30 import std.string; 31 32 import core.stdc.errno; 33 import core.stdc.string; 34 import core.sys.posix.sys.socket : accept; 35 36 // extern (C) nothrow @nogc { 37 // int accept4(int, sockaddr*, socklen_t*, int); 38 // } 39 40 enum int SOCK_CLOEXEC = 0x02000000; /* Atomically set close-on-exec flag for the 41 new descriptor(s). */ 42 enum int SOCK_NONBLOCK = 0x00004000; /* Atomically mark descriptor(s) as 43 non-blocking. */ 44 45 /** 46 TCP Server 47 */ 48 abstract class AbstractListener : AbstractSocketChannel { 49 this(Selector loop, AddressFamily family = AddressFamily.INET) { 50 super(loop, ChannelType.Accept); 51 setFlag(ChannelFlag.Read, true); 52 this.socket = new TcpSocket(family); 53 } 54 55 protected bool onAccept(scope AcceptHandler handler) { 56 version (HUNT_DEBUG) 57 trace("new connection coming..."); 58 this.clearError(); 59 // http://man7.org/linux/man-pages/man2/accept.2.html 60 version(HAVE_EPOLL) { 61 // socket_t clientFd = cast(socket_t)(accept4(this.handle, null, null, SOCK_NONBLOCK | SOCK_CLOEXEC)); 62 socket_t clientFd = cast(socket_t)(accept(this.handle, null, null)); 63 } else { 64 socket_t clientFd = cast(socket_t)(accept(this.handle, null, null)); 65 } 66 if (clientFd == socket_t.init) 67 return false; 68 69 version (HUNT_DEBUG) 70 tracef("Listener fd=%d, client fd=%d", this.handle, clientFd); 71 72 if (handler !is null) 73 handler(new Socket(clientFd, this.localAddress.addressFamily)); 74 return true; 75 } 76 77 override void onWriteDone() { 78 version (HUNT_DEBUG) 79 tracef("a new connection created"); 80 } 81 } 82 83 /** 84 TCP Client 85 */ 86 abstract class AbstractStream : AbstractSocketChannel, Stream { 87 SimpleEventHandler disconnectionHandler; 88 89 protected bool _isConnected; // It's always true for server. 90 protected AddressFamily _family; 91 92 this(Selector loop, AddressFamily family = AddressFamily.INET, size_t bufferSize = 4096 * 2) { 93 this._family = family; 94 _readBuffer = new ubyte[bufferSize]; 95 super(loop, ChannelType.TCP); 96 setFlag(ChannelFlag.Read, true); 97 setFlag(ChannelFlag.Write, true); 98 setFlag(ChannelFlag.ETMode, true); 99 } 100 101 /** 102 */ 103 protected bool tryRead() { 104 bool isDone = true; 105 this.clearError(); 106 ptrdiff_t len = this.socket.receive(cast(void[]) this._readBuffer); 107 version (HUNT_DEBUG) 108 tracef("reading[fd=%d]: %d nbytes", this.handle, len); 109 110 if (len > 0) { 111 if (dataReceivedHandler !is null) 112 dataReceivedHandler(this._readBuffer[0 .. len]); 113 114 // It's prossible that more data are wainting for read in inner buffer. 115 if (len == _readBuffer.length) 116 isDone = false; 117 } else if (len == Socket.ERROR) { 118 // https://stackoverflow.com/questions/14595269/errno-35-eagain-returned-on-recv-call 119 // FIXME: Needing refactor or cleanup -@Administrator at 2018-5-8 16:06:13 120 // check more error status 121 this._error = errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK; 122 if (_error) { 123 this._erroString = getErrorMessage(errno); 124 } else { 125 debug warningf("write warning: fd=%s, errno=%d, message=%s", this.handle, 126 errno, getErrorMessage(errno)); 127 } 128 129 if(errno == ECONNRESET) { 130 // https://stackoverflow.com/questions/1434451/what-does-connection-reset-by-peer-mean 131 onDisconnected(); 132 this.close(); 133 } 134 } 135 else { 136 version (HUNT_DEBUG) 137 infof("connection broken: %s, fd:%d", _remoteAddress.toString(), this.handle); 138 onDisconnected(); 139 this.close(); 140 } 141 142 return isDone; 143 } 144 145 protected void onDisconnected() { 146 _isConnected = false; 147 if (disconnectionHandler !is null) 148 disconnectionHandler(); 149 } 150 151 protected bool canWriteAgain = true; 152 int writeRetryLimit = 5; 153 private int writeRetries = 0; 154 155 /** 156 Warning: It will try the best to write all the data. 157 TODO: create a test 158 */ 159 protected void tryWriteAll(in ubyte[] data) { 160 const nBytes = this.socket.send(data); 161 version (HUNT_DEBUG) 162 tracef("actually sent bytes: %d / %d", nBytes, data.length); 163 164 if (nBytes > 0) { 165 if (canWriteAgain && nBytes < data.length) { // && writeRetries < writeRetryLimit 166 // version (HUNT_DEBUG) 167 writeRetries++; 168 tracef("[%d] rewrite: written %d, remaining: %d, total: %d", 169 writeRetries, nBytes, data.length - nBytes, data.length); 170 if (writeRetries > writeRetryLimit) 171 warning("You are writting a big block of data!!!"); 172 173 tryWriteAll(data[nBytes .. $]); 174 } else 175 writeRetries = 0; 176 } else if (nBytes == Socket.ERROR) { 177 this._error = errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK; 178 if (this._error) { 179 this._erroString = lastSocketError(); 180 181 warningf("write error: fd=%s, errno=%d, message=%s", this.handle, 182 errno, this._erroString); 183 184 if(errno == ECONNRESET) { 185 // https://stackoverflow.com/questions/1434451/what-does-connection-reset-by-peer-mean 186 onDisconnected(); 187 this.close(); 188 } 189 } else { 190 debug warningf("write error: fd=%s, errno=%d, message=%s", this.handle, 191 errno, lastSocketError()); 192 193 if (canWriteAgain && !_isClosed) { 194 import core.thread; 195 import core.time; 196 197 writeRetries++; 198 tracef("[%d] rewrite: written %d, remaining: %d, total: %d", 199 writeRetries, nBytes, data.length - nBytes, data.length); 200 if (writeRetries > writeRetryLimit) 201 warning("You are writting a Big block of data!!!"); 202 warning("Wait for a 100 msecs to try again"); 203 Thread.sleep(100.msecs); 204 tryWriteAll(data); 205 } 206 } 207 } else { 208 version (HUNT_DEBUG) { 209 warningf("nBytes=%d, message: %s", nBytes, lastSocketError()); 210 assert(false, "Undefined behavior!"); 211 } 212 else { 213 this._error = true; 214 this._erroString = lastSocketError(); 215 } 216 } 217 } 218 219 /** 220 Try to write a block of data. 221 */ 222 protected ptrdiff_t tryWrite(const ubyte[] data) { 223 const nBytes = this.socket.send(data); 224 version (HUNT_DEBUG) 225 tracef("actually sent : %d / %d bytes, fd=%d", nBytes, data.length, this.handle); 226 227 if (nBytes > 0) { 228 return nBytes; 229 } else if (nBytes == Socket.ERROR) { 230 // FIXME: Needing refactor or cleanup -@Administrator at 2018-5-8 16:07:38 231 // check more error status 232 // EPIPE/Broken pipe: 233 // https://stackoverflow.com/questions/6824265/sigpipe-broken-pipe 234 this._error = errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK; 235 if (_error) { 236 this._erroString = getErrorMessage(errno); 237 } else { 238 debug warningf("warning for write: fd=%d, errno=%d, message=%s", this.handle, 239 errno, getErrorMessage(errno)); 240 } 241 242 if(errno == ECONNRESET) { 243 // https://stackoverflow.com/questions/1434451/what-does-connection-reset-by-peer-mean 244 onDisconnected(); 245 this.close(); 246 } 247 } else { 248 version (HUNT_DEBUG) { 249 warningf("nBytes=%d, message: %s", nBytes, lastSocketError()); 250 assert(false, "Undefined behavior!"); 251 } 252 else { 253 this._error = true; 254 this._erroString = getErrorMessage(errno); 255 } 256 } 257 return 0; 258 } 259 260 protected void doConnect(Address addr) { 261 this.socket.connect(addr); 262 } 263 264 void cancelWrite() { 265 isWriteCancelling = true; 266 } 267 268 override void onWriteDone() { 269 // notified by kqueue selector when data writing done 270 version (HUNT_DEBUG) 271 tracef("done with data writing"); 272 } 273 274 // protected UbyteArrayObject _readBuffer; 275 private const(ubyte)[] _readBuffer; 276 protected WriteBufferQueue _writeQueue; 277 protected bool isWriteCancelling = false; 278 279 /** 280 * Warning: The received data is stored a inner buffer. For a data safe, 281 * you would make a copy of it. 282 */ 283 DataReceivedHandler dataReceivedHandler; 284 285 } 286 287 /** 288 UDP Socket 289 */ 290 abstract class AbstractDatagramSocket : AbstractSocketChannel { 291 this(Selector loop, AddressFamily family = AddressFamily.INET, int bufferSize = 4096 * 2) { 292 super(loop, ChannelType.UDP); 293 setFlag(ChannelFlag.Read, true); 294 setFlag(ChannelFlag.ETMode, false); 295 296 this.socket = new UdpSocket(family); 297 // _socket.blocking = false; 298 _readBuffer = new UdpDataObject(); 299 _readBuffer.data = new ubyte[bufferSize]; 300 301 if (family == AddressFamily.INET) 302 _bindAddress = new InternetAddress(InternetAddress.PORT_ANY); 303 else if (family == AddressFamily.INET6) 304 _bindAddress = new Internet6Address(Internet6Address.PORT_ANY); 305 else 306 _bindAddress = new UnknownAddress(); 307 } 308 309 final void bind(Address addr) { 310 if (_binded) 311 return; 312 _bindAddress = addr; 313 socket.bind(_bindAddress); 314 _binded = true; 315 } 316 317 final bool isBind() { 318 return _binded; 319 } 320 321 Address bindAddr() { 322 return _bindAddress; 323 } 324 325 protected UdpDataObject _readBuffer; 326 protected bool _binded = false; 327 protected Address _bindAddress; 328 329 protected bool tryRead(scope ReadCallBack read) { 330 this._readBuffer.addr = createAddress(this.socket.addressFamily, 0); 331 auto data = this._readBuffer.data; 332 scope (exit) 333 this._readBuffer.data = data; 334 auto len = this.socket.receiveFrom(this._readBuffer.data, this._readBuffer.addr); 335 if (len > 0) { 336 this._readBuffer.data = this._readBuffer.data[0 .. len]; 337 read(this._readBuffer); 338 } 339 return false; 340 } 341 342 override void onWriteDone() { 343 // notified by kqueue selector when data writing done 344 version (HUNT_DEBUG) 345 tracef("done with data writing"); 346 } 347 }