1 module hunt.io.channel.AbstractChannel;
2 
3 import hunt.event.selector.Selector;
4 import hunt.io.channel.Common;
5 import hunt.io.IoError;
6 import hunt.logging.ConsoleLogger;
7 
8 import core.atomic;
9 import std.bitmanip;
10 import std.socket : socket_t;
11 
12 
13 /**
14  *
15  */
16 abstract class AbstractChannel : Channel {
17     socket_t handle = socket_t.init;
18     ErrorEventHandler errorHandler;
19 
20     protected bool _isRegistered = false;
21     private shared bool _isClosing = false;
22     protected shared bool _isClosed = false;
23 
24     this(Selector loop, ChannelType type) {
25         this._inLoop = loop;
26         _type = type;
27         _flags = BitArray([false, false, false, false, false, false, false,
28                 false, false, false, false, false, false, false, false, false]);
29     }
30 
31     /**
32      *
33      */
34     bool isRegistered() {
35         return _isRegistered;
36     }
37 
38     /**
39      *
40      */
41     bool isClosing() {
42         return _isClosing;
43     }
44 
45     /**
46      *
47      */
48     bool isClosed() {
49         return _isClosed;
50     }
51 
52     /**
53      *
54      */
55     void close() {
56         if (!_isClosed && cas(&_isClosing, false, true) ) {
57             version (HUNT_IO_DEBUG_MORE)
58                 tracef("channel[fd=%d] closing...", this.handle);
59 
60             // closing
61             doClose(); // close
62             _isClosed = true;
63 
64             // closed
65             onClose(); 
66             _isClosing = false;
67             version (HUNT_IO_DEBUG)
68                 tracef("channel[fd=%d] closed", this.handle);
69 
70         } else {
71             version (HUNT_IO_DEBUG) {
72                 warningf("The channel[fd=%d] has already been closed (%s) or closing (%s)",
73                  this.handle, _isClosed, _isClosing);
74             }
75         }
76     }
77 
78     protected void doClose() {
79 
80     }
81 
82      void onClose() {
83         version (HUNT_IO_DEBUG)
84             tracef("onClose [fd=%d]...", this.handle);
85         _isRegistered = false;
86         _inLoop.deregister(this);
87         clear();
88 
89         version (HUNT_IO_DEBUG_MORE)
90             tracef("onClose done [fd=%d]...", this.handle);
91 
92         _isClosed = true;
93     }
94 
95     protected void errorOccurred(ErrorCode code, string msg) {
96         debug warningf("isRegistered: %s, isClosed: %s, msg=%s", _isRegistered, _isClosed, msg);
97         if (errorHandler !is null) {
98             errorHandler(new IoError(code, msg));
99         }
100     }
101 
102     void onRead() {
103         assert(false, "not implemented");
104     }
105 
106     void onWrite() {
107         assert(false, "not implemented");
108     }
109 
110     final bool hasFlag(ChannelFlag index) {
111         return _flags[index];
112     }
113 
114     @property ChannelType type() {
115         return _type;
116     }
117 
118     @property Selector eventLoop() {
119         return _inLoop;
120     }
121 
122     void setNext(AbstractChannel next) {
123         if (next is this)
124             return; // Can't set to self
125         next._next = _next;
126         next._priv = this;
127         if (_next)
128             _next._priv = next;
129         this._next = next;
130     }
131 
132     void clear() {
133         if (_priv)
134             _priv._next = _next;
135         if (_next)
136             _next._priv = _priv;
137         _next = null;
138         _priv = null;
139     }
140 
141     mixin OverrideErro;
142 
143 protected:
144     final void setFlag(ChannelFlag index, bool enable) {
145         _flags[index] = enable;
146     }
147 
148     Selector _inLoop;
149 
150 private:
151     BitArray _flags;
152     ChannelType _type;
153 
154     AbstractChannel _priv;
155     AbstractChannel _next;
156 }
157 
158 
159 
160 /**
161     https://stackoverflow.com/questions/40361869/how-to-wake-up-epoll-wait-before-any-event-happened
162 */
163 class EventChannel : AbstractChannel {
164     this(Selector loop) {
165         super(loop, ChannelType.Event);
166     }
167 
168     abstract void trigger();
169     // override void close() {
170     //     if(_isClosing)
171     //         return;
172     //     _isClosing = true;
173     //     version (HUNT_DEBUG) tracef("closing [fd=%d]...", this.handle);
174 
175     //     if(isBusy) {
176     //         import std.parallelism;
177     //         version (HUNT_DEBUG) warning("Close operation delayed");
178     //         auto theTask = task(() {
179     //             while(isBusy) {
180     //                 version (HUNT_DEBUG) infof("waitting for idle [fd=%d]...", this.handle);
181     //                 // Thread.sleep(20.msecs);
182     //             }
183     //             super.close();
184     //         });
185     //         taskPool.put(theTask);
186     //     } else {
187     //         super.close();
188     //     }
189     // }
190 }
191 
192 mixin template OverrideErro() {
193     bool isError() {
194         return _error;
195     }
196 
197     deprecated("Using errorMessage instead.")
198     alias erroString = errorMessage;
199 
200     string errorMessage() {
201         return _errorMessage;
202     }
203 
204     void clearError() {
205         _error = false;
206         _errorMessage = "";
207     }
208 
209     bool _error = false;
210     string _errorMessage;
211 }