1 /*
2  * Hunt - A refined core library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net/
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module hunt.io.TcpStream;
13 
14 import hunt.io.channel.Common;
15 import hunt.io.TcpStreamOptions;
16 import hunt.io.IoError;
17 
18 import hunt.io.ByteBuffer;
19 import hunt.io.BufferUtils;
20 import hunt.event.selector.Selector;
21 import hunt.io.SimpleQueue;
22 import hunt.event;
23 import hunt.logging;
24 import hunt.Functions;
25 
26 import std.exception;
27 import std.format;
28 import std.socket;
29 import std.string;
30 
31 import core.atomic;
32 import core.stdc.errno;
33 import core.thread;
34 import core.time;
35 
36 version (HAVE_EPOLL) {
37     import core.sys.linux.netinet.tcp : TCP_KEEPCNT;
38 }
39 
40 
41 
42 /**
43  *
44  */
45 class TcpStream : AbstractStream {
46     SimpleEventHandler closeHandler;
47     protected shared bool _isConnected; // It's always true for server.
48 
49     private TcpStreamOptions _tcpOption;
50     private int retryCount = 0;
51 
52     // for client
53     this(Selector loop, TcpStreamOptions option = null, AddressFamily family = AddressFamily.INET) {
54         _isClient = true;
55         _isConnected = false;
56 
57         if (option is null)
58             _tcpOption = TcpStreamOptions.create();
59         else
60             _tcpOption = option;
61         this.socket = new Socket(family, SocketType.STREAM, ProtocolType.TCP);
62         super(loop, family, _tcpOption.bufferSize);
63         version(HUNT_IO_DEBUG) tracef("buffer size: %d bytes", _tcpOption.bufferSize);
64         
65 
66     }
67 
68     // for server
69     this(Selector loop, Socket socket, TcpStreamOptions option = null) {
70         if (option is null)
71             _tcpOption = TcpStreamOptions.create();
72         else
73             _tcpOption = option;
74         this.socket = socket;
75         super(loop, socket.addressFamily, _tcpOption.bufferSize);
76         _remoteAddress = socket.remoteAddress();
77         _localAddress = socket.localAddress();
78 
79         _isClient = false;
80         _isConnected = true;
81         setKeepalive();
82     }
83 
84     void options(TcpStreamOptions option) @property {
85         assert(option !is null);
86         this._tcpOption = option;
87     }
88 
89     TcpStreamOptions options() @property {
90         return this._tcpOption;
91     }
92 
93     override bool isBusy() {
94         return _isWritting;
95     }
96 
97     
98     override bool isClient() {
99         return _isClient;
100     }
101 
102     void connect(string hostname, ushort port) {
103         Address[] addresses = getAddress(hostname, port);
104         if(addresses is null) {
105             throw new SocketException("Can't resolve hostname: " ~ hostname);
106         }
107         Address selectedAddress;
108         foreach(Address addr; addresses) {
109             string ip = addr.toAddrString();
110             if(ip.startsWith("::")) // skip IPV6
111                 continue;
112             if(ip.length <= 16) {
113                 selectedAddress = addr;
114                 break;
115             }
116         }
117 
118         if(selectedAddress is null) {
119             warning("No IPV4 avaliable");
120             selectedAddress = addresses[0];
121         }
122         version(HUNT_IO_DEBUG) {
123             infof("connecting with: hostname=%s, ip=%s, port=%d ", hostname, selectedAddress.toAddrString(), port);
124         }
125         connect(selectedAddress); // always select the first one.
126     }
127 
128     void connect(Address addr) {
129         if (_isConnected)
130             return;
131 
132         _remoteAddress = addr;
133         import std.parallelism;
134 
135         version(HUNT_DEBUG) tracef("Try to connect to %s", addr);
136         // FIXME: Needing refactor or cleanup -@zhangxueping at 2021-10-12T10:02:52+08:00
137         // The task will not run when the concurrent number is over 15;
138         // auto connectionTask = task(&doConnect, addr);
139         // taskPool.put(connectionTask);
140         doConnect(addr);
141     }
142 
143     void reconnect() {
144         if (!_isClient) {
145             throw new Exception("Only client can call this method.");
146         }
147 
148         if (_isConnected || retryCount >= _tcpOption.retryTimes)
149             return;
150 
151         retryCount++;
152         _isConnected = false;
153         this.socket = new Socket(this._family, SocketType.STREAM, ProtocolType.TCP);
154 
155         version (HUNT_DEBUG)
156             tracef("reconnecting %d...", retryCount);
157         connect(_remoteAddress);
158     }
159 
160     protected override bool doConnect(Address addr)  {
161         if(!this.socket.isAlive) {
162             warning("socket is not ready.");
163             return false;
164         }
165 
166         try {
167             version (HUNT_DEBUG)
168                 tracef("Connecting to %s...", addr);
169             version (HAVE_IOCP) {
170                 this.socket.blocking = false;
171                 start();
172                 if(super.doConnect(addr)) {
173                     this.socket.blocking = false;
174                     setKeepalive();
175                     _localAddress = this.socket.localAddress();
176                     _isConnected = true;
177                 } else {
178                     errorOccurred(ErrorCode.CONNECTIONEFUSED,"Connection refused");
179                     _isConnected = false;
180                 }
181             } else {
182                 this.socket.blocking = true;
183                 if(super.doConnect(addr)) {
184                     this.socket.blocking = false;
185                     setKeepalive();
186                     _localAddress = this.socket.localAddress();
187                     start();
188                     _isConnected = true;
189                 } else {
190                     errorOccurred(ErrorCode.CONNECTIONEFUSED,"Connection refused");
191                     _isConnected = false;
192                 }
193             }
194         } catch (Throwable ex) {
195             // Must try the best to catch all the exceptions, because it will be executed in another thread.
196             debug warning(ex.msg);
197             version(HUNT_DEBUG) warning(ex);
198             errorOccurred(ErrorCode.CONNECTIONEFUSED,"Connection refused");
199             _isConnected = false;
200         } 
201 
202         if (_connectionHandler !is null) {
203             try {
204                 _connectionHandler(_isConnected);
205             } catch(Throwable ex) {
206                 debug warning(ex.msg);
207                 version(HUNT_DEBUG) warning(ex);
208             }
209         }
210         return true;
211     }
212 
213     // www.tldp.org/HOWTO/html_single/TCP-Keepalive-HOWTO/
214     // http://www.importnew.com/27624.html
215     private void setKeepalive() {
216         version(HUNT_DEBUG) {
217             infof("isKeepalive: %s, keepaliveTime: %d seconds, Interval: %d seconds", 
218                 _tcpOption.isKeepalive, _tcpOption.keepaliveTime, _tcpOption.keepaliveInterval);
219         }
220 
221         version (HAVE_EPOLL) {
222             if (_tcpOption.isKeepalive) {
223                 this.socket.setKeepAlive(_tcpOption.keepaliveTime, _tcpOption.keepaliveInterval);
224                 this.setOption(SocketOptionLevel.TCP,
225                         cast(SocketOption) TCP_KEEPCNT, _tcpOption.keepaliveProbes);
226                 // version (HUNT_DEBUG) checkKeepAlive();
227             }
228         } else version (HAVE_IOCP) {
229             if (_tcpOption.isKeepalive) {
230                 this.socket.setKeepAlive(_tcpOption.keepaliveTime, _tcpOption.keepaliveInterval);
231                 // this.setOption(SocketOptionLevel.TCP, cast(SocketOption) TCP_KEEPCNT,
232                 //     _tcpOption.keepaliveProbes);
233                 // version (HUNT_DEBUG) checkKeepAlive();
234             }
235         }
236     }
237 
238     version (HUNT_DEBUG) private void checkKeepAlive() {
239         version (HAVE_EPOLL) {
240             int time;
241             int ret1 = getOption(SocketOptionLevel.TCP, cast(SocketOption) TCP_KEEPIDLE, time);
242             tracef("ret=%d, time=%d", ret1, time);
243 
244             int interval;
245             int ret2 = getOption(SocketOptionLevel.TCP, cast(SocketOption) TCP_KEEPINTVL, interval);
246             tracef("ret=%d, interval=%d", ret2, interval);
247 
248             int isKeep;
249             int ret3 = getOption(SocketOptionLevel.SOCKET, SocketOption.KEEPALIVE, isKeep);
250             tracef("ret=%d, keepalive=%s", ret3, isKeep == 1);
251 
252             int probe;
253             int ret4 = getOption(SocketOptionLevel.TCP, cast(SocketOption) TCP_KEEPCNT, probe);
254             tracef("ret=%d, interval=%d", ret4, probe);
255         }
256     }
257 
258     TcpStream connected(ConnectionHandler handler) {
259         _connectionHandler = handler;
260         return this;
261     }
262 
263     TcpStream received(DataReceivedHandler handler) {
264         dataReceivedHandler = handler;
265         return this;
266     }
267 
268     TcpStream writed(SimpleActionHandler handler) {
269         dataWriteDoneHandler = handler;
270         return this;
271     }
272     alias onWritten = writed;
273 
274     TcpStream closed(SimpleEventHandler handler) {
275         closeHandler = handler;
276         return this;
277     }
278 
279     TcpStream disconnected(SimpleEventHandler handler) {
280         disconnectionHandler = handler;
281         return this;
282     }
283 
284     TcpStream error(ErrorEventHandler handler) {
285         errorHandler = handler;
286         return this;
287     }
288 
289     override bool isConnected() nothrow {
290         return _isConnected;
291     }
292 
293     override void start() {
294         if (_isRegistered)
295             return;
296         _inLoop.register(this);
297         _isRegistered = true;
298         version (HAVE_IOCP)
299         {
300         //    this.beginRead();
301         }
302     }
303 
304     void write(ByteBuffer buffer) {
305         assert(buffer !is null);
306 
307         if (!_isConnected) {
308             throw new Exception(format("The connection %s closed!",
309                 this.remoteAddress.toString()));
310         }
311 
312         version (HUNT_IO_DEBUG)
313             infof("data buffered (%d bytes): fd=%d", buffer.limit(), this.handle);
314         _isWritting = true;
315         initializeWriteQueue();
316         _writeQueue.enqueue(buffer);
317         onWrite();
318     }
319 
320     /**
321      * 
322      */
323     void write(const(ubyte)[] data) {
324 
325         version (HUNT_IO_DEBUG_MORE) {
326             infof("%d bytes(fd=%d): %(%02X %)", data.length, this.handle, data[0 .. $]);
327         } else  version (HUNT_IO_DEBUG) {
328             if (data.length <= 32)
329                 infof("%d bytes(fd=%d): %(%02X %)", data.length, this.handle, data[0 .. $]);
330             else
331                 infof("%d bytes(fd=%d): %(%02X %)", data.length, this.handle, data[0 .. 32]);
332         }
333 
334         if(data is null) {
335             version(HUNT_DEBUG) {
336                 warning("Writting a empty data on connection %s.", this.remoteAddress.toString());
337             }
338             return;
339         }
340 
341         if (!_isConnected) {
342             string msg = format("The connection %s is closed! Writting cancelled.", this.remoteAddress.toString());
343             throw new Exception(msg);
344         }
345 
346         version (HAVE_IOCP) {
347             return write(BufferUtils.toBuffer(cast(byte[])data));
348         } else {
349 
350             if (_writeQueue is null || (_writeQueue.isEmpty()) && !_isWritting) {
351                 _isWritting = true;
352                 const(ubyte)[] d = data;
353 
354                 // while (!isClosing() && !_isWriteCancelling && d.length > 0) {
355                 while(d !is null) {
356                     if(isWriteCancelling()) {
357                         _errorMessage = format("The connection %s is cancelled!", this.remoteAddress.toString());
358                         _error = true;
359                         warningf(_errorMessage);
360                         throw new Exception(_errorMessage);
361                         // throw new IoError(ErrorCode.INTERRUPTED, _errorMessage);
362                         // break;
363                     }
364 
365                     if(isClosing() || isClosed()) {
366                         _errorMessage= format("The connection %s is closing or closed!", this.remoteAddress.toString());
367                         _error = true;
368                         warningf("%s, %s", isClosing(), isClosed());
369                         throw new Exception(_errorMessage); 
370                         // throw new IoError(ErrorCode.CONNECTIONABORTED, _errorMessage);
371                         // break;
372                     }
373 
374                     version (HUNT_IO_DEBUG)
375                         infof("to write directly %d bytes, fd=%d", d.length, this.handle);
376                     size_t nBytes = tryWrite(d);
377 
378                     if (nBytes == d.length) {
379                         version (HUNT_IO_DEBUG)
380                             tracef("write all out at once: %d / %d bytes, fd=%d", nBytes, d.length, this.handle);
381                         checkAllWriteDone();
382                         break;
383                     } else if (nBytes > 0) {
384                         version (HUNT_IO_DEBUG)
385                             tracef("write out partly: %d / %d bytes, fd=%d", nBytes, d.length, this.handle);
386                         d = d[nBytes .. $];
387                     } else {
388                         version (HUNT_IO_DEBUG)
389                             warningf("buffering data: %d bytes, fd=%d", d.length, this.handle);
390                         initializeWriteQueue();
391                         _writeQueue.enqueue(BufferUtils.toBuffer(cast(byte[]) d));
392                         break;
393                     }
394                 }
395             } else {
396                 write(BufferUtils.toBuffer(cast(byte[]) data));
397             }
398         }
399     }
400 
401     void shutdownInput() {
402         this.socket.shutdown(SocketShutdown.RECEIVE);
403     }
404 
405     void shutdownOutput() {
406         this.socket.shutdown(SocketShutdown.SEND);
407     }
408 
409     override protected void onDisconnected() {
410         version(HUNT_DEBUG) {
411             infof("peer disconnected: fd=%d", this.handle);
412         }
413         if (disconnectionHandler !is null)
414             disconnectionHandler();
415 
416         this.close();
417     }
418 
419 protected:
420     bool _isClient;
421     ConnectionHandler _connectionHandler;
422 
423     override void onRead() {
424         version (HUNT_IO_DEBUG)
425             trace("start to read");
426 
427         version (Posix) {
428             while (!_isClosed && !tryRead()) {
429                 version (HUNT_IO_DEBUG)
430                     trace("continue reading...");
431             }
432         } else {
433             if (!_isClosed)
434             {
435                 doRead();
436             }
437 
438         }
439 
440         //if (this.isError) {
441         //    string msg = format("Socket error on read: fd=%d, code=%d, message: %s",
442         //            this.handle, errno, this.errorMessage);
443         //    debug errorf(msg);
444         //    if (!isClosed())
445         //        errorOccurred(msg);
446         //}
447     }
448 
449     override void onClose() {
450         bool lastConnectStatus = _isConnected;
451         super.onClose();
452         if(lastConnectStatus) {
453             version (HUNT_IO_DEBUG) {
454                 if (_writeQueue !is null && !_writeQueue.isEmpty) {
455                     warningf("Some data has not been sent yet: fd=%d", this.handle);
456                 }
457             }
458             version(HUNT_NET_DEBUG) {
459                 infof("Closing a connection with: %s, fd=%d", this.remoteAddress, this.handle);
460             }
461 
462             resetWriteStatus();
463             _isConnected = false;
464             version (HUNT_IO_DEBUG) {
465                 infof("Raising a event on a TCP stream [%s] is down: fd=%d", 
466                     this.remoteAddress.toString(), this.handle);
467             }
468 
469             if (closeHandler !is null)
470                 closeHandler();
471         }
472     }
473 
474 }