1 /*
2  * Hunt - A refined core library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net/
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module hunt.io.TcpStream;
13 
14 import hunt.io.channel.Common;
15 import hunt.io.TcpStreamOptions;
16 import hunt.io.IoError;
17 
18 import hunt.io.ByteBuffer;
19 import hunt.io.BufferUtils;
20 import hunt.event.selector.Selector;
21 import hunt.io.SimpleQueue;
22 import hunt.event;
23 import hunt.logging.ConsoleLogger;
24 import hunt.Functions;
25 
26 import std.exception;
27 import std.format;
28 import std.socket;
29 import std.string;
30 
31 import core.atomic;
32 import core.stdc.errno;
33 import core.thread;
34 import core.time;
35 
36 version (HAVE_EPOLL) {
37     import core.sys.linux.netinet.tcp : TCP_KEEPCNT;
38 }
39 
40 
41 
42 /**
43  *
44  */
45 class TcpStream : AbstractStream {
46     SimpleEventHandler closeHandler;
47     protected shared bool _isConnected; // It's always true for server.
48 
49     private TcpStreamOptions _tcpOption;
50     private int retryCount = 0;
51 
52     // for client
53     this(Selector loop, TcpStreamOptions option = null, AddressFamily family = AddressFamily.INET) {
54         _isClient = true;
55         _isConnected = false;
56 
57         if (option is null)
58             _tcpOption = TcpStreamOptions.create();
59         else
60             _tcpOption = option;
61         this.socket = new Socket(family, SocketType.STREAM, ProtocolType.TCP);
62         super(loop, family, _tcpOption.bufferSize);
63         version(HUNT_IO_DEBUG) tracef("buffer size: %d bytes", _tcpOption.bufferSize);
64         
65 
66     }
67 
68     // for server
69     this(Selector loop, Socket socket, TcpStreamOptions option = null) {
70         if (option is null)
71             _tcpOption = TcpStreamOptions.create();
72         else
73             _tcpOption = option;
74         this.socket = socket;
75         super(loop, socket.addressFamily, _tcpOption.bufferSize);
76         _remoteAddress = socket.remoteAddress();
77         _localAddress = socket.localAddress();
78 
79         _isClient = false;
80         _isConnected = true;
81         setKeepalive();
82     }
83 
84     void options(TcpStreamOptions option) @property {
85         assert(option !is null);
86         this._tcpOption = option;
87     }
88 
89     TcpStreamOptions options() @property {
90         return this._tcpOption;
91     }
92 
93     override bool isBusy() {
94         return _isWritting;
95     }
96 
97     
98     override bool isClient() {
99         return _isClient;
100     }
101 
102     void connect(string hostname, ushort port) {
103         Address[] addresses = getAddress(hostname, port);
104         if(addresses is null) {
105             throw new SocketException("Can't resolve hostname: " ~ hostname);
106         }
107         Address selectedAddress;
108         foreach(Address addr; addresses) {
109             string ip = addr.toAddrString();
110             if(ip.startsWith("::")) // skip IPV6
111                 continue;
112             if(ip.length <= 16) {
113                 selectedAddress = addr;
114                 break;
115             }
116         }
117 
118         if(selectedAddress is null) {
119             warning("No IPV4 avaliable");
120             selectedAddress = addresses[0];
121         }
122         version(HUNT_IO_DEBUG) {
123             infof("connecting with: hostname=%s, ip=%s, port=%d ", hostname, selectedAddress.toAddrString(), port);
124         }
125         connect(selectedAddress); // always select the first one.
126     }
127 
128     void connect(Address addr) {
129         if (_isConnected)
130             return;
131 
132         _remoteAddress = addr;
133         import std.parallelism;
134 
135         auto connectionTask = task(&doConnect, addr);
136         taskPool.put(connectionTask);
137         // doConnect(addr);
138     }
139 
140     void reconnect() {
141         if (!_isClient) {
142             throw new Exception("Only client can call this method.");
143         }
144 
145         if (_isConnected || retryCount >= _tcpOption.retryTimes)
146             return;
147 
148         retryCount++;
149         _isConnected = false;
150         this.socket = new Socket(this._family, SocketType.STREAM, ProtocolType.TCP);
151 
152         version (HUNT_DEBUG)
153             tracef("reconnecting %d...", retryCount);
154         connect(_remoteAddress);
155     }
156 
157     protected override bool doConnect(Address addr)  {
158         try {
159             version (HUNT_DEBUG)
160                 tracef("Connecting to %s...", addr);
161             // Address binded = createAddress(this.socket.addressFamily);
162             // this.socket.bind(binded);
163             version (HAVE_IOCP) {
164                 this.socket.blocking = false;
165                 start();
166                 if(super.doConnect(addr)) {
167                     this.socket.blocking = false;
168                     setKeepalive();
169                     _localAddress = this.socket.localAddress();
170                     _isConnected = true;
171                 } else {
172                     errorOccurred(ErrorCode.CONNECTIONEFUSED,"Connection refused");
173                     _isConnected = false;
174                 }
175             } else {
176                 this.socket.blocking = true;
177                 if(super.doConnect(addr)) {
178                     this.socket.blocking = false;
179                     setKeepalive();
180                     _localAddress = this.socket.localAddress();
181                     start();
182                     _isConnected = true;
183                 } else {
184                     errorOccurred(ErrorCode.CONNECTIONEFUSED,"Connection refused");
185                     _isConnected = false;
186                 }
187             }
188         } catch (Throwable ex) {
189             // Must try the best to catch all the exceptions, because it will be executed in another thread.
190             debug warning(ex.msg);
191             version(HUNT_DEBUG) warning(ex);
192             errorOccurred(ErrorCode.CONNECTIONEFUSED,"Connection refused");
193             _isConnected = false;
194         } 
195 
196         if (_connectionHandler !is null) {
197             try {
198                 _connectionHandler(_isConnected);
199             } catch(Throwable ex) {
200                 debug warning(ex.msg);
201                 version(HUNT_DEBUG) warning(ex);
202             }
203         }
204         return true;
205     }
206 
207     // www.tldp.org/HOWTO/html_single/TCP-Keepalive-HOWTO/
208     // http://www.importnew.com/27624.html
209     private void setKeepalive() {
210         version (HAVE_EPOLL) {
211             if (_tcpOption.isKeepalive) {
212                 this.socket.setKeepAlive(_tcpOption.keepaliveTime, _tcpOption.keepaliveInterval);
213                 this.setOption(SocketOptionLevel.TCP,
214                         cast(SocketOption) TCP_KEEPCNT, _tcpOption.keepaliveProbes);
215                 // version (HUNT_DEBUG) checkKeepAlive();
216             }
217         } else version (HAVE_IOCP) {
218             if (_tcpOption.isKeepalive) {
219                 this.socket.setKeepAlive(_tcpOption.keepaliveTime, _tcpOption.keepaliveInterval);
220                 // this.setOption(SocketOptionLevel.TCP, cast(SocketOption) TCP_KEEPCNT,
221                 //     _tcpOption.keepaliveProbes);
222                 // version (HUNT_DEBUG) checkKeepAlive();
223             }
224         }
225     }
226 
227     version (HUNT_DEBUG) private void checkKeepAlive() {
228         version (HAVE_EPOLL) {
229             int time;
230             int ret1 = getOption(SocketOptionLevel.TCP, cast(SocketOption) TCP_KEEPIDLE, time);
231             tracef("ret=%d, time=%d", ret1, time);
232 
233             int interval;
234             int ret2 = getOption(SocketOptionLevel.TCP, cast(SocketOption) TCP_KEEPINTVL, interval);
235             tracef("ret=%d, interval=%d", ret2, interval);
236 
237             int isKeep;
238             int ret3 = getOption(SocketOptionLevel.SOCKET, SocketOption.KEEPALIVE, isKeep);
239             tracef("ret=%d, keepalive=%s", ret3, isKeep == 1);
240 
241             int probe;
242             int ret4 = getOption(SocketOptionLevel.TCP, cast(SocketOption) TCP_KEEPCNT, probe);
243             tracef("ret=%d, interval=%d", ret4, probe);
244         }
245     }
246 
247     TcpStream connected(ConnectionHandler handler) {
248         _connectionHandler = handler;
249         return this;
250     }
251 
252     TcpStream received(DataReceivedHandler handler) {
253         dataReceivedHandler = handler;
254         return this;
255     }
256 
257     TcpStream writed(SimpleActionHandler handler) {
258         dataWriteDoneHandler = handler;
259         return this;
260     }
261     alias onWritten = writed;
262 
263     TcpStream closed(SimpleEventHandler handler) {
264         closeHandler = handler;
265         return this;
266     }
267 
268     TcpStream disconnected(SimpleEventHandler handler) {
269         disconnectionHandler = handler;
270         return this;
271     }
272 
273     TcpStream error(ErrorEventHandler handler) {
274         errorHandler = handler;
275         return this;
276     }
277 
278     override bool isConnected() nothrow {
279         return _isConnected;
280     }
281 
282     override void start() {
283         if (_isRegistered)
284             return;
285         _inLoop.register(this);
286         _isRegistered = true;
287         version (HAVE_IOCP)
288         {
289         //    this.beginRead();
290         }
291     }
292 
293     void write(ByteBuffer buffer) {
294         assert(buffer !is null);
295 
296         if (!_isConnected) {
297             throw new Exception(format("The connection %s closed!",
298                 this.remoteAddress.toString()));
299         }
300 
301         version (HUNT_IO_DEBUG)
302             infof("data buffered (%d bytes): fd=%d", buffer.limit(), this.handle);
303         _isWritting = true;
304         initializeWriteQueue();
305         _writeQueue.enqueue(buffer);
306         onWrite();
307     }
308 
309     /**
310      * 
311      */
312     void write(const(ubyte)[] data) {
313 
314         version (HUNT_IO_DEBUG_MORE) {
315             infof("%d bytes(fd=%d): %(%02X %)", data.length, this.handle, data[0 .. $]);
316         } else  version (HUNT_IO_DEBUG) {
317             if (data.length <= 32)
318                 infof("%d bytes(fd=%d): %(%02X %)", data.length, this.handle, data[0 .. $]);
319             else
320                 infof("%d bytes(fd=%d): %(%02X %)", data.length, this.handle, data[0 .. 32]);
321         }
322 
323         if(data is null) {
324             version(HUNT_DEBUG) {
325                 warning("Writting a empty data on connection %s.", this.remoteAddress.toString());
326             }
327             return;
328         }
329 
330         if (!_isConnected) {
331             string msg = format("The connection %s is closed!", this.remoteAddress.toString());
332             throw new Exception(msg);
333         }
334 
335         version (HAVE_IOCP) {
336             return write(BufferUtils.toBuffer(cast(byte[])data));
337         } else {
338 
339             if (_writeQueue is null || (_writeQueue.isEmpty()) && !_isWritting) {
340                 _isWritting = true;
341                 const(ubyte)[] d = data;
342 
343                 // while (!isClosing() && !_isWriteCancelling && d.length > 0) {
344                 while(d !is null) {
345                     if(isWriteCancelling()) {
346                         _errorMessage = format("The connection %s is cancelled!", this.remoteAddress.toString());
347                         _error = true;
348                         warningf(_errorMessage);
349                         throw new Exception(_errorMessage);
350                         // break;
351                     }
352 
353                     if(isClosing() || isClosed()) {
354                         _errorMessage= format("The connection %s is closing or closed!", this.remoteAddress.toString());
355                         _error = true;
356                         warningf("%s, %s", isClosing(), isClosed());
357                         throw new Exception(_errorMessage);
358                         // break;
359                     }
360 
361                     version (HUNT_IO_DEBUG)
362                         infof("to write directly %d bytes, fd=%d", d.length, this.handle);
363                     size_t nBytes = tryWrite(d);
364 
365                     if (nBytes == d.length) {
366                         version (HUNT_IO_DEBUG)
367                             tracef("write all out at once: %d / %d bytes, fd=%d", nBytes, d.length, this.handle);
368                         checkAllWriteDone();
369                         break;
370                     } else if (nBytes > 0) {
371                         version (HUNT_IO_DEBUG)
372                             tracef("write out partly: %d / %d bytes, fd=%d", nBytes, d.length, this.handle);
373                         d = d[nBytes .. $];
374                     } else {
375                         version (HUNT_IO_DEBUG)
376                             warningf("buffering data: %d bytes, fd=%d", d.length, this.handle);
377                         initializeWriteQueue();
378                         _writeQueue.enqueue(BufferUtils.toBuffer(cast(byte[]) d));
379                         break;
380                     }
381                 }
382             } else {
383                 write(BufferUtils.toBuffer(cast(byte[]) data));
384             }
385         }
386     }
387 
388     void shutdownInput() {
389         this.socket.shutdown(SocketShutdown.RECEIVE);
390     }
391 
392     void shutdownOutput() {
393         this.socket.shutdown(SocketShutdown.SEND);
394     }
395 
396     override protected void onDisconnected() {
397         version(HUNT_DEBUG) {
398             infof("peer disconnected: fd=%d", this.handle);
399         }
400         if (disconnectionHandler !is null)
401             disconnectionHandler();
402 
403         this.close();
404     }
405 
406 protected:
407     bool _isClient;
408     ConnectionHandler _connectionHandler;
409 
410     override void onRead() {
411         version (HUNT_IO_DEBUG)
412             trace("start to read");
413 
414         version (Posix) {
415             while (!_isClosed && !tryRead()) {
416                 version (HUNT_IO_DEBUG)
417                     trace("continue reading...");
418             }
419         } else {
420             if (!_isClosed)
421             {
422                 doRead();
423             }
424 
425         }
426 
427         //if (this.isError) {
428         //    string msg = format("Socket error on read: fd=%d, code=%d, message: %s",
429         //            this.handle, errno, this.errorMessage);
430         //    debug errorf(msg);
431         //    if (!isClosed())
432         //        errorOccurred(msg);
433         //}
434     }
435 
436     override void onClose() {
437         bool lastConnectStatus = _isConnected;
438         super.onClose();
439         if(lastConnectStatus) {
440             version (HUNT_IO_DEBUG) {
441                 if (_writeQueue !is null && !_writeQueue.isEmpty) {
442                     warningf("Some data has not been sent yet: fd=%d", this.handle);
443                 }
444             }
445             version(HUNT_DEBUG) {
446                 infof("Closing a connection with: %s, fd=%d", this.remoteAddress, this.handle);
447             }
448 
449             resetWriteStatus();
450             _isConnected = false;
451             version (HUNT_IO_DEBUG) {
452                 infof("Raising a event on a TCP stream [%s] is down: fd=%d", 
453                     this.remoteAddress.toString(), this.handle);
454             }
455 
456             if (closeHandler !is null)
457                 closeHandler();
458         }
459     }
460 
461 }