1 module hunt.event.selector.Selector;
2 
3 import hunt.Exceptions;
4 import hunt.Functions;
5 import hunt.io.channel.AbstractChannel;
6 import hunt.io.channel.Common;
7 import hunt.logging;
8 import hunt.util.worker;
9 
10 import core.atomic;
11 import core.memory;
12 import core.thread;
13 
14 
15 /**
16 http://tutorials.jenkov.com/java-nio/selectors.html
17 */
18 abstract class Selector {
19 
20     private shared bool _running = false;
21     private shared bool _isStopping = false;
22     private bool _isReady;
23     protected size_t _id;
24     protected size_t divider;
25     private Worker _taskWorker;
26     // protected AbstractChannel[] channels;
27     protected long idleTime = -1; // in millisecond
28     protected int fd;
29 
30     private long timeout = -1; // in millisecond
31     private Thread _thread;
32 
33     private SimpleEventHandler _startedHandler;
34     private SimpleEventHandler _stoppeddHandler;
35 
36     this(size_t id, size_t divider, Worker worker = null, size_t maxChannels = 1500) {
37         _id = id;
38         _taskWorker = worker;
39         this.divider = divider;
40         // channels = new AbstractChannel[maxChannels];
41     }
42 
43     size_t getId() {
44         return _id;
45     }
46 
47     Worker worker() {
48         return _taskWorker;
49     }
50 
51     bool isReady() {
52         return _isReady;
53     }
54 
55 
56     /**
57      * Tells whether or not this selector is running.
58      *
59      * @return <tt>true</tt> if, and only if, this selector is running
60      */
61     bool isRuning() {
62         return _running;
63     }
64 
65     alias isOpen = isRuning;
66 
67     bool isStopping() {
68         return _isStopping;
69     }
70 
71     bool register(AbstractChannel channel) {
72         assert(channel !is null);
73         channel.taskWorker = _taskWorker;
74         void* context = cast(void*)channel;
75         GC.addRoot(context);
76         GC.setAttr(cast(void*)context, GC.BlkAttr.NO_MOVE);
77         version (HUNT_IO_DEBUG) {
78             int infd = cast(int) channel.handle;
79             infof("Register channel@%s: fd=%d, selector: %d", context, infd, getId());
80         }        
81         return true;
82     }
83 
84     bool deregister(AbstractChannel channel) {
85         channel.taskWorker = null;
86         void* context = cast(void*)channel;
87         GC.removeRoot(context);
88         GC.clrAttr(context, GC.BlkAttr.NO_MOVE);
89         version(HUNT_IO_DEBUG) {
90             size_t fd = cast(size_t) channel.handle;
91             infof("The channel@%s has been deregistered: fd=%d, selector: %d", context, fd, getId());
92         }        
93         return true;
94     }
95 
96     protected abstract int doSelect(long timeout);
97 
98     /**
99         timeout: in millisecond
100     */
101     void run(long timeout = -1) {
102         this.timeout = timeout;
103         doRun();
104     }
105 
106     /**
107         timeout: in millisecond
108     */
109     void runAsync(long timeout = -1, SimpleEventHandler handler = null) {
110         if(_running) {
111             version (HUNT_IO_DEBUG) warningf("The current selector %d has being running already!", _id);
112             return;
113         }
114 
115         this.timeout = timeout;
116         version (HUNT_IO_DEBUG) tracef("runAsync ... Thread: %d", Thread.getAll().length);
117         
118         _workThread = new Thread(() { 
119             try {
120                 doRun(handler); 
121             } catch (Throwable t) {
122                 warning(t.msg);
123                 version(HUNT_DEBUG) warning(t.toString());
124             }
125         });
126         // th.isDaemon = true; // unstable
127         _workThread.start();
128 
129         // BUG: Reported defects -@zhangxueping at 2021-10-12T18:25:30+08:00
130         // https://issues.dlang.org/show_bug.cgi?id=22346
131         // import std.parallelism;
132 
133         // auto workerTask = task(() { 
134         //     try {
135         //         doRun(handler); 
136         //     } catch (Throwable t) {
137         //         warning(t.msg);
138         //         version(HUNT_DEBUG) warning(t.toString());
139         //     }
140         // });
141         
142         // taskPool.put(workerTask);
143     }
144 
145     private Thread _workThread;
146 
147     
148     private void doRun(SimpleEventHandler handler=null) {
149         if(cas(&_running, false, true)) {
150             version (HUNT_IO_DEBUG) trace("running selector...");
151             _thread = Thread.getThis();
152             if(handler !is null) {
153                 handler();
154             }
155             onLoop(timeout);
156         } else {
157             version (HUNT_DEBUG) warningf("The current selector %d has being running already!", _id);
158         }  
159     }
160 
161     void stop() {
162         version (HUNT_IO_DEBUG)
163             tracef("Stopping selector %d. _running=%s, _isStopping=%s", _id, _running, _isStopping); 
164         
165         if(cas(&_isStopping, false, true)) {
166             try {
167                 onStop();
168             } catch(Throwable t) {
169                 warning(t.msg);
170                 version(HUNT_DEBUG) warning(t);
171             }
172 
173             if(_workThread !is null)
174                 _workThread.join;
175         }
176     }
177 
178     protected void onStop() {
179         version (HUNT_IO_DEBUG) 
180             tracef("stopping.");
181     }
182 
183     /**
184         timeout: in millisecond
185     */
186     protected void onLoop(long timeout = -1) {
187         _isReady = true;
188         idleTime = timeout;
189 
190         version (HAVE_IOCP) {
191             doSelect(timeout);
192         } else {
193             do {
194                 // version(HUNT_THREAD_DEBUG) warningf("Threads: %d", Thread.getAll().length);
195                 doSelect(timeout);
196                 // infof("Selector rolled once. isRuning: %s", isRuning);
197             } while (!_isStopping);
198         }
199 
200         _isReady = false;
201         _running = false;
202         version(HUNT_IO_DEBUG) infof("Selector %d exited.", _id);
203         dispose();
204     }
205 
206     /**
207         timeout: in millisecond
208     */
209     int select(long timeout) {
210         if (timeout < 0)
211             throw new IllegalArgumentException("Negative timeout");
212         return doSelect((timeout == 0) ? -1 : timeout);
213     }
214 
215     int select() {
216         return doSelect(0);
217     }
218 
219     int selectNow() {
220         return doSelect(0);
221     }
222 
223     void dispose() {
224         _thread = null;
225         _startedHandler = null;
226         _stoppeddHandler = null;
227     }
228     
229     bool isSelfThread() {
230         return _thread is Thread.getThis();
231     }
232 
233     override string toString() {
234         import std.format;
235         string str = format("Selector%d", this.getId());
236 
237         return str;
238     }    
239 }