1 module hunt.event.selector.Selector; 2 3 import hunt.Exceptions; 4 import hunt.Functions; 5 import hunt.io.channel.AbstractChannel; 6 import hunt.io.channel.Common; 7 import hunt.logging.ConsoleLogger; 8 9 import core.atomic; 10 import core.memory; 11 import core.thread; 12 13 14 /** 15 http://tutorials.jenkov.com/java-nio/selectors.html 16 */ 17 abstract class Selector { 18 19 private shared bool _running = false; 20 private shared bool _isStopping = false; 21 private bool _isReady; 22 protected size_t _id; 23 protected size_t divider; 24 protected long idleTime = -1; // in millisecond 25 protected int fd; 26 27 private long timeout = -1; // in millisecond 28 private Thread _thread; 29 30 private SimpleEventHandler _startedHandler; 31 private SimpleEventHandler _stoppeddHandler; 32 33 this(size_t id, size_t divider, size_t maxChannels = 1500) { 34 _id = id; 35 this.divider = divider; 36 } 37 38 size_t getId() { 39 return _id; 40 } 41 42 bool isReady() { 43 return _isReady; 44 } 45 46 47 /** 48 * Tells whether or not this selector is running. 49 * 50 * @return <tt>true</tt> if, and only if, this selector is running 51 */ 52 bool isRuning() { 53 return _running; 54 } 55 56 alias isOpen = isRuning; 57 58 bool isStopping() { 59 return _isStopping; 60 } 61 62 bool register(AbstractChannel channel) { 63 assert(channel !is null); 64 void* context = cast(void*)channel; 65 GC.addRoot(context); 66 GC.setAttr(cast(void*)context, GC.BlkAttr.NO_MOVE); 67 version (HUNT_IO_DEBUG) { 68 int infd = cast(int) channel.handle; 69 tracef("Register channel@%s: fd=%d, selector: %d", context, infd, getId()); 70 } 71 return true; 72 } 73 74 bool deregister(AbstractChannel channel) { 75 void* context = cast(void*)channel; 76 GC.removeRoot(context); 77 GC.clrAttr(context, GC.BlkAttr.NO_MOVE); 78 version(HUNT_IO_DEBUG) { 79 size_t fd = cast(size_t) channel.handle; 80 infof("The channel@%s has been deregistered: fd=%d, selector: %d", context, fd, getId()); 81 } 82 return true; 83 } 84 85 bool update(AbstractChannel channel) { return true; } 86 87 protected abstract int doSelect(long timeout); 88 89 /** 90 timeout: in millisecond 91 */ 92 void run(long timeout = -1) { 93 this.timeout = timeout; 94 doRun(); 95 } 96 97 /** 98 timeout: in millisecond 99 */ 100 void runAsync(long timeout = -1, SimpleEventHandler handler = null) { 101 if(_running) { 102 version (HUNT_IO_DEBUG) warningf("The current selector %d has being running already!", _id); 103 return; 104 } 105 this.timeout = timeout; 106 version (HUNT_IO_DEBUG) trace("runAsync ..."); 107 Thread th = new Thread(() { 108 try { 109 doRun(handler); 110 } catch (Throwable t) { 111 warning(t.msg); 112 version(HUNT_DEBUG) warning(t.toString()); 113 } 114 }); 115 // th.isDaemon = true; // unstable 116 th.start(); 117 } 118 119 private void doRun(SimpleEventHandler handler=null) { 120 if(cas(&_running, false, true)) { 121 version (HUNT_IO_DEBUG) trace("running selector..."); 122 _thread = Thread.getThis(); 123 if(handler !is null) { 124 handler(); 125 } 126 onLoop(timeout); 127 } else { 128 version (HUNT_DEBUG) warningf("The current selector %d has being running already!", _id); 129 } 130 } 131 132 void stop() { 133 version (HUNT_IO_DEBUG) 134 tracef("Stopping selector %d. _running=%s, _isStopping=%s", _id, _running, _isStopping); 135 if(cas(&_isStopping, false, true)) { 136 try { 137 onStop(); 138 } catch(Throwable t) { 139 warning(t.msg); 140 version(HUNT_DEBUG) warning(t); 141 } 142 } 143 } 144 145 protected void onStop() { 146 version (HUNT_IO_DEBUG) 147 tracef("stopping."); 148 } 149 150 /** 151 timeout: in millisecond 152 */ 153 protected void onLoop(long timeout = -1) { 154 _isReady = true; 155 idleTime = timeout; 156 157 version (HAVE_IOCP) { 158 doSelect(timeout); 159 } else { 160 do { 161 // version(HUNT_THREAD_DEBUG) warningf("Threads: %d", Thread.getAll().length); 162 doSelect(timeout); 163 // infof("Selector rolled once. isRuning: %s", isRuning); 164 } while (!_isStopping); 165 } 166 167 _isReady = false; 168 _running = false; 169 version(HUNT_IO_DEBUG) infof("Selector %d exited.", _id); 170 dispose(); 171 } 172 173 /** 174 timeout: in millisecond 175 */ 176 int select(long timeout) { 177 if (timeout < 0) 178 throw new IllegalArgumentException("Negative timeout"); 179 return doSelect((timeout == 0) ? -1 : timeout); 180 } 181 182 int select() { 183 return doSelect(0); 184 } 185 186 int selectNow() { 187 return doSelect(0); 188 } 189 190 void dispose() { 191 _thread = null; 192 _startedHandler = null; 193 _stoppeddHandler = null; 194 } 195 196 bool isSelfThread() { 197 return _thread is Thread.getThis(); 198 } 199 }