1 /* 2 * Hunt - A refined core library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.io.TcpStream; 13 14 import hunt.io.channel.Common; 15 import hunt.io.TcpStreamOptions; 16 import hunt.io.IoError; 17 18 import hunt.io.ByteBuffer; 19 import hunt.io.BufferUtils; 20 import hunt.event.selector.Selector; 21 import hunt.io.SimpleQueue; 22 import hunt.event; 23 import hunt.logging; 24 import hunt.Functions; 25 26 import std.exception; 27 import std.format; 28 import std.socket; 29 import std.string; 30 31 import core.atomic; 32 import core.stdc.errno; 33 import core.thread; 34 import core.time; 35 36 version (HAVE_EPOLL) { 37 import core.sys.linux.netinet.tcp : TCP_KEEPCNT; 38 } 39 40 41 42 /** 43 * 44 */ 45 class TcpStream : AbstractStream { 46 SimpleEventHandler closeHandler; 47 protected shared bool _isConnected; // It's always true for server. 48 49 private TcpStreamOptions _tcpOption; 50 private int retryCount = 0; 51 52 // for client 53 this(Selector loop, TcpStreamOptions option = null, AddressFamily family = AddressFamily.INET) { 54 _isClient = true; 55 _isConnected = false; 56 57 if (option is null) 58 _tcpOption = TcpStreamOptions.create(); 59 else 60 _tcpOption = option; 61 this.socket = new Socket(family, SocketType.STREAM, ProtocolType.TCP); 62 super(loop, family, _tcpOption.bufferSize); 63 version(HUNT_IO_DEBUG) tracef("buffer size: %d bytes", _tcpOption.bufferSize); 64 65 66 } 67 68 // for server 69 this(Selector loop, Socket socket, TcpStreamOptions option = null) { 70 if (option is null) 71 _tcpOption = TcpStreamOptions.create(); 72 else 73 _tcpOption = option; 74 this.socket = socket; 75 super(loop, socket.addressFamily, _tcpOption.bufferSize); 76 _remoteAddress = socket.remoteAddress(); 77 _localAddress = socket.localAddress(); 78 79 _isClient = false; 80 _isConnected = true; 81 setKeepalive(); 82 } 83 84 void options(TcpStreamOptions option) @property { 85 assert(option !is null); 86 this._tcpOption = option; 87 } 88 89 TcpStreamOptions options() @property { 90 return this._tcpOption; 91 } 92 93 override bool isBusy() { 94 return _isWritting; 95 } 96 97 98 override bool isClient() { 99 return _isClient; 100 } 101 102 void connect(string hostname, ushort port) { 103 Address[] addresses = getAddress(hostname, port); 104 if(addresses is null) { 105 throw new SocketException("Can't resolve hostname: " ~ hostname); 106 } 107 Address selectedAddress; 108 foreach(Address addr; addresses) { 109 string ip = addr.toAddrString(); 110 if(ip.startsWith("::")) // skip IPV6 111 continue; 112 if(ip.length <= 16) { 113 selectedAddress = addr; 114 break; 115 } 116 } 117 118 if(selectedAddress is null) { 119 warning("No IPV4 avaliable"); 120 selectedAddress = addresses[0]; 121 } 122 version(HUNT_IO_DEBUG) { 123 infof("connecting with: hostname=%s, ip=%s, port=%d ", hostname, selectedAddress.toAddrString(), port); 124 } 125 connect(selectedAddress); // always select the first one. 126 } 127 128 void connect(Address addr) { 129 if (_isConnected) 130 return; 131 132 _remoteAddress = addr; 133 import std.parallelism; 134 135 version(HUNT_DEBUG) tracef("Try to connect to %s", addr); 136 // FIXME: Needing refactor or cleanup -@zhangxueping at 2021-10-12T10:02:52+08:00 137 // The task will not run when the concurrent number is over 15; 138 // auto connectionTask = task(&doConnect, addr); 139 // taskPool.put(connectionTask); 140 doConnect(addr); 141 } 142 143 void reconnect() { 144 if (!_isClient) { 145 throw new Exception("Only client can call this method."); 146 } 147 148 if (_isConnected || retryCount >= _tcpOption.retryTimes) 149 return; 150 151 retryCount++; 152 _isConnected = false; 153 this.socket = new Socket(this._family, SocketType.STREAM, ProtocolType.TCP); 154 155 version (HUNT_DEBUG) 156 tracef("reconnecting %d...", retryCount); 157 connect(_remoteAddress); 158 } 159 160 protected override bool doConnect(Address addr) { 161 if(!this.socket.isAlive) { 162 warning("socket is not ready."); 163 return false; 164 } 165 166 try { 167 version (HUNT_DEBUG) 168 tracef("Connecting to %s...", addr); 169 version (HAVE_IOCP) { 170 this.socket.blocking = false; 171 start(); 172 if(super.doConnect(addr)) { 173 this.socket.blocking = false; 174 setKeepalive(); 175 _localAddress = this.socket.localAddress(); 176 _isConnected = true; 177 } else { 178 errorOccurred(ErrorCode.CONNECTIONEFUSED,"Connection refused"); 179 _isConnected = false; 180 } 181 } else { 182 this.socket.blocking = true; 183 if(super.doConnect(addr)) { 184 this.socket.blocking = false; 185 setKeepalive(); 186 _localAddress = this.socket.localAddress(); 187 start(); 188 _isConnected = true; 189 } else { 190 errorOccurred(ErrorCode.CONNECTIONEFUSED,"Connection refused"); 191 _isConnected = false; 192 } 193 } 194 } catch (Throwable ex) { 195 // Must try the best to catch all the exceptions, because it will be executed in another thread. 196 debug warning(ex.msg); 197 version(HUNT_DEBUG) warning(ex); 198 errorOccurred(ErrorCode.CONNECTIONEFUSED,"Connection refused"); 199 _isConnected = false; 200 } 201 202 if (_connectionHandler !is null) { 203 try { 204 _connectionHandler(_isConnected); 205 } catch(Throwable ex) { 206 debug warning(ex.msg); 207 version(HUNT_DEBUG) warning(ex); 208 } 209 } 210 return true; 211 } 212 213 // www.tldp.org/HOWTO/html_single/TCP-Keepalive-HOWTO/ 214 // http://www.importnew.com/27624.html 215 private void setKeepalive() { 216 version(HUNT_DEBUG) { 217 infof("isKeepalive: %s, keepaliveTime: %d seconds, Interval: %d seconds", 218 _tcpOption.isKeepalive, _tcpOption.keepaliveTime, _tcpOption.keepaliveInterval); 219 } 220 221 version (HAVE_EPOLL) { 222 if (_tcpOption.isKeepalive) { 223 this.socket.setKeepAlive(_tcpOption.keepaliveTime, _tcpOption.keepaliveInterval); 224 this.setOption(SocketOptionLevel.TCP, 225 cast(SocketOption) TCP_KEEPCNT, _tcpOption.keepaliveProbes); 226 // version (HUNT_DEBUG) checkKeepAlive(); 227 } 228 } else version (HAVE_IOCP) { 229 if (_tcpOption.isKeepalive) { 230 this.socket.setKeepAlive(_tcpOption.keepaliveTime, _tcpOption.keepaliveInterval); 231 // this.setOption(SocketOptionLevel.TCP, cast(SocketOption) TCP_KEEPCNT, 232 // _tcpOption.keepaliveProbes); 233 // version (HUNT_DEBUG) checkKeepAlive(); 234 } 235 } 236 } 237 238 version (HUNT_DEBUG) private void checkKeepAlive() { 239 version (HAVE_EPOLL) { 240 int time; 241 int ret1 = getOption(SocketOptionLevel.TCP, cast(SocketOption) TCP_KEEPIDLE, time); 242 tracef("ret=%d, time=%d", ret1, time); 243 244 int interval; 245 int ret2 = getOption(SocketOptionLevel.TCP, cast(SocketOption) TCP_KEEPINTVL, interval); 246 tracef("ret=%d, interval=%d", ret2, interval); 247 248 int isKeep; 249 int ret3 = getOption(SocketOptionLevel.SOCKET, SocketOption.KEEPALIVE, isKeep); 250 tracef("ret=%d, keepalive=%s", ret3, isKeep == 1); 251 252 int probe; 253 int ret4 = getOption(SocketOptionLevel.TCP, cast(SocketOption) TCP_KEEPCNT, probe); 254 tracef("ret=%d, interval=%d", ret4, probe); 255 } 256 } 257 258 TcpStream connected(ConnectionHandler handler) { 259 _connectionHandler = handler; 260 return this; 261 } 262 263 TcpStream received(DataReceivedHandler handler) { 264 dataReceivedHandler = handler; 265 return this; 266 } 267 268 TcpStream writed(SimpleActionHandler handler) { 269 dataWriteDoneHandler = handler; 270 return this; 271 } 272 alias onWritten = writed; 273 274 TcpStream closed(SimpleEventHandler handler) { 275 closeHandler = handler; 276 return this; 277 } 278 279 TcpStream disconnected(SimpleEventHandler handler) { 280 disconnectionHandler = handler; 281 return this; 282 } 283 284 TcpStream error(ErrorEventHandler handler) { 285 errorHandler = handler; 286 return this; 287 } 288 289 override bool isConnected() nothrow { 290 return _isConnected; 291 } 292 293 override void start() { 294 if (_isRegistered) 295 return; 296 _inLoop.register(this); 297 _isRegistered = true; 298 version (HAVE_IOCP) 299 { 300 // this.beginRead(); 301 } 302 } 303 304 void write(ByteBuffer buffer) { 305 assert(buffer !is null); 306 307 if (!_isConnected) { 308 throw new Exception(format("The connection %s closed!", 309 this.remoteAddress.toString())); 310 } 311 312 version (HUNT_IO_DEBUG) 313 infof("data buffered (%d bytes): fd=%d", buffer.limit(), this.handle); 314 _isWritting = true; 315 initializeWriteQueue(); 316 _writeQueue.enqueue(buffer); 317 onWrite(); 318 } 319 320 /** 321 * 322 */ 323 void write(const(ubyte)[] data) { 324 325 version (HUNT_IO_DEBUG_MORE) { 326 infof("%d bytes(fd=%d): %(%02X %)", data.length, this.handle, data[0 .. $]); 327 } else version (HUNT_IO_DEBUG) { 328 if (data.length <= 32) 329 infof("%d bytes(fd=%d): %(%02X %)", data.length, this.handle, data[0 .. $]); 330 else 331 infof("%d bytes(fd=%d): %(%02X %)", data.length, this.handle, data[0 .. 32]); 332 } 333 334 if(data is null) { 335 version(HUNT_DEBUG) { 336 warning("Writting a empty data on connection %s.", this.remoteAddress.toString()); 337 } 338 return; 339 } 340 341 if (!_isConnected) { 342 string msg = format("The connection %s is closed! Writting cancelled.", this.remoteAddress.toString()); 343 throw new Exception(msg); 344 } 345 346 version (HAVE_IOCP) { 347 return write(BufferUtils.toBuffer(cast(byte[])data)); 348 } else { 349 350 if (_writeQueue is null || (_writeQueue.isEmpty()) && !_isWritting) { 351 _isWritting = true; 352 const(ubyte)[] d = data; 353 354 // while (!isClosing() && !_isWriteCancelling && d.length > 0) { 355 while(d !is null) { 356 if(isWriteCancelling()) { 357 _errorMessage = format("The connection %s is cancelled!", this.remoteAddress.toString()); 358 _error = true; 359 warningf(_errorMessage); 360 throw new Exception(_errorMessage); 361 // throw new IoError(ErrorCode.INTERRUPTED, _errorMessage); 362 // break; 363 } 364 365 if(isClosing() || isClosed()) { 366 _errorMessage= format("The connection %s is closing or closed!", this.remoteAddress.toString()); 367 _error = true; 368 warningf("%s, %s", isClosing(), isClosed()); 369 throw new Exception(_errorMessage); 370 // throw new IoError(ErrorCode.CONNECTIONABORTED, _errorMessage); 371 // break; 372 } 373 374 version (HUNT_IO_DEBUG) 375 infof("to write directly %d bytes, fd=%d", d.length, this.handle); 376 size_t nBytes = tryWrite(d); 377 378 if (nBytes == d.length) { 379 version (HUNT_IO_DEBUG) 380 tracef("write all out at once: %d / %d bytes, fd=%d", nBytes, d.length, this.handle); 381 checkAllWriteDone(); 382 break; 383 } else if (nBytes > 0) { 384 version (HUNT_IO_DEBUG) 385 tracef("write out partly: %d / %d bytes, fd=%d", nBytes, d.length, this.handle); 386 d = d[nBytes .. $]; 387 } else { 388 version (HUNT_IO_DEBUG) 389 warningf("buffering data: %d bytes, fd=%d", d.length, this.handle); 390 initializeWriteQueue(); 391 _writeQueue.enqueue(BufferUtils.toBuffer(cast(byte[]) d)); 392 break; 393 } 394 } 395 } else { 396 write(BufferUtils.toBuffer(cast(byte[]) data)); 397 } 398 } 399 } 400 401 void shutdownInput() { 402 this.socket.shutdown(SocketShutdown.RECEIVE); 403 } 404 405 void shutdownOutput() { 406 this.socket.shutdown(SocketShutdown.SEND); 407 } 408 409 override protected void onDisconnected() { 410 version(HUNT_DEBUG) { 411 infof("peer disconnected: fd=%d", this.handle); 412 } 413 if (disconnectionHandler !is null) 414 disconnectionHandler(); 415 416 this.close(); 417 } 418 419 protected: 420 bool _isClient; 421 ConnectionHandler _connectionHandler; 422 423 override void onRead() { 424 version (HUNT_IO_DEBUG) 425 trace("start to read"); 426 427 version (Posix) { 428 while (!_isClosed && !tryRead()) { 429 version (HUNT_IO_DEBUG) 430 trace("continue reading..."); 431 } 432 } else { 433 if (!_isClosed) 434 { 435 doRead(); 436 } 437 438 } 439 440 //if (this.isError) { 441 // string msg = format("Socket error on read: fd=%d, code=%d, message: %s", 442 // this.handle, errno, this.errorMessage); 443 // debug errorf(msg); 444 // if (!isClosed()) 445 // errorOccurred(msg); 446 //} 447 } 448 449 override void onClose() { 450 bool lastConnectStatus = _isConnected; 451 super.onClose(); 452 if(lastConnectStatus) { 453 version (HUNT_IO_DEBUG) { 454 if (_writeQueue !is null && !_writeQueue.isEmpty) { 455 warningf("Some data has not been sent yet: fd=%d", this.handle); 456 } 457 } 458 version(HUNT_NET_DEBUG) { 459 infof("Closing a connection with: %s, fd=%d", this.remoteAddress, this.handle); 460 } 461 462 resetWriteStatus(); 463 _isConnected = false; 464 version (HUNT_IO_DEBUG) { 465 infof("Raising a event on a TCP stream [%s] is down: fd=%d", 466 this.remoteAddress.toString(), this.handle); 467 } 468 469 if (closeHandler !is null) 470 closeHandler(); 471 } 472 } 473 474 }