1 /*
2  * Hunt - A refined core library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net/
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module hunt.event.EventLoopGroup;
13 
14 import hunt.concurrency.TaskPool;
15 import hunt.event.EventLoop;
16 import hunt.logging.ConsoleLogger;
17 import hunt.system.Memory;
18 import hunt.util.Lifecycle;
19 
20 import core.atomic;
21 
22 /**
23  * 
24  */
25 class EventLoopGroup : Lifecycle {
26     private TaskPool _pool;
27 
28     this(size_t ioThreadSize = (totalCPUs - 1), size_t workerThreadSize = 0) {
29         size_t _size = ioThreadSize > 0 ? ioThreadSize : 1;
30 
31         version(HUNT_DEBUG) infof("ioThreadSize: %d, workerThreadSize: %d", ioThreadSize, workerThreadSize);
32 
33         _eventLoops = new EventLoop[_size];
34 
35         if(workerThreadSize > 0) {
36             _pool = new TaskPool(workerThreadSize, true);
37         } 
38 
39         foreach (i; 0 .. _size) {
40             _eventLoops[i] = new EventLoop(i, _size, _pool);
41         }
42     }
43 
44     void start() {
45         start(-1);
46     }
47 
48     /**
49         timeout: in millisecond
50     */
51     void start(long timeout) {
52         if (cas(&_isRunning, false, true)) {
53             foreach (EventLoop pool; _eventLoops) {
54                 pool.runAsync(timeout);
55             }
56         }
57     }
58 
59     void stop() {
60         if (!cas(&_isRunning, true, false))
61             return;
62 
63         version (HUNT_IO_DEBUG)
64             trace("stopping EventLoopGroup...");
65         foreach (EventLoop pool; _eventLoops) {
66             pool.stop();
67         }
68 
69         version (HUNT_IO_DEBUG)
70             trace("EventLoopGroup stopped.");
71     }
72 
73 	bool isRunning() {
74         return _isRunning;
75     }
76 
77     bool isReady() {
78         
79         foreach (EventLoop pool; _eventLoops) {
80             if(!pool.isReady()) return false;
81         }
82 
83         return true;
84     }
85 
86     @property size_t size() {
87         return _eventLoops.length;
88     }
89 
90     EventLoop nextLoop(size_t factor) {
91        return _eventLoops[factor % _eventLoops.length];
92     }
93 
94     // Warning:
95     //  It's dangerous because it may lead that the different io channels are 
96     //  assigned to the same slot in the same EventLoop. 
97     //  Then the last channel may be overwritten and be collected by GC.
98     deprecated("Using nextLoop(size_t factor) instead.")
99     EventLoop nextLoop() {
100         size_t index = atomicOp!"+="(_loopIndex, 1);
101         if(index > 10000) {
102             index = 0;
103             atomicStore(_loopIndex, 0);
104         }
105         index %= _eventLoops.length;
106         return _eventLoops[index];
107     }
108 
109     EventLoop opIndex(size_t index) {
110         auto i = index % _eventLoops.length;
111         return _eventLoops[i];
112     }
113 
114     int opApply(scope int delegate(EventLoop) dg) {
115         int ret = 0;
116         foreach (pool; _eventLoops) {
117             ret = dg(pool);
118             if (ret)
119                 break;
120         }
121         return ret;
122     }
123 
124 private:
125     shared int _loopIndex;
126     shared bool _isRunning;
127     EventLoop[] _eventLoops;
128 }