1 /*
2  * Hunt - A refined core library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net/
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module hunt.io.TcpStream;
13 
14 import hunt.io.channel.Common;
15 import hunt.io.TcpStreamOptions;
16 import hunt.io.IoError;
17 
18 import hunt.io.ByteBuffer;
19 import hunt.io.BufferUtils;
20 import hunt.event.selector.Selector;
21 import hunt.io.SimpleQueue;
22 import hunt.event;
23 import hunt.logging.ConsoleLogger;
24 import hunt.Functions;
25 
26 import std.exception;
27 import std.format;
28 import std.socket;
29 import std.string;
30 
31 import core.atomic;
32 import core.stdc.errno;
33 import core.thread;
34 import core.time;
35 
36 version (HAVE_EPOLL) {
37     import core.sys.linux.netinet.tcp : TCP_KEEPCNT;
38 }
39 
40 
41 
42 /**
43  *
44  */
45 class TcpStream : AbstractStream {
46     SimpleEventHandler closeHandler;
47     protected shared bool _isConnected; // It's always true for server.
48 
49     private TcpStreamOptions _tcpOption;
50     private int retryCount = 0;
51 
52     // for client
53     this(Selector loop, TcpStreamOptions option = null, AddressFamily family = AddressFamily.INET) {
54         _isClient = true;
55         _isConnected = false;
56 
57         if (option is null)
58             _tcpOption = TcpStreamOptions.create();
59         else
60             _tcpOption = option;
61         this.socket = new Socket(family, SocketType.STREAM, ProtocolType.TCP);
62         super(loop, family, _tcpOption.bufferSize);
63         version(HUNT_IO_DEBUG) tracef("buffer size: %d bytes", _tcpOption.bufferSize);
64         
65 
66     }
67 
68     // for server
69     this(Selector loop, Socket socket, TcpStreamOptions option = null) {
70         if (option is null)
71             _tcpOption = TcpStreamOptions.create();
72         else
73             _tcpOption = option;
74         this.socket = socket;
75         super(loop, socket.addressFamily, _tcpOption.bufferSize);
76         _remoteAddress = socket.remoteAddress();
77         _localAddress = socket.localAddress();
78 
79         _isClient = false;
80         _isConnected = true;
81         setKeepalive();
82     }
83 
84     void options(TcpStreamOptions option) @property {
85         assert(option !is null);
86         this._tcpOption = option;
87     }
88 
89     TcpStreamOptions options() @property {
90         return this._tcpOption;
91     }
92 
93     override bool isBusy() {
94         return _isWritting;
95     }
96 
97     
98     override bool isClient() {
99         return _isClient;
100     }
101 
102     void connect(string hostname, ushort port) {
103         Address[] addresses = getAddress(hostname, port);
104         if(addresses is null) {
105             throw new SocketException("Can't resolve hostname: " ~ hostname);
106         }
107         Address selectedAddress;
108         foreach(Address addr; addresses) {
109             string ip = addr.toAddrString();
110             if(ip.startsWith("::")) // skip IPV6
111                 continue;
112             if(ip.length <= 16) {
113                 selectedAddress = addr;
114                 break;
115             }
116         }
117 
118         if(selectedAddress is null) {
119             warning("No IPV4 avaliable");
120             selectedAddress = addresses[0];
121         }
122         version(HUNT_IO_DEBUG) {
123             infof("connecting with: hostname=%s, ip=%s, port=%d ", hostname, selectedAddress.toAddrString(), port);
124         }
125         connect(selectedAddress); // always select the first one.
126     }
127 
128     void connect(Address addr) {
129         if (_isConnected)
130             return;
131 
132         _remoteAddress = addr;
133         import std.parallelism;
134 
135         auto connectionTask = task(&doConnect, addr);
136         taskPool.put(connectionTask);
137         // doConnect(addr);
138     }
139 
140     void reconnect() {
141         if (!_isClient) {
142             throw new Exception("Only client can call this method.");
143         }
144 
145         if (_isConnected || retryCount >= _tcpOption.retryTimes)
146             return;
147 
148         retryCount++;
149         _isConnected = false;
150         this.socket = new Socket(this._family, SocketType.STREAM, ProtocolType.TCP);
151 
152         version (HUNT_DEBUG)
153             tracef("reconnecting %d...", retryCount);
154         connect(_remoteAddress);
155     }
156 
157     protected override bool doConnect(Address addr)  {
158         try {
159             version (HUNT_DEBUG)
160                 tracef("Connecting to %s...", addr);
161             // Address binded = createAddress(this.socket.addressFamily);
162             // this.socket.bind(binded);
163             version (HAVE_IOCP) {
164                 this.socket.blocking = false;
165                 start();
166                 if(super.doConnect(addr)) {
167                     this.socket.blocking = false;
168                     setKeepalive();
169                     _localAddress = this.socket.localAddress();
170                     _isConnected = true;
171                 } else {
172                     errorOccurred(ErrorCode.CONNECTIONEFUSED,"Connection refused");
173                     _isConnected = false;
174                 }
175             } else {
176                 this.socket.blocking = true;
177                 if(super.doConnect(addr)) {
178                     this.socket.blocking = false;
179                     setKeepalive();
180                     _localAddress = this.socket.localAddress();
181                     start();
182                     _isConnected = true;
183                 } else {
184                     errorOccurred(ErrorCode.CONNECTIONEFUSED,"Connection refused");
185                     _isConnected = false;
186                 }
187             }
188         } catch (Throwable ex) {
189             // Must try the best to catch all the exceptions, because it will be executed in another thread.
190             debug warning(ex.msg);
191             version(HUNT_DEBUG) warning(ex);
192             errorOccurred(ErrorCode.CONNECTIONEFUSED,"Connection refused");
193             _isConnected = false;
194         } 
195 
196         if (_connectionHandler !is null) {
197             try {
198                 _connectionHandler(_isConnected);
199             } catch(Throwable ex) {
200                 debug warning(ex.msg);
201                 version(HUNT_DEBUG) warning(ex);
202             }
203         }
204         return true;
205     }
206 
207     // www.tldp.org/HOWTO/html_single/TCP-Keepalive-HOWTO/
208     // http://www.importnew.com/27624.html
209     private void setKeepalive() {
210         version(HUNT_DEBUG) {
211             infof("isKeepalive: %s, keepaliveTime: %d seconds, Interval: %d seconds", 
212                 _tcpOption.isKeepalive, _tcpOption.keepaliveTime, _tcpOption.keepaliveInterval);
213         }
214         version (HAVE_EPOLL) {
215             if (_tcpOption.isKeepalive) {
216                 this.socket.setKeepAlive(_tcpOption.keepaliveTime, _tcpOption.keepaliveInterval);
217                 this.setOption(SocketOptionLevel.TCP,
218                         cast(SocketOption) TCP_KEEPCNT, _tcpOption.keepaliveProbes);
219                 // version (HUNT_DEBUG) checkKeepAlive();
220             }
221         } else version (HAVE_IOCP) {
222             if (_tcpOption.isKeepalive) {
223                 this.socket.setKeepAlive(_tcpOption.keepaliveTime, _tcpOption.keepaliveInterval);
224                 // this.setOption(SocketOptionLevel.TCP, cast(SocketOption) TCP_KEEPCNT,
225                 //     _tcpOption.keepaliveProbes);
226                 // version (HUNT_DEBUG) checkKeepAlive();
227             }
228         }
229     }
230 
231     version (HUNT_DEBUG) private void checkKeepAlive() {
232         version (HAVE_EPOLL) {
233             int time;
234             int ret1 = getOption(SocketOptionLevel.TCP, cast(SocketOption) TCP_KEEPIDLE, time);
235             tracef("ret=%d, time=%d", ret1, time);
236 
237             int interval;
238             int ret2 = getOption(SocketOptionLevel.TCP, cast(SocketOption) TCP_KEEPINTVL, interval);
239             tracef("ret=%d, interval=%d", ret2, interval);
240 
241             int isKeep;
242             int ret3 = getOption(SocketOptionLevel.SOCKET, SocketOption.KEEPALIVE, isKeep);
243             tracef("ret=%d, keepalive=%s", ret3, isKeep == 1);
244 
245             int probe;
246             int ret4 = getOption(SocketOptionLevel.TCP, cast(SocketOption) TCP_KEEPCNT, probe);
247             tracef("ret=%d, interval=%d", ret4, probe);
248         }
249     }
250 
251     TcpStream connected(ConnectionHandler handler) {
252         _connectionHandler = handler;
253         return this;
254     }
255 
256     TcpStream received(DataReceivedHandler handler) {
257         dataReceivedHandler = handler;
258         return this;
259     }
260 
261     TcpStream writed(SimpleActionHandler handler) {
262         dataWriteDoneHandler = handler;
263         return this;
264     }
265     alias onWritten = writed;
266 
267     TcpStream closed(SimpleEventHandler handler) {
268         closeHandler = handler;
269         return this;
270     }
271 
272     TcpStream disconnected(SimpleEventHandler handler) {
273         disconnectionHandler = handler;
274         return this;
275     }
276 
277     TcpStream error(ErrorEventHandler handler) {
278         errorHandler = handler;
279         return this;
280     }
281 
282     override bool isConnected() nothrow {
283         return _isConnected;
284     }
285 
286     override void start() {
287         if (_isRegistered)
288             return;
289         _inLoop.register(this);
290         _isRegistered = true;
291         version (HAVE_IOCP)
292         {
293         //    this.beginRead();
294         }
295     }
296 
297     void write(ByteBuffer buffer) {
298         assert(buffer !is null);
299 
300         if (!_isConnected) {
301             throw new Exception(format("The connection %s closed!",
302                 this.remoteAddress.toString()));
303         }
304 
305         version (HUNT_IO_DEBUG)
306             infof("data buffered (%d bytes): fd=%d", buffer.limit(), this.handle);
307         _isWritting = true;
308         initializeWriteQueue();
309         _writeQueue.enqueue(buffer);
310         onWrite();
311     }
312 
313     /**
314      * 
315      */
316     void write(const(ubyte)[] data) {
317 
318         version (HUNT_IO_DEBUG_MORE) {
319             infof("%d bytes(fd=%d): %(%02X %)", data.length, this.handle, data[0 .. $]);
320         } else  version (HUNT_IO_DEBUG) {
321             if (data.length <= 32)
322                 infof("%d bytes(fd=%d): %(%02X %)", data.length, this.handle, data[0 .. $]);
323             else
324                 infof("%d bytes(fd=%d): %(%02X %)", data.length, this.handle, data[0 .. 32]);
325         }
326 
327         if(data is null) {
328             version(HUNT_DEBUG) {
329                 warning("Writting a empty data on connection %s.", this.remoteAddress.toString());
330             }
331             return;
332         }
333 
334         if (!_isConnected) {
335             string msg = format("The connection %s is closed!", this.remoteAddress.toString());
336             throw new Exception(msg);
337         }
338 
339         version (HAVE_IOCP) {
340             return write(BufferUtils.toBuffer(cast(byte[])data));
341         } else {
342 
343             if (_writeQueue is null || (_writeQueue.isEmpty()) && !_isWritting) {
344                 _isWritting = true;
345                 const(ubyte)[] d = data;
346 
347                 // while (!isClosing() && !_isWriteCancelling && d.length > 0) {
348                 while(d !is null) {
349                     if(isWriteCancelling()) {
350                         _errorMessage = format("The connection %s is cancelled!", this.remoteAddress.toString());
351                         _error = true;
352                         warningf(_errorMessage);
353                         throw new Exception(_errorMessage);
354                         // break;
355                     }
356 
357                     if(isClosing() || isClosed()) {
358                         _errorMessage= format("The connection %s is closing or closed!", this.remoteAddress.toString());
359                         _error = true;
360                         warningf("%s, %s", isClosing(), isClosed());
361                         throw new Exception(_errorMessage);
362                         // break;
363                     }
364 
365                     version (HUNT_IO_DEBUG)
366                         infof("to write directly %d bytes, fd=%d", d.length, this.handle);
367                     size_t nBytes = tryWrite(d);
368 
369                     if (nBytes == d.length) {
370                         version (HUNT_IO_DEBUG)
371                             tracef("write all out at once: %d / %d bytes, fd=%d", nBytes, d.length, this.handle);
372                         checkAllWriteDone();
373                         break;
374                     } else if (nBytes > 0) {
375                         version (HUNT_IO_DEBUG)
376                             tracef("write out partly: %d / %d bytes, fd=%d", nBytes, d.length, this.handle);
377                         d = d[nBytes .. $];
378                     } else {
379                         version (HUNT_IO_DEBUG)
380                             warningf("buffering data: %d bytes, fd=%d", d.length, this.handle);
381                         initializeWriteQueue();
382                         _writeQueue.enqueue(BufferUtils.toBuffer(cast(byte[]) d));
383                         break;
384                     }
385                 }
386             } else {
387                 write(BufferUtils.toBuffer(cast(byte[]) data));
388             }
389         }
390     }
391 
392     void shutdownInput() {
393         this.socket.shutdown(SocketShutdown.RECEIVE);
394     }
395 
396     void shutdownOutput() {
397         this.socket.shutdown(SocketShutdown.SEND);
398     }
399 
400     override protected void onDisconnected() {
401         version(HUNT_DEBUG) {
402             infof("peer disconnected: fd=%d", this.handle);
403         }
404         if (disconnectionHandler !is null)
405             disconnectionHandler();
406 
407         this.close();
408     }
409 
410 protected:
411     bool _isClient;
412     ConnectionHandler _connectionHandler;
413 
414     override void onRead() {
415         version (HUNT_IO_DEBUG)
416             trace("start to read");
417 
418         version (Posix) {
419             while (!_isClosed && !tryRead()) {
420                 version (HUNT_IO_DEBUG)
421                     trace("continue reading...");
422             }
423         } else {
424             if (!_isClosed)
425             {
426                 doRead();
427             }
428 
429         }
430 
431         //if (this.isError) {
432         //    string msg = format("Socket error on read: fd=%d, code=%d, message: %s",
433         //            this.handle, errno, this.errorMessage);
434         //    debug errorf(msg);
435         //    if (!isClosed())
436         //        errorOccurred(msg);
437         //}
438     }
439 
440     override void onClose() {
441         bool lastConnectStatus = _isConnected;
442         super.onClose();
443         if(lastConnectStatus) {
444             version (HUNT_IO_DEBUG) {
445                 if (_writeQueue !is null && !_writeQueue.isEmpty) {
446                     warningf("Some data has not been sent yet: fd=%d", this.handle);
447                 }
448             }
449             version(HUNT_DEBUG) {
450                 infof("Closing a connection with: %s, fd=%d", this.remoteAddress, this.handle);
451             }
452 
453             resetWriteStatus();
454             _isConnected = false;
455             version (HUNT_IO_DEBUG) {
456                 infof("Raising a event on a TCP stream [%s] is down: fd=%d", 
457                     this.remoteAddress.toString(), this.handle);
458             }
459 
460             if (closeHandler !is null)
461                 closeHandler();
462         }
463     }
464 
465 }