1 /*
2  * Hunt - A refined core library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net/
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module hunt.io.TcpListener;
13 
14 import hunt.io.TcpStream;
15 import hunt.io.TcpStreamOptions;
16 import hunt.io.IoError;
17 
18 import hunt.event;
19 import hunt.Exceptions;
20 import hunt.Functions;
21 import hunt.logging.ConsoleLogger;
22 import hunt.util.Common;
23 
24 import std.socket;
25 import std.exception;
26 import core.thread;
27 import core.time;
28 
29 
30 alias AcceptEventHandler = void delegate(TcpListener sender, TcpStream stream);
31 alias PeerCreateHandler = TcpStream delegate(TcpListener sender, Socket socket, size_t bufferSize);
32 alias EventErrorHandler = void delegate(IoError error);
33 
34 /**
35  * 
36  */
37 class TcpListener : AbstractListener {
38     private bool _isSslEnabled = false;
39     private bool _isBlocking = false;
40     private bool _isBinded = false;
41     private TcpStreamOptions _tcpStreamoption;
42     protected EventHandler _shutdownHandler;
43 
44     /// event handlers
45     AcceptEventHandler acceptHandler;
46     SimpleEventHandler closeHandler;
47     PeerCreateHandler peerCreateHandler;
48     EventErrorHandler errorHandler;
49 
50     this(EventLoop loop, AddressFamily family = AddressFamily.INET, size_t bufferSize = 4 * 1024) {
51         _tcpStreamoption = TcpStreamOptions.create();
52         _tcpStreamoption.bufferSize = bufferSize;
53         version (HAVE_IOCP)
54             super(loop, family, bufferSize);
55         else
56             super(loop, family);
57     }
58 
59     TcpListener accepted(AcceptEventHandler handler) {
60         acceptHandler = handler;
61         return this;
62     }
63 
64     TcpListener error(EventErrorHandler handler)
65     {
66         errorHandler = handler;
67         return this;
68     }
69 
70     TcpListener onPeerCreating(PeerCreateHandler handler) {
71         peerCreateHandler = handler;
72         return this;
73     }
74 
75     TcpListener onShutdown(EventHandler handler) {
76         _shutdownHandler = handler;
77         return this;
78     }
79 
80     TcpListener bind(string ip, ushort port) {
81         return bind(parseAddress(ip, port));
82     }
83 
84     TcpListener bind(ushort port) {
85         return bind(createAddress(this.socket.addressFamily, port));
86     }
87 
88     TcpListener bind(Address addr) {
89         try {
90         this.socket.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, true);
91         this.socket.bind(addr);
92         this.socket.blocking = _isBlocking;
93         _localAddress = _socket.localAddress();
94         _isBinded = true;
95         } catch (SocketOSException e)
96         {
97             if (errorHandler !is null)
98             {
99                 this.errorHandler(new IoError(ErrorCode.ADDRINUSE , e.msg));
100             }
101         }
102         return this;
103     }
104 
105     Address bindingAddress() {
106         return _localAddress;
107     }
108 
109     void blocking(bool flag) {
110         _isBlocking = flag;
111         // if(_isBinded)
112         this.socket.blocking = flag;
113     }
114 
115     bool blocking() {
116         return _isBlocking;
117     }
118 
119     /**
120      * https://stackoverflow.com/questions/14388706/socket-options-so-reuseaddr-and-so-reuseport-how-do-they-differ-do-they-mean-t
121      * https://www.cnblogs.com/xybaby/p/7341579.html
122      * https://rextester.com/BUAFK86204
123      */
124     TcpListener reusePort(bool flag) {
125         if(_isBinded) {
126             throw new IOException("Must be set before binding.");
127         }
128 
129         version (Posix) {
130             import core.sys.posix.sys.socket;
131 
132             this.socket.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, flag);
133             this.socket.setOption(SocketOptionLevel.SOCKET, cast(SocketOption) SO_REUSEPORT, flag);
134         } else version (Windows) {
135             // https://docs.microsoft.com/en-us/windows/win32/winsock/using-so-reuseaddr-and-so-exclusiveaddruse
136             // https://docs.microsoft.com/zh-cn/windows/win32/winsock/so-exclusiveaddruse
137             // TODO: Tasks pending completion -@Administrator at 2020-05-25T15:04:42+08:00
138             // More tests needed
139             import core.sys.windows.winsock2;
140             this.socket.setOption(SocketOptionLevel.SOCKET, cast(SocketOption) SO_EXCLUSIVEADDRUSE, !flag);
141         }
142 
143         return this;
144     }
145 
146     TcpListener listen(int backlog) {
147         this.socket.listen(backlog);
148         return this;
149     }
150 
151     override void start() {
152         _inLoop.register(this);
153         _isRegistered = true;
154         version (HAVE_IOCP)
155             this.doAccept();
156     }
157 
158     override void close() {
159         if (closeHandler !is null)
160             closeHandler();
161         else if (_shutdownHandler !is null)
162             _shutdownHandler(this, null);
163         this.onClose();
164     }
165 
166     protected override void onRead() {
167         bool canRead = true;
168         version (HUNT_DEBUG)
169             trace("start to listen");
170         // while(canRead && this.isRegistered) // why??
171         {
172             version (HUNT_DEBUG)
173                 trace("listening...");
174 
175             try
176             {
177               canRead = onAccept((Socket socket) {
178 
179                 version (HUNT_DEBUG) {
180                   infof("new connection from %s, fd=%d",
181                   socket.remoteAddress.toString(), socket.handle());
182                 }
183 
184                 if (acceptHandler !is null) {
185                   TcpStream stream;
186                   if (peerCreateHandler is null) {
187                     stream = new TcpStream(_inLoop, socket, _tcpStreamoption);
188                   }
189                   else
190                     stream = peerCreateHandler(this, socket, _tcpStreamoption.bufferSize);
191 
192                   acceptHandler(this, stream);
193                   stream.start();
194                 }
195               });
196 
197               if (this.isError) {
198                 canRead = false;
199                 hunt.logging.ConsoleLogger.error("listener error: ", this.errorMessage);
200                 this.close();
201               }
202             } catch (SocketOSException e)
203             {
204                 if (errorHandler !is null)
205                 {
206                     errorHandler(new IoError(ErrorCode.OTHER , e.msg));
207                 }
208             }
209         }
210     }
211 }
212 
213 
214 
215 // dfmt off
216 version(linux):
217 // dfmt on
218 static if (CompilerHelper.isLessThan(2078)) {
219     version (X86) {
220         enum SO_REUSEPORT = 15;
221     } else version (X86_64) {
222         enum SO_REUSEPORT = 15;
223     } else version (MIPS32) {
224         enum SO_REUSEPORT = 0x0200;
225     } else version (MIPS64) {
226         enum SO_REUSEPORT = 0x0200;
227     } else version (PPC) {
228         enum SO_REUSEPORT = 15;
229     } else version (PPC64) {
230         enum SO_REUSEPORT = 15;
231     } else version (ARM) {
232         enum SO_REUSEPORT = 15;
233     }
234 
235 }
236 
237 version (AArch64) {
238     enum SO_REUSEPORT = 15;
239 }
240 
241 version(CRuntime_Musl) {
242     enum SO_REUSEPORT = 15;
243 }