1 module hunt.io.channel.AbstractSocketChannel;
2 
3 import hunt.event.selector.Selector;
4 import hunt.io.channel.AbstractChannel;
5 import hunt.io.channel.Common;
6 import hunt.logging;
7 
8 import core.time;
9 import std.functional;
10 import std.socket;
11 import core.stdc.stdint;
12 
13 /**
14  * 
15  */
16 abstract class AbstractSocketChannel : AbstractChannel {
17 
18     protected shared bool _isWritting = false; // keep a data write operation atomic
19 
20     this(Selector loop, ChannelType type) {
21         super(loop, type);
22     }
23 
24     // Busy with reading or writting
25     protected bool isBusy() {
26         return false;
27     }
28 
29     protected @property void socket(Socket s) {
30         this.handle = s.handle();
31         version (Posix) {
32             s.blocking = false;
33         }
34         _socket = s;
35         version (HUNT_DEBUG_MORE)
36             infof("new socket: fd=%d", this.handle);
37     }
38 
39     protected @property Socket socket() {
40         return _socket;
41     }
42 
43     protected Socket _socket;
44 
45     override void close() {
46         // if (_isClosing) {
47         //     // debug warningf("already closed [fd=%d]", this.handle);
48         //     return;
49         // }
50         // _isClosing = true;
51         version (HUNT_IO_MORE)
52             tracef("socket channel closing [fd=%d]...", this.handle);
53         version (HAVE_IOCP)
54         {
55             super.close();
56         } else
57         {
58             if (isBusy()) {
59                 import std.parallelism;
60 
61                 version (HUNT_DEBUG)
62                     warning("Close operation delayed");
63                 auto theTask = task(() {
64                     super.close();
65                     while (isBusy()) {
66                         version (HUNT_DEBUG)
67                             infof("waitting for idle [fd=%d]...", this.handle);
68                         // Thread.sleep(20.msecs);
69                     }
70                 });
71                 taskPool.put(theTask);
72             } else {
73                 super.close();
74             }
75         }
76     }
77 
78     /// Get a socket option.
79     /// Returns: The number of bytes written to $(D result).
80     ///    returns the length, in bytes, of the actual result - very different from getsockopt()
81     pragma(inline) final int getOption(SocketOptionLevel level, SocketOption option, void[] result) @trusted {
82         return this._socket.getOption(level, option, result);
83     }
84 
85     /// Common case of getting integer and boolean options.
86     pragma(inline) final int getOption(SocketOptionLevel level,
87             SocketOption option, ref int32_t result) @trusted {
88         return this._socket.getOption(level, option, result);
89     }
90 
91     /// Get the linger option.
92     pragma(inline) final int getOption(SocketOptionLevel level, SocketOption option,
93             ref Linger result) @trusted {
94         return this._socket.getOption(level, option, result);
95     }
96 
97     /// Get a timeout (duration) option.
98     pragma(inline) final void getOption(SocketOptionLevel level,
99             SocketOption option, ref Duration result) @trusted {
100         this._socket.getOption(level, option, result);
101     }
102 
103     /// Set a socket option.
104     pragma(inline) final void setOption(SocketOptionLevel level, SocketOption option, void[] value) @trusted {
105         this._socket.setOption(forward!(level, option, value));
106     }
107 
108     /// Common case for setting integer and boolean options.
109     pragma(inline) final void setOption(SocketOptionLevel level, SocketOption option, int32_t value) @trusted {
110         this._socket.setOption(forward!(level, option, value));
111     }
112 
113     /// Set the linger option.
114     pragma(inline) final void setOption(SocketOptionLevel level, SocketOption option, Linger value) @trusted {
115         this._socket.setOption(forward!(level, option, value));
116     }
117 
118     pragma(inline) final void setOption(SocketOptionLevel level, SocketOption option, Duration value) @trusted {
119         this._socket.setOption(forward!(level, option, value));
120     }
121 
122     final @property @trusted Address remoteAddress() {
123         return _remoteAddress;
124     }
125 
126     protected Address _remoteAddress;
127 
128     final @property @trusted Address localAddress() {
129         return _localAddress;
130     }
131 
132     protected Address _localAddress;
133 
134     version (HAVE_IOCP) {
135         void setRead(size_t bytes) {
136             readLen = bytes;
137         }
138 
139         protected size_t readLen;
140     }
141 
142     void start();
143 
144     void onWriteDone() {
145         assert(false, "unimplemented");
146     }
147 
148 }