1 /*
2  * Hunt - A refined core library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net/
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module hunt.event.EventLoop;
13 
14 import hunt.io.socket.Common;
15 import hunt.event.selector;
16 import hunt.event.Task;
17 import hunt.logging;
18 
19 import core.thread;
20 import std.parallelism;
21 
22 /**
23 
24 */
25 final class EventLoop : AbstractSelector {
26     private long timeout = -1; // in millisecond
27 
28     this() {
29         super();
30     }
31 
32     /**
33         timeout: in millisecond
34     */
35     void run(long timeout = -1) {
36         this.timeout = timeout;
37         doRun();
38     }
39 
40     /**
41         timeout: in millisecond
42     */
43     void runAsync(long timeout = -1) {
44         this.timeout = timeout;
45         version (HUNT_DEBUG) trace("runAsync ...");
46         // BUG: Reported defects -@zxp at 12/3/2018, 8:17:58 PM
47         // The task may not be executed.
48         // auto runTask = task(&run, timeout);
49         // taskPool.put(runTask); // 
50         Thread th = new Thread(&doRun);
51         th.start();
52     }
53 
54     private void doRun() {
55         if (_running) {
56             version (HUNT_DEBUG) warning("The current eventloop is running!");
57         } else {
58             version (HUNT_DEBUG) trace("running eventloop...");
59             _thread = Thread.getThis();
60             onLoop(&onWeakUp, timeout);
61         }
62     }
63 
64     override void stop() {
65         if(!_running) {
66             version (HUNT_DEBUG) trace("event loop has been stopped.");
67             return;
68         }
69         
70         version (HUNT_DEBUG) trace("Stopping event loop...");
71         if(isLoopThread()) {
72             auto stopTask = task(&stop);
73             taskPool.put(stopTask);
74         } else {
75             _thread = null;
76             super.stop();
77             // dispose();
78         }
79     }
80 
81     // bool isRuning() {
82     //     return (_thread !is null);
83     // }
84 
85     bool isLoopThread() {
86         return _thread is Thread.getThis();
87     }
88 
89     EventLoop postTask(AbstractTask task) {
90         synchronized (this) {
91             _queue.enQueue(task);
92         }
93         return this;
94     }
95 
96     static AbstractTask createTask(alias fun, Args...)(Args args) {
97         return newTask!(fun, Args)(args);
98     }
99 
100     static AbstractTask createTask(F, Args...)(F delegateOrFp, Args args)
101             if (is(typeof(delegateOrFp(args)))) {
102         return newTask(F, Args)(delegateOrFp, args);
103     }
104 
105     protected void onWeakUp() {
106         TaskQueue queue;
107         synchronized (this) {
108             queue = _queue;
109             _queue = TaskQueue();
110         }
111         while (!queue.empty) {
112             auto task = queue.deQueue();
113             task.job();
114         }
115     }
116 
117 private:
118     Thread _thread;
119     TaskQueue _queue;
120 }