1 /* 2 * Hunt - A refined core library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.io.TcpStream; 13 14 import hunt.event; 15 16 import hunt.logging; 17 import hunt.Functions; 18 19 import std.format; 20 import std.socket; 21 import std.exception; 22 import std.socket; 23 import core.thread; 24 import core.time; 25 26 version(HAVE_EPOLL) { 27 import core.sys.linux.netinet.tcp : TCP_KEEPCNT; 28 } 29 30 class TcpStreamOption { 31 string ip = "127.0.0.1"; 32 ushort port = 8080; 33 34 // http://www.tldp.org/HOWTO/TCP-Keepalive-HOWTO/usingkeepalive.html 35 /// the interval between the last data packet sent (simple ACKs are not considered data) and the first keepalive probe; 36 /// after the connection is marked to need keepalive, this counter is not used any further 37 int keepaliveTime = 7200; // in seconds 38 39 /// the interval between subsequential keepalive probes, regardless of what the connection has exchanged in the meantime 40 int keepaliveInterval = 75; // in seconds 41 42 /// the number of unacknowledged probes to send before considering the connection dead and notifying the application layer 43 int keepaliveProbes = 9; // times 44 45 bool isKeepalive = false; 46 47 size_t bufferSize = 1024*8; 48 49 int retryTimes = 5; 50 Duration retryInterval = 2.seconds; 51 52 53 static TcpStreamOption createOption() { 54 TcpStreamOption option = new TcpStreamOption(); 55 option.isKeepalive = true; 56 option.keepaliveTime = 15; 57 option.keepaliveInterval = 3; 58 option.keepaliveProbes = 5; 59 option.bufferSize = 1024*8; 60 return option; 61 } 62 63 this() { 64 65 } 66 } 67 68 /** 69 */ 70 class TcpStream : AbstractStream { 71 SimpleEventHandler closeHandler; 72 73 private TcpStreamOption _tcpOption; 74 private int retryCount = 0; 75 76 // for client 77 this(Selector loop, AddressFamily family = AddressFamily.INET, TcpStreamOption option = null) { 78 if(option is null) 79 _tcpOption = TcpStreamOption.createOption(); 80 else 81 _tcpOption = option; 82 super(loop, family, _tcpOption.bufferSize); 83 this.socket = new Socket(family, SocketType.STREAM, ProtocolType.TCP); 84 85 _isClient = true; 86 _isConnected = false; 87 } 88 89 // for server 90 this(Selector loop, Socket socket, TcpStreamOption option = null) { 91 if(option is null) 92 _tcpOption = TcpStreamOption.createOption(); 93 else 94 _tcpOption = option; 95 super(loop, socket.addressFamily, _tcpOption.bufferSize); 96 this.socket = socket; 97 _remoteAddress = socket.remoteAddress(); 98 _localAddress = socket.localAddress(); 99 100 _isClient = false; 101 _isConnected = true; 102 setKeepalive(); 103 } 104 105 void options(TcpStreamOption option) @property { 106 assert(option !is null); 107 this._tcpOption = option; 108 } 109 110 TcpStreamOption options() @property { 111 return this._tcpOption; 112 } 113 114 override bool isBusy() { 115 return _isWritting; 116 } 117 118 void connect(string ip, ushort port) { 119 connect(parseAddress(ip, port)); 120 } 121 122 void connect(Address addr) { 123 if (_isConnected) 124 return; 125 126 _remoteAddress = addr; 127 import std.parallelism; 128 auto connectionTask = task(&doConnect, addr); 129 taskPool.put(connectionTask); 130 // doConnect(addr); 131 } 132 133 void reconnect() { 134 if(!_isClient) { 135 throw new Exception("Only client can call this method."); 136 } 137 138 if (_isConnected || retryCount >= _tcpOption.retryTimes) 139 return; 140 141 retryCount++; 142 _isConnected = false; 143 this.socket = new Socket(this._family, SocketType.STREAM, ProtocolType.TCP); 144 145 version (HUNT_DEBUG) tracef("reconnecting %d...", retryCount); 146 connect(_remoteAddress); 147 } 148 149 protected override void doConnect(Address addr) { 150 try { 151 version (HUNT_DEBUG) tracef("connecting to %s...", addr); 152 // Address binded = createAddress(this.socket.addressFamily); 153 // this.socket.bind(binded); 154 this.socket.blocking = true; 155 super.doConnect(addr); 156 this.socket.blocking = false; 157 _isConnected = true; 158 setKeepalive(); 159 _localAddress = this.socket.localAddress(); 160 start(); 161 } catch (Exception ex) { 162 warning(ex.message); 163 } 164 165 if (_connectionHandler !is null) 166 _connectionHandler(_isConnected); 167 168 } 169 170 171 // www.tldp.org/HOWTO/html_single/TCP-Keepalive-HOWTO/ 172 // http://www.importnew.com/27624.html 173 private void setKeepalive() { 174 version(HAVE_EPOLL) { 175 if(_tcpOption.isKeepalive) { 176 this.socket.setKeepAlive(_tcpOption.keepaliveTime, _tcpOption.keepaliveInterval); 177 this.setOption(SocketOptionLevel.TCP, cast(SocketOption) TCP_KEEPCNT, 178 _tcpOption.keepaliveProbes); 179 // version (HUNT_DEBUG) checkKeepAlive(); 180 } 181 } else version(HAVE_IOCP) { 182 if(_tcpOption.isKeepalive) { 183 this.socket.setKeepAlive(_tcpOption.keepaliveTime, _tcpOption.keepaliveInterval); 184 // this.setOption(SocketOptionLevel.TCP, cast(SocketOption) TCP_KEEPCNT, 185 // _tcpOption.keepaliveProbes); 186 version (HUNT_DEBUG) checkKeepAlive(); 187 } 188 } 189 } 190 191 version (HUNT_DEBUG) 192 private void checkKeepAlive() { 193 version(HAVE_EPOLL) { 194 int time ; 195 int ret1 = getOption(SocketOptionLevel.TCP, cast(SocketOption) TCP_KEEPIDLE, time); 196 tracef("ret=%d, time=%d", ret1, time); 197 198 int interval; 199 int ret2 = getOption(SocketOptionLevel.TCP, cast(SocketOption) TCP_KEEPINTVL, interval); 200 tracef("ret=%d, interval=%d", ret2, interval); 201 202 int isKeep; 203 int ret3 = getOption(SocketOptionLevel.SOCKET, SocketOption.KEEPALIVE, isKeep); 204 tracef("ret=%d, keepalive=%s", ret3, isKeep==1); 205 206 int probe; 207 int ret4 = getOption(SocketOptionLevel.TCP, cast(SocketOption) TCP_KEEPCNT, probe); 208 tracef("ret=%d, interval=%d", ret4, probe); 209 } 210 } 211 212 TcpStream onConnected(ConnectionHandler cback) { 213 _connectionHandler = cback; 214 return this; 215 } 216 217 TcpStream onDataReceived(DataReceivedHandler handler) { 218 dataReceivedHandler = handler; 219 return this; 220 } 221 222 // TcpStream onDataWritten(DataWrittenHandler handler) 223 // { 224 // sentHandler = handler; 225 // return this; 226 // } 227 228 TcpStream onClosed(SimpleEventHandler handler) { 229 closeHandler = handler; 230 return this; 231 } 232 233 TcpStream onDisconnected(SimpleEventHandler handler) { 234 disconnectionHandler = handler; 235 return this; 236 } 237 238 TcpStream onError(ErrorEventHandler handler) { 239 errorHandler = handler; 240 return this; 241 } 242 243 bool isConnected() nothrow { 244 return _isConnected; 245 } 246 247 override void start() { 248 if (_isRegistered) 249 return; 250 _inLoop.register(this); 251 _isRegistered = true; 252 version (HAVE_IOCP) 253 this.beginRead(); 254 } 255 256 void write(StreamWriteBuffer buffer) { 257 assert(buffer !is null); 258 259 if (!_isConnected) { 260 debug warningf("The connection (fd=%d) has been closed!", this.handle); 261 return; 262 } 263 264 _writeQueue.enQueue(buffer); 265 266 version (HAVE_IOCP) { 267 if (_isWritting) { 268 version (HUNT_DEBUG) 269 infof("Busy in writting, data buffered (%d bytes)", buffer.capacity); 270 } else 271 tryWrite(); 272 } else { 273 onWrite(); 274 } 275 } 276 277 /// safe for big data sending 278 void write(const ubyte[] data, DataWrittenHandler handler = null) { 279 if (data.length == 0) 280 return; 281 282 write(new SocketStreamBuffer(data, handler)); 283 } 284 285 void shutdownInput() { 286 this.socket.shutdown(SocketShutdown.RECEIVE); 287 } 288 289 void shutdownOutput() { 290 this.socket.shutdown(SocketShutdown.SEND); 291 } 292 293 protected: 294 bool _isClient; 295 ConnectionHandler _connectionHandler; 296 297 override void onRead() { 298 version (HUNT_DEBUG) 299 trace("start to read"); 300 301 version (Posix) { 302 while (!_isClosed && !tryRead()) { 303 version (HUNT_DEBUG) 304 trace("continue reading..."); 305 } 306 } else { 307 doRead(); 308 } 309 310 if (this.isError) { 311 string msg = format("Socket error on read: fd=%d, message: %s", 312 this.handle, this.erroString); 313 // version (HUNT_DEBUG) 314 debug errorf(msg); 315 errorOccurred(msg); 316 } 317 } 318 319 override void onClose() { 320 version (HUNT_DEBUG) { 321 if (!_writeQueue.empty) { 322 warning("Some data has not been sent yet."); 323 } 324 325 infof("connection closed with: %s", this.remoteAddress); 326 } 327 328 _writeQueue.clear(); 329 _isConnected = false; 330 this.socket.shutdown(SocketShutdown.BOTH); 331 this.socket.close(); 332 super.onClose(); 333 334 if (closeHandler) 335 closeHandler(); 336 } 337 338 override void onWrite() { 339 if (!_isConnected) { 340 _isConnected = true; 341 _remoteAddress = socket.remoteAddress(); 342 343 if (_connectionHandler) 344 _connectionHandler(true); 345 return; 346 } 347 348 // bool canWrite = true; 349 version (HUNT_DEBUG) 350 tracef("start to write [fd=%d]", this.handle); 351 352 while (_isRegistered && !isWriteCancelling && !_writeQueue.empty) { 353 version (HUNT_DEBUG) 354 tracef("writting [fd=%d]...", this.handle); 355 356 StreamWriteBuffer writeBuffer = _writeQueue.front(); 357 const(ubyte[]) data = writeBuffer.remaining(); 358 if (data.length == 0) { 359 auto q = _writeQueue.deQueue(); 360 if (q is null) 361 warning("StreamWriteBuffer is null"); 362 else 363 q.finish(); 364 // _writeQueue.deQueue().finish(); 365 continue; 366 } 367 368 this.clearError(); 369 size_t nBytes = tryWrite(data); 370 if (nBytes > 0 && writeBuffer.pop(nBytes)) { 371 version (HUNT_DEBUG) 372 tracef("writing done: %d bytes, fd: %d", nBytes, this.handle); 373 auto q = _writeQueue.deQueue(); 374 if (q is null) 375 warning("StreamWriteBuffer is null"); 376 else 377 q.finish(); 378 } 379 380 if (this.isError) { 381 string msg = format("Socket error on write: fd=%d, message=%s", 382 this.handle, this.erroString); 383 debug errorf(msg); 384 errorOccurred(msg); 385 break; 386 } 387 } 388 } 389 }