1 module hunt.io.channel.AbstractChannel;
2 
3 import hunt.event.selector.Selector;
4 import hunt.io.channel.Common;
5 import hunt.io.IoError;
6 import hunt.logging;
7 import hunt.util.worker;
8 
9 import core.atomic;
10 import std.bitmanip;
11 import std.socket : socket_t;
12 
13 
14 /**
15  *
16  */
17 abstract class AbstractChannel : Channel {
18     socket_t handle = socket_t.init;
19     ErrorEventHandler errorHandler;
20     
21     Worker taskWorker = null;
22 
23     protected bool _isRegistered = false;
24     private shared bool _isClosing = false;
25     protected shared bool _isClosed = false;
26 
27     this(Selector loop, ChannelType type) {
28         this._inLoop = loop;
29         _type = type;
30         _flags = BitArray([false, false, false, false, false, false, false,
31                 false, false, false, false, false, false, false, false, false]);
32     }
33 
34     /**
35      *
36      */
37     bool isRegistered() {
38         return _isRegistered;
39     }
40 
41     /**
42      *
43      */
44     bool isClosing() {
45         return _isClosing;
46     }
47 
48     /**
49      *
50      */
51     bool isClosed() {
52         return _isClosed;
53     }
54 
55     /**
56      *
57      */
58     void close() {
59         if (!_isClosed && cas(&_isClosing, false, true) ) {
60             version (HUNT_IO_DEBUG_MORE)
61                 tracef("channel[fd=%d] closing...", this.handle);
62 
63             // closing
64             doClose(); // close
65             _isClosed = true;
66 
67             // closed
68             onClose(); 
69             _isClosing = false;
70             version (HUNT_IO_DEBUG)
71                 tracef("channel[fd=%d] closed", this.handle);
72 
73         } else {
74             version (HUNT_IO_DEBUG) {
75                 warningf("The channel[fd=%d] has already been closed (%s) or closing (%s)",
76                  this.handle, _isClosed, _isClosing);
77             }
78         }
79     }
80 
81     protected void doClose() {
82 
83     }
84 
85      void onClose() {
86         version (HUNT_IO_DEBUG)
87             tracef("onClose [fd=%d]...", this.handle);
88         _isRegistered = false;
89         _isClosed = true;
90         clear();
91 
92         _inLoop.deregister(this);
93         version (HUNT_IO_DEBUG_MORE)
94             tracef("onClose done [fd=%d]...", this.handle);
95     }
96 
97     protected void errorOccurred(ErrorCode code, string msg) {
98         debug warningf("isRegistered: %s, _isClosing: %s, isClosed: %s, code: %s, msg=%s", 
99             _isRegistered, _isClosing, _isClosed, code, msg);
100             
101         if (errorHandler !is null) {
102             errorHandler(new IoError(code, msg));
103         }
104     }
105 
106     void onRead() {
107         assert(false, "not implemented");
108     }
109 
110     void onWrite() {
111         assert(false, "not implemented");
112     }
113 
114     final bool hasFlag(ChannelFlag index) {
115         return _flags[index];
116     }
117 
118     @property ChannelType type() {
119         return _type;
120     }
121 
122     @property Selector eventLoop() {
123         return _inLoop;
124     }
125 
126     void setNext(AbstractChannel next) {
127         if (next is this)
128             return; // Can't set to self
129         next._next = _next;
130         next._priv = this;
131         if (_next)
132             _next._priv = next;
133         this._next = next;
134     }
135 
136     void clear() {
137         if (_priv)
138             _priv._next = _next;
139         if (_next)
140             _next._priv = _priv;
141         _next = null;
142         _priv = null;
143     }
144 
145     mixin OverrideErro;
146 
147 protected:
148     final void setFlag(ChannelFlag index, bool enable) {
149         _flags[index] = enable;
150     }
151 
152     Selector _inLoop;
153 
154 private:
155     BitArray _flags;
156     ChannelType _type;
157 
158     AbstractChannel _priv;
159     AbstractChannel _next;
160 }
161 
162 
163 
164 /**
165     https://stackoverflow.com/questions/40361869/how-to-wake-up-epoll-wait-before-any-event-happened
166 */
167 class EventChannel : AbstractChannel {
168     this(Selector loop) {
169         super(loop, ChannelType.Event);
170     }
171 
172     abstract void trigger();
173     // override void close() {
174     //     if(_isClosing)
175     //         return;
176     //     _isClosing = true;
177     //     version (HUNT_DEBUG) tracef("closing [fd=%d]...", this.handle);
178 
179     //     if(isBusy) {
180     //         import std.parallelism;
181     //         version (HUNT_DEBUG) warning("Close operation delayed");
182     //         auto theTask = task(() {
183     //             while(isBusy) {
184     //                 version (HUNT_DEBUG) infof("waitting for idle [fd=%d]...", this.handle);
185     //                 // Thread.sleep(20.msecs);
186     //             }
187     //             super.close();
188     //         });
189     //         taskPool.put(theTask);
190     //     } else {
191     //         super.close();
192     //     }
193     // }
194 }
195 
196 mixin template OverrideErro() {
197     bool isError() {
198         return _error;
199     }
200 
201     deprecated("Using errorMessage instead.")
202     alias erroString = errorMessage;
203 
204     string errorMessage() {
205         return _errorMessage;
206     }
207 
208     void clearError() {
209         _error = false;
210         _errorMessage = "";
211     }
212 
213     bool _error = false;
214     string _errorMessage;
215 }