1 module hunt.io.channel.AbstractSocketChannel; 2 3 import hunt.event.selector.Selector; 4 import hunt.io.channel.AbstractChannel; 5 import hunt.io.channel.Common; 6 import hunt.logging.ConsoleLogger; 7 8 import core.time; 9 import std.functional; 10 import std.socket; 11 import core.stdc.stdint; 12 13 /** 14 */ 15 abstract class AbstractSocketChannel : AbstractChannel { 16 17 protected shared bool _isWritting = false; // keep a data write operation atomic 18 19 this(Selector loop, ChannelType type) { 20 super(loop, type); 21 } 22 23 // Busy with reading or writting 24 protected bool isBusy() { 25 return false; 26 } 27 28 protected @property void socket(Socket s) { 29 this.handle = s.handle(); 30 version (Posix) { 31 s.blocking = false; 32 } 33 _socket = s; 34 version (HUNT_DEBUG_MORE) 35 infof("new socket: fd=%d", this.handle); 36 } 37 38 protected @property Socket socket() { 39 return _socket; 40 } 41 42 protected Socket _socket; 43 44 override void close() { 45 // if (_isClosing) { 46 // // debug warningf("already closed [fd=%d]", this.handle); 47 // return; 48 // } 49 // _isClosing = true; 50 version (HUNT_IO_MORE) 51 tracef("socket channel closing [fd=%d]...", this.handle); 52 version (HAVE_IOCP) 53 { 54 super.close(); 55 } else 56 { 57 if (isBusy()) { 58 import std.parallelism; 59 60 version (HUNT_DEBUG) 61 warning("Close operation delayed"); 62 auto theTask = task(() { 63 super.close(); 64 while (isBusy()) { 65 version (HUNT_DEBUG) 66 infof("waitting for idle [fd=%d]...", this.handle); 67 // Thread.sleep(20.msecs); 68 } 69 }); 70 taskPool.put(theTask); 71 } else { 72 super.close(); 73 } 74 } 75 } 76 77 /// Get a socket option. 78 /// Returns: The number of bytes written to $(D result). 79 /// returns the length, in bytes, of the actual result - very different from getsockopt() 80 pragma(inline) final int getOption(SocketOptionLevel level, SocketOption option, void[] result) @trusted { 81 return this._socket.getOption(level, option, result); 82 } 83 84 /// Common case of getting integer and boolean options. 85 pragma(inline) final int getOption(SocketOptionLevel level, 86 SocketOption option, ref int32_t result) @trusted { 87 return this._socket.getOption(level, option, result); 88 } 89 90 /// Get the linger option. 91 pragma(inline) final int getOption(SocketOptionLevel level, SocketOption option, 92 ref Linger result) @trusted { 93 return this._socket.getOption(level, option, result); 94 } 95 96 /// Get a timeout (duration) option. 97 pragma(inline) final void getOption(SocketOptionLevel level, 98 SocketOption option, ref Duration result) @trusted { 99 this._socket.getOption(level, option, result); 100 } 101 102 /// Set a socket option. 103 pragma(inline) final void setOption(SocketOptionLevel level, SocketOption option, void[] value) @trusted { 104 this._socket.setOption(forward!(level, option, value)); 105 } 106 107 /// Common case for setting integer and boolean options. 108 pragma(inline) final void setOption(SocketOptionLevel level, SocketOption option, int32_t value) @trusted { 109 this._socket.setOption(forward!(level, option, value)); 110 } 111 112 /// Set the linger option. 113 pragma(inline) final void setOption(SocketOptionLevel level, SocketOption option, Linger value) @trusted { 114 this._socket.setOption(forward!(level, option, value)); 115 } 116 117 pragma(inline) final void setOption(SocketOptionLevel level, SocketOption option, Duration value) @trusted { 118 this._socket.setOption(forward!(level, option, value)); 119 } 120 121 final @property @trusted Address remoteAddress() { 122 return _remoteAddress; 123 } 124 125 protected Address _remoteAddress; 126 127 final @property @trusted Address localAddress() { 128 return _localAddress; 129 } 130 131 protected Address _localAddress; 132 133 version (HAVE_IOCP) { 134 void setRead(size_t bytes) { 135 readLen = bytes; 136 } 137 138 protected size_t readLen; 139 } 140 141 void start(); 142 143 void onWriteDone() { 144 assert(false, "unimplemented"); 145 } 146 147 }