1 module hunt.event.selector.Selector; 2 3 import hunt.Exceptions; 4 import hunt.Functions; 5 import hunt.io.channel.AbstractChannel; 6 import hunt.io.channel.Common; 7 import hunt.logging.ConsoleLogger; 8 9 import core.atomic; 10 import core.memory; 11 import core.thread; 12 13 14 /** 15 http://tutorials.jenkov.com/java-nio/selectors.html 16 */ 17 abstract class Selector { 18 19 private shared bool _running = false; 20 private shared bool _isStopping = false; 21 private bool _isReady; 22 protected size_t _id; 23 protected size_t divider; 24 // protected AbstractChannel[] channels; 25 protected long idleTime = -1; // in millisecond 26 protected int fd; 27 28 private long timeout = -1; // in millisecond 29 private Thread _thread; 30 31 private SimpleEventHandler _startedHandler; 32 private SimpleEventHandler _stoppeddHandler; 33 34 this(size_t id, size_t divider, size_t maxChannels = 1500) { 35 _id = id; 36 this.divider = divider; 37 // channels = new AbstractChannel[maxChannels]; 38 } 39 40 size_t getId() { 41 return _id; 42 } 43 44 bool isReady() { 45 return _isReady; 46 } 47 48 49 /** 50 * Tells whether or not this selector is running. 51 * 52 * @return <tt>true</tt> if, and only if, this selector is running 53 */ 54 bool isRuning() { 55 return _running; 56 } 57 58 alias isOpen = isRuning; 59 60 bool isStopping() { 61 return _isStopping; 62 } 63 64 bool register(AbstractChannel channel) { 65 assert(channel !is null); 66 void* context = cast(void*)channel; 67 GC.addRoot(context); 68 GC.setAttr(cast(void*)context, GC.BlkAttr.NO_MOVE); 69 version (HUNT_IO_DEBUG) { 70 int infd = cast(int) channel.handle; 71 tracef("Register channel@%s: fd=%d, selector: %d", context, infd, getId()); 72 } 73 return true; 74 // int infd = cast(int) channel.handle; 75 // size_t index = cast(size_t)(infd / divider); 76 77 // if (index >= channels.length) { 78 // debug warningf("expanding channels uplimit to %d", index); 79 // import std.algorithm : max; 80 81 // size_t length = max(cast(size_t)(index * 3 / 2), 16); 82 // AbstractChannel[] arr = new AbstractChannel[length]; 83 // arr[0 .. channels.length] = channels[0 .. $]; 84 // channels = arr; 85 // } 86 87 // bool result = true; 88 89 // debug { 90 // AbstractChannel oldChannel = channels[index]; 91 // if(oldChannel !is null) { 92 // result = false; 93 // version(HUNT_DEBUG) { 94 // warningf("Register collision, {old channel: %s, fd=%d}; " ~ 95 // "{new channel: %s, fd=%d}; {slot=%d, selector: %d}", 96 // cast(void*)oldChannel, oldChannel.handle, 97 // cast(void*)channel, infd, 98 // index, getId()); 99 // } 100 101 // if(oldChannel.handle != channel.handle) { 102 // // Try to find a empty slot 103 // size_t lastIndex = index; 104 // while(channels[index] !is null) { 105 // index = (index + 1) % channels.length; 106 // if(index == lastIndex) { 107 // warningf("All the slots are full on selector: %d", getId()); 108 // } 109 // } 110 // } 111 // } 112 // } 113 114 // version (HUNT_IO_DEBUG) { 115 // tracef("register channel: fd=%d, slot=%d, selector: %d", infd, index, getId()); 116 // } 117 // channels[index] = channel; 118 119 // return result; 120 } 121 122 bool deregister(AbstractChannel channel) { 123 void* context = cast(void*)channel; 124 GC.removeRoot(context); 125 GC.clrAttr(context, GC.BlkAttr.NO_MOVE); 126 version(HUNT_IO_DEBUG) { 127 size_t fd = cast(size_t) channel.handle; 128 infof("The channel@%s has been deregistered: fd=%d, selector: %d", context, fd, getId()); 129 } 130 return true; 131 // size_t fd = cast(size_t) channel.handle; 132 // size_t index = cast(size_t)(fd / divider); 133 // bool result = true; 134 // debug { 135 // auto oldChannel = channels[index]; 136 137 // if(oldChannel is null) { 138 // result = false; 139 // version(HUNT_IO_DEBUG) { 140 // infof("The channel has been deregistered: fd=%d, slot=%d, selector: %d", fd, index, getId()); 141 // } 142 // } else { 143 // if(oldChannel !is channel) { 144 // result = false; 145 // version(HUNT_DEBUG) { 146 // warningf("deregistering a mismatched channel, " ~ 147 // "{old: %s, fd=%d}; {new: %s, fd=%d}, {slot=%d, selector: %d}", 148 // cast(void*)oldChannel, oldChannel.handle, 149 // cast(void*)channel, fd, index, getId()); 150 // } 151 // } else { 152 // version (HUNT_IO_DEBUG) { 153 // tracef("deregister channel: fd=%d, slot=%d, selector: %d", fd, index, getId()); 154 // } 155 // channels[index] = null; 156 // } 157 // } 158 // } else { 159 // channels[index] = null; 160 // } 161 162 // return result; 163 } 164 165 // bool update(AbstractChannel channel) { return true; } 166 167 protected abstract int doSelect(long timeout); 168 169 /** 170 timeout: in millisecond 171 */ 172 void run(long timeout = -1) { 173 this.timeout = timeout; 174 doRun(); 175 } 176 177 /** 178 timeout: in millisecond 179 */ 180 void runAsync(long timeout = -1, SimpleEventHandler handler = null) { 181 if(_running) { 182 version (HUNT_IO_DEBUG) warningf("The current selector %d has being running already!", _id); 183 return; 184 } 185 this.timeout = timeout; 186 version (HUNT_IO_DEBUG) trace("runAsync ..."); 187 Thread th = new Thread(() { 188 try { 189 doRun(handler); 190 } catch (Throwable t) { 191 warning(t.msg); 192 version(HUNT_DEBUG) warning(t.toString()); 193 } 194 }); 195 // th.isDaemon = true; // unstable 196 th.start(); 197 } 198 199 private void doRun(SimpleEventHandler handler=null) { 200 if(cas(&_running, false, true)) { 201 version (HUNT_IO_DEBUG) trace("running selector..."); 202 _thread = Thread.getThis(); 203 if(handler !is null) { 204 handler(); 205 } 206 onLoop(timeout); 207 } else { 208 version (HUNT_DEBUG) warningf("The current selector %d has being running already!", _id); 209 } 210 } 211 212 void stop() { 213 version (HUNT_IO_DEBUG) 214 tracef("Stopping selector %d. _running=%s, _isStopping=%s", _id, _running, _isStopping); 215 if(cas(&_isStopping, false, true)) { 216 try { 217 onStop(); 218 } catch(Throwable t) { 219 warning(t.msg); 220 version(HUNT_DEBUG) warning(t); 221 } 222 } 223 } 224 225 protected void onStop() { 226 version (HUNT_IO_DEBUG) 227 tracef("stopping."); 228 } 229 230 /** 231 timeout: in millisecond 232 */ 233 protected void onLoop(long timeout = -1) { 234 _isReady = true; 235 idleTime = timeout; 236 237 version (HAVE_IOCP) { 238 doSelect(timeout); 239 } else { 240 do { 241 // version(HUNT_THREAD_DEBUG) warningf("Threads: %d", Thread.getAll().length); 242 doSelect(timeout); 243 // infof("Selector rolled once. isRuning: %s", isRuning); 244 } while (!_isStopping); 245 } 246 247 _isReady = false; 248 _running = false; 249 version(HUNT_IO_DEBUG) infof("Selector %d exited.", _id); 250 dispose(); 251 } 252 253 /** 254 timeout: in millisecond 255 */ 256 int select(long timeout) { 257 if (timeout < 0) 258 throw new IllegalArgumentException("Negative timeout"); 259 return doSelect((timeout == 0) ? -1 : timeout); 260 } 261 262 int select() { 263 return doSelect(0); 264 } 265 266 int selectNow() { 267 return doSelect(0); 268 } 269 270 void dispose() { 271 _thread = null; 272 _startedHandler = null; 273 _stoppeddHandler = null; 274 } 275 276 bool isSelfThread() { 277 return _thread is Thread.getThis(); 278 } 279 }