1 /* 2 * Hunt - A refined core library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.io.TcpListener; 13 14 import hunt.io.TcpStream; 15 import hunt.io.TcpStreamOptions; 16 import hunt.io.IoError; 17 18 import hunt.event; 19 import hunt.Exceptions; 20 import hunt.Functions; 21 import hunt.logging; 22 import hunt.util.CompilerHelper; 23 24 import std.socket; 25 import std.exception; 26 import core.thread; 27 import core.time; 28 29 30 alias AcceptEventHandler = void delegate(TcpListener sender, TcpStream stream); 31 alias PeerCreateHandler = TcpStream delegate(TcpListener sender, Socket socket, size_t bufferSize); 32 alias EventErrorHandler = void delegate(IoError error); 33 34 35 /** 36 * 37 */ 38 class TcpListener : AbstractListener { 39 private bool _isSslEnabled = false; 40 private bool _isBlocking = false; 41 private bool _isBinded = false; 42 private TcpStreamOptions _tcpStreamoption; 43 protected EventHandler _shutdownHandler; 44 45 /// event handlers 46 AcceptEventHandler acceptHandler; 47 SimpleEventHandler closeHandler; 48 PeerCreateHandler peerCreateHandler; 49 EventErrorHandler errorHandler; 50 51 this(EventLoop loop, AddressFamily family = AddressFamily.INET, size_t bufferSize = 4 * 1024) { 52 _tcpStreamoption = TcpStreamOptions.create(); 53 _tcpStreamoption.bufferSize = bufferSize; 54 version (HAVE_IOCP) 55 super(loop, family, bufferSize); 56 else 57 super(loop, family); 58 } 59 60 TcpListener accepted(AcceptEventHandler handler) { 61 acceptHandler = handler; 62 return this; 63 } 64 65 TcpListener error(EventErrorHandler handler) 66 { 67 errorHandler = handler; 68 return this; 69 } 70 71 TcpListener onPeerCreating(PeerCreateHandler handler) { 72 peerCreateHandler = handler; 73 return this; 74 } 75 76 TcpListener onShutdown(EventHandler handler) { 77 _shutdownHandler = handler; 78 return this; 79 } 80 81 TcpListener bind(string ip, ushort port) { 82 return bind(parseAddress(ip, port)); 83 } 84 85 TcpListener bind(ushort port) { 86 return bind(createAddress(this.socket.addressFamily, port)); 87 } 88 89 TcpListener bind(Address addr) { 90 try { 91 this.socket.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, true); 92 this.socket.bind(addr); 93 this.socket.blocking = _isBlocking; 94 _localAddress = _socket.localAddress(); 95 _isBinded = true; 96 } catch (SocketOSException e) 97 { 98 if (errorHandler !is null) 99 { 100 this.errorHandler(new IoError(ErrorCode.ADDRINUSE , e.msg)); 101 } 102 } 103 return this; 104 } 105 106 Address bindingAddress() { 107 return _localAddress; 108 } 109 110 void blocking(bool flag) { 111 _isBlocking = flag; 112 // if(_isBinded) 113 this.socket.blocking = flag; 114 } 115 116 bool blocking() { 117 return _isBlocking; 118 } 119 120 /** 121 * https://stackoverflow.com/questions/14388706/socket-options-so-reuseaddr-and-so-reuseport-how-do-they-differ-do-they-mean-t 122 * https://www.cnblogs.com/xybaby/p/7341579.html 123 * https://rextester.com/BUAFK86204 124 */ 125 TcpListener reusePort(bool flag) { 126 if(_isBinded) { 127 throw new IOException("Must be set before binding."); 128 } 129 130 version (Posix) { 131 import core.sys.posix.sys.socket; 132 133 this.socket.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, flag); 134 this.socket.setOption(SocketOptionLevel.SOCKET, cast(SocketOption) SO_REUSEPORT, flag); 135 } else version (Windows) { 136 // https://docs.microsoft.com/en-us/windows/win32/winsock/using-so-reuseaddr-and-so-exclusiveaddruse 137 // https://docs.microsoft.com/zh-cn/windows/win32/winsock/so-exclusiveaddruse 138 // TODO: Tasks pending completion -@Administrator at 2020-05-25T15:04:42+08:00 139 // More tests needed 140 import core.sys.windows.winsock2; 141 this.socket.setOption(SocketOptionLevel.SOCKET, cast(SocketOption) SO_EXCLUSIVEADDRUSE, !flag); 142 } 143 144 return this; 145 } 146 147 TcpListener listen(int backlog) { 148 this.socket.listen(backlog); 149 return this; 150 } 151 152 override void start() { 153 _inLoop.register(this); 154 _isRegistered = true; 155 version (HAVE_IOCP) 156 this.doAccept(); 157 } 158 159 override void close() { 160 if (closeHandler !is null) 161 closeHandler(); 162 else if (_shutdownHandler !is null) 163 _shutdownHandler(this, null); 164 this.onClose(); 165 } 166 167 protected override void onRead() { 168 bool canRead = true; 169 version (HUNT_DEBUG) 170 trace("start to listen"); 171 // while(canRead && this.isRegistered) // why?? 172 { 173 version (HUNT_DEBUG) 174 trace("listening..."); 175 176 try 177 { 178 canRead = onAccept((Socket socket) { 179 180 version (HUNT_DEBUG) { 181 infof("new connection from %s, fd=%d", 182 socket.remoteAddress.toString(), socket.handle()); 183 } 184 185 if (acceptHandler !is null) { 186 TcpStream stream; 187 if (peerCreateHandler is null) { 188 stream = new TcpStream(_inLoop, socket, _tcpStreamoption); 189 } 190 else 191 stream = peerCreateHandler(this, socket, _tcpStreamoption.bufferSize); 192 193 acceptHandler(this, stream); 194 stream.start(); 195 } 196 }); 197 198 if (this.isError) { 199 canRead = false; 200 hunt.logging.error("listener error: ", this.errorMessage); 201 this.close(); 202 } 203 } catch (SocketOSException e) 204 { 205 if (errorHandler !is null) 206 { 207 errorHandler(new IoError(ErrorCode.OTHER , e.msg)); 208 } 209 } 210 } 211 } 212 } 213 214 215 216 // dfmt off 217 version(linux): 218 // dfmt on 219 static if (CompilerHelper.isLessThan(2078)) { 220 version (X86) { 221 enum SO_REUSEPORT = 15; 222 } else version (X86_64) { 223 enum SO_REUSEPORT = 15; 224 } else version (MIPS32) { 225 enum SO_REUSEPORT = 0x0200; 226 } else version (MIPS64) { 227 enum SO_REUSEPORT = 0x0200; 228 } else version (PPC) { 229 enum SO_REUSEPORT = 15; 230 } else version (PPC64) { 231 enum SO_REUSEPORT = 15; 232 } else version (ARM) { 233 enum SO_REUSEPORT = 15; 234 } 235 236 } 237 238 version (AArch64) { 239 enum SO_REUSEPORT = 15; 240 } 241 242 version(CRuntime_Musl) { 243 enum SO_REUSEPORT = 15; 244 }