1 /* 2 * Hunt - A refined core library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.io.TcpStream; 13 14 import hunt.io.channel.Common; 15 import hunt.io.TcpStreamOptions; 16 import hunt.io.IoError; 17 18 import hunt.io.ByteBuffer; 19 import hunt.io.BufferUtils; 20 import hunt.event.selector.Selector; 21 import hunt.io.SimpleQueue; 22 import hunt.event; 23 import hunt.logging.ConsoleLogger; 24 import hunt.Functions; 25 26 import std.exception; 27 import std.format; 28 import std.socket; 29 import std.string; 30 31 import core.atomic; 32 import core.stdc.errno; 33 import core.thread; 34 import core.time; 35 36 version (HAVE_EPOLL) { 37 import core.sys.linux.netinet.tcp : TCP_KEEPCNT; 38 } 39 40 41 42 /** 43 * 44 */ 45 class TcpStream : AbstractStream { 46 SimpleEventHandler closeHandler; 47 protected shared bool _isConnected; // It's always true for server. 48 49 private TcpStreamOptions _tcpOption; 50 private int retryCount = 0; 51 52 // for client 53 this(Selector loop, TcpStreamOptions option = null, AddressFamily family = AddressFamily.INET) { 54 _isClient = true; 55 _isConnected = false; 56 57 if (option is null) 58 _tcpOption = TcpStreamOptions.create(); 59 else 60 _tcpOption = option; 61 this.socket = new Socket(family, SocketType.STREAM, ProtocolType.TCP); 62 super(loop, family, _tcpOption.bufferSize); 63 version(HUNT_IO_DEBUG) tracef("buffer size: %d bytes", _tcpOption.bufferSize); 64 65 66 } 67 68 // for server 69 this(Selector loop, Socket socket, TcpStreamOptions option = null) { 70 if (option is null) 71 _tcpOption = TcpStreamOptions.create(); 72 else 73 _tcpOption = option; 74 this.socket = socket; 75 super(loop, socket.addressFamily, _tcpOption.bufferSize); 76 _remoteAddress = socket.remoteAddress(); 77 _localAddress = socket.localAddress(); 78 79 _isClient = false; 80 _isConnected = true; 81 setKeepalive(); 82 } 83 84 void options(TcpStreamOptions option) @property { 85 assert(option !is null); 86 this._tcpOption = option; 87 } 88 89 TcpStreamOptions options() @property { 90 return this._tcpOption; 91 } 92 93 override bool isBusy() { 94 return _isWritting; 95 } 96 97 98 override bool isClient() { 99 return _isClient; 100 } 101 102 void connect(string hostname, ushort port) { 103 Address[] addresses = getAddress(hostname, port); 104 if(addresses is null) { 105 throw new SocketException("Can't resolve hostname: " ~ hostname); 106 } 107 Address selectedAddress; 108 foreach(Address addr; addresses) { 109 string ip = addr.toAddrString(); 110 if(ip.startsWith("::")) // skip IPV6 111 continue; 112 if(ip.length <= 16) { 113 selectedAddress = addr; 114 break; 115 } 116 } 117 118 if(selectedAddress is null) { 119 warning("No IPV4 avaliable"); 120 selectedAddress = addresses[0]; 121 } 122 version(HUNT_IO_DEBUG) { 123 infof("connecting with: hostname=%s, ip=%s, port=%d ", hostname, selectedAddress.toAddrString(), port); 124 } 125 connect(selectedAddress); // always select the first one. 126 } 127 128 void connect(Address addr) { 129 if (_isConnected) 130 return; 131 132 _remoteAddress = addr; 133 import std.parallelism; 134 135 auto connectionTask = task(&doConnect, addr); 136 taskPool.put(connectionTask); 137 // doConnect(addr); 138 } 139 140 void reconnect() { 141 if (!_isClient) { 142 throw new Exception("Only client can call this method."); 143 } 144 145 if (_isConnected || retryCount >= _tcpOption.retryTimes) 146 return; 147 148 retryCount++; 149 _isConnected = false; 150 this.socket = new Socket(this._family, SocketType.STREAM, ProtocolType.TCP); 151 152 version (HUNT_DEBUG) 153 tracef("reconnecting %d...", retryCount); 154 connect(_remoteAddress); 155 } 156 157 protected override bool doConnect(Address addr) { 158 try { 159 version (HUNT_DEBUG) 160 tracef("Connecting to %s...", addr); 161 // Address binded = createAddress(this.socket.addressFamily); 162 // this.socket.bind(binded); 163 version (HAVE_IOCP) { 164 this.socket.blocking = false; 165 start(); 166 if(super.doConnect(addr)) { 167 this.socket.blocking = false; 168 setKeepalive(); 169 _localAddress = this.socket.localAddress(); 170 _isConnected = true; 171 } else { 172 errorOccurred(ErrorCode.CONNECTIONEFUSED,"Connection refused"); 173 _isConnected = false; 174 } 175 } else { 176 this.socket.blocking = true; 177 if(super.doConnect(addr)) { 178 this.socket.blocking = false; 179 setKeepalive(); 180 _localAddress = this.socket.localAddress(); 181 start(); 182 _isConnected = true; 183 } else { 184 errorOccurred(ErrorCode.CONNECTIONEFUSED,"Connection refused"); 185 _isConnected = false; 186 } 187 } 188 } catch (Throwable ex) { 189 // Must try the best to catch all the exceptions, because it will be executed in another thread. 190 debug warning(ex.msg); 191 version(HUNT_DEBUG) warning(ex); 192 errorOccurred(ErrorCode.CONNECTIONEFUSED,"Connection refused"); 193 _isConnected = false; 194 } 195 196 if (_connectionHandler !is null) { 197 try { 198 _connectionHandler(_isConnected); 199 } catch(Throwable ex) { 200 debug warning(ex.msg); 201 version(HUNT_DEBUG) warning(ex); 202 } 203 } 204 return true; 205 } 206 207 // www.tldp.org/HOWTO/html_single/TCP-Keepalive-HOWTO/ 208 // http://www.importnew.com/27624.html 209 private void setKeepalive() { 210 version (HAVE_EPOLL) { 211 if (_tcpOption.isKeepalive) { 212 this.socket.setKeepAlive(_tcpOption.keepaliveTime, _tcpOption.keepaliveInterval); 213 this.setOption(SocketOptionLevel.TCP, 214 cast(SocketOption) TCP_KEEPCNT, _tcpOption.keepaliveProbes); 215 // version (HUNT_DEBUG) checkKeepAlive(); 216 } 217 } else version (HAVE_IOCP) { 218 if (_tcpOption.isKeepalive) { 219 this.socket.setKeepAlive(_tcpOption.keepaliveTime, _tcpOption.keepaliveInterval); 220 // this.setOption(SocketOptionLevel.TCP, cast(SocketOption) TCP_KEEPCNT, 221 // _tcpOption.keepaliveProbes); 222 // version (HUNT_DEBUG) checkKeepAlive(); 223 } 224 } 225 } 226 227 version (HUNT_DEBUG) private void checkKeepAlive() { 228 version (HAVE_EPOLL) { 229 int time; 230 int ret1 = getOption(SocketOptionLevel.TCP, cast(SocketOption) TCP_KEEPIDLE, time); 231 tracef("ret=%d, time=%d", ret1, time); 232 233 int interval; 234 int ret2 = getOption(SocketOptionLevel.TCP, cast(SocketOption) TCP_KEEPINTVL, interval); 235 tracef("ret=%d, interval=%d", ret2, interval); 236 237 int isKeep; 238 int ret3 = getOption(SocketOptionLevel.SOCKET, SocketOption.KEEPALIVE, isKeep); 239 tracef("ret=%d, keepalive=%s", ret3, isKeep == 1); 240 241 int probe; 242 int ret4 = getOption(SocketOptionLevel.TCP, cast(SocketOption) TCP_KEEPCNT, probe); 243 tracef("ret=%d, interval=%d", ret4, probe); 244 } 245 } 246 247 TcpStream connected(ConnectionHandler handler) { 248 _connectionHandler = handler; 249 return this; 250 } 251 252 TcpStream received(DataReceivedHandler handler) { 253 dataReceivedHandler = handler; 254 return this; 255 } 256 257 TcpStream writed(SimpleActionHandler handler) { 258 dataWriteDoneHandler = handler; 259 return this; 260 } 261 alias onWritten = writed; 262 263 TcpStream closed(SimpleEventHandler handler) { 264 closeHandler = handler; 265 return this; 266 } 267 268 TcpStream disconnected(SimpleEventHandler handler) { 269 disconnectionHandler = handler; 270 return this; 271 } 272 273 TcpStream error(ErrorEventHandler handler) { 274 errorHandler = handler; 275 return this; 276 } 277 278 override bool isConnected() nothrow { 279 return _isConnected; 280 } 281 282 override void start() { 283 if (_isRegistered) 284 return; 285 _inLoop.register(this); 286 _isRegistered = true; 287 version (HAVE_IOCP) 288 { 289 // this.beginRead(); 290 } 291 } 292 293 void write(ByteBuffer buffer) { 294 assert(buffer !is null); 295 296 if (!_isConnected) { 297 throw new Exception(format("The connection %s closed!", 298 this.remoteAddress.toString())); 299 } 300 301 version (HUNT_IO_DEBUG) 302 infof("data buffered (%d bytes): fd=%d", buffer.limit(), this.handle); 303 _isWritting = true; 304 initializeWriteQueue(); 305 _writeQueue.enqueue(buffer); 306 onWrite(); 307 } 308 309 /** 310 * 311 */ 312 void write(const(ubyte)[] data) { 313 314 version (HUNT_IO_DEBUG_MORE) { 315 infof("%d bytes(fd=%d): %(%02X %)", data.length, this.handle, data[0 .. $]); 316 } else version (HUNT_IO_DEBUG) { 317 if (data.length <= 32) 318 infof("%d bytes(fd=%d): %(%02X %)", data.length, this.handle, data[0 .. $]); 319 else 320 infof("%d bytes(fd=%d): %(%02X %)", data.length, this.handle, data[0 .. 32]); 321 } 322 323 if(data is null) { 324 version(HUNT_DEBUG) { 325 warning("Writting a empty data on connection %s.", this.remoteAddress.toString()); 326 } 327 return; 328 } 329 330 if (!_isConnected) { 331 string msg = format("The connection %s is closed!", this.remoteAddress.toString()); 332 throw new Exception(msg); 333 } 334 335 version (HAVE_IOCP) { 336 return write(BufferUtils.toBuffer(cast(byte[])data)); 337 } else { 338 339 if (_writeQueue is null || (_writeQueue.isEmpty()) && !_isWritting) { 340 _isWritting = true; 341 const(ubyte)[] d = data; 342 343 // while (!isClosing() && !_isWriteCancelling && d.length > 0) { 344 while(d !is null) { 345 if(isWriteCancelling()) { 346 _errorMessage = format("The connection %s is cancelled!", this.remoteAddress.toString()); 347 _error = true; 348 warningf(_errorMessage); 349 throw new Exception(_errorMessage); 350 // break; 351 } 352 353 if(isClosing() || isClosed()) { 354 _errorMessage= format("The connection %s is closing or closed!", this.remoteAddress.toString()); 355 _error = true; 356 warningf("%s, %s", isClosing(), isClosed()); 357 throw new Exception(_errorMessage); 358 // break; 359 } 360 361 version (HUNT_IO_DEBUG) 362 infof("to write directly %d bytes, fd=%d", d.length, this.handle); 363 size_t nBytes = tryWrite(d); 364 365 if (nBytes == d.length) { 366 version (HUNT_IO_DEBUG) 367 tracef("write all out at once: %d / %d bytes, fd=%d", nBytes, d.length, this.handle); 368 checkAllWriteDone(); 369 break; 370 } else if (nBytes > 0) { 371 version (HUNT_IO_DEBUG) 372 tracef("write out partly: %d / %d bytes, fd=%d", nBytes, d.length, this.handle); 373 d = d[nBytes .. $]; 374 } else { 375 version (HUNT_IO_DEBUG) 376 warningf("buffering data: %d bytes, fd=%d", d.length, this.handle); 377 initializeWriteQueue(); 378 _writeQueue.enqueue(BufferUtils.toBuffer(cast(byte[]) d)); 379 break; 380 } 381 } 382 } else { 383 write(BufferUtils.toBuffer(cast(byte[]) data)); 384 } 385 } 386 } 387 388 void shutdownInput() { 389 this.socket.shutdown(SocketShutdown.RECEIVE); 390 } 391 392 void shutdownOutput() { 393 this.socket.shutdown(SocketShutdown.SEND); 394 } 395 396 override protected void onDisconnected() { 397 version(HUNT_DEBUG) { 398 infof("peer disconnected: fd=%d", this.handle); 399 } 400 if (disconnectionHandler !is null) 401 disconnectionHandler(); 402 403 this.close(); 404 } 405 406 protected: 407 bool _isClient; 408 ConnectionHandler _connectionHandler; 409 410 override void onRead() { 411 version (HUNT_IO_DEBUG) 412 trace("start to read"); 413 414 version (Posix) { 415 while (!_isClosed && !tryRead()) { 416 version (HUNT_IO_DEBUG) 417 trace("continue reading..."); 418 } 419 } else { 420 if (!_isClosed) 421 { 422 doRead(); 423 } 424 425 } 426 427 //if (this.isError) { 428 // string msg = format("Socket error on read: fd=%d, code=%d, message: %s", 429 // this.handle, errno, this.errorMessage); 430 // debug errorf(msg); 431 // if (!isClosed()) 432 // errorOccurred(msg); 433 //} 434 } 435 436 override void onClose() { 437 bool lastConnectStatus = _isConnected; 438 super.onClose(); 439 if(lastConnectStatus) { 440 version (HUNT_IO_DEBUG) { 441 if (_writeQueue !is null && !_writeQueue.isEmpty) { 442 warningf("Some data has not been sent yet: fd=%d", this.handle); 443 } 444 } 445 version(HUNT_DEBUG) { 446 infof("Closing a connection with: %s, fd=%d", this.remoteAddress, this.handle); 447 } 448 449 resetWriteStatus(); 450 _isConnected = false; 451 version (HUNT_IO_DEBUG) { 452 infof("Raising a event on a TCP stream [%s] is down: fd=%d", 453 this.remoteAddress.toString(), this.handle); 454 } 455 456 if (closeHandler !is null) 457 closeHandler(); 458 } 459 } 460 461 }