1 /* 2 * Hunt - A refined core library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.io.TcpListener; 13 14 import hunt.event; 15 16 import std.socket; 17 import std.exception; 18 import hunt.logging; 19 import core.thread; 20 import core.time; 21 22 import hunt.Functions; 23 import hunt.io.TcpStream; 24 25 alias AcceptEventHandler = void delegate(TcpListener sender, TcpStream stream); 26 alias PeerCreateHandler = TcpStream delegate(TcpListener sender, Socket socket, size_t bufferSize); 27 28 /** 29 */ 30 class TcpListener : AbstractListener { 31 private bool isSslEnabled = false; 32 private TcpStreamOption _tcpStreamoption; 33 protected EventHandler _shutdownHandler; 34 35 /// event handlers 36 AcceptEventHandler acceptHandler; 37 SimpleEventHandler closeHandler; 38 PeerCreateHandler peerCreateHandler; 39 40 this(EventLoop loop, AddressFamily family = AddressFamily.INET, size_t bufferSize = 4 * 1024) { 41 _tcpStreamoption = TcpStreamOption.createOption(); 42 _tcpStreamoption.bufferSize = bufferSize; 43 version (HAVE_IOCP) 44 super(loop, family, bufferSize); 45 else 46 super(loop, family); 47 } 48 49 TcpListener onConnectionAccepted(AcceptEventHandler handler) { 50 acceptHandler = handler; 51 return this; 52 } 53 54 TcpListener onPeerCreating(PeerCreateHandler handler) { 55 peerCreateHandler = handler; 56 return this; 57 } 58 59 TcpListener onShutdown(EventHandler handler) { 60 _shutdownHandler = handler; 61 return this; 62 } 63 64 TcpListener bind(string ip, ushort port) { 65 bind(parseAddress(ip, port)); 66 return this; 67 } 68 69 TcpListener bind(ushort port) { 70 bind(createAddress(this.socket.addressFamily, port)); 71 return this; 72 } 73 74 TcpListener bind(Address addr) { 75 this.socket.bind(addr); 76 this.socket.blocking = false; 77 _localAddress = _socket.localAddress(); 78 return this; 79 } 80 81 Address bindingAddress() { 82 return _localAddress; 83 } 84 85 TcpListener reusePort(bool use) { 86 this.socket.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, use); 87 88 version (Posix) { 89 import core.sys.posix.sys.socket; 90 91 this.socket.setOption(SocketOptionLevel.SOCKET, cast(SocketOption) SO_REUSEPORT, use); 92 } else version (windows) { 93 import core.sys.windows.winsock2; 94 95 if (!use) 96 this.socket.setOption(SocketOptionLevel.SOCKET, 97 cast(SocketOption) SO_EXCLUSIVEADDRUSE, true); 98 } 99 100 return this; 101 } 102 103 TcpListener listen(int backlog) { 104 this.socket.listen(backlog); 105 return this; 106 } 107 108 override void start() { 109 _inLoop.register(this); 110 _isRegistered = true; 111 version (HAVE_IOCP) 112 this.doAccept(); 113 } 114 115 override void close() { 116 if (closeHandler !is null) 117 closeHandler(); 118 else if (_shutdownHandler !is null) 119 _shutdownHandler(this, null); 120 this.onClose(); 121 } 122 123 protected override void onRead() { 124 bool canRead = true; 125 version (HUNT_DEBUG) 126 trace("start to listen"); 127 // while(canRead && this.isRegistered) // why?? 128 { 129 version (HUNT_DEBUG) 130 trace("listening..."); 131 132 canRead = onAccept((Socket socket) { 133 134 version (HUNT_DEBUG) { 135 infof("new connection from %s, fd=%d", 136 socket.remoteAddress.toString(), socket.handle()); 137 } 138 139 if (acceptHandler !is null) { 140 TcpStream stream; 141 if (peerCreateHandler is null) { 142 stream = new TcpStream(_inLoop, socket, _tcpStreamoption); 143 } 144 else 145 stream = peerCreateHandler(this, socket, _tcpStreamoption.bufferSize); 146 147 acceptHandler(this, stream); 148 stream.start(); 149 } 150 }); 151 152 if (this.isError) { 153 canRead = false; 154 error("listener error: ", this.erroString); 155 this.close(); 156 } 157 } 158 } 159 }