1 /*
2  * Hunt - A refined core library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net/
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module hunt.io.TcpStream;
13 
14 import hunt.io.channel.Common;
15 import hunt.io.TcpStreamOptions;
16 import hunt.io.IoError;
17 
18 import hunt.collection.ByteBuffer;
19 import hunt.collection.BufferUtils;
20 import hunt.event.selector.Selector;
21 import hunt.concurrency.SimpleQueue;
22 import hunt.event;
23 import hunt.logging.ConsoleLogger;
24 import hunt.Functions;
25 
26 import std.exception;
27 import std.format;
28 import std.socket;
29 import std.string;
30 
31 import core.atomic;
32 import core.stdc.errno;
33 import core.thread;
34 import core.time;
35 
36 version (HAVE_EPOLL) {
37     import core.sys.linux.netinet.tcp : TCP_KEEPCNT;
38 }
39 
40 
41 
42 /**
43  *
44  */
45 class TcpStream : AbstractStream {
46     SimpleEventHandler closeHandler;
47     protected shared bool _isConnected; // It's always true for server.
48 
49     private TcpStreamOptions _tcpOption;
50     private int retryCount = 0;
51 
52     // for client
53     this(Selector loop, TcpStreamOptions option = null, AddressFamily family = AddressFamily.INET) {
54         _isClient = true;
55         _isConnected = false;
56 
57         if (option is null)
58             _tcpOption = TcpStreamOptions.create();
59         else
60             _tcpOption = option;
61         this.socket = new Socket(family, SocketType.STREAM, ProtocolType.TCP);
62         super(loop, family, _tcpOption.bufferSize);
63         version(HUNT_IO_DEBUG) tracef("buffer size: %d bytes", _tcpOption.bufferSize);
64         
65 
66     }
67 
68     // for server
69     this(Selector loop, Socket socket, TcpStreamOptions option = null) {
70         if (option is null)
71             _tcpOption = TcpStreamOptions.create();
72         else
73             _tcpOption = option;
74         this.socket = socket;
75         super(loop, socket.addressFamily, _tcpOption.bufferSize);
76         _remoteAddress = socket.remoteAddress();
77         _localAddress = socket.localAddress();
78 
79         _isClient = false;
80         _isConnected = true;
81         setKeepalive();
82     }
83 
84     void options(TcpStreamOptions option) @property {
85         assert(option !is null);
86         this._tcpOption = option;
87     }
88 
89     TcpStreamOptions options() @property {
90         return this._tcpOption;
91     }
92 
93     override bool isBusy() {
94         return _isWritting;
95     }
96 
97     
98     override bool isClient() {
99         return _isClient;
100     }
101 
102     void connect(string hostname, ushort port) {
103         Address[] addresses = getAddress(hostname, port);
104         if(addresses is null) {
105             throw new SocketException("Can't resolve hostname: " ~ hostname);
106         }
107         Address selectedAddress;
108         foreach(Address addr; addresses) {
109             string ip = addr.toAddrString();
110             if(ip.startsWith("::")) // skip IPV6
111                 continue;
112             if(ip.length <= 16) {
113                 selectedAddress = addr;
114                 break;
115             }
116         }
117 
118         if(selectedAddress is null) {
119             warning("No IPV4 avaliable");
120             selectedAddress = addresses[0];
121         }
122         version(HUNT_IO_DEBUG) {
123             infof("connecting with: hostname=%s, ip=%s, port=%d ", hostname, selectedAddress.toAddrString(), port);
124         }
125         connect(selectedAddress); // always select the first one.
126     }
127 
128     void connect(Address addr) {
129         if (_isConnected)
130             return;
131 
132         _remoteAddress = addr;
133         import std.parallelism;
134 
135         auto connectionTask = task(&doConnect, addr);
136         taskPool.put(connectionTask);
137         // doConnect(addr);
138     }
139 
140     void reconnect() {
141         if (!_isClient) {
142             throw new Exception("Only client can call this method.");
143         }
144 
145         if (_isConnected || retryCount >= _tcpOption.retryTimes)
146             return;
147 
148         retryCount++;
149         _isConnected = false;
150         this.socket = new Socket(this._family, SocketType.STREAM, ProtocolType.TCP);
151 
152         version (HUNT_DEBUG)
153             tracef("reconnecting %d...", retryCount);
154         connect(_remoteAddress);
155     }
156 
157     protected override bool doConnect(Address addr)  {
158         try {
159             version (HUNT_DEBUG)
160                 tracef("Connecting to %s...", addr);
161             // Address binded = createAddress(this.socket.addressFamily);
162             // this.socket.bind(binded);
163             this.socket.blocking = false;
164             version (HAVE_IOCP) {
165                 start();
166                 if(super.doConnect(addr)) {
167                     this.socket.blocking = false;
168                     setKeepalive();
169                     _localAddress = this.socket.localAddress();
170                     _isConnected = true;
171                 } else {
172                     errorOccurred(ErrorCode.CONNECTIONEFUSED,"Connection refused");
173                     _isConnected = false;
174                 }
175             } else {
176                 if(super.doConnect(addr)) {
177                     this.socket.blocking = false;
178                     setKeepalive();
179                     _localAddress = this.socket.localAddress();
180                     start();
181                     _isConnected = true;
182                 } else {
183                     errorOccurred(ErrorCode.CONNECTIONEFUSED,"Connection refused");
184                     _isConnected = false;
185                 }
186             }
187         } catch (Throwable ex) {
188             // Must try the best to catch all the exceptions, because it will be executed in another thread.
189             debug warning(ex.msg);
190             version(HUNT_DEBUG) warning(ex);
191             errorOccurred(ErrorCode.CONNECTIONEFUSED,"Connection refused");
192             _isConnected = false;
193         } 
194 
195         if (_connectionHandler !is null) {
196             try {
197                 _connectionHandler(_isConnected);
198             } catch(Throwable ex) {
199                 debug warning(ex.msg);
200                 version(HUNT_DEBUG) warning(ex);
201             }
202         }
203         return true;
204     }
205 
206     // www.tldp.org/HOWTO/html_single/TCP-Keepalive-HOWTO/
207     // http://www.importnew.com/27624.html
208     private void setKeepalive() {
209         version (HAVE_EPOLL) {
210             if (_tcpOption.isKeepalive) {
211                 this.socket.setKeepAlive(_tcpOption.keepaliveTime, _tcpOption.keepaliveInterval);
212                 this.setOption(SocketOptionLevel.TCP,
213                         cast(SocketOption) TCP_KEEPCNT, _tcpOption.keepaliveProbes);
214                 // version (HUNT_DEBUG) checkKeepAlive();
215             }
216         } else version (HAVE_IOCP) {
217             if (_tcpOption.isKeepalive) {
218                 this.socket.setKeepAlive(_tcpOption.keepaliveTime, _tcpOption.keepaliveInterval);
219                 // this.setOption(SocketOptionLevel.TCP, cast(SocketOption) TCP_KEEPCNT,
220                 //     _tcpOption.keepaliveProbes);
221                 // version (HUNT_DEBUG) checkKeepAlive();
222             }
223         }
224     }
225 
226     version (HUNT_DEBUG) private void checkKeepAlive() {
227         version (HAVE_EPOLL) {
228             int time;
229             int ret1 = getOption(SocketOptionLevel.TCP, cast(SocketOption) TCP_KEEPIDLE, time);
230             tracef("ret=%d, time=%d", ret1, time);
231 
232             int interval;
233             int ret2 = getOption(SocketOptionLevel.TCP, cast(SocketOption) TCP_KEEPINTVL, interval);
234             tracef("ret=%d, interval=%d", ret2, interval);
235 
236             int isKeep;
237             int ret3 = getOption(SocketOptionLevel.SOCKET, SocketOption.KEEPALIVE, isKeep);
238             tracef("ret=%d, keepalive=%s", ret3, isKeep == 1);
239 
240             int probe;
241             int ret4 = getOption(SocketOptionLevel.TCP, cast(SocketOption) TCP_KEEPCNT, probe);
242             tracef("ret=%d, interval=%d", ret4, probe);
243         }
244     }
245 
246     TcpStream connected(ConnectionHandler handler) {
247         _connectionHandler = handler;
248         return this;
249     }
250 
251     TcpStream received(DataReceivedHandler handler) {
252         dataReceivedHandler = handler;
253         return this;
254     }
255 
256     TcpStream writed(SimpleActionHandler handler) {
257         dataWriteDoneHandler = handler;
258         return this;
259     }
260     alias onWritten = writed;
261 
262     TcpStream closed(SimpleEventHandler handler) {
263         closeHandler = handler;
264         return this;
265     }
266 
267     TcpStream disconnected(SimpleEventHandler handler) {
268         disconnectionHandler = handler;
269         return this;
270     }
271 
272     TcpStream error(ErrorEventHandler handler) {
273         errorHandler = handler;
274         return this;
275     }
276 
277     override bool isConnected() nothrow {
278         return _isConnected;
279     }
280 
281     override void start() {
282         if (_isRegistered)
283             return;
284         _inLoop.register(this);
285         _isRegistered = true;
286         version (HAVE_IOCP)
287         {
288         //    this.beginRead();
289         }
290     }
291 
292     void write(ByteBuffer buffer) {
293         assert(buffer !is null);
294 
295         if (!_isConnected) {
296             throw new Exception(format("The connection is down! remote: %s",
297                 this.remoteAddress.toString()));
298         }
299 
300         version (HUNT_IO_DEBUG)
301             infof("data buffered (%d bytes): fd=%d", buffer.limit(), this.handle);
302         _isWritting = true;
303         initializeWriteQueue();
304         _writeQueue.enqueue(buffer);
305         onWrite();
306     }
307 
308     /**
309     */
310     void write(const(ubyte)[] data) {
311         if (data.length == 0 || !_isConnected)
312             return;
313 
314         if (!_isConnected) {
315             throw new Exception("The connection is down!");
316         }
317         version (HAVE_IOCP)
318         {
319             return write(BufferUtils.toBuffer(cast(byte[])data));
320         } else
321         {
322             version (HUNT_IO_DEBUG_MORE) {
323                 infof("%d bytes(fd=%d): %(%02X %)", data.length, this.handle, data[0 .. $]);
324             } else  version (HUNT_IO_DEBUG) {
325                 if (data.length <= 32)
326                     infof("%d bytes(fd=%d): %(%02X %)", data.length, this.handle, data[0 .. $]);
327                 else
328                     infof("%d bytes(fd=%d): %(%02X %)", data.length, this.handle, data[0 .. 32]);
329             }
330 
331             if (_writeQueue is null || (_writeQueue.isEmpty()) && !_isWritting) {
332                 _isWritting = true;
333                 const(ubyte)[] d = data;
334 
335                 while (!isClosing() && !isWriteCancelling && d.length > 0) {
336                     version (HUNT_IO_DEBUG)
337                         infof("to write directly %d bytes, fd=%d", d.length, this.handle);
338                     size_t nBytes = tryWrite(d);
339 
340                     if (nBytes == d.length) {
341                         version (HUNT_IO_DEBUG)
342                             tracef("write all out at once: %d / %d bytes, fd=%d", nBytes, d.length, this.handle);
343                         checkAllWriteDone();
344                         break;
345                     } else if (nBytes > 0) {
346                         version (HUNT_IO_DEBUG)
347                             tracef("write out partly: %d / %d bytes, fd=%d", nBytes, d.length, this.handle);
348                         d = d[nBytes .. $];
349                     } else {
350                         version (HUNT_IO_DEBUG)
351                             warningf("buffering data: %d bytes, fd=%d", d.length, this.handle);
352                         initializeWriteQueue();
353                         _writeQueue.enqueue(BufferUtils.toBuffer(cast(byte[]) d));
354                         break;
355                     }
356                 }
357             } else {
358                 write(BufferUtils.toBuffer(cast(byte[]) data));
359             }
360         }
361     }
362 
363     void shutdownInput() {
364         this.socket.shutdown(SocketShutdown.RECEIVE);
365     }
366 
367     void shutdownOutput() {
368         this.socket.shutdown(SocketShutdown.SEND);
369     }
370 
371     override protected void onDisconnected() {
372         version(HUNT_DEBUG) {
373             infof("peer disconnected: fd=%d", this.handle);
374         }
375         if (disconnectionHandler !is null)
376             disconnectionHandler();
377 
378         this.close();
379     }
380 
381 protected:
382     bool _isClient;
383     ConnectionHandler _connectionHandler;
384 
385     override void onRead() {
386         version (HUNT_IO_DEBUG)
387             trace("start to read");
388 
389         version (Posix) {
390             while (!_isClosed && !tryRead()) {
391                 version (HUNT_IO_DEBUG)
392                     trace("continue reading...");
393             }
394         } else {
395             if (!_isClosed)
396             {
397                 doRead();
398             }
399 
400         }
401 
402         //if (this.isError) {
403         //    string msg = format("Socket error on read: fd=%d, code=%d, message: %s",
404         //            this.handle, errno, this.errorMessage);
405         //    debug errorf(msg);
406         //    if (!isClosed())
407         //        errorOccurred(msg);
408         //}
409     }
410 
411     override void onClose() {
412         bool lastConnectStatus = _isConnected;
413         super.onClose();
414         if(lastConnectStatus) {
415             version (HUNT_IO_DEBUG) {
416                 if (_writeQueue !is null && !_writeQueue.isEmpty) {
417                     warningf("Some data has not been sent yet: fd=%d", this.handle);
418                 }
419                 infof("Closing a connection with: %s, fd=%d", this.remoteAddress, this.handle);
420             }
421 
422             resetWriteStatus();
423             _isConnected = false;
424             version (HUNT_IO_DEBUG)
425                 infof("notifying TCP stream down: fd=%d", this.handle);
426             if (closeHandler !is null)
427                 closeHandler();
428         }
429 
430     }
431 
432 }