1 /*
2  * Hunt - A refined core library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net/
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module hunt.io.socket.Posix;
13 
14 // dfmt off
15 version(Posix):
16 
17 // dfmt on
18 
19 import hunt.concurrency.thread.Helper;
20 import hunt.Functions;
21 import hunt.io.socket.Common;
22 import hunt.logging;
23 import hunt.system.Error;
24 
25 import std.conv;
26 import std.exception;
27 import std.format;
28 import std.process;
29 import std.socket;
30 import std.string;
31 
32 import core.stdc.errno;
33 import core.stdc.string;
34 import core.sys.posix.sys.socket : accept;
35 
36 // extern (C) nothrow @nogc {
37 //     int     accept4(int, sockaddr*, socklen_t*, int);
38 // }
39 
40 enum int SOCK_CLOEXEC = 0x02000000;	/* Atomically set close-on-exec flag for the
41 				   new descriptor(s).  */
42 enum int SOCK_NONBLOCK = 0x00004000;	/* Atomically mark descriptor(s) as
43 				   non-blocking.  */
44 
45 /**
46 TCP Server
47 */
48 abstract class AbstractListener : AbstractSocketChannel {
49     this(Selector loop, AddressFamily family = AddressFamily.INET) {
50         super(loop, ChannelType.Accept);
51         setFlag(ChannelFlag.Read, true);
52         this.socket = new TcpSocket(family);
53     }
54 
55     protected bool onAccept(scope AcceptHandler handler) {
56         version (HUNT_DEBUG)
57             trace("new connection coming...");
58         this.clearError();
59         // http://man7.org/linux/man-pages/man2/accept.2.html
60         version(HAVE_EPOLL) {
61             // socket_t clientFd = cast(socket_t)(accept4(this.handle, null, null, SOCK_NONBLOCK | SOCK_CLOEXEC));
62             socket_t clientFd = cast(socket_t)(accept(this.handle, null, null));
63         } else {
64             socket_t clientFd = cast(socket_t)(accept(this.handle, null, null));
65         }
66         if (clientFd == socket_t.init)
67             return false;
68 
69         version (HUNT_DEBUG)
70             tracef("Listener fd=%d, client fd=%d", this.handle, clientFd);
71 
72         if (handler !is null)
73             handler(new Socket(clientFd, this.localAddress.addressFamily));
74         return true;
75     }
76 
77     override void onWriteDone() {
78         version (HUNT_DEBUG)
79             tracef("a new connection created");
80     }
81 }
82 
83 /**
84 TCP Client
85 */
86 abstract class AbstractStream : AbstractSocketChannel, Stream {
87     SimpleEventHandler disconnectionHandler;
88 
89     protected bool _isConnected; // It's always true for server.
90     protected AddressFamily _family;
91 
92     this(Selector loop, AddressFamily family = AddressFamily.INET, size_t bufferSize = 4096 * 2) {
93         this._family = family;
94         _readBuffer = new ubyte[bufferSize];
95         super(loop, ChannelType.TCP);
96         setFlag(ChannelFlag.Read, true);
97         setFlag(ChannelFlag.Write, true);
98         setFlag(ChannelFlag.ETMode, true);
99     }
100 
101     /**
102     */
103     protected bool tryRead() {
104         bool isDone = true;
105         this.clearError();
106         ptrdiff_t len = this.socket.receive(cast(void[]) this._readBuffer);
107         version (HUNT_DEBUG)
108             tracef("reading[fd=%d]: %d nbytes", this.handle, len);
109 
110         if (len > 0) {
111             if (dataReceivedHandler !is null)
112                 dataReceivedHandler(this._readBuffer[0 .. len]);
113 
114             // It's prossible that more data are wainting for read in inner buffer.
115             if (len == _readBuffer.length)
116                 isDone = false;
117         } else if (len == Socket.ERROR) {
118             // https://stackoverflow.com/questions/14595269/errno-35-eagain-returned-on-recv-call
119             // FIXME: Needing refactor or cleanup -@Administrator at 2018-5-8 16:06:13
120             // check more error status
121             this._error = errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK;
122             if (_error) {
123                 this._erroString = getErrorMessage(errno);
124             } else {
125                 debug warningf("write warning: fd=%s, errno=%d, message=%s", this.handle,
126                         errno, getErrorMessage(errno));
127             }
128 
129             if(errno == ECONNRESET) {
130                 // https://stackoverflow.com/questions/1434451/what-does-connection-reset-by-peer-mean
131                 onDisconnected();
132                 this.close();
133             }
134         }
135         else {
136             version (HUNT_DEBUG)
137                 infof("connection broken: %s, fd:%d", _remoteAddress.toString(), this.handle);
138             onDisconnected();
139             this.close();
140         }
141 
142         return isDone;
143     }
144 
145     protected void onDisconnected() {
146         _isConnected = false;
147         if (disconnectionHandler !is null)
148             disconnectionHandler();
149     }
150 
151     protected bool canWriteAgain = true;
152     int writeRetryLimit = 5;
153     private int writeRetries = 0;
154 
155     /**
156     Warning: It will try the best to write all the data.   
157     TODO: create a test
158     */
159     protected void tryWriteAll(in ubyte[] data) {
160         const nBytes = this.socket.send(data);
161         version (HUNT_DEBUG)
162             tracef("actually sent bytes: %d / %d", nBytes, data.length);
163 
164         if (nBytes > 0) {
165             if (canWriteAgain && nBytes < data.length) { //  && writeRetries < writeRetryLimit
166                 // version (HUNT_DEBUG)
167                 writeRetries++;
168                 tracef("[%d] rewrite: written %d, remaining: %d, total: %d",
169                         writeRetries, nBytes, data.length - nBytes, data.length);
170                 if (writeRetries > writeRetryLimit)
171                     warning("You are writting a big block of data!!!");
172 
173                 tryWriteAll(data[nBytes .. $]);
174             } else
175                 writeRetries = 0;
176         } else if (nBytes == Socket.ERROR) {
177             this._error = errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK;
178             if (this._error) {
179                 this._erroString = lastSocketError();
180 
181                 warningf("write error: fd=%s, errno=%d, message=%s", this.handle,
182                         errno, this._erroString);
183 
184                 if(errno == ECONNRESET) {
185                     // https://stackoverflow.com/questions/1434451/what-does-connection-reset-by-peer-mean
186                     onDisconnected();
187                     this.close();
188                 }
189             } else {
190                 debug warningf("write error: fd=%s, errno=%d, message=%s", this.handle,
191                         errno, lastSocketError());
192 
193                 if (canWriteAgain && !_isClosed) {
194                     import core.thread;
195                     import core.time;
196 
197                     writeRetries++;
198                     tracef("[%d] rewrite: written %d, remaining: %d, total: %d",
199                             writeRetries, nBytes, data.length - nBytes, data.length);
200                     if (writeRetries > writeRetryLimit)
201                         warning("You are writting a Big block of data!!!");
202                     warning("Wait for a 100 msecs to try again");
203                     Thread.sleep(100.msecs);
204                     tryWriteAll(data);
205                 }
206             }
207         } else {
208             version (HUNT_DEBUG) {
209                 warningf("nBytes=%d, message: %s", nBytes, lastSocketError());
210                 assert(false, "Undefined behavior!");
211             }
212             else {
213                 this._error = true;
214                 this._erroString = lastSocketError();
215             }
216         }
217     }
218 
219     /**
220     Try to write a block of data.
221     */
222     protected ptrdiff_t tryWrite(const ubyte[] data) {
223         const nBytes = this.socket.send(data);
224         version (HUNT_DEBUG)
225             tracef("actually sent : %d / %d bytes, fd=%d", nBytes, data.length, this.handle);
226 
227         if (nBytes > 0) {
228             return nBytes;
229         } else if (nBytes == Socket.ERROR) {
230             // FIXME: Needing refactor or cleanup -@Administrator at 2018-5-8 16:07:38
231             // check more error status
232             // EPIPE/Broken pipe: 
233             // https://stackoverflow.com/questions/6824265/sigpipe-broken-pipe
234             this._error = errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK;
235             if (_error) {
236                 this._erroString = getErrorMessage(errno);
237             } else {
238                 debug warningf("warning for write: fd=%d, errno=%d, message=%s", this.handle,
239                         errno, getErrorMessage(errno));
240             }
241 
242             if(errno == ECONNRESET) {
243                 // https://stackoverflow.com/questions/1434451/what-does-connection-reset-by-peer-mean
244                 onDisconnected();
245                 this.close();
246             }
247         } else {
248             version (HUNT_DEBUG) {
249                 warningf("nBytes=%d, message: %s", nBytes, lastSocketError());
250                 assert(false, "Undefined behavior!");
251             }
252             else {
253                 this._error = true;
254                 this._erroString = getErrorMessage(errno);
255             }
256         }
257         return 0;
258     }
259 
260     protected void doConnect(Address addr) {
261         this.socket.connect(addr);
262     }
263 
264     void cancelWrite() {
265         isWriteCancelling = true;
266     }
267 
268     override void onWriteDone() {
269         // notified by kqueue selector when data writing done
270         version (HUNT_DEBUG)
271             tracef("done with data writing");
272     }
273 
274     // protected UbyteArrayObject _readBuffer;
275     private const(ubyte)[] _readBuffer;
276     protected WriteBufferQueue _writeQueue;
277     protected bool isWriteCancelling = false;
278 
279     /**
280     * Warning: The received data is stored a inner buffer. For a data safe, 
281     * you would make a copy of it. 
282     */
283     DataReceivedHandler dataReceivedHandler;
284 
285 }
286 
287 /**
288 UDP Socket
289 */
290 abstract class AbstractDatagramSocket : AbstractSocketChannel {
291     this(Selector loop, AddressFamily family = AddressFamily.INET, int bufferSize = 4096 * 2) {
292         super(loop, ChannelType.UDP);
293         setFlag(ChannelFlag.Read, true);
294         setFlag(ChannelFlag.ETMode, false);
295 
296         this.socket = new UdpSocket(family);
297         // _socket.blocking = false;
298         _readBuffer = new UdpDataObject();
299         _readBuffer.data = new ubyte[bufferSize];
300 
301         if (family == AddressFamily.INET)
302             _bindAddress = new InternetAddress(InternetAddress.PORT_ANY);
303         else if (family == AddressFamily.INET6)
304             _bindAddress = new Internet6Address(Internet6Address.PORT_ANY);
305         else
306             _bindAddress = new UnknownAddress();
307     }
308 
309     final void bind(Address addr) {
310         if (_binded)
311             return;
312         _bindAddress = addr;
313         socket.bind(_bindAddress);
314         _binded = true;
315     }
316 
317     final bool isBind() {
318         return _binded;
319     }
320 
321     Address bindAddr() {
322         return _bindAddress;
323     }
324 
325     protected UdpDataObject _readBuffer;
326     protected bool _binded = false;
327     protected Address _bindAddress;
328 
329     protected bool tryRead(scope ReadCallBack read) {
330         this._readBuffer.addr = createAddress(this.socket.addressFamily, 0);
331         auto data = this._readBuffer.data;
332         scope (exit)
333             this._readBuffer.data = data;
334         auto len = this.socket.receiveFrom(this._readBuffer.data, this._readBuffer.addr);
335         if (len > 0) {
336             this._readBuffer.data = this._readBuffer.data[0 .. len];
337             read(this._readBuffer);
338         }
339         return false;
340     }
341 
342     override void onWriteDone() {
343         // notified by kqueue selector when data writing done
344         version (HUNT_DEBUG)
345             tracef("done with data writing");
346     }
347 }