1 module hunt.io.channel.AbstractChannel;
2 
3 import hunt.event.selector.Selector;
4 import hunt.io.channel.Common;
5 import hunt.io.IoError;
6 import hunt.logging.ConsoleLogger;
7 
8 import core.atomic;
9 import std.bitmanip;
10 import std.socket : socket_t;
11 
12 
13 /**
14  *
15  */
16 abstract class AbstractChannel : Channel {
17     socket_t handle = socket_t.init;
18     ErrorEventHandler errorHandler;
19 
20     protected bool _isRegistered = false;
21     private shared bool _isClosing = false;
22     protected shared bool _isClosed = false;
23 
24     this(Selector loop, ChannelType type) {
25         this._inLoop = loop;
26         _type = type;
27         _flags = BitArray([false, false, false, false, false, false, false,
28                 false, false, false, false, false, false, false, false, false]);
29     }
30 
31     /**
32      *
33      */
34     bool isRegistered() {
35         return _isRegistered;
36     }
37 
38     /**
39      *
40      */
41     bool isClosing() {
42         return _isClosing;
43     }
44 
45     /**
46      *
47      */
48     bool isClosed() {
49         return _isClosed;
50     }
51 
52     /**
53      *
54      */
55     void close() {
56         if (!_isClosed && cas(&_isClosing, false, true) ) {
57             version (HUNT_IO_DEBUG_MORE) tracef("channel[fd=%d] closing...", this.handle);
58             onClose(); // closing
59             doClose(); // close
60             // closed
61             _isClosing = false;
62             version (HUNT_IO_DEBUG) tracef("channel[fd=%d] closed", this.handle);
63 
64         } else {
65             version (HUNT_IO_DEBUG) {
66                 warningf("The channel[fd=%d] has already been closed (%s) or closing (%s)",
67                  this.handle, _isClosed, _isClosing);
68             }
69         }
70     }
71 
72     protected void doClose() {
73 
74     }
75 
76      void onClose() {
77         version (HUNT_IO_DEBUG)
78             tracef("onClose [fd=%d]...", this.handle);
79         _isRegistered = false;
80         _inLoop.deregister(this);
81         clear();
82 
83         version (HUNT_IO_DEBUG_MORE)
84             tracef("onClose done [fd=%d]...", this.handle);
85 
86         _isClosed = true;
87     }
88 
89     protected void errorOccurred(ErrorCode code, string msg) {
90         debug warningf("isRegistered: %s, isClosed: %s, msg=%s", _isRegistered, _isClosed, msg);
91         if (errorHandler !is null) {
92             errorHandler(new IoError(code, msg));
93         }
94     }
95 
96     void onRead() {
97         assert(false, "not implemented");
98     }
99 
100     void onWrite() {
101         assert(false, "not implemented");
102     }
103 
104     final bool hasFlag(ChannelFlag index) {
105         return _flags[index];
106     }
107 
108     @property ChannelType type() {
109         return _type;
110     }
111 
112     @property Selector eventLoop() {
113         return _inLoop;
114     }
115 
116     void setNext(AbstractChannel next) {
117         if (next is this)
118             return; // Can't set to self
119         next._next = _next;
120         next._priv = this;
121         if (_next)
122             _next._priv = next;
123         this._next = next;
124     }
125 
126     void clear() {
127         if (_priv)
128             _priv._next = _next;
129         if (_next)
130             _next._priv = _priv;
131         _next = null;
132         _priv = null;
133     }
134 
135     mixin OverrideErro;
136 
137 protected:
138     final void setFlag(ChannelFlag index, bool enable) {
139         _flags[index] = enable;
140     }
141 
142     Selector _inLoop;
143 
144 private:
145     BitArray _flags;
146     ChannelType _type;
147 
148     AbstractChannel _priv;
149     AbstractChannel _next;
150 }
151 
152 
153 
154 /**
155     https://stackoverflow.com/questions/40361869/how-to-wake-up-epoll-wait-before-any-event-happened
156 */
157 class EventChannel : AbstractChannel {
158     this(Selector loop) {
159         super(loop, ChannelType.Event);
160     }
161 
162     abstract void trigger();
163     // override void close() {
164     //     if(_isClosing)
165     //         return;
166     //     _isClosing = true;
167     //     version (HUNT_DEBUG) tracef("closing [fd=%d]...", this.handle);
168 
169     //     if(isBusy) {
170     //         import std.parallelism;
171     //         version (HUNT_DEBUG) warning("Close operation delayed");
172     //         auto theTask = task(() {
173     //             while(isBusy) {
174     //                 version (HUNT_DEBUG) infof("waitting for idle [fd=%d]...", this.handle);
175     //                 // Thread.sleep(20.msecs);
176     //             }
177     //             super.close();
178     //         });
179     //         taskPool.put(theTask);
180     //     } else {
181     //         super.close();
182     //     }
183     // }
184 }
185 
186 mixin template OverrideErro() {
187     bool isError() {
188         return _error;
189     }
190 
191     deprecated("Using errorMessage instead.")
192     alias erroString = errorMessage;
193 
194     string errorMessage() {
195         return _errorMessage;
196     }
197 
198     void clearError() {
199         _error = false;
200         _errorMessage = "";
201     }
202 
203     bool _error = false;
204     string _errorMessage;
205 }