1 /* 2 * Hunt - A refined core library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.io.TcpListener; 13 14 import hunt.io.TcpStream; 15 import hunt.io.TcpStreamOptions; 16 import hunt.io.IoError; 17 18 import hunt.event; 19 import hunt.Exceptions; 20 import hunt.Functions; 21 import hunt.logging.ConsoleLogger; 22 import hunt.util.Common; 23 24 import std.socket; 25 import std.exception; 26 import core.thread; 27 import core.time; 28 29 30 alias AcceptEventHandler = void delegate(TcpListener sender, TcpStream stream); 31 alias PeerCreateHandler = TcpStream delegate(TcpListener sender, Socket socket, size_t bufferSize); 32 alias EventErrorHandler = void delegate(IoError error); 33 34 /** 35 * 36 */ 37 class TcpListener : AbstractListener { 38 private bool _isSslEnabled = false; 39 private bool _isBlocking = false; 40 private bool _isBinded = false; 41 private TcpStreamOptions _tcpStreamoption; 42 protected EventHandler _shutdownHandler; 43 44 /// event handlers 45 AcceptEventHandler acceptHandler; 46 SimpleEventHandler closeHandler; 47 PeerCreateHandler peerCreateHandler; 48 EventErrorHandler errorHandler; 49 50 this(EventLoop loop, AddressFamily family = AddressFamily.INET, size_t bufferSize = 4 * 1024) { 51 _tcpStreamoption = TcpStreamOptions.create(); 52 _tcpStreamoption.bufferSize = bufferSize; 53 version (HAVE_IOCP) 54 super(loop, family, bufferSize); 55 else 56 super(loop, family); 57 } 58 59 TcpListener accepted(AcceptEventHandler handler) { 60 acceptHandler = handler; 61 return this; 62 } 63 64 TcpListener error(EventErrorHandler handler) 65 { 66 errorHandler = handler; 67 return this; 68 } 69 70 TcpListener onPeerCreating(PeerCreateHandler handler) { 71 peerCreateHandler = handler; 72 return this; 73 } 74 75 TcpListener onShutdown(EventHandler handler) { 76 _shutdownHandler = handler; 77 return this; 78 } 79 80 TcpListener bind(string ip, ushort port) { 81 return bind(parseAddress(ip, port)); 82 } 83 84 TcpListener bind(ushort port) { 85 return bind(createAddress(this.socket.addressFamily, port)); 86 } 87 88 TcpListener bind(Address addr) { 89 try { 90 this.socket.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, true); 91 this.socket.bind(addr); 92 this.socket.blocking = _isBlocking; 93 _localAddress = _socket.localAddress(); 94 _isBinded = true; 95 } catch (SocketOSException e) 96 { 97 if (errorHandler !is null) 98 { 99 this.errorHandler(new IoError(ErrorCode.ADDRINUSE , e.msg)); 100 } 101 } 102 return this; 103 } 104 105 Address bindingAddress() { 106 return _localAddress; 107 } 108 109 void blocking(bool flag) { 110 _isBlocking = flag; 111 // if(_isBinded) 112 this.socket.blocking = flag; 113 } 114 115 bool blocking() { 116 return _isBlocking; 117 } 118 119 /** 120 * https://stackoverflow.com/questions/14388706/socket-options-so-reuseaddr-and-so-reuseport-how-do-they-differ-do-they-mean-t 121 * https://www.cnblogs.com/xybaby/p/7341579.html 122 * https://rextester.com/BUAFK86204 123 */ 124 TcpListener reusePort(bool flag) { 125 if(_isBinded) { 126 throw new IOException("Must be set before binding."); 127 } 128 129 version (Posix) { 130 import core.sys.posix.sys.socket; 131 132 this.socket.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, flag); 133 this.socket.setOption(SocketOptionLevel.SOCKET, cast(SocketOption) SO_REUSEPORT, flag); 134 } else version (Windows) { 135 // https://docs.microsoft.com/en-us/windows/win32/winsock/using-so-reuseaddr-and-so-exclusiveaddruse 136 // https://docs.microsoft.com/zh-cn/windows/win32/winsock/so-exclusiveaddruse 137 // TODO: Tasks pending completion -@Administrator at 2020-05-25T15:04:42+08:00 138 // More tests needed 139 import core.sys.windows.winsock2; 140 this.socket.setOption(SocketOptionLevel.SOCKET, cast(SocketOption) SO_EXCLUSIVEADDRUSE, !flag); 141 } 142 143 return this; 144 } 145 146 TcpListener listen(int backlog) { 147 this.socket.listen(backlog); 148 return this; 149 } 150 151 override void start() { 152 _inLoop.register(this); 153 _isRegistered = true; 154 version (HAVE_IOCP) 155 this.doAccept(); 156 } 157 158 override void close() { 159 if (closeHandler !is null) 160 closeHandler(); 161 else if (_shutdownHandler !is null) 162 _shutdownHandler(this, null); 163 this.onClose(); 164 } 165 166 protected override void onRead() { 167 bool canRead = true; 168 version (HUNT_DEBUG) 169 trace("start to listen"); 170 // while(canRead && this.isRegistered) // why?? 171 { 172 version (HUNT_DEBUG) 173 trace("listening..."); 174 175 try 176 { 177 canRead = onAccept((Socket socket) { 178 179 version (HUNT_DEBUG) { 180 infof("new connection from %s, fd=%d", 181 socket.remoteAddress.toString(), socket.handle()); 182 } 183 184 if (acceptHandler !is null) { 185 TcpStream stream; 186 if (peerCreateHandler is null) { 187 stream = new TcpStream(_inLoop, socket, _tcpStreamoption); 188 } 189 else 190 stream = peerCreateHandler(this, socket, _tcpStreamoption.bufferSize); 191 192 acceptHandler(this, stream); 193 stream.start(); 194 } 195 }); 196 197 if (this.isError) { 198 canRead = false; 199 hunt.logging.ConsoleLogger.error("listener error: ", this.errorMessage); 200 this.close(); 201 } 202 } catch (SocketOSException e) 203 { 204 if (errorHandler !is null) 205 { 206 errorHandler(new IoError(ErrorCode.OTHER , e.msg)); 207 } 208 } 209 } 210 } 211 } 212 213 214 215 // dfmt off 216 version(linux): 217 // dfmt on 218 static if (CompilerHelper.isLessThan(2078)) { 219 version (X86) { 220 enum SO_REUSEPORT = 15; 221 } else version (X86_64) { 222 enum SO_REUSEPORT = 15; 223 } else version (MIPS32) { 224 enum SO_REUSEPORT = 0x0200; 225 } else version (MIPS64) { 226 enum SO_REUSEPORT = 0x0200; 227 } else version (PPC) { 228 enum SO_REUSEPORT = 15; 229 } else version (PPC64) { 230 enum SO_REUSEPORT = 15; 231 } else version (ARM) { 232 enum SO_REUSEPORT = 15; 233 } 234 235 } 236 237 version (AArch64) { 238 enum SO_REUSEPORT = 15; 239 } 240 241 version(CRuntime_Musl) { 242 enum SO_REUSEPORT = 15; 243 }