1 /*
2  * Hunt - A refined core library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net/
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module hunt.event.EventLoopGroup;
13 
14 import hunt.event.EventLoop;
15 import hunt.logging;
16 import hunt.system.Memory;
17 import hunt.util.Lifecycle;
18 import hunt.util.worker;
19 
20 import core.atomic;
21 
22 /**
23  * 
24  */
25 class EventLoopGroup : Lifecycle {
26     private TaskQueue _pool;
27     private Worker _worker;
28 
29     this(size_t ioThreadSize = (totalCPUs - 1), size_t workerThreadSize = 0) {
30         size_t _size = ioThreadSize > 0 ? ioThreadSize : 1;
31 
32         version(HUNT_DEBUG) infof("ioThreadSize: %d, workerThreadSize: %d", ioThreadSize, workerThreadSize);
33 
34         _eventLoops = new EventLoop[_size];
35 
36         if(workerThreadSize > 0) {
37             _pool = new MemoryTaskQueue();
38             _worker = new Worker(_pool, workerThreadSize);
39             _worker.run();
40         } 
41 
42         foreach (i; 0 .. _size) {
43             _eventLoops[i] = new EventLoop(i, _size, _worker);
44         }
45     }
46 
47     void start() {
48         start(-1);
49     }
50 
51     Worker worker() {
52         return _worker;
53     }
54 
55     /**
56         timeout: in millisecond
57     */
58     void start(long timeout) {
59         if (cas(&_isRunning, false, true)) {
60             foreach (EventLoop pool; _eventLoops) {
61                 pool.runAsync(timeout);
62             }
63         }
64     }
65 
66     void stop() {
67         if (!cas(&_isRunning, true, false))
68             return;
69 
70         if(_worker !is null) {
71             _worker.stop();
72         }
73 
74         version (HUNT_IO_DEBUG)
75             trace("stopping EventLoopGroup...");
76         foreach (EventLoop pool; _eventLoops) {
77             pool.stop();
78         }
79 
80         version (HUNT_IO_DEBUG)
81             trace("EventLoopGroup stopped.");
82     }
83 
84 	bool isRunning() {
85         return _isRunning;
86     }
87 
88     bool isReady() {
89         
90         foreach (EventLoop pool; _eventLoops) {
91             if(!pool.isReady()) return false;
92         }
93 
94         return true;
95     }
96 
97     @property size_t size() {
98         return _eventLoops.length;
99     }
100 
101     EventLoop nextLoop(size_t factor) {
102        return _eventLoops[factor % _eventLoops.length];
103     }
104 
105     EventLoop opIndex(size_t index) {
106         auto i = index % _eventLoops.length;
107         return _eventLoops[i];
108     }
109 
110     int opApply(scope int delegate(EventLoop) dg) {
111         int ret = 0;
112         foreach (pool; _eventLoops) {
113             ret = dg(pool);
114             if (ret)
115                 break;
116         }
117         return ret;
118     }
119 
120 private:
121     shared int _loopIndex;
122     shared bool _isRunning;
123     EventLoop[] _eventLoops;
124 }