1 module hunt.io.channel.AbstractChannel; 2 3 import hunt.event.selector.Selector; 4 import hunt.io.channel.Common; 5 import hunt.io.IoError; 6 import hunt.logging.ConsoleLogger; 7 import hunt.util.worker; 8 9 import core.atomic; 10 import std.bitmanip; 11 import std.socket : socket_t; 12 13 14 /** 15 * 16 */ 17 abstract class AbstractChannel : Channel { 18 socket_t handle = socket_t.init; 19 ErrorEventHandler errorHandler; 20 21 Worker taskWorker = null; 22 23 protected bool _isRegistered = false; 24 private shared bool _isClosing = false; 25 protected shared bool _isClosed = false; 26 27 this(Selector loop, ChannelType type) { 28 this._inLoop = loop; 29 _type = type; 30 _flags = BitArray([false, false, false, false, false, false, false, 31 false, false, false, false, false, false, false, false, false]); 32 } 33 34 /** 35 * 36 */ 37 bool isRegistered() { 38 return _isRegistered; 39 } 40 41 /** 42 * 43 */ 44 bool isClosing() { 45 return _isClosing; 46 } 47 48 /** 49 * 50 */ 51 bool isClosed() { 52 return _isClosed; 53 } 54 55 /** 56 * 57 */ 58 void close() { 59 if (!_isClosed && cas(&_isClosing, false, true) ) { 60 version (HUNT_IO_DEBUG_MORE) 61 tracef("channel[fd=%d] closing...", this.handle); 62 63 // closing 64 doClose(); // close 65 _isClosed = true; 66 67 // closed 68 onClose(); 69 _isClosing = false; 70 version (HUNT_IO_DEBUG) 71 tracef("channel[fd=%d] closed", this.handle); 72 73 } else { 74 version (HUNT_IO_DEBUG) { 75 warningf("The channel[fd=%d] has already been closed (%s) or closing (%s)", 76 this.handle, _isClosed, _isClosing); 77 } 78 } 79 } 80 81 protected void doClose() { 82 83 } 84 85 void onClose() { 86 version (HUNT_IO_DEBUG) 87 tracef("onClose [fd=%d]...", this.handle); 88 _isRegistered = false; 89 _inLoop.deregister(this); 90 clear(); 91 92 version (HUNT_IO_DEBUG_MORE) 93 tracef("onClose done [fd=%d]...", this.handle); 94 95 _isClosed = true; 96 } 97 98 protected void errorOccurred(ErrorCode code, string msg) { 99 debug warningf("isRegistered: %s, isClosed: %s, msg=%s", _isRegistered, _isClosed, msg); 100 if (errorHandler !is null) { 101 errorHandler(new IoError(code, msg)); 102 } 103 } 104 105 void onRead() { 106 assert(false, "not implemented"); 107 } 108 109 void onWrite() { 110 assert(false, "not implemented"); 111 } 112 113 final bool hasFlag(ChannelFlag index) { 114 return _flags[index]; 115 } 116 117 @property ChannelType type() { 118 return _type; 119 } 120 121 @property Selector eventLoop() { 122 return _inLoop; 123 } 124 125 void setNext(AbstractChannel next) { 126 if (next is this) 127 return; // Can't set to self 128 next._next = _next; 129 next._priv = this; 130 if (_next) 131 _next._priv = next; 132 this._next = next; 133 } 134 135 void clear() { 136 if (_priv) 137 _priv._next = _next; 138 if (_next) 139 _next._priv = _priv; 140 _next = null; 141 _priv = null; 142 } 143 144 mixin OverrideErro; 145 146 protected: 147 final void setFlag(ChannelFlag index, bool enable) { 148 _flags[index] = enable; 149 } 150 151 Selector _inLoop; 152 153 private: 154 BitArray _flags; 155 ChannelType _type; 156 157 AbstractChannel _priv; 158 AbstractChannel _next; 159 } 160 161 162 163 /** 164 https://stackoverflow.com/questions/40361869/how-to-wake-up-epoll-wait-before-any-event-happened 165 */ 166 class EventChannel : AbstractChannel { 167 this(Selector loop) { 168 super(loop, ChannelType.Event); 169 } 170 171 abstract void trigger(); 172 // override void close() { 173 // if(_isClosing) 174 // return; 175 // _isClosing = true; 176 // version (HUNT_DEBUG) tracef("closing [fd=%d]...", this.handle); 177 178 // if(isBusy) { 179 // import std.parallelism; 180 // version (HUNT_DEBUG) warning("Close operation delayed"); 181 // auto theTask = task(() { 182 // while(isBusy) { 183 // version (HUNT_DEBUG) infof("waitting for idle [fd=%d]...", this.handle); 184 // // Thread.sleep(20.msecs); 185 // } 186 // super.close(); 187 // }); 188 // taskPool.put(theTask); 189 // } else { 190 // super.close(); 191 // } 192 // } 193 } 194 195 mixin template OverrideErro() { 196 bool isError() { 197 return _error; 198 } 199 200 deprecated("Using errorMessage instead.") 201 alias erroString = errorMessage; 202 203 string errorMessage() { 204 return _errorMessage; 205 } 206 207 void clearError() { 208 _error = false; 209 _errorMessage = ""; 210 } 211 212 bool _error = false; 213 string _errorMessage; 214 }