1 module hunt.io.channel.AbstractSocketChannel; 2 3 import hunt.event.selector.Selector; 4 import hunt.io.channel.AbstractChannel; 5 import hunt.io.channel.Common; 6 import hunt.logging; 7 8 import core.time; 9 import std.functional; 10 import std.socket; 11 import core.stdc.stdint; 12 13 /** 14 * 15 */ 16 abstract class AbstractSocketChannel : AbstractChannel { 17 18 protected shared bool _isWritting = false; // keep a data write operation atomic 19 20 this(Selector loop, ChannelType type) { 21 super(loop, type); 22 } 23 24 // Busy with reading or writting 25 protected bool isBusy() { 26 return false; 27 } 28 29 protected @property void socket(Socket s) { 30 this.handle = s.handle(); 31 version (Posix) { 32 s.blocking = false; 33 } 34 _socket = s; 35 version (HUNT_DEBUG_MORE) 36 infof("new socket: fd=%d", this.handle); 37 } 38 39 protected @property Socket socket() { 40 return _socket; 41 } 42 43 protected Socket _socket; 44 45 override void close() { 46 // if (_isClosing) { 47 // // debug warningf("already closed [fd=%d]", this.handle); 48 // return; 49 // } 50 // _isClosing = true; 51 version (HUNT_IO_MORE) 52 tracef("socket channel closing [fd=%d]...", this.handle); 53 version (HAVE_IOCP) 54 { 55 super.close(); 56 } else 57 { 58 if (isBusy()) { 59 import std.parallelism; 60 61 version (HUNT_DEBUG) 62 warning("Close operation delayed"); 63 auto theTask = task(() { 64 super.close(); 65 while (isBusy()) { 66 version (HUNT_DEBUG) 67 infof("waitting for idle [fd=%d]...", this.handle); 68 // Thread.sleep(20.msecs); 69 } 70 }); 71 taskPool.put(theTask); 72 } else { 73 super.close(); 74 } 75 } 76 } 77 78 /// Get a socket option. 79 /// Returns: The number of bytes written to $(D result). 80 /// returns the length, in bytes, of the actual result - very different from getsockopt() 81 pragma(inline) final int getOption(SocketOptionLevel level, SocketOption option, void[] result) @trusted { 82 return this._socket.getOption(level, option, result); 83 } 84 85 /// Common case of getting integer and boolean options. 86 pragma(inline) final int getOption(SocketOptionLevel level, 87 SocketOption option, ref int32_t result) @trusted { 88 return this._socket.getOption(level, option, result); 89 } 90 91 /// Get the linger option. 92 pragma(inline) final int getOption(SocketOptionLevel level, SocketOption option, 93 ref Linger result) @trusted { 94 return this._socket.getOption(level, option, result); 95 } 96 97 /// Get a timeout (duration) option. 98 pragma(inline) final void getOption(SocketOptionLevel level, 99 SocketOption option, ref Duration result) @trusted { 100 this._socket.getOption(level, option, result); 101 } 102 103 /// Set a socket option. 104 pragma(inline) final void setOption(SocketOptionLevel level, SocketOption option, void[] value) @trusted { 105 this._socket.setOption(forward!(level, option, value)); 106 } 107 108 /// Common case for setting integer and boolean options. 109 pragma(inline) final void setOption(SocketOptionLevel level, SocketOption option, int32_t value) @trusted { 110 this._socket.setOption(forward!(level, option, value)); 111 } 112 113 /// Set the linger option. 114 pragma(inline) final void setOption(SocketOptionLevel level, SocketOption option, Linger value) @trusted { 115 this._socket.setOption(forward!(level, option, value)); 116 } 117 118 pragma(inline) final void setOption(SocketOptionLevel level, SocketOption option, Duration value) @trusted { 119 this._socket.setOption(forward!(level, option, value)); 120 } 121 122 final @property @trusted Address remoteAddress() { 123 return _remoteAddress; 124 } 125 126 protected Address _remoteAddress; 127 128 final @property @trusted Address localAddress() { 129 return _localAddress; 130 } 131 132 protected Address _localAddress; 133 134 version (HAVE_IOCP) { 135 void setRead(size_t bytes) { 136 readLen = bytes; 137 } 138 139 protected size_t readLen; 140 } 141 142 void start(); 143 144 void onWriteDone() { 145 assert(false, "unimplemented"); 146 } 147 148 }