1 module hunt.event.selector.Selector; 2 3 import hunt.Exceptions; 4 import hunt.Functions; 5 import hunt.io.channel.AbstractChannel; 6 import hunt.io.channel.Common; 7 import hunt.logging; 8 import hunt.util.worker; 9 10 import core.atomic; 11 import core.memory; 12 import core.thread; 13 14 15 /** 16 http://tutorials.jenkov.com/java-nio/selectors.html 17 */ 18 abstract class Selector { 19 20 private shared bool _running = false; 21 private shared bool _isStopping = false; 22 private bool _isReady; 23 protected size_t _id; 24 protected size_t divider; 25 private Worker _taskWorker; 26 // protected AbstractChannel[] channels; 27 protected long idleTime = -1; // in millisecond 28 protected int fd; 29 30 private long timeout = -1; // in millisecond 31 private Thread _thread; 32 33 private SimpleEventHandler _startedHandler; 34 private SimpleEventHandler _stoppeddHandler; 35 36 this(size_t id, size_t divider, Worker worker = null, size_t maxChannels = 1500) { 37 _id = id; 38 _taskWorker = worker; 39 this.divider = divider; 40 // channels = new AbstractChannel[maxChannels]; 41 } 42 43 size_t getId() { 44 return _id; 45 } 46 47 Worker worker() { 48 return _taskWorker; 49 } 50 51 bool isReady() { 52 return _isReady; 53 } 54 55 56 /** 57 * Tells whether or not this selector is running. 58 * 59 * @return <tt>true</tt> if, and only if, this selector is running 60 */ 61 bool isRuning() { 62 return _running; 63 } 64 65 alias isOpen = isRuning; 66 67 bool isStopping() { 68 return _isStopping; 69 } 70 71 bool register(AbstractChannel channel) { 72 assert(channel !is null); 73 channel.taskWorker = _taskWorker; 74 void* context = cast(void*)channel; 75 GC.addRoot(context); 76 GC.setAttr(cast(void*)context, GC.BlkAttr.NO_MOVE); 77 version (HUNT_IO_DEBUG) { 78 int infd = cast(int) channel.handle; 79 infof("Register channel@%s: fd=%d, selector: %d", context, infd, getId()); 80 } 81 return true; 82 } 83 84 bool deregister(AbstractChannel channel) { 85 channel.taskWorker = null; 86 void* context = cast(void*)channel; 87 GC.removeRoot(context); 88 GC.clrAttr(context, GC.BlkAttr.NO_MOVE); 89 version(HUNT_IO_DEBUG) { 90 size_t fd = cast(size_t) channel.handle; 91 infof("The channel@%s has been deregistered: fd=%d, selector: %d", context, fd, getId()); 92 } 93 return true; 94 } 95 96 protected abstract int doSelect(long timeout); 97 98 /** 99 timeout: in millisecond 100 */ 101 void run(long timeout = -1) { 102 this.timeout = timeout; 103 doRun(); 104 } 105 106 /** 107 timeout: in millisecond 108 */ 109 void runAsync(long timeout = -1, SimpleEventHandler handler = null) { 110 if(_running) { 111 version (HUNT_IO_DEBUG) warningf("The current selector %d has being running already!", _id); 112 return; 113 } 114 115 this.timeout = timeout; 116 version (HUNT_IO_DEBUG) tracef("runAsync ... Thread: %d", Thread.getAll().length); 117 118 _workThread = new Thread(() { 119 try { 120 doRun(handler); 121 } catch (Throwable t) { 122 warning(t.msg); 123 version(HUNT_DEBUG) warning(t.toString()); 124 } 125 }); 126 // th.isDaemon = true; // unstable 127 _workThread.start(); 128 129 // BUG: Reported defects -@zhangxueping at 2021-10-12T18:25:30+08:00 130 // https://issues.dlang.org/show_bug.cgi?id=22346 131 // import std.parallelism; 132 133 // auto workerTask = task(() { 134 // try { 135 // doRun(handler); 136 // } catch (Throwable t) { 137 // warning(t.msg); 138 // version(HUNT_DEBUG) warning(t.toString()); 139 // } 140 // }); 141 142 // taskPool.put(workerTask); 143 } 144 145 private Thread _workThread; 146 147 148 private void doRun(SimpleEventHandler handler=null) { 149 if(cas(&_running, false, true)) { 150 version (HUNT_IO_DEBUG) trace("running selector..."); 151 _thread = Thread.getThis(); 152 if(handler !is null) { 153 handler(); 154 } 155 onLoop(timeout); 156 } else { 157 version (HUNT_DEBUG) warningf("The current selector %d has being running already!", _id); 158 } 159 } 160 161 void stop() { 162 version (HUNT_IO_DEBUG) 163 tracef("Stopping selector %d. _running=%s, _isStopping=%s", _id, _running, _isStopping); 164 165 if(cas(&_isStopping, false, true)) { 166 try { 167 onStop(); 168 } catch(Throwable t) { 169 warning(t.msg); 170 version(HUNT_DEBUG) warning(t); 171 } 172 173 if(_workThread !is null) 174 _workThread.join; 175 } 176 } 177 178 protected void onStop() { 179 version (HUNT_IO_DEBUG) 180 tracef("stopping."); 181 } 182 183 /** 184 timeout: in millisecond 185 */ 186 protected void onLoop(long timeout = -1) { 187 _isReady = true; 188 idleTime = timeout; 189 190 version (HAVE_IOCP) { 191 doSelect(timeout); 192 } else { 193 do { 194 // version(HUNT_THREAD_DEBUG) warningf("Threads: %d", Thread.getAll().length); 195 doSelect(timeout); 196 // infof("Selector rolled once. isRuning: %s", isRuning); 197 } while (!_isStopping); 198 } 199 200 _isReady = false; 201 _running = false; 202 version(HUNT_IO_DEBUG) infof("Selector %d exited.", _id); 203 dispose(); 204 } 205 206 /** 207 timeout: in millisecond 208 */ 209 int select(long timeout) { 210 if (timeout < 0) 211 throw new IllegalArgumentException("Negative timeout"); 212 return doSelect((timeout == 0) ? -1 : timeout); 213 } 214 215 int select() { 216 return doSelect(0); 217 } 218 219 int selectNow() { 220 return doSelect(0); 221 } 222 223 void dispose() { 224 _thread = null; 225 _startedHandler = null; 226 _stoppeddHandler = null; 227 } 228 229 bool isSelfThread() { 230 return _thread is Thread.getThis(); 231 } 232 233 override string toString() { 234 import std.format; 235 string str = format("Selector%d", this.getId()); 236 237 return str; 238 } 239 }