1 /* 2 * Hunt - A refined core library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.io.TcpStream; 13 14 import hunt.io.channel.Common; 15 import hunt.io.TcpStreamOptions; 16 import hunt.io.IoError; 17 18 import hunt.io.ByteBuffer; 19 import hunt.io.BufferUtils; 20 import hunt.event.selector.Selector; 21 import hunt.io.SimpleQueue; 22 import hunt.event; 23 import hunt.logging.ConsoleLogger; 24 import hunt.Functions; 25 26 import std.exception; 27 import std.format; 28 import std.socket; 29 import std.string; 30 31 import core.atomic; 32 import core.stdc.errno; 33 import core.thread; 34 import core.time; 35 36 version (HAVE_EPOLL) { 37 import core.sys.linux.netinet.tcp : TCP_KEEPCNT; 38 } 39 40 41 42 /** 43 * 44 */ 45 class TcpStream : AbstractStream { 46 SimpleEventHandler closeHandler; 47 protected shared bool _isConnected; // It's always true for server. 48 49 private TcpStreamOptions _tcpOption; 50 private int retryCount = 0; 51 52 // for client 53 this(Selector loop, TcpStreamOptions option = null, AddressFamily family = AddressFamily.INET) { 54 _isClient = true; 55 _isConnected = false; 56 57 if (option is null) 58 _tcpOption = TcpStreamOptions.create(); 59 else 60 _tcpOption = option; 61 this.socket = new Socket(family, SocketType.STREAM, ProtocolType.TCP); 62 super(loop, family, _tcpOption.bufferSize); 63 version(HUNT_IO_DEBUG) tracef("buffer size: %d bytes", _tcpOption.bufferSize); 64 65 66 } 67 68 // for server 69 this(Selector loop, Socket socket, TcpStreamOptions option = null) { 70 if (option is null) 71 _tcpOption = TcpStreamOptions.create(); 72 else 73 _tcpOption = option; 74 this.socket = socket; 75 super(loop, socket.addressFamily, _tcpOption.bufferSize); 76 _remoteAddress = socket.remoteAddress(); 77 _localAddress = socket.localAddress(); 78 79 _isClient = false; 80 _isConnected = true; 81 setKeepalive(); 82 } 83 84 void options(TcpStreamOptions option) @property { 85 assert(option !is null); 86 this._tcpOption = option; 87 } 88 89 TcpStreamOptions options() @property { 90 return this._tcpOption; 91 } 92 93 override bool isBusy() { 94 return _isWritting; 95 } 96 97 98 override bool isClient() { 99 return _isClient; 100 } 101 102 void connect(string hostname, ushort port) { 103 Address[] addresses = getAddress(hostname, port); 104 if(addresses is null) { 105 throw new SocketException("Can't resolve hostname: " ~ hostname); 106 } 107 Address selectedAddress; 108 foreach(Address addr; addresses) { 109 string ip = addr.toAddrString(); 110 if(ip.startsWith("::")) // skip IPV6 111 continue; 112 if(ip.length <= 16) { 113 selectedAddress = addr; 114 break; 115 } 116 } 117 118 if(selectedAddress is null) { 119 warning("No IPV4 avaliable"); 120 selectedAddress = addresses[0]; 121 } 122 version(HUNT_IO_DEBUG) { 123 infof("connecting with: hostname=%s, ip=%s, port=%d ", hostname, selectedAddress.toAddrString(), port); 124 } 125 connect(selectedAddress); // always select the first one. 126 } 127 128 void connect(Address addr) { 129 if (_isConnected) 130 return; 131 132 _remoteAddress = addr; 133 import std.parallelism; 134 135 auto connectionTask = task(&doConnect, addr); 136 taskPool.put(connectionTask); 137 // doConnect(addr); 138 } 139 140 void reconnect() { 141 if (!_isClient) { 142 throw new Exception("Only client can call this method."); 143 } 144 145 if (_isConnected || retryCount >= _tcpOption.retryTimes) 146 return; 147 148 retryCount++; 149 _isConnected = false; 150 this.socket = new Socket(this._family, SocketType.STREAM, ProtocolType.TCP); 151 152 version (HUNT_DEBUG) 153 tracef("reconnecting %d...", retryCount); 154 connect(_remoteAddress); 155 } 156 157 protected override bool doConnect(Address addr) { 158 try { 159 version (HUNT_DEBUG) 160 tracef("Connecting to %s...", addr); 161 // Address binded = createAddress(this.socket.addressFamily); 162 // this.socket.bind(binded); 163 version (HAVE_IOCP) { 164 this.socket.blocking = false; 165 start(); 166 if(super.doConnect(addr)) { 167 this.socket.blocking = false; 168 setKeepalive(); 169 _localAddress = this.socket.localAddress(); 170 _isConnected = true; 171 } else { 172 errorOccurred(ErrorCode.CONNECTIONEFUSED,"Connection refused"); 173 _isConnected = false; 174 } 175 } else { 176 this.socket.blocking = true; 177 if(super.doConnect(addr)) { 178 this.socket.blocking = false; 179 setKeepalive(); 180 _localAddress = this.socket.localAddress(); 181 start(); 182 _isConnected = true; 183 } else { 184 errorOccurred(ErrorCode.CONNECTIONEFUSED,"Connection refused"); 185 _isConnected = false; 186 } 187 } 188 } catch (Throwable ex) { 189 // Must try the best to catch all the exceptions, because it will be executed in another thread. 190 debug warning(ex.msg); 191 version(HUNT_DEBUG) warning(ex); 192 errorOccurred(ErrorCode.CONNECTIONEFUSED,"Connection refused"); 193 _isConnected = false; 194 } 195 196 if (_connectionHandler !is null) { 197 try { 198 _connectionHandler(_isConnected); 199 } catch(Throwable ex) { 200 debug warning(ex.msg); 201 version(HUNT_DEBUG) warning(ex); 202 } 203 } 204 return true; 205 } 206 207 // www.tldp.org/HOWTO/html_single/TCP-Keepalive-HOWTO/ 208 // http://www.importnew.com/27624.html 209 private void setKeepalive() { 210 version(HUNT_DEBUG) { 211 infof("isKeepalive: %s, keepaliveTime: %d seconds, Interval: %d seconds", 212 _tcpOption.isKeepalive, _tcpOption.keepaliveTime, _tcpOption.keepaliveInterval); 213 } 214 version (HAVE_EPOLL) { 215 if (_tcpOption.isKeepalive) { 216 this.socket.setKeepAlive(_tcpOption.keepaliveTime, _tcpOption.keepaliveInterval); 217 this.setOption(SocketOptionLevel.TCP, 218 cast(SocketOption) TCP_KEEPCNT, _tcpOption.keepaliveProbes); 219 // version (HUNT_DEBUG) checkKeepAlive(); 220 } 221 } else version (HAVE_IOCP) { 222 if (_tcpOption.isKeepalive) { 223 this.socket.setKeepAlive(_tcpOption.keepaliveTime, _tcpOption.keepaliveInterval); 224 // this.setOption(SocketOptionLevel.TCP, cast(SocketOption) TCP_KEEPCNT, 225 // _tcpOption.keepaliveProbes); 226 // version (HUNT_DEBUG) checkKeepAlive(); 227 } 228 } 229 } 230 231 version (HUNT_DEBUG) private void checkKeepAlive() { 232 version (HAVE_EPOLL) { 233 int time; 234 int ret1 = getOption(SocketOptionLevel.TCP, cast(SocketOption) TCP_KEEPIDLE, time); 235 tracef("ret=%d, time=%d", ret1, time); 236 237 int interval; 238 int ret2 = getOption(SocketOptionLevel.TCP, cast(SocketOption) TCP_KEEPINTVL, interval); 239 tracef("ret=%d, interval=%d", ret2, interval); 240 241 int isKeep; 242 int ret3 = getOption(SocketOptionLevel.SOCKET, SocketOption.KEEPALIVE, isKeep); 243 tracef("ret=%d, keepalive=%s", ret3, isKeep == 1); 244 245 int probe; 246 int ret4 = getOption(SocketOptionLevel.TCP, cast(SocketOption) TCP_KEEPCNT, probe); 247 tracef("ret=%d, interval=%d", ret4, probe); 248 } 249 } 250 251 TcpStream connected(ConnectionHandler handler) { 252 _connectionHandler = handler; 253 return this; 254 } 255 256 TcpStream received(DataReceivedHandler handler) { 257 dataReceivedHandler = handler; 258 return this; 259 } 260 261 TcpStream writed(SimpleActionHandler handler) { 262 dataWriteDoneHandler = handler; 263 return this; 264 } 265 alias onWritten = writed; 266 267 TcpStream closed(SimpleEventHandler handler) { 268 closeHandler = handler; 269 return this; 270 } 271 272 TcpStream disconnected(SimpleEventHandler handler) { 273 disconnectionHandler = handler; 274 return this; 275 } 276 277 TcpStream error(ErrorEventHandler handler) { 278 errorHandler = handler; 279 return this; 280 } 281 282 override bool isConnected() nothrow { 283 return _isConnected; 284 } 285 286 override void start() { 287 if (_isRegistered) 288 return; 289 _inLoop.register(this); 290 _isRegistered = true; 291 version (HAVE_IOCP) 292 { 293 // this.beginRead(); 294 } 295 } 296 297 void write(ByteBuffer buffer) { 298 assert(buffer !is null); 299 300 if (!_isConnected) { 301 throw new Exception(format("The connection %s closed!", 302 this.remoteAddress.toString())); 303 } 304 305 version (HUNT_IO_DEBUG) 306 infof("data buffered (%d bytes): fd=%d", buffer.limit(), this.handle); 307 _isWritting = true; 308 initializeWriteQueue(); 309 _writeQueue.enqueue(buffer); 310 onWrite(); 311 } 312 313 /** 314 * 315 */ 316 void write(const(ubyte)[] data) { 317 318 version (HUNT_IO_DEBUG_MORE) { 319 infof("%d bytes(fd=%d): %(%02X %)", data.length, this.handle, data[0 .. $]); 320 } else version (HUNT_IO_DEBUG) { 321 if (data.length <= 32) 322 infof("%d bytes(fd=%d): %(%02X %)", data.length, this.handle, data[0 .. $]); 323 else 324 infof("%d bytes(fd=%d): %(%02X %)", data.length, this.handle, data[0 .. 32]); 325 } 326 327 if(data is null) { 328 version(HUNT_DEBUG) { 329 warning("Writting a empty data on connection %s.", this.remoteAddress.toString()); 330 } 331 return; 332 } 333 334 if (!_isConnected) { 335 string msg = format("The connection %s is closed!", this.remoteAddress.toString()); 336 throw new Exception(msg); 337 } 338 339 version (HAVE_IOCP) { 340 return write(BufferUtils.toBuffer(cast(byte[])data)); 341 } else { 342 343 if (_writeQueue is null || (_writeQueue.isEmpty()) && !_isWritting) { 344 _isWritting = true; 345 const(ubyte)[] d = data; 346 347 // while (!isClosing() && !_isWriteCancelling && d.length > 0) { 348 while(d !is null) { 349 if(isWriteCancelling()) { 350 _errorMessage = format("The connection %s is cancelled!", this.remoteAddress.toString()); 351 _error = true; 352 warningf(_errorMessage); 353 throw new Exception(_errorMessage); 354 // break; 355 } 356 357 if(isClosing() || isClosed()) { 358 _errorMessage= format("The connection %s is closing or closed!", this.remoteAddress.toString()); 359 _error = true; 360 warningf("%s, %s", isClosing(), isClosed()); 361 throw new Exception(_errorMessage); 362 // break; 363 } 364 365 version (HUNT_IO_DEBUG) 366 infof("to write directly %d bytes, fd=%d", d.length, this.handle); 367 size_t nBytes = tryWrite(d); 368 369 if (nBytes == d.length) { 370 version (HUNT_IO_DEBUG) 371 tracef("write all out at once: %d / %d bytes, fd=%d", nBytes, d.length, this.handle); 372 checkAllWriteDone(); 373 break; 374 } else if (nBytes > 0) { 375 version (HUNT_IO_DEBUG) 376 tracef("write out partly: %d / %d bytes, fd=%d", nBytes, d.length, this.handle); 377 d = d[nBytes .. $]; 378 } else { 379 version (HUNT_IO_DEBUG) 380 warningf("buffering data: %d bytes, fd=%d", d.length, this.handle); 381 initializeWriteQueue(); 382 _writeQueue.enqueue(BufferUtils.toBuffer(cast(byte[]) d)); 383 break; 384 } 385 } 386 } else { 387 write(BufferUtils.toBuffer(cast(byte[]) data)); 388 } 389 } 390 } 391 392 void shutdownInput() { 393 this.socket.shutdown(SocketShutdown.RECEIVE); 394 } 395 396 void shutdownOutput() { 397 this.socket.shutdown(SocketShutdown.SEND); 398 } 399 400 override protected void onDisconnected() { 401 version(HUNT_DEBUG) { 402 infof("peer disconnected: fd=%d", this.handle); 403 } 404 if (disconnectionHandler !is null) 405 disconnectionHandler(); 406 407 this.close(); 408 } 409 410 protected: 411 bool _isClient; 412 ConnectionHandler _connectionHandler; 413 414 override void onRead() { 415 version (HUNT_IO_DEBUG) 416 trace("start to read"); 417 418 version (Posix) { 419 while (!_isClosed && !tryRead()) { 420 version (HUNT_IO_DEBUG) 421 trace("continue reading..."); 422 } 423 } else { 424 if (!_isClosed) 425 { 426 doRead(); 427 } 428 429 } 430 431 //if (this.isError) { 432 // string msg = format("Socket error on read: fd=%d, code=%d, message: %s", 433 // this.handle, errno, this.errorMessage); 434 // debug errorf(msg); 435 // if (!isClosed()) 436 // errorOccurred(msg); 437 //} 438 } 439 440 override void onClose() { 441 bool lastConnectStatus = _isConnected; 442 super.onClose(); 443 if(lastConnectStatus) { 444 version (HUNT_IO_DEBUG) { 445 if (_writeQueue !is null && !_writeQueue.isEmpty) { 446 warningf("Some data has not been sent yet: fd=%d", this.handle); 447 } 448 } 449 version(HUNT_DEBUG) { 450 infof("Closing a connection with: %s, fd=%d", this.remoteAddress, this.handle); 451 } 452 453 resetWriteStatus(); 454 _isConnected = false; 455 version (HUNT_IO_DEBUG) { 456 infof("Raising a event on a TCP stream [%s] is down: fd=%d", 457 this.remoteAddress.toString(), this.handle); 458 } 459 460 if (closeHandler !is null) 461 closeHandler(); 462 } 463 } 464 465 }