1 module hunt.event.selector.Selector;
2 
3 import hunt.Exceptions;
4 import hunt.Functions;
5 import hunt.io.channel.AbstractChannel;
6 import hunt.io.channel.Common;
7 import hunt.logging.ConsoleLogger;
8 import hunt.util.worker;
9 
10 import core.atomic;
11 import core.memory;
12 import core.thread;
13 
14 
15 /**
16 http://tutorials.jenkov.com/java-nio/selectors.html
17 */
18 abstract class Selector {
19 
20     private shared bool _running = false;
21     private shared bool _isStopping = false;
22     private bool _isReady;
23     protected size_t _id;
24     protected size_t divider;
25     private Worker _taskWorker;
26     // protected AbstractChannel[] channels;
27     protected long idleTime = -1; // in millisecond
28     protected int fd;
29 
30     private long timeout = -1; // in millisecond
31     private Thread _thread;
32 
33     private SimpleEventHandler _startedHandler;
34     private SimpleEventHandler _stoppeddHandler;
35 
36     this(size_t id, size_t divider, Worker worker = null, size_t maxChannels = 1500) {
37         _id = id;
38         _taskWorker = worker;
39         this.divider = divider;
40         // channels = new AbstractChannel[maxChannels];
41     }
42 
43     size_t getId() {
44         return _id;
45     }
46 
47     Worker worker() {
48         return _taskWorker;
49     }
50 
51     bool isReady() {
52         return _isReady;
53     }
54 
55 
56     /**
57      * Tells whether or not this selector is running.
58      *
59      * @return <tt>true</tt> if, and only if, this selector is running
60      */
61     bool isRuning() {
62         return _running;
63     }
64 
65     alias isOpen = isRuning;
66 
67     bool isStopping() {
68         return _isStopping;
69     }
70 
71     bool register(AbstractChannel channel) {
72         assert(channel !is null);
73         channel.taskWorker = _taskWorker;
74         void* context = cast(void*)channel;
75         GC.addRoot(context);
76         GC.setAttr(cast(void*)context, GC.BlkAttr.NO_MOVE);
77         version (HUNT_IO_DEBUG) {
78             int infd = cast(int) channel.handle;
79             tracef("Register channel@%s: fd=%d, selector: %d", context, infd, getId());
80         }        
81         return true;
82     }
83 
84     bool deregister(AbstractChannel channel) {
85         channel.taskWorker = null;
86         void* context = cast(void*)channel;
87         GC.removeRoot(context);
88         GC.clrAttr(context, GC.BlkAttr.NO_MOVE);
89         version(HUNT_IO_DEBUG) {
90             size_t fd = cast(size_t) channel.handle;
91             infof("The channel@%s has been deregistered: fd=%d, selector: %d", context, fd, getId());
92         }        
93         return true;
94     }
95 
96     protected abstract int doSelect(long timeout);
97 
98     /**
99         timeout: in millisecond
100     */
101     void run(long timeout = -1) {
102         this.timeout = timeout;
103         doRun();
104     }
105 
106     /**
107         timeout: in millisecond
108     */
109     void runAsync(long timeout = -1, SimpleEventHandler handler = null) {
110         if(_running) {
111             version (HUNT_IO_DEBUG) warningf("The current selector %d has being running already!", _id);
112             return;
113         }
114         this.timeout = timeout;
115         version (HUNT_IO_DEBUG) trace("runAsync ...");
116         Thread th = new Thread(() { 
117             try {
118                 doRun(handler); 
119             } catch (Throwable t) {
120                 warning(t.msg);
121                 version(HUNT_DEBUG) warning(t.toString());
122             }
123         });
124         // th.isDaemon = true; // unstable
125         th.start();
126     }
127     
128     private void doRun(SimpleEventHandler handler=null) {
129         if(cas(&_running, false, true)) {
130             version (HUNT_IO_DEBUG) trace("running selector...");
131             _thread = Thread.getThis();
132             if(handler !is null) {
133                 handler();
134             }
135             onLoop(timeout);
136         } else {
137             version (HUNT_DEBUG) warningf("The current selector %d has being running already!", _id);
138         }  
139     }
140 
141     void stop() {
142         version (HUNT_IO_DEBUG)
143             tracef("Stopping selector %d. _running=%s, _isStopping=%s", _id, _running, _isStopping); 
144         if(cas(&_isStopping, false, true)) {
145             try {
146                 onStop();
147             } catch(Throwable t) {
148                 warning(t.msg);
149                 version(HUNT_DEBUG) warning(t);
150             }
151         }
152     }
153 
154     protected void onStop() {
155         version (HUNT_IO_DEBUG) 
156             tracef("stopping.");
157     }
158 
159     /**
160         timeout: in millisecond
161     */
162     protected void onLoop(long timeout = -1) {
163         _isReady = true;
164         idleTime = timeout;
165 
166         version (HAVE_IOCP) {
167             doSelect(timeout);
168         } else {
169             do {
170                 // version(HUNT_THREAD_DEBUG) warningf("Threads: %d", Thread.getAll().length);
171                 doSelect(timeout);
172                 // infof("Selector rolled once. isRuning: %s", isRuning);
173             } while (!_isStopping);
174         }
175 
176         _isReady = false;
177         _running = false;
178         version(HUNT_IO_DEBUG) infof("Selector %d exited.", _id);
179         dispose();
180     }
181 
182     /**
183         timeout: in millisecond
184     */
185     int select(long timeout) {
186         if (timeout < 0)
187             throw new IllegalArgumentException("Negative timeout");
188         return doSelect((timeout == 0) ? -1 : timeout);
189     }
190 
191     int select() {
192         return doSelect(0);
193     }
194 
195     int selectNow() {
196         return doSelect(0);
197     }
198 
199     void dispose() {
200         _thread = null;
201         _startedHandler = null;
202         _stoppeddHandler = null;
203     }
204     
205     bool isSelfThread() {
206         return _thread is Thread.getThis();
207     }
208 }