1 /*
2  * Hunt - A refined core library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net/
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module hunt.event.EventLoopGroup;
13 
14 import hunt.event.EventLoop;
15 import hunt.logging.ConsoleLogger;
16 import hunt.system.Memory;
17 import hunt.util.Lifecycle;
18 import hunt.util.TaskPool;
19 
20 import core.atomic;
21 
22 /**
23  * 
24  */
25 class EventLoopGroup : Lifecycle {
26     private TaskPool _pool;
27 
28     this(size_t ioThreadSize = (totalCPUs - 1), size_t workerThreadSize = 0) {
29         size_t _size = ioThreadSize > 0 ? ioThreadSize : 1;
30 
31         version(HUNT_DEBUG) infof("ioThreadSize: %d, workerThreadSize: %d", ioThreadSize, workerThreadSize);
32 
33         _eventLoops = new EventLoop[_size];
34 
35         if(workerThreadSize > 0) {
36             _pool = new TaskPool(workerThreadSize, true);
37         } 
38 
39         foreach (i; 0 .. _size) {
40             _eventLoops[i] = new EventLoop(i, _size, _pool);
41         }
42     }
43 
44     void start() {
45         start(-1);
46     }
47 
48     /**
49         timeout: in millisecond
50     */
51     void start(long timeout) {
52         if (cas(&_isRunning, false, true)) {
53             foreach (EventLoop pool; _eventLoops) {
54                 pool.runAsync(timeout);
55             }
56         }
57     }
58 
59     void stop() {
60         if (!cas(&_isRunning, true, false))
61             return;
62 
63         if(_pool !is null)  {
64             _pool.stop();
65         }
66 
67         version (HUNT_IO_DEBUG)
68             trace("stopping EventLoopGroup...");
69         foreach (EventLoop pool; _eventLoops) {
70             pool.stop();
71         }
72 
73         version (HUNT_IO_DEBUG)
74             trace("EventLoopGroup stopped.");
75     }
76 
77 	bool isRunning() {
78         return _isRunning;
79     }
80 
81     bool isReady() {
82         
83         foreach (EventLoop pool; _eventLoops) {
84             if(!pool.isReady()) return false;
85         }
86 
87         return true;
88     }
89 
90     TaskPool worker() {
91         return _pool;
92     }
93 
94     @property size_t size() {
95         return _eventLoops.length;
96     }
97 
98     EventLoop nextLoop(size_t factor) {
99        return _eventLoops[factor % _eventLoops.length];
100     }
101 
102     EventLoop opIndex(size_t index) {
103         auto i = index % _eventLoops.length;
104         return _eventLoops[i];
105     }
106 
107     int opApply(scope int delegate(EventLoop) dg) {
108         int ret = 0;
109         foreach (pool; _eventLoops) {
110             ret = dg(pool);
111             if (ret)
112                 break;
113         }
114         return ret;
115     }
116 
117 private:
118     shared int _loopIndex;
119     shared bool _isRunning;
120     EventLoop[] _eventLoops;
121 }