1 module hunt.event.selector.Selector;
2 
3 import hunt.Exceptions;
4 import hunt.Functions;
5 import hunt.io.channel.AbstractChannel;
6 import hunt.io.channel.Common;
7 import hunt.logging.ConsoleLogger;
8 
9 import core.atomic;
10 import core.memory;
11 import core.thread;
12 
13 
14 /**
15 http://tutorials.jenkov.com/java-nio/selectors.html
16 */
17 abstract class Selector {
18 
19     private shared bool _running = false;
20     private shared bool _isStopping = false;
21     private bool _isReady;
22     protected size_t _id;
23     protected size_t divider;
24     // protected AbstractChannel[] channels;
25     protected long idleTime = -1; // in millisecond
26     protected int fd;
27 
28     private long timeout = -1; // in millisecond
29     private Thread _thread;
30 
31     private SimpleEventHandler _startedHandler;
32     private SimpleEventHandler _stoppeddHandler;
33 
34     this(size_t id, size_t divider, size_t maxChannels = 1500) {
35         _id = id;
36         this.divider = divider;
37         // channels = new AbstractChannel[maxChannels];
38     }
39 
40     size_t getId() {
41         return _id;
42     }
43 
44     bool isReady() {
45         return _isReady;
46     }
47 
48 
49     /**
50      * Tells whether or not this selector is running.
51      *
52      * @return <tt>true</tt> if, and only if, this selector is running
53      */
54     bool isRuning() {
55         return _running;
56     }
57 
58     alias isOpen = isRuning;
59 
60     bool isStopping() {
61         return _isStopping;
62     }
63 
64     bool register(AbstractChannel channel) {
65         assert(channel !is null);
66         void* context = cast(void*)channel;
67         GC.addRoot(context);
68         GC.setAttr(cast(void*)context, GC.BlkAttr.NO_MOVE);
69         version (HUNT_IO_DEBUG) {
70             int infd = cast(int) channel.handle;
71             tracef("Register channel@%s: fd=%d, selector: %d", context, infd, getId());
72         }        
73         return true;
74         // int infd = cast(int) channel.handle;
75         // size_t index = cast(size_t)(infd / divider);
76 
77         // if (index >= channels.length) {
78         //     debug warningf("expanding channels uplimit to %d", index);
79         //     import std.algorithm : max;
80 
81         //     size_t length = max(cast(size_t)(index * 3 / 2), 16);
82         //     AbstractChannel[] arr = new AbstractChannel[length];
83         //     arr[0 .. channels.length] = channels[0 .. $];
84         //     channels = arr;
85         // }
86 
87         // bool result = true;
88 
89         // debug {
90         //     AbstractChannel oldChannel = channels[index];
91         //     if(oldChannel !is null) {
92         //         result = false;
93         //         version(HUNT_DEBUG) {
94         //             warningf("Register collision, {old channel: %s, fd=%d};  " ~ 
95         //                         "{new channel: %s, fd=%d}; {slot=%d, selector: %d}", 
96         //                 cast(void*)oldChannel, oldChannel.handle,
97         //                 cast(void*)channel, infd,
98         //                 index, getId());
99         //         }
100 
101         //         if(oldChannel.handle != channel.handle) {
102         //             // Try to find a empty slot
103         //             size_t lastIndex = index;
104         //             while(channels[index] !is null) {
105         //                 index = (index + 1) % channels.length;
106         //                 if(index == lastIndex) {
107         //                     warningf("All the slots are full on selector: %d", getId());
108         //                 }
109         //             }
110         //         }
111         //     }
112         // } 
113         
114         // version (HUNT_IO_DEBUG) {
115         //     tracef("register channel: fd=%d, slot=%d, selector: %d", infd, index, getId());
116         // }
117         // channels[index] = channel;
118 
119         // return result;
120     }
121 
122     bool deregister(AbstractChannel channel) {
123         void* context = cast(void*)channel;
124         GC.removeRoot(context);
125         GC.clrAttr(context, GC.BlkAttr.NO_MOVE);
126         version(HUNT_IO_DEBUG) {
127             size_t fd = cast(size_t) channel.handle;
128             infof("The channel@%s has been deregistered: fd=%d, selector: %d", context, fd, getId());
129         }        
130         return true;
131         // size_t fd = cast(size_t) channel.handle;
132         // size_t index = cast(size_t)(fd / divider);
133         // bool result = true;
134         // debug {
135         //     auto oldChannel = channels[index];
136             
137         //     if(oldChannel is null) {
138         //         result = false;
139         //         version(HUNT_IO_DEBUG) {
140         //             infof("The channel has been deregistered: fd=%d, slot=%d, selector: %d", fd, index, getId());
141         //         }
142         //     } else {
143         //         if(oldChannel !is channel) {
144         //             result = false;
145         //             version(HUNT_DEBUG) {
146         //                 warningf("deregistering a mismatched channel, " ~ 
147         //                     "{old: %s, fd=%d}; {new: %s, fd=%d}, {slot=%d, selector: %d}", 
148         //                     cast(void*)oldChannel, oldChannel.handle, 
149         //                     cast(void*)channel, fd, index, getId());
150         //             }
151         //         } else {
152         //             version (HUNT_IO_DEBUG) {
153         //                 tracef("deregister channel: fd=%d, slot=%d, selector: %d", fd, index, getId());
154         //             }
155         //             channels[index] = null;
156         //         }
157         //     }
158         // } else {
159         //     channels[index] = null;
160         // }
161         
162         // return result;
163     }
164 
165     // bool update(AbstractChannel channel) { return true; }
166 
167     protected abstract int doSelect(long timeout);
168 
169     /**
170         timeout: in millisecond
171     */
172     void run(long timeout = -1) {
173         this.timeout = timeout;
174         doRun();
175     }
176 
177     /**
178         timeout: in millisecond
179     */
180     void runAsync(long timeout = -1, SimpleEventHandler handler = null) {
181         if(_running) {
182             version (HUNT_IO_DEBUG) warningf("The current selector %d has being running already!", _id);
183             return;
184         }
185         this.timeout = timeout;
186         version (HUNT_IO_DEBUG) trace("runAsync ...");
187         Thread th = new Thread(() { 
188             try {
189                 doRun(handler); 
190             } catch (Throwable t) {
191                 warning(t.msg);
192                 version(HUNT_DEBUG) warning(t.toString());
193             }
194         });
195         // th.isDaemon = true; // unstable
196         th.start();
197     }
198     
199     private void doRun(SimpleEventHandler handler=null) {
200         if(cas(&_running, false, true)) {
201             version (HUNT_IO_DEBUG) trace("running selector...");
202             _thread = Thread.getThis();
203             if(handler !is null) {
204                 handler();
205             }
206             onLoop(timeout);
207         } else {
208             version (HUNT_DEBUG) warningf("The current selector %d has being running already!", _id);
209         }  
210     }
211 
212     void stop() {
213         version (HUNT_IO_DEBUG)
214             tracef("Stopping selector %d. _running=%s, _isStopping=%s", _id, _running, _isStopping); 
215         if(cas(&_isStopping, false, true)) {
216             try {
217                 onStop();
218             } catch(Throwable t) {
219                 warning(t.msg);
220                 version(HUNT_DEBUG) warning(t);
221             }
222         }
223     }
224 
225     protected void onStop() {
226         version (HUNT_IO_DEBUG) 
227             tracef("stopping.");
228     }
229 
230     /**
231         timeout: in millisecond
232     */
233     protected void onLoop(long timeout = -1) {
234         _isReady = true;
235         idleTime = timeout;
236 
237         version (HAVE_IOCP) {
238             doSelect(timeout);
239         } else {
240             do {
241                 // version(HUNT_THREAD_DEBUG) warningf("Threads: %d", Thread.getAll().length);
242                 doSelect(timeout);
243                 // infof("Selector rolled once. isRuning: %s", isRuning);
244             } while (!_isStopping);
245         }
246 
247         _isReady = false;
248         _running = false;
249         version(HUNT_IO_DEBUG) infof("Selector %d exited.", _id);
250         dispose();
251     }
252 
253     /**
254         timeout: in millisecond
255     */
256     int select(long timeout) {
257         if (timeout < 0)
258             throw new IllegalArgumentException("Negative timeout");
259         return doSelect((timeout == 0) ? -1 : timeout);
260     }
261 
262     int select() {
263         return doSelect(0);
264     }
265 
266     int selectNow() {
267         return doSelect(0);
268     }
269 
270     void dispose() {
271         _thread = null;
272         _startedHandler = null;
273         _stoppeddHandler = null;
274     }
275     
276     bool isSelfThread() {
277         return _thread is Thread.getThis();
278     }
279 }