1 module hunt.event.selector.Selector; 2 3 import hunt.Exceptions; 4 import hunt.Functions; 5 import hunt.io.channel.AbstractChannel; 6 import hunt.io.channel.Common; 7 import hunt.logging.ConsoleLogger; 8 import hunt.util.worker; 9 10 import core.atomic; 11 import core.memory; 12 import core.thread; 13 14 15 /** 16 http://tutorials.jenkov.com/java-nio/selectors.html 17 */ 18 abstract class Selector { 19 20 private shared bool _running = false; 21 private shared bool _isStopping = false; 22 private bool _isReady; 23 protected size_t _id; 24 protected size_t divider; 25 private Worker _taskWorker; 26 // protected AbstractChannel[] channels; 27 protected long idleTime = -1; // in millisecond 28 protected int fd; 29 30 private long timeout = -1; // in millisecond 31 private Thread _thread; 32 33 private SimpleEventHandler _startedHandler; 34 private SimpleEventHandler _stoppeddHandler; 35 36 this(size_t id, size_t divider, Worker worker = null, size_t maxChannels = 1500) { 37 _id = id; 38 _taskWorker = worker; 39 this.divider = divider; 40 // channels = new AbstractChannel[maxChannels]; 41 } 42 43 size_t getId() { 44 return _id; 45 } 46 47 Worker worker() { 48 return _taskWorker; 49 } 50 51 bool isReady() { 52 return _isReady; 53 } 54 55 56 /** 57 * Tells whether or not this selector is running. 58 * 59 * @return <tt>true</tt> if, and only if, this selector is running 60 */ 61 bool isRuning() { 62 return _running; 63 } 64 65 alias isOpen = isRuning; 66 67 bool isStopping() { 68 return _isStopping; 69 } 70 71 bool register(AbstractChannel channel) { 72 assert(channel !is null); 73 channel.taskWorker = _taskWorker; 74 void* context = cast(void*)channel; 75 GC.addRoot(context); 76 GC.setAttr(cast(void*)context, GC.BlkAttr.NO_MOVE); 77 version (HUNT_IO_DEBUG) { 78 int infd = cast(int) channel.handle; 79 tracef("Register channel@%s: fd=%d, selector: %d", context, infd, getId()); 80 } 81 return true; 82 } 83 84 bool deregister(AbstractChannel channel) { 85 channel.taskWorker = null; 86 void* context = cast(void*)channel; 87 GC.removeRoot(context); 88 GC.clrAttr(context, GC.BlkAttr.NO_MOVE); 89 version(HUNT_IO_DEBUG) { 90 size_t fd = cast(size_t) channel.handle; 91 infof("The channel@%s has been deregistered: fd=%d, selector: %d", context, fd, getId()); 92 } 93 return true; 94 } 95 96 protected abstract int doSelect(long timeout); 97 98 /** 99 timeout: in millisecond 100 */ 101 void run(long timeout = -1) { 102 this.timeout = timeout; 103 doRun(); 104 } 105 106 /** 107 timeout: in millisecond 108 */ 109 void runAsync(long timeout = -1, SimpleEventHandler handler = null) { 110 if(_running) { 111 version (HUNT_IO_DEBUG) warningf("The current selector %d has being running already!", _id); 112 return; 113 } 114 this.timeout = timeout; 115 version (HUNT_IO_DEBUG) trace("runAsync ..."); 116 Thread th = new Thread(() { 117 try { 118 doRun(handler); 119 } catch (Throwable t) { 120 warning(t.msg); 121 version(HUNT_DEBUG) warning(t.toString()); 122 } 123 }); 124 // th.isDaemon = true; // unstable 125 th.start(); 126 } 127 128 private void doRun(SimpleEventHandler handler=null) { 129 if(cas(&_running, false, true)) { 130 version (HUNT_IO_DEBUG) trace("running selector..."); 131 _thread = Thread.getThis(); 132 if(handler !is null) { 133 handler(); 134 } 135 onLoop(timeout); 136 } else { 137 version (HUNT_DEBUG) warningf("The current selector %d has being running already!", _id); 138 } 139 } 140 141 void stop() { 142 version (HUNT_IO_DEBUG) 143 tracef("Stopping selector %d. _running=%s, _isStopping=%s", _id, _running, _isStopping); 144 if(cas(&_isStopping, false, true)) { 145 try { 146 onStop(); 147 } catch(Throwable t) { 148 warning(t.msg); 149 version(HUNT_DEBUG) warning(t); 150 } 151 } 152 } 153 154 protected void onStop() { 155 version (HUNT_IO_DEBUG) 156 tracef("stopping."); 157 } 158 159 /** 160 timeout: in millisecond 161 */ 162 protected void onLoop(long timeout = -1) { 163 _isReady = true; 164 idleTime = timeout; 165 166 version (HAVE_IOCP) { 167 doSelect(timeout); 168 } else { 169 do { 170 // version(HUNT_THREAD_DEBUG) warningf("Threads: %d", Thread.getAll().length); 171 doSelect(timeout); 172 // infof("Selector rolled once. isRuning: %s", isRuning); 173 } while (!_isStopping); 174 } 175 176 _isReady = false; 177 _running = false; 178 version(HUNT_IO_DEBUG) infof("Selector %d exited.", _id); 179 dispose(); 180 } 181 182 /** 183 timeout: in millisecond 184 */ 185 int select(long timeout) { 186 if (timeout < 0) 187 throw new IllegalArgumentException("Negative timeout"); 188 return doSelect((timeout == 0) ? -1 : timeout); 189 } 190 191 int select() { 192 return doSelect(0); 193 } 194 195 int selectNow() { 196 return doSelect(0); 197 } 198 199 void dispose() { 200 _thread = null; 201 _startedHandler = null; 202 _stoppeddHandler = null; 203 } 204 205 bool isSelfThread() { 206 return _thread is Thread.getThis(); 207 } 208 }