1 module hunt.io.channel.AbstractChannel; 2 3 import hunt.event.selector.Selector; 4 import hunt.io.channel.Common; 5 import hunt.io.IoError; 6 import hunt.logging; 7 import hunt.util.worker; 8 9 import core.atomic; 10 import std.bitmanip; 11 import std.socket : socket_t; 12 13 14 /** 15 * 16 */ 17 abstract class AbstractChannel : Channel { 18 socket_t handle = socket_t.init; 19 ErrorEventHandler errorHandler; 20 21 Worker taskWorker = null; 22 23 protected bool _isRegistered = false; 24 private shared bool _isClosing = false; 25 protected shared bool _isClosed = false; 26 27 this(Selector loop, ChannelType type) { 28 this._inLoop = loop; 29 _type = type; 30 _flags = BitArray([false, false, false, false, false, false, false, 31 false, false, false, false, false, false, false, false, false]); 32 } 33 34 /** 35 * 36 */ 37 bool isRegistered() { 38 return _isRegistered; 39 } 40 41 /** 42 * 43 */ 44 bool isClosing() { 45 return _isClosing; 46 } 47 48 /** 49 * 50 */ 51 bool isClosed() { 52 return _isClosed; 53 } 54 55 /** 56 * 57 */ 58 void close() { 59 if (!_isClosed && cas(&_isClosing, false, true) ) { 60 version (HUNT_IO_DEBUG_MORE) 61 tracef("channel[fd=%d] closing...", this.handle); 62 63 // closing 64 doClose(); // close 65 _isClosed = true; 66 67 // closed 68 onClose(); 69 _isClosing = false; 70 version (HUNT_IO_DEBUG) 71 tracef("channel[fd=%d] closed", this.handle); 72 73 } else { 74 version (HUNT_IO_DEBUG) { 75 warningf("The channel[fd=%d] has already been closed (%s) or closing (%s)", 76 this.handle, _isClosed, _isClosing); 77 } 78 } 79 } 80 81 protected void doClose() { 82 83 } 84 85 void onClose() { 86 version (HUNT_IO_DEBUG) 87 tracef("onClose [fd=%d]...", this.handle); 88 _isRegistered = false; 89 _isClosed = true; 90 clear(); 91 92 _inLoop.deregister(this); 93 version (HUNT_IO_DEBUG_MORE) 94 tracef("onClose done [fd=%d]...", this.handle); 95 } 96 97 protected void errorOccurred(ErrorCode code, string msg) { 98 debug warningf("isRegistered: %s, _isClosing: %s, isClosed: %s, code: %s, msg=%s", 99 _isRegistered, _isClosing, _isClosed, code, msg); 100 101 if (errorHandler !is null) { 102 errorHandler(new IoError(code, msg)); 103 } 104 } 105 106 void onRead() { 107 assert(false, "not implemented"); 108 } 109 110 void onWrite() { 111 assert(false, "not implemented"); 112 } 113 114 final bool hasFlag(ChannelFlag index) { 115 return _flags[index]; 116 } 117 118 @property ChannelType type() { 119 return _type; 120 } 121 122 @property Selector eventLoop() { 123 return _inLoop; 124 } 125 126 void setNext(AbstractChannel next) { 127 if (next is this) 128 return; // Can't set to self 129 next._next = _next; 130 next._priv = this; 131 if (_next) 132 _next._priv = next; 133 this._next = next; 134 } 135 136 void clear() { 137 if (_priv) 138 _priv._next = _next; 139 if (_next) 140 _next._priv = _priv; 141 _next = null; 142 _priv = null; 143 } 144 145 mixin OverrideErro; 146 147 protected: 148 final void setFlag(ChannelFlag index, bool enable) { 149 _flags[index] = enable; 150 } 151 152 Selector _inLoop; 153 154 private: 155 BitArray _flags; 156 ChannelType _type; 157 158 AbstractChannel _priv; 159 AbstractChannel _next; 160 } 161 162 163 164 /** 165 https://stackoverflow.com/questions/40361869/how-to-wake-up-epoll-wait-before-any-event-happened 166 */ 167 class EventChannel : AbstractChannel { 168 this(Selector loop) { 169 super(loop, ChannelType.Event); 170 } 171 172 abstract void trigger(); 173 // override void close() { 174 // if(_isClosing) 175 // return; 176 // _isClosing = true; 177 // version (HUNT_DEBUG) tracef("closing [fd=%d]...", this.handle); 178 179 // if(isBusy) { 180 // import std.parallelism; 181 // version (HUNT_DEBUG) warning("Close operation delayed"); 182 // auto theTask = task(() { 183 // while(isBusy) { 184 // version (HUNT_DEBUG) infof("waitting for idle [fd=%d]...", this.handle); 185 // // Thread.sleep(20.msecs); 186 // } 187 // super.close(); 188 // }); 189 // taskPool.put(theTask); 190 // } else { 191 // super.close(); 192 // } 193 // } 194 } 195 196 mixin template OverrideErro() { 197 bool isError() { 198 return _error; 199 } 200 201 deprecated("Using errorMessage instead.") 202 alias erroString = errorMessage; 203 204 string errorMessage() { 205 return _errorMessage; 206 } 207 208 void clearError() { 209 _error = false; 210 _errorMessage = ""; 211 } 212 213 bool _error = false; 214 string _errorMessage; 215 }