1 /*
2  * Hunt - A refined core library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net/
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module hunt.io.TcpListener;
13 
14 import hunt.event;
15 
16 import std.socket;
17 import std.exception;
18 import hunt.logging;
19 import core.thread;
20 import core.time;
21 
22 import hunt.Functions;
23 import hunt.io.TcpStream;
24 
25 alias AcceptEventHandler = void delegate(TcpListener sender, TcpStream stream);
26 alias PeerCreateHandler = TcpStream delegate(TcpListener sender, Socket socket, size_t bufferSize);
27 
28 /**
29 */
30 class TcpListener : AbstractListener {
31     private bool isSslEnabled = false;
32     private TcpStreamOption _tcpStreamoption;
33     protected EventHandler _shutdownHandler;
34 
35     /// event handlers
36     AcceptEventHandler acceptHandler;
37     SimpleEventHandler closeHandler;
38     PeerCreateHandler peerCreateHandler;
39 
40     this(EventLoop loop, AddressFamily family = AddressFamily.INET, size_t bufferSize = 4 * 1024) {
41         _tcpStreamoption = TcpStreamOption.createOption();
42         _tcpStreamoption.bufferSize = bufferSize;
43         version (HAVE_IOCP)
44             super(loop, family, bufferSize);
45         else
46             super(loop, family);
47     }
48 
49     TcpListener onConnectionAccepted(AcceptEventHandler handler) {
50         acceptHandler = handler;
51         return this;
52     }
53 
54     TcpListener onPeerCreating(PeerCreateHandler handler) {
55         peerCreateHandler = handler;
56         return this;
57     }
58 
59     TcpListener onShutdown(EventHandler handler) {
60         _shutdownHandler = handler;
61         return this;
62     }
63 
64     TcpListener bind(string ip, ushort port) {
65         bind(parseAddress(ip, port));
66         return this;
67     }
68 
69     TcpListener bind(ushort port) {
70         bind(createAddress(this.socket.addressFamily, port));
71         return this;
72     }
73 
74     TcpListener bind(Address addr) {
75         this.socket.bind(addr);
76         this.socket.blocking = false;
77         _localAddress = _socket.localAddress();
78         return this;
79     }
80 
81     Address bindingAddress() {
82         return _localAddress;
83     }
84 
85     TcpListener reusePort(bool use) {
86         this.socket.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, use);
87 
88         version (Posix) {
89             import core.sys.posix.sys.socket;
90 
91             this.socket.setOption(SocketOptionLevel.SOCKET, cast(SocketOption) SO_REUSEPORT, use);
92         } else version (windows) {
93             import core.sys.windows.winsock2;
94 
95             if (!use)
96                 this.socket.setOption(SocketOptionLevel.SOCKET,
97                         cast(SocketOption) SO_EXCLUSIVEADDRUSE, true);
98         }
99 
100         return this;
101     }
102 
103     TcpListener listen(int backlog) {
104         this.socket.listen(backlog);
105         return this;
106     }
107 
108     override void start() {
109         _inLoop.register(this);
110         _isRegistered = true;
111         version (HAVE_IOCP)
112             this.doAccept();
113     }
114 
115     override void close() {
116         if (closeHandler !is null)
117             closeHandler();
118         else if (_shutdownHandler !is null)
119             _shutdownHandler(this, null);
120         this.onClose();
121     }
122 
123     protected override void onRead() {
124         bool canRead = true;
125         version (HUNT_DEBUG)
126             trace("start to listen");
127         // while(canRead && this.isRegistered) // why??
128         {
129             version (HUNT_DEBUG)
130                 trace("listening...");
131 
132             canRead = onAccept((Socket socket) {
133 
134                 version (HUNT_DEBUG) {
135                     infof("new connection from %s, fd=%d",
136                         socket.remoteAddress.toString(), socket.handle());
137                 }
138 
139                 if (acceptHandler !is null) {
140                     TcpStream stream;
141                     if (peerCreateHandler is null) {
142                         stream = new TcpStream(_inLoop, socket, _tcpStreamoption);
143                     }
144                     else
145                         stream = peerCreateHandler(this, socket, _tcpStreamoption.bufferSize);
146 
147                     acceptHandler(this, stream);
148                     stream.start();
149                 }
150             });
151 
152             if (this.isError) {
153                 canRead = false;
154                 error("listener error: ", this.erroString);
155                 this.close();
156             }
157         }
158     }
159 }