1 /*
2  * Hunt - A refined core library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net/
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module hunt.io.TcpListener;
13 
14 import hunt.io.TcpStream;
15 import hunt.io.TcpStreamOptions;
16 import hunt.io.IoError;
17 
18 import hunt.event;
19 import hunt.Exceptions;
20 import hunt.Functions;
21 import hunt.logging;
22 import hunt.util.CompilerHelper;
23 
24 import std.socket;
25 import std.exception;
26 import core.thread;
27 import core.time;
28 
29 
30 alias AcceptEventHandler = void delegate(TcpListener sender, TcpStream stream);
31 alias PeerCreateHandler = TcpStream delegate(TcpListener sender, Socket socket, size_t bufferSize);
32 alias EventErrorHandler = void delegate(IoError error);
33 
34 
35 /**
36  * 
37  */
38 class TcpListener : AbstractListener {
39     private bool _isSslEnabled = false;
40     private bool _isBlocking = false;
41     private bool _isBinded = false;
42     private TcpStreamOptions _tcpStreamoption;
43     protected EventHandler _shutdownHandler;
44 
45     /// event handlers
46     AcceptEventHandler acceptHandler;
47     SimpleEventHandler closeHandler;
48     PeerCreateHandler peerCreateHandler;
49     EventErrorHandler errorHandler;
50 
51     this(EventLoop loop, AddressFamily family = AddressFamily.INET, size_t bufferSize = 4 * 1024) {
52         _tcpStreamoption = TcpStreamOptions.create();
53         _tcpStreamoption.bufferSize = bufferSize;
54         version (HAVE_IOCP)
55             super(loop, family, bufferSize);
56         else
57             super(loop, family);
58     }
59 
60     TcpListener accepted(AcceptEventHandler handler) {
61         acceptHandler = handler;
62         return this;
63     }
64 
65     TcpListener error(EventErrorHandler handler)
66     {
67         errorHandler = handler;
68         return this;
69     }
70 
71     TcpListener onPeerCreating(PeerCreateHandler handler) {
72         peerCreateHandler = handler;
73         return this;
74     }
75 
76     TcpListener onShutdown(EventHandler handler) {
77         _shutdownHandler = handler;
78         return this;
79     }
80 
81     TcpListener bind(string ip, ushort port) {
82         return bind(parseAddress(ip, port));
83     }
84 
85     TcpListener bind(ushort port) {
86         return bind(createAddress(this.socket.addressFamily, port));
87     }
88 
89     TcpListener bind(Address addr) {
90         try {
91         this.socket.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, true);
92         this.socket.bind(addr);
93         this.socket.blocking = _isBlocking;
94         _localAddress = _socket.localAddress();
95         _isBinded = true;
96         } catch (SocketOSException e)
97         {
98             if (errorHandler !is null)
99             {
100                 this.errorHandler(new IoError(ErrorCode.ADDRINUSE , e.msg));
101             }
102         }
103         return this;
104     }
105 
106     Address bindingAddress() {
107         return _localAddress;
108     }
109 
110     void blocking(bool flag) {
111         _isBlocking = flag;
112         // if(_isBinded)
113         this.socket.blocking = flag;
114     }
115 
116     bool blocking() {
117         return _isBlocking;
118     }
119 
120     /**
121      * https://stackoverflow.com/questions/14388706/socket-options-so-reuseaddr-and-so-reuseport-how-do-they-differ-do-they-mean-t
122      * https://www.cnblogs.com/xybaby/p/7341579.html
123      * https://rextester.com/BUAFK86204
124      */
125     TcpListener reusePort(bool flag) {
126         if(_isBinded) {
127             throw new IOException("Must be set before binding.");
128         }
129 
130         version (Posix) {
131             import core.sys.posix.sys.socket;
132 
133             this.socket.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, flag);
134             this.socket.setOption(SocketOptionLevel.SOCKET, cast(SocketOption) SO_REUSEPORT, flag);
135         } else version (Windows) {
136             // https://docs.microsoft.com/en-us/windows/win32/winsock/using-so-reuseaddr-and-so-exclusiveaddruse
137             // https://docs.microsoft.com/zh-cn/windows/win32/winsock/so-exclusiveaddruse
138             // TODO: Tasks pending completion -@Administrator at 2020-05-25T15:04:42+08:00
139             // More tests needed            
140             import core.sys.windows.winsock2;
141             this.socket.setOption(SocketOptionLevel.SOCKET, cast(SocketOption) SO_EXCLUSIVEADDRUSE, !flag);
142         }
143 
144         return this;
145     }
146 
147     TcpListener listen(int backlog) {
148         this.socket.listen(backlog);
149         return this;
150     }
151 
152     override void start() {
153         _inLoop.register(this);
154         _isRegistered = true;
155         version (HAVE_IOCP)
156             this.doAccept();
157     }
158 
159     override void close() {
160         if (closeHandler !is null)
161             closeHandler();
162         else if (_shutdownHandler !is null)
163             _shutdownHandler(this, null);
164         this.onClose();
165     }
166 
167     protected override void onRead() {
168         bool canRead = true;
169         version (HUNT_DEBUG)
170             trace("start to listen");
171         // while(canRead && this.isRegistered) // why??
172         {
173             version (HUNT_DEBUG)
174                 trace("listening...");
175 
176             try
177             {
178               canRead = onAccept((Socket socket) {
179 
180                 version (HUNT_DEBUG) {
181                   infof("new connection from %s, fd=%d",
182                   socket.remoteAddress.toString(), socket.handle());
183                 }
184 
185                 if (acceptHandler !is null) {
186                   TcpStream stream;
187                   if (peerCreateHandler is null) {
188                     stream = new TcpStream(_inLoop, socket, _tcpStreamoption);
189                   }
190                   else
191                     stream = peerCreateHandler(this, socket, _tcpStreamoption.bufferSize);
192 
193                   acceptHandler(this, stream);
194                   stream.start();
195                 }
196               });
197 
198               if (this.isError) {
199                 canRead = false;
200                 hunt.logging.error("listener error: ", this.errorMessage);
201                 this.close();
202               }
203             } catch (SocketOSException e)
204             {
205                 if (errorHandler !is null)
206                 {
207                     errorHandler(new IoError(ErrorCode.OTHER , e.msg));
208                 }
209             }
210         }
211     }
212 }
213 
214 
215 
216 // dfmt off
217 version(linux):
218 // dfmt on
219 static if (CompilerHelper.isLessThan(2078)) {
220     version (X86) {
221         enum SO_REUSEPORT = 15;
222     } else version (X86_64) {
223         enum SO_REUSEPORT = 15;
224     } else version (MIPS32) {
225         enum SO_REUSEPORT = 0x0200;
226     } else version (MIPS64) {
227         enum SO_REUSEPORT = 0x0200;
228     } else version (PPC) {
229         enum SO_REUSEPORT = 15;
230     } else version (PPC64) {
231         enum SO_REUSEPORT = 15;
232     } else version (ARM) {
233         enum SO_REUSEPORT = 15;
234     }
235 
236 }
237 
238 version (AArch64) {
239     enum SO_REUSEPORT = 15;
240 }
241 
242 version(CRuntime_Musl) {
243     enum SO_REUSEPORT = 15;
244 }