1 /*
2  * Hunt - A refined core library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net/
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module hunt.io.socket.Common;
13 
14 import hunt.collection.ByteBuffer;
15 import hunt.event.EventLoop;
16 import hunt.Exceptions;
17 import hunt.Functions;
18 import hunt.logging;
19 
20 import hunt.util.Common;
21 import hunt.Version;
22 
23 import core.atomic;
24 import core.stdc.stdint;
25 
26 import std.bitmanip;
27 import std.datetime;
28 import std.exception;
29 import std.functional;
30 import std.socket;
31 
32 version (HAVE_IOCP) import SOCKETOPTIONS = core.sys.windows.winsock2;
33 
34 version (Posix) import SOCKETOPTIONS = core.sys.posix.sys.socket;
35 
36 alias ReadCallBack = void delegate(Object obj);
37 alias DataReceivedHandler = void delegate(const ubyte[] data);
38 alias DataWrittenHandler = void delegate(const ubyte[] data, size_t size);
39 alias AcceptHandler = void delegate(Socket socket);
40 
41 alias ConnectionHandler = void delegate(bool isSucceeded);
42 
43 // dfmt off
44 alias UDPReadCallBack = void delegate(in ubyte[] data, Address addr);
45 alias AcceptCallBack = void delegate(Selector loop, Socket socket) ;
46 // dfmt on
47 
48 /**
49 */
50 interface StreamWriteBuffer {
51     // todo Write Data;
52     const(ubyte)[] remaining();
53 
54     // add send offiset and return is empty
55     bool pop(size_t size);
56 
57     // do send finish
58     void finish();
59 
60     StreamWriteBuffer next();
61     void next(StreamWriteBuffer);
62 
63     size_t capacity();
64 }
65 
66 
67 /**
68 */
69 interface Channel {
70 
71 }
72 
73 /**
74 http://tutorials.jenkov.com/java-nio/selectors.html
75 */
76 abstract class Selector {
77 
78     protected shared bool _running;
79     // protected shared bool _isOpen = true;
80 
81     abstract bool register(AbstractChannel channel);
82 
83     abstract bool reregister(AbstractChannel channel);
84 
85     abstract bool deregister(AbstractChannel channel);
86 
87     void stop() {
88         atomicStore(_running, false);
89         version (HUNT_DEBUG) trace("Selector stopped.");
90     }
91 
92     abstract void dispose();
93 
94     /**
95      * Tells whether or not this selector is open.
96      *
97      * @return <tt>true</tt> if, and only if, this selector is open
98      */
99     bool isOpen() {
100         return atomicLoad(_running);
101     }
102 
103     alias isRuning = isOpen;
104 
105     /**
106         timeout: in millisecond
107     */
108     protected void onLoop(scope void delegate() weakup, long timeout = -1) {
109         _running = true;
110         do {
111             // version (HUNT_DEBUG) trace("Selector rolled once.");
112             weakup();
113             lockAndDoSelect(timeout);
114         } while (_running);
115         dispose();
116     }
117 
118     /**
119         timeout: in millisecond
120     */
121     int select(long timeout) {
122         if (timeout < 0)
123             throw new IllegalArgumentException("Negative timeout");
124         return lockAndDoSelect((timeout == 0) ? -1 : timeout);
125     }
126 
127     int select() {
128         return select(0);
129     }
130 
131     int selectNow() {
132         return lockAndDoSelect(0);
133     }
134 
135     protected abstract int doSelect(long timeout);
136 
137     private int lockAndDoSelect(long timeout) {
138         synchronized (this) {
139             // if (!isOpen())
140             //     throw new ClosedSelectorException();
141             // synchronized (publicKeys) {
142             //     synchronized (publicSelectedKeys) {
143             //         return doSelect(timeout);
144             //     }
145             // }
146             return doSelect(timeout);
147         }
148     }
149 }
150 
151 /**
152 */
153 abstract class AbstractChannel : Channel {
154     socket_t handle = socket_t.init;
155     ErrorEventHandler errorHandler;
156 
157     protected bool _isRegistered = false;
158     protected bool _isClosing = false;
159     protected bool _isClosed = false;
160 
161     this(Selector loop, ChannelType type) {
162         this._inLoop = loop;
163         _type = type;
164         _flags = BitArray([false, false, false, false, false, false, false,
165                 false, false, false, false, false, false, false, false, false]);
166     }
167 
168     /**
169     */
170     bool isRegistered() {
171         return _isRegistered;
172     }
173 
174     /**
175     */
176     bool isClosed() {
177         return _isClosing || _isClosed;
178     }
179 
180     protected void onClose() {
181         _isRegistered = false;
182         _isClosed = true;
183         _isClosing = false;
184         version (HAVE_IOCP) {
185         }
186         else {
187             _inLoop.deregister(this);
188         }
189         clear();
190 
191         version (HUNT_DEBUG) tracef("closed [fd=%d]...", this.handle);
192     }
193 
194     protected void errorOccurred(string msg) {
195         debug warningf("isRegistered: %s, isClosed: %s, msg=%s", _isRegistered, _isClosed, msg);
196         if (errorHandler !is null) {
197             errorHandler(msg);
198         }
199     }
200 
201     void onRead() {
202         assert(false, "not implemented");
203     }
204 
205     void onWrite() {
206         assert(false, "not implemented");
207     }
208 
209     final bool hasFlag(ChannelFlag index) {
210         return _flags[index];
211     }
212 
213     @property ChannelType type() {
214         return _type;
215     }
216 
217     @property Selector eventLoop() {
218         return _inLoop;
219     }
220 
221     void close() {
222         if (!_isClosed) {
223             version (HUNT_DEBUG)
224                 tracef("channel[fd=%d] closing...", this.handle);
225             onClose();
226             version (HUNT_DEBUG)
227                 tracef("channel[fd=%d] closed...", this.handle);
228         }
229         else {
230             debug warningf("The channel[fd=%d] has already been closed", this.handle);
231         }
232     }
233 
234     void setNext(AbstractChannel next) {
235         if (next is this)
236             return; // Can't set to self
237         next._next = _next;
238         next._priv = this;
239         if (_next)
240             _next._priv = next;
241         this._next = next;
242     }
243 
244     void clear() {
245         if (_priv)
246             _priv._next = _next;
247         if (_next)
248             _next._priv = _priv;
249         _next = null;
250         _priv = null;
251     }
252 
253     mixin OverrideErro;
254 
255 protected:
256     final void setFlag(ChannelFlag index, bool enable) {
257         _flags[index] = enable;
258     }
259 
260     Selector _inLoop;
261 
262 private:
263     BitArray _flags;
264     ChannelType _type;
265 
266     AbstractChannel _priv;
267     AbstractChannel _next;
268 }
269 
270 /**
271 */
272 class EventChannel : AbstractChannel {
273     this(Selector loop) {
274         super(loop, ChannelType.Event);
275     }
276 
277     void call() {
278         assert(false);
279     }
280 
281     // override void close() {
282     //     if(_isClosing)
283     //         return;
284     //     _isClosing = true;
285     //     version (HUNT_DEBUG) tracef("closing [fd=%d]...", this.handle);
286 
287     //     if(isBusy) {
288     //         import std.parallelism;
289     //         version (HUNT_DEBUG) warning("Close operation delayed");
290     //         auto theTask = task(() {
291     //             while(isBusy) {
292     //                 version (HUNT_DEBUG) infof("waitting for idle [fd=%d]...", this.handle);
293     //                 // Thread.sleep(20.msecs);
294     //             }
295     //             super.close();
296     //         });
297     //         taskPool.put(theTask);
298     //     } else {
299     //         super.close();
300     //     }
301     // }
302 }
303 
304 mixin template OverrideErro() {
305     bool isError() {
306         return _error;
307     }
308 
309     string erroString() {
310         return _erroString;
311     }
312 
313     void clearError() {
314         _error = false;
315         _erroString = "";
316     }
317 
318     bool _error = false;
319     string _erroString;
320 }
321 
322 enum ChannelType : ubyte {
323     Accept = 0,
324     TCP,
325     UDP,
326     Timer,
327     Event,
328     File,
329     None
330 }
331 
332 enum ChannelFlag : ushort {
333     None = 0,
334     Read,
335     Write,
336 
337     OneShot = 8,
338     ETMode = 16
339 }
340 
341 final class UdpDataObject {
342     Address addr;
343     ubyte[] data;
344 }
345 
346 final class BaseTypeObject(T) {
347     T data;
348 }
349 
350 class LoopException : Exception {
351     mixin basicExceptionCtors;
352 }
353 
354 
355 /**
356 */
357 interface Stream {
358 
359 }
360 
361 /**
362 */
363 abstract class AbstractSocketChannel : AbstractChannel {
364 
365     protected bool _isWritting = false;
366 
367     this(Selector loop, ChannelType type) {
368         super(loop, type);
369     }
370 
371     // Busy with reading or writting
372     protected bool isBusy() { return false; }
373 
374     protected @property void socket(Socket s) {
375         this.handle = s.handle();
376         version (Posix) {
377             s.blocking = false;
378         }
379         _socket = s;
380         version (HUNT_DEBUG)
381             infof("new socket: fd=%d", this.handle);
382     }
383 
384     protected @property Socket socket() {
385         return _socket;
386     }
387 
388     protected Socket _socket;
389 
390     override void close() {
391         if(_isClosing) {
392             debug warning("already closed [fd=%d]", this.handle);
393             return;
394         }
395         _isClosing = true;
396         version (HUNT_DEBUG) tracef("closing [fd=%d]...", this.handle);
397 
398         if(isBusy) {
399             import std.parallelism;
400             version (HUNT_DEBUG) warning("Close operation delayed");
401             auto theTask = task(() {
402                 while(isBusy) {
403                     version (HUNT_DEBUG) infof("waitting for idle [fd=%d]...", this.handle);
404                     // Thread.sleep(20.msecs);
405                 }
406                 super.close();
407             });
408             taskPool.put(theTask);
409         } else {
410             super.close();
411         }
412     }
413 
414     /// Get a socket option.
415     /// Returns: The number of bytes written to $(D result).
416     //returns the length, in bytes, of the actual result - very different from getsockopt()
417     pragma(inline) final int getOption(SocketOptionLevel level, SocketOption option, void[] result) @trusted {
418         return this._socket.getOption(level, option, result);
419     }
420 
421     /// Common case of getting integer and boolean options.
422     pragma(inline) final int getOption(SocketOptionLevel level,
423             SocketOption option, ref int32_t result) @trusted {
424         return this._socket.getOption(level, option, result);
425     }
426 
427     /// Get the linger option.
428     pragma(inline) final int getOption(SocketOptionLevel level, SocketOption option,
429             ref Linger result) @trusted {
430         return this._socket.getOption(level, option, result);
431     }
432 
433     /// Get a timeout (duration) option.
434     pragma(inline) final void getOption(SocketOptionLevel level,
435             SocketOption option, ref Duration result) @trusted {
436         this._socket.getOption(level, option, result);
437     }
438 
439     /// Set a socket option.
440     pragma(inline) final void setOption(SocketOptionLevel level, SocketOption option, void[] value) @trusted {
441         this._socket.setOption(forward!(level, option, value));
442     }
443 
444     /// Common case for setting integer and boolean options.
445     pragma(inline) final void setOption(SocketOptionLevel level, SocketOption option, int32_t value) @trusted {
446         this._socket.setOption(forward!(level, option, value));
447     }
448 
449     /// Set the linger option.
450     pragma(inline) final void setOption(SocketOptionLevel level, SocketOption option, Linger value) @trusted {
451         this._socket.setOption(forward!(level, option, value));
452     }
453 
454     pragma(inline) final void setOption(SocketOptionLevel level, SocketOption option, Duration value) @trusted {
455         this._socket.setOption(forward!(level, option, value));
456     }
457 
458     final @property @trusted Address remoteAddress() {
459         return _remoteAddress;
460     }
461 
462     protected Address _remoteAddress;
463 
464     final @property @trusted Address localAddress() {
465         return _localAddress;
466     }
467 
468     protected Address _localAddress;
469 
470     version (HAVE_IOCP) {
471         void setRead(size_t bytes) {
472             readLen = bytes;
473         }
474 
475         protected size_t readLen;
476     }
477 
478     void start();
479 
480     void onWriteDone() {
481         assert(false, "unimplemented");
482     }
483 
484 }
485 
486 /**
487 */
488 class SocketStreamBuffer : StreamWriteBuffer {
489 
490     this(const(ubyte)[] data, DataWrittenHandler handler = null) {
491         _buffer = data;
492         _pos = 0;
493         _sentHandler = handler;
494     }
495 
496     const(ubyte)[] remaining() {
497         return _buffer[_pos .. $];
498     }
499 
500     bool pop(size_t size) {
501         _pos += size;
502         if (_pos >= _buffer.length)
503             return true;
504         else
505             return false;
506     }
507 
508     void finish() {
509         if (_sentHandler)
510             _sentHandler(_buffer, _pos);
511         _sentHandler = null;
512         _buffer = null;
513     }
514 
515     StreamWriteBuffer next() {
516         return _next;
517     }
518 
519     void next(StreamWriteBuffer v) {
520         _next = v;
521     }
522 
523     size_t capacity() {
524         return _buffer.length;
525     }
526 
527 private:
528     StreamWriteBuffer _next;
529     size_t _pos = 0;
530     const(ubyte)[] _buffer;
531     DataWrittenHandler _sentHandler;
532 }
533 
534 /**
535 */
536 struct WriteBufferQueue {
537     StreamWriteBuffer front() nothrow @safe {
538         return _first;
539     }
540 
541     bool empty() nothrow @safe {
542         return _first is null;
543     }
544 
545     void clear() {
546         StreamWriteBuffer current = _first;
547         while (current !is null) {
548             _first = current.next;
549             current.next = null;
550             current = _first;
551         }
552 
553         _first = null;
554         _last = null;
555     }
556 
557     void enQueue(StreamWriteBuffer wsite) {
558         assert(wsite);
559         if (_last) {
560             _last.next = wsite;
561         } else {
562             _first = wsite;
563         }
564         wsite.next = null;
565         _last = wsite;
566     }
567 
568     StreamWriteBuffer deQueue() {
569         // assert(_first && _last);
570         StreamWriteBuffer wsite = _first;
571         if (_first !is null)
572             _first = _first.next;
573 
574         if (_first is null)
575             _last = null;
576 
577         return wsite;
578     }
579 
580 private:
581     StreamWriteBuffer _last = null;
582     StreamWriteBuffer _first = null;
583 }
584 
585 
586 /**
587 */
588 Address createAddress(AddressFamily family = AddressFamily.INET, 
589     ushort port=InternetAddress.PORT_ANY) {
590     if (family == AddressFamily.INET6) {
591         // addr = new Internet6Address(port); // bug on windows
592         return new Internet6Address("::", port);
593     }
594     else
595         return new InternetAddress(port);
596 }
597 
598 
599 // dfmt off
600 version(linux):
601 // dfmt on
602 static if (CompilerHelper.isLessThan(2078)) {
603     version (X86) {
604         enum SO_REUSEPORT = 15;
605     }
606     else version (X86_64) {
607         enum SO_REUSEPORT = 15;
608     }
609     else version (MIPS32) {
610         enum SO_REUSEPORT = 0x0200;
611     }
612     else version (MIPS64) {
613         enum SO_REUSEPORT = 0x0200;
614     }
615     else version (PPC) {
616         enum SO_REUSEPORT = 15;
617     }
618     else version (PPC64) {
619         enum SO_REUSEPORT = 15;
620     }
621     else version (ARM) {
622         enum SO_REUSEPORT = 15;
623     }
624 }