1 /*
2  * Hunt - A refined core library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net/
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module hunt.io.TcpStream;
13 
14 import hunt.event;
15 
16 import hunt.logging;
17 import hunt.Functions;
18 
19 import std.format;
20 import std.socket;
21 import std.exception;
22 import std.socket;
23 import core.thread;
24 import core.time;
25 
26 version(HAVE_EPOLL) {
27     import core.sys.linux.netinet.tcp : TCP_KEEPCNT;
28 }
29 
30 class TcpStreamOption {
31     string ip = "127.0.0.1";
32     ushort port = 8080;
33 
34     // http://www.tldp.org/HOWTO/TCP-Keepalive-HOWTO/usingkeepalive.html
35     /// the interval between the last data packet sent (simple ACKs are not considered data) and the first keepalive probe; 
36     /// after the connection is marked to need keepalive, this counter is not used any further 
37     int keepaliveTime = 7200; // in seconds
38 
39     /// the interval between subsequential keepalive probes, regardless of what the connection has exchanged in the meantime 
40     int keepaliveInterval = 75; // in seconds
41 
42     /// the number of unacknowledged probes to send before considering the connection dead and notifying the application layer 
43     int keepaliveProbes = 9; // times
44 
45     bool isKeepalive = false;
46 
47     size_t bufferSize = 1024*8;
48 
49     int retryTimes = 5;
50     Duration retryInterval = 2.seconds;
51 
52 
53     static TcpStreamOption createOption() {
54         TcpStreamOption option = new TcpStreamOption();
55         option.isKeepalive = true;
56         option.keepaliveTime = 15; 
57         option.keepaliveInterval = 3; 
58         option.keepaliveProbes = 5;
59         option.bufferSize = 1024*8;
60         return option;
61     }
62 
63     this() {
64 
65     }
66 }
67 
68 /**
69 */
70 class TcpStream : AbstractStream {
71     SimpleEventHandler closeHandler;
72 
73     private TcpStreamOption _tcpOption;
74     private int retryCount = 0;
75 
76     // for client
77     this(Selector loop, AddressFamily family = AddressFamily.INET, TcpStreamOption option = null) {
78         if(option is null)
79            _tcpOption = TcpStreamOption.createOption();
80         else
81             _tcpOption = option;
82         super(loop, family, _tcpOption.bufferSize);
83         this.socket = new Socket(family, SocketType.STREAM, ProtocolType.TCP);
84 
85         _isClient = true;
86         _isConnected = false;
87     }
88 
89     // for server
90     this(Selector loop, Socket socket, TcpStreamOption option = null) {
91         if(option is null)
92            _tcpOption = TcpStreamOption.createOption();
93         else
94             _tcpOption = option;
95         super(loop, socket.addressFamily, _tcpOption.bufferSize);
96         this.socket = socket;
97         _remoteAddress = socket.remoteAddress();
98         _localAddress = socket.localAddress();
99 
100         _isClient = false;
101         _isConnected = true;
102         setKeepalive();
103     }
104 
105     void options(TcpStreamOption option) @property {
106         assert(option !is null);
107         this._tcpOption = option;
108     }
109 
110     TcpStreamOption options() @property {
111         return this._tcpOption;
112     }
113 
114     override bool isBusy() {
115         return _isWritting;
116     }
117 
118     void connect(string ip, ushort port) {
119         connect(parseAddress(ip, port));
120     }
121 
122     void connect(Address addr) {
123         if (_isConnected)
124             return;
125         
126         _remoteAddress = addr;
127         import std.parallelism;
128         auto connectionTask = task(&doConnect, addr);
129         taskPool.put(connectionTask);
130         // doConnect(addr);
131     }
132 
133     void reconnect() {
134         if(!_isClient) {
135             throw new Exception("Only client can call this method.");
136         }
137 
138         if (_isConnected || retryCount >= _tcpOption.retryTimes)
139             return;
140 
141         retryCount++;
142         _isConnected = false;
143         this.socket = new Socket(this._family, SocketType.STREAM, ProtocolType.TCP);
144 
145         version (HUNT_DEBUG) tracef("reconnecting %d...", retryCount);
146         connect(_remoteAddress);
147     }
148 
149     protected override void doConnect(Address addr) {
150         try {
151             version (HUNT_DEBUG) tracef("connecting to %s...", addr);
152             // Address binded = createAddress(this.socket.addressFamily);
153             // this.socket.bind(binded);
154             this.socket.blocking = true;
155             super.doConnect(addr);
156             this.socket.blocking = false;
157             _isConnected = true;
158             setKeepalive();
159             _localAddress = this.socket.localAddress();
160             start();
161         } catch (Exception ex) {
162             warning(ex.message);
163         }
164 
165         if (_connectionHandler !is null)
166             _connectionHandler(_isConnected);
167         
168     }
169 
170 
171     // www.tldp.org/HOWTO/html_single/TCP-Keepalive-HOWTO/
172     // http://www.importnew.com/27624.html
173     private void setKeepalive() {
174         version(HAVE_EPOLL) {
175             if(_tcpOption.isKeepalive) {
176                 this.socket.setKeepAlive(_tcpOption.keepaliveTime, _tcpOption.keepaliveInterval);
177                 this.setOption(SocketOptionLevel.TCP, cast(SocketOption) TCP_KEEPCNT, 
178                     _tcpOption.keepaliveProbes);
179                 // version (HUNT_DEBUG) checkKeepAlive();
180             }
181         } else version(HAVE_IOCP) {
182             if(_tcpOption.isKeepalive) {
183                 this.socket.setKeepAlive(_tcpOption.keepaliveTime, _tcpOption.keepaliveInterval);
184                 // this.setOption(SocketOptionLevel.TCP, cast(SocketOption) TCP_KEEPCNT, 
185                 //     _tcpOption.keepaliveProbes);
186                 version (HUNT_DEBUG) checkKeepAlive();
187             }
188         }
189     }
190 
191     version (HUNT_DEBUG)
192     private void checkKeepAlive() {
193         version(HAVE_EPOLL) {
194         int time ;
195         int ret1 = getOption(SocketOptionLevel.TCP, cast(SocketOption) TCP_KEEPIDLE, time);
196         tracef("ret=%d, time=%d", ret1, time);
197 
198         int interval;
199         int ret2 = getOption(SocketOptionLevel.TCP, cast(SocketOption) TCP_KEEPINTVL, interval);
200         tracef("ret=%d, interval=%d", ret2, interval);
201 
202         int isKeep;
203         int ret3 = getOption(SocketOptionLevel.SOCKET, SocketOption.KEEPALIVE, isKeep);
204         tracef("ret=%d, keepalive=%s", ret3, isKeep==1);
205 
206         int probe;
207         int ret4 = getOption(SocketOptionLevel.TCP, cast(SocketOption) TCP_KEEPCNT, probe);
208         tracef("ret=%d, interval=%d", ret4, probe);
209         }
210     }
211 
212     TcpStream onConnected(ConnectionHandler cback) {
213         _connectionHandler = cback;
214         return this;
215     }
216 
217     TcpStream onDataReceived(DataReceivedHandler handler) {
218         dataReceivedHandler = handler;
219         return this;
220     }
221 
222     // TcpStream onDataWritten(DataWrittenHandler handler)
223     // {
224     //     sentHandler = handler;
225     //     return this;
226     // }
227 
228     TcpStream onClosed(SimpleEventHandler handler) {
229         closeHandler = handler;
230         return this;
231     }
232 
233     TcpStream onDisconnected(SimpleEventHandler handler) {
234         disconnectionHandler = handler;
235         return this;
236     }
237 
238     TcpStream onError(ErrorEventHandler handler) {
239         errorHandler = handler;
240         return this;
241     }
242 
243     bool isConnected() nothrow {
244         return _isConnected;
245     }
246 
247     override void start() {
248         if (_isRegistered)
249             return;
250         _inLoop.register(this);
251         _isRegistered = true;
252         version (HAVE_IOCP)
253             this.beginRead();
254     }
255 
256     void write(StreamWriteBuffer buffer) {
257         assert(buffer !is null);
258 
259         if (!_isConnected) {
260             debug warningf("The connection (fd=%d) has been closed!", this.handle);
261             return;
262         }
263 
264         _writeQueue.enQueue(buffer);
265 
266         version (HAVE_IOCP) {
267             if (_isWritting) {
268                 version (HUNT_DEBUG)
269                     infof("Busy in writting, data buffered (%d bytes)", buffer.capacity);
270             } else
271                 tryWrite();
272         } else {
273             onWrite();
274         }
275     }
276 
277     /// safe for big data sending
278     void write(const ubyte[] data, DataWrittenHandler handler = null) {
279         if (data.length == 0)
280             return;
281 
282         write(new SocketStreamBuffer(data, handler));
283     }
284 
285     void shutdownInput() {
286         this.socket.shutdown(SocketShutdown.RECEIVE);
287     }
288 
289     void shutdownOutput() {
290         this.socket.shutdown(SocketShutdown.SEND);
291     }
292 
293 protected:
294     bool _isClient;
295     ConnectionHandler _connectionHandler;
296 
297     override void onRead() {
298         version (HUNT_DEBUG)
299             trace("start to read");
300 
301         version (Posix) {
302             while (!_isClosed && !tryRead()) {
303                 version (HUNT_DEBUG)
304                     trace("continue reading...");
305             }
306         } else {
307             doRead();
308         }
309 
310         if (this.isError) {
311             string msg = format("Socket error on read: fd=%d, message: %s",
312                     this.handle, this.erroString);
313             // version (HUNT_DEBUG)
314             debug errorf(msg);
315             errorOccurred(msg);
316         }
317     }
318 
319     override void onClose() {
320         version (HUNT_DEBUG) {
321             if (!_writeQueue.empty) {
322                 warning("Some data has not been sent yet.");
323             }
324 
325             infof("connection closed with: %s", this.remoteAddress);
326         }
327 
328         _writeQueue.clear();
329         _isConnected = false;
330         this.socket.shutdown(SocketShutdown.BOTH);
331         this.socket.close();
332         super.onClose();
333 
334         if (closeHandler)
335             closeHandler();
336     }
337 
338     override void onWrite() {
339         if (!_isConnected) {
340             _isConnected = true;
341             _remoteAddress = socket.remoteAddress();
342 
343             if (_connectionHandler)
344                 _connectionHandler(true);
345             return;
346         }
347 
348         // bool canWrite = true;
349         version (HUNT_DEBUG)
350             tracef("start to write [fd=%d]", this.handle);
351 
352         while (_isRegistered && !isWriteCancelling && !_writeQueue.empty) {
353             version (HUNT_DEBUG)
354                 tracef("writting [fd=%d]...", this.handle);
355 
356             StreamWriteBuffer writeBuffer = _writeQueue.front();
357             const(ubyte[]) data = writeBuffer.remaining();
358             if (data.length == 0) {
359                 auto q = _writeQueue.deQueue();
360                 if (q is null)
361                     warning("StreamWriteBuffer is null");
362                 else
363                     q.finish();
364                 // _writeQueue.deQueue().finish();
365                 continue;
366             }
367 
368             this.clearError();
369             size_t nBytes = tryWrite(data);
370             if (nBytes > 0 && writeBuffer.pop(nBytes)) {
371                 version (HUNT_DEBUG)
372                     tracef("writing done: %d bytes, fd: %d", nBytes, this.handle);
373                 auto q = _writeQueue.deQueue();
374                 if (q is null)
375                     warning("StreamWriteBuffer is null");
376                 else
377                     q.finish();
378             }
379 
380             if (this.isError) {
381                 string msg = format("Socket error on write: fd=%d, message=%s",
382                         this.handle, this.erroString);
383                 debug errorf(msg);
384                 errorOccurred(msg);
385                 break;
386             }
387         }
388     }
389 }