1 /* 2 * Hunt - A refined core library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.io.TcpStream; 13 14 import hunt.io.channel.Common; 15 import hunt.io.TcpStreamOptions; 16 import hunt.io.IoError; 17 18 import hunt.collection.ByteBuffer; 19 import hunt.collection.BufferUtils; 20 import hunt.event.selector.Selector; 21 import hunt.concurrency.SimpleQueue; 22 import hunt.event; 23 import hunt.logging.ConsoleLogger; 24 import hunt.Functions; 25 26 import std.exception; 27 import std.format; 28 import std.socket; 29 import std.string; 30 31 import core.atomic; 32 import core.stdc.errno; 33 import core.thread; 34 import core.time; 35 36 version (HAVE_EPOLL) { 37 import core.sys.linux.netinet.tcp : TCP_KEEPCNT; 38 } 39 40 41 42 /** 43 * 44 */ 45 class TcpStream : AbstractStream { 46 SimpleEventHandler closeHandler; 47 protected shared bool _isConnected; // It's always true for server. 48 49 private TcpStreamOptions _tcpOption; 50 private int retryCount = 0; 51 52 // for client 53 this(Selector loop, TcpStreamOptions option = null, AddressFamily family = AddressFamily.INET) { 54 _isClient = true; 55 _isConnected = false; 56 57 if (option is null) 58 _tcpOption = TcpStreamOptions.create(); 59 else 60 _tcpOption = option; 61 this.socket = new Socket(family, SocketType.STREAM, ProtocolType.TCP); 62 super(loop, family, _tcpOption.bufferSize); 63 version(HUNT_IO_DEBUG) tracef("buffer size: %d bytes", _tcpOption.bufferSize); 64 65 66 } 67 68 // for server 69 this(Selector loop, Socket socket, TcpStreamOptions option = null) { 70 if (option is null) 71 _tcpOption = TcpStreamOptions.create(); 72 else 73 _tcpOption = option; 74 this.socket = socket; 75 super(loop, socket.addressFamily, _tcpOption.bufferSize); 76 _remoteAddress = socket.remoteAddress(); 77 _localAddress = socket.localAddress(); 78 79 _isClient = false; 80 _isConnected = true; 81 setKeepalive(); 82 } 83 84 void options(TcpStreamOptions option) @property { 85 assert(option !is null); 86 this._tcpOption = option; 87 } 88 89 TcpStreamOptions options() @property { 90 return this._tcpOption; 91 } 92 93 override bool isBusy() { 94 return _isWritting; 95 } 96 97 98 override bool isClient() { 99 return _isClient; 100 } 101 102 void connect(string hostname, ushort port) { 103 Address[] addresses = getAddress(hostname, port); 104 if(addresses is null) { 105 throw new SocketException("Can't resolve hostname: " ~ hostname); 106 } 107 Address selectedAddress; 108 foreach(Address addr; addresses) { 109 string ip = addr.toAddrString(); 110 if(ip.startsWith("::")) // skip IPV6 111 continue; 112 if(ip.length <= 16) { 113 selectedAddress = addr; 114 break; 115 } 116 } 117 118 if(selectedAddress is null) { 119 warning("No IPV4 avaliable"); 120 selectedAddress = addresses[0]; 121 } 122 version(HUNT_IO_DEBUG) { 123 infof("connecting with: hostname=%s, ip=%s, port=%d ", hostname, selectedAddress.toAddrString(), port); 124 } 125 connect(selectedAddress); // always select the first one. 126 } 127 128 void connect(Address addr) { 129 if (_isConnected) 130 return; 131 132 _remoteAddress = addr; 133 import std.parallelism; 134 135 auto connectionTask = task(&doConnect, addr); 136 taskPool.put(connectionTask); 137 // doConnect(addr); 138 } 139 140 void reconnect() { 141 if (!_isClient) { 142 throw new Exception("Only client can call this method."); 143 } 144 145 if (_isConnected || retryCount >= _tcpOption.retryTimes) 146 return; 147 148 retryCount++; 149 _isConnected = false; 150 this.socket = new Socket(this._family, SocketType.STREAM, ProtocolType.TCP); 151 152 version (HUNT_DEBUG) 153 tracef("reconnecting %d...", retryCount); 154 connect(_remoteAddress); 155 } 156 157 protected override bool doConnect(Address addr) { 158 try { 159 version (HUNT_DEBUG) 160 tracef("Connecting to %s...", addr); 161 // Address binded = createAddress(this.socket.addressFamily); 162 // this.socket.bind(binded); 163 this.socket.blocking = false; 164 version (HAVE_IOCP) { 165 start(); 166 if(super.doConnect(addr)) { 167 this.socket.blocking = false; 168 setKeepalive(); 169 _localAddress = this.socket.localAddress(); 170 _isConnected = true; 171 } else { 172 errorOccurred(ErrorCode.CONNECTIONEFUSED,"Connection refused"); 173 _isConnected = false; 174 } 175 } else { 176 if(super.doConnect(addr)) { 177 this.socket.blocking = false; 178 setKeepalive(); 179 _localAddress = this.socket.localAddress(); 180 start(); 181 _isConnected = true; 182 } else { 183 errorOccurred(ErrorCode.CONNECTIONEFUSED,"Connection refused"); 184 _isConnected = false; 185 } 186 } 187 } catch (Throwable ex) { 188 // Must try the best to catch all the exceptions, because it will be executed in another thread. 189 debug warning(ex.msg); 190 version(HUNT_DEBUG) warning(ex); 191 errorOccurred(ErrorCode.CONNECTIONEFUSED,"Connection refused"); 192 _isConnected = false; 193 } 194 195 if (_connectionHandler !is null) { 196 try { 197 _connectionHandler(_isConnected); 198 } catch(Throwable ex) { 199 debug warning(ex.msg); 200 version(HUNT_DEBUG) warning(ex); 201 } 202 } 203 return true; 204 } 205 206 // www.tldp.org/HOWTO/html_single/TCP-Keepalive-HOWTO/ 207 // http://www.importnew.com/27624.html 208 private void setKeepalive() { 209 version (HAVE_EPOLL) { 210 if (_tcpOption.isKeepalive) { 211 this.socket.setKeepAlive(_tcpOption.keepaliveTime, _tcpOption.keepaliveInterval); 212 this.setOption(SocketOptionLevel.TCP, 213 cast(SocketOption) TCP_KEEPCNT, _tcpOption.keepaliveProbes); 214 // version (HUNT_DEBUG) checkKeepAlive(); 215 } 216 } else version (HAVE_IOCP) { 217 if (_tcpOption.isKeepalive) { 218 this.socket.setKeepAlive(_tcpOption.keepaliveTime, _tcpOption.keepaliveInterval); 219 // this.setOption(SocketOptionLevel.TCP, cast(SocketOption) TCP_KEEPCNT, 220 // _tcpOption.keepaliveProbes); 221 // version (HUNT_DEBUG) checkKeepAlive(); 222 } 223 } 224 } 225 226 version (HUNT_DEBUG) private void checkKeepAlive() { 227 version (HAVE_EPOLL) { 228 int time; 229 int ret1 = getOption(SocketOptionLevel.TCP, cast(SocketOption) TCP_KEEPIDLE, time); 230 tracef("ret=%d, time=%d", ret1, time); 231 232 int interval; 233 int ret2 = getOption(SocketOptionLevel.TCP, cast(SocketOption) TCP_KEEPINTVL, interval); 234 tracef("ret=%d, interval=%d", ret2, interval); 235 236 int isKeep; 237 int ret3 = getOption(SocketOptionLevel.SOCKET, SocketOption.KEEPALIVE, isKeep); 238 tracef("ret=%d, keepalive=%s", ret3, isKeep == 1); 239 240 int probe; 241 int ret4 = getOption(SocketOptionLevel.TCP, cast(SocketOption) TCP_KEEPCNT, probe); 242 tracef("ret=%d, interval=%d", ret4, probe); 243 } 244 } 245 246 TcpStream connected(ConnectionHandler handler) { 247 _connectionHandler = handler; 248 return this; 249 } 250 251 TcpStream received(DataReceivedHandler handler) { 252 dataReceivedHandler = handler; 253 return this; 254 } 255 256 TcpStream writed(SimpleActionHandler handler) { 257 dataWriteDoneHandler = handler; 258 return this; 259 } 260 alias onWritten = writed; 261 262 TcpStream closed(SimpleEventHandler handler) { 263 closeHandler = handler; 264 return this; 265 } 266 267 TcpStream disconnected(SimpleEventHandler handler) { 268 disconnectionHandler = handler; 269 return this; 270 } 271 272 TcpStream error(ErrorEventHandler handler) { 273 errorHandler = handler; 274 return this; 275 } 276 277 override bool isConnected() nothrow { 278 return _isConnected; 279 } 280 281 override void start() { 282 if (_isRegistered) 283 return; 284 _inLoop.register(this); 285 _isRegistered = true; 286 version (HAVE_IOCP) 287 { 288 // this.beginRead(); 289 } 290 } 291 292 void write(ByteBuffer buffer) { 293 assert(buffer !is null); 294 295 if (!_isConnected) { 296 throw new Exception(format("The connection is down! remote: %s", 297 this.remoteAddress.toString())); 298 } 299 300 version (HUNT_IO_DEBUG) 301 infof("data buffered (%d bytes): fd=%d", buffer.limit(), this.handle); 302 _isWritting = true; 303 initializeWriteQueue(); 304 _writeQueue.enqueue(buffer); 305 onWrite(); 306 } 307 308 /** 309 */ 310 void write(const(ubyte)[] data) { 311 if (data.length == 0 || !_isConnected) 312 return; 313 314 if (!_isConnected) { 315 throw new Exception("The connection is down!"); 316 } 317 version (HAVE_IOCP) 318 { 319 return write(BufferUtils.toBuffer(cast(byte[])data)); 320 } else 321 { 322 version (HUNT_IO_DEBUG_MORE) { 323 infof("%d bytes(fd=%d): %(%02X %)", data.length, this.handle, data[0 .. $]); 324 } else version (HUNT_IO_DEBUG) { 325 if (data.length <= 32) 326 infof("%d bytes(fd=%d): %(%02X %)", data.length, this.handle, data[0 .. $]); 327 else 328 infof("%d bytes(fd=%d): %(%02X %)", data.length, this.handle, data[0 .. 32]); 329 } 330 331 if (_writeQueue is null || (_writeQueue.isEmpty()) && !_isWritting) { 332 _isWritting = true; 333 const(ubyte)[] d = data; 334 335 while (!isClosing() && !isWriteCancelling && d.length > 0) { 336 version (HUNT_IO_DEBUG) 337 infof("to write directly %d bytes, fd=%d", d.length, this.handle); 338 size_t nBytes = tryWrite(d); 339 340 if (nBytes == d.length) { 341 version (HUNT_IO_DEBUG) 342 tracef("write all out at once: %d / %d bytes, fd=%d", nBytes, d.length, this.handle); 343 checkAllWriteDone(); 344 break; 345 } else if (nBytes > 0) { 346 version (HUNT_IO_DEBUG) 347 tracef("write out partly: %d / %d bytes, fd=%d", nBytes, d.length, this.handle); 348 d = d[nBytes .. $]; 349 } else { 350 version (HUNT_IO_DEBUG) 351 warningf("buffering data: %d bytes, fd=%d", d.length, this.handle); 352 initializeWriteQueue(); 353 _writeQueue.enqueue(BufferUtils.toBuffer(cast(byte[]) d)); 354 break; 355 } 356 } 357 } else { 358 write(BufferUtils.toBuffer(cast(byte[]) data)); 359 } 360 } 361 } 362 363 void shutdownInput() { 364 this.socket.shutdown(SocketShutdown.RECEIVE); 365 } 366 367 void shutdownOutput() { 368 this.socket.shutdown(SocketShutdown.SEND); 369 } 370 371 override protected void onDisconnected() { 372 version(HUNT_DEBUG) { 373 infof("peer disconnected: fd=%d", this.handle); 374 } 375 if (disconnectionHandler !is null) 376 disconnectionHandler(); 377 378 this.close(); 379 } 380 381 protected: 382 bool _isClient; 383 ConnectionHandler _connectionHandler; 384 385 override void onRead() { 386 version (HUNT_IO_DEBUG) 387 trace("start to read"); 388 389 version (Posix) { 390 while (!_isClosed && !tryRead()) { 391 version (HUNT_IO_DEBUG) 392 trace("continue reading..."); 393 } 394 } else { 395 if (!_isClosed) 396 { 397 doRead(); 398 } 399 400 } 401 402 //if (this.isError) { 403 // string msg = format("Socket error on read: fd=%d, code=%d, message: %s", 404 // this.handle, errno, this.errorMessage); 405 // debug errorf(msg); 406 // if (!isClosed()) 407 // errorOccurred(msg); 408 //} 409 } 410 411 override void onClose() { 412 bool lastConnectStatus = _isConnected; 413 super.onClose(); 414 if(lastConnectStatus) { 415 version (HUNT_IO_DEBUG) { 416 if (_writeQueue !is null && !_writeQueue.isEmpty) { 417 warningf("Some data has not been sent yet: fd=%d", this.handle); 418 } 419 infof("Closing a connection with: %s, fd=%d", this.remoteAddress, this.handle); 420 } 421 422 resetWriteStatus(); 423 _isConnected = false; 424 version (HUNT_IO_DEBUG) 425 infof("notifying TCP stream down: fd=%d", this.handle); 426 if (closeHandler !is null) 427 closeHandler(); 428 } 429 430 } 431 432 }