1 module hunt.event.selector.Selector;
2 
3 import hunt.Exceptions;
4 import hunt.Functions;
5 import hunt.io.channel.AbstractChannel;
6 import hunt.io.channel.Common;
7 import hunt.logging.ConsoleLogger;
8 
9 import core.atomic;
10 import core.memory;
11 import core.thread;
12 
13 
14 /**
15 http://tutorials.jenkov.com/java-nio/selectors.html
16 */
17 abstract class Selector {
18 
19     private shared bool _running = false;
20     private shared bool _isStopping = false;
21     private bool _isReady;
22     protected size_t _id;
23     protected size_t divider;
24     protected long idleTime = -1; // in millisecond
25     protected int fd;
26 
27     private long timeout = -1; // in millisecond
28     private Thread _thread;
29 
30     private SimpleEventHandler _startedHandler;
31     private SimpleEventHandler _stoppeddHandler;
32 
33     this(size_t id, size_t divider, size_t maxChannels = 1500) {
34         _id = id;
35         this.divider = divider;
36     }
37 
38     size_t getId() {
39         return _id;
40     }
41 
42     bool isReady() {
43         return _isReady;
44     }
45 
46 
47     /**
48      * Tells whether or not this selector is running.
49      *
50      * @return <tt>true</tt> if, and only if, this selector is running
51      */
52     bool isRuning() {
53         return _running;
54     }
55 
56     alias isOpen = isRuning;
57 
58     bool isStopping() {
59         return _isStopping;
60     }
61 
62     bool register(AbstractChannel channel) {
63         assert(channel !is null);
64         void* context = cast(void*)channel;
65         GC.addRoot(context);
66         GC.setAttr(cast(void*)context, GC.BlkAttr.NO_MOVE);
67         version (HUNT_IO_DEBUG) {
68             int infd = cast(int) channel.handle;
69             tracef("Register channel@%s: fd=%d, selector: %d", context, infd, getId());
70         }        
71         return true;
72     }
73 
74     bool deregister(AbstractChannel channel) {
75         void* context = cast(void*)channel;
76         GC.removeRoot(context);
77         GC.clrAttr(context, GC.BlkAttr.NO_MOVE);
78         version(HUNT_IO_DEBUG) {
79             size_t fd = cast(size_t) channel.handle;
80             infof("The channel@%s has been deregistered: fd=%d, selector: %d", context, fd, getId());
81         }        
82         return true;
83     }
84 
85     bool update(AbstractChannel channel) { return true; }
86 
87     protected abstract int doSelect(long timeout);
88 
89     /**
90         timeout: in millisecond
91     */
92     void run(long timeout = -1) {
93         this.timeout = timeout;
94         doRun();
95     }
96 
97     /**
98         timeout: in millisecond
99     */
100     void runAsync(long timeout = -1, SimpleEventHandler handler = null) {
101         if(_running) {
102             version (HUNT_IO_DEBUG) warningf("The current selector %d has being running already!", _id);
103             return;
104         }
105         this.timeout = timeout;
106         version (HUNT_IO_DEBUG) trace("runAsync ...");
107         Thread th = new Thread(() { 
108             try {
109                 doRun(handler); 
110             } catch (Throwable t) {
111                 warning(t.msg);
112                 version(HUNT_DEBUG) warning(t.toString());
113             }
114         });
115         // th.isDaemon = true; // unstable
116         th.start();
117     }
118     
119     private void doRun(SimpleEventHandler handler=null) {
120         if(cas(&_running, false, true)) {
121             version (HUNT_IO_DEBUG) trace("running selector...");
122             _thread = Thread.getThis();
123             if(handler !is null) {
124                 handler();
125             }
126             onLoop(timeout);
127         } else {
128             version (HUNT_DEBUG) warningf("The current selector %d has being running already!", _id);
129         }  
130     }
131 
132     void stop() {
133         version (HUNT_IO_DEBUG)
134             tracef("Stopping selector %d. _running=%s, _isStopping=%s", _id, _running, _isStopping); 
135         if(cas(&_isStopping, false, true)) {
136             try {
137                 onStop();
138             } catch(Throwable t) {
139                 warning(t.msg);
140                 version(HUNT_DEBUG) warning(t);
141             }
142         }
143     }
144 
145     protected void onStop() {
146         version (HUNT_IO_DEBUG) 
147             tracef("stopping.");
148     }
149 
150     /**
151         timeout: in millisecond
152     */
153     protected void onLoop(long timeout = -1) {
154         _isReady = true;
155         idleTime = timeout;
156 
157         version (HAVE_IOCP) {
158             doSelect(timeout);
159         } else {
160             do {
161                 // version(HUNT_THREAD_DEBUG) warningf("Threads: %d", Thread.getAll().length);
162                 doSelect(timeout);
163                 // infof("Selector rolled once. isRuning: %s", isRuning);
164             } while (!_isStopping);
165         }
166 
167         _isReady = false;
168         _running = false;
169         version(HUNT_IO_DEBUG) infof("Selector %d exited.", _id);
170         dispose();
171     }
172 
173     /**
174         timeout: in millisecond
175     */
176     int select(long timeout) {
177         if (timeout < 0)
178             throw new IllegalArgumentException("Negative timeout");
179         return doSelect((timeout == 0) ? -1 : timeout);
180     }
181 
182     int select() {
183         return doSelect(0);
184     }
185 
186     int selectNow() {
187         return doSelect(0);
188     }
189 
190     void dispose() {
191         _thread = null;
192         _startedHandler = null;
193         _stoppeddHandler = null;
194     }
195     
196     bool isSelfThread() {
197         return _thread is Thread.getThis();
198     }
199 }