1 /* 2 * Hunt - A refined core library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.event.EventLoopGroup; 13 14 import hunt.event.EventLoop; 15 import hunt.logging.ConsoleLogger; 16 import hunt.system.Memory; 17 import hunt.util.Lifecycle; 18 import hunt.util.TaskPool; 19 20 import core.atomic; 21 22 /** 23 * 24 */ 25 class EventLoopGroup : Lifecycle { 26 private TaskPool _pool; 27 28 this(size_t ioThreadSize = (totalCPUs - 1), size_t workerThreadSize = 0) { 29 size_t _size = ioThreadSize > 0 ? ioThreadSize : 1; 30 31 version(HUNT_DEBUG) infof("ioThreadSize: %d, workerThreadSize: %d", ioThreadSize, workerThreadSize); 32 33 _eventLoops = new EventLoop[_size]; 34 35 if(workerThreadSize > 0) { 36 _pool = new TaskPool(workerThreadSize, true); 37 } 38 39 foreach (i; 0 .. _size) { 40 _eventLoops[i] = new EventLoop(i, _size, _pool); 41 } 42 } 43 44 void start() { 45 start(-1); 46 } 47 48 /** 49 timeout: in millisecond 50 */ 51 void start(long timeout) { 52 if (cas(&_isRunning, false, true)) { 53 foreach (EventLoop pool; _eventLoops) { 54 pool.runAsync(timeout); 55 } 56 } 57 } 58 59 void stop() { 60 if (!cas(&_isRunning, true, false)) 61 return; 62 63 if(_pool !is null) { 64 _pool.stop(); 65 } 66 67 version (HUNT_IO_DEBUG) 68 trace("stopping EventLoopGroup..."); 69 foreach (EventLoop pool; _eventLoops) { 70 pool.stop(); 71 } 72 73 version (HUNT_IO_DEBUG) 74 trace("EventLoopGroup stopped."); 75 } 76 77 bool isRunning() { 78 return _isRunning; 79 } 80 81 bool isReady() { 82 83 foreach (EventLoop pool; _eventLoops) { 84 if(!pool.isReady()) return false; 85 } 86 87 return true; 88 } 89 90 TaskPool worker() { 91 return _pool; 92 } 93 94 @property size_t size() { 95 return _eventLoops.length; 96 } 97 98 EventLoop nextLoop(size_t factor) { 99 return _eventLoops[factor % _eventLoops.length]; 100 } 101 102 EventLoop opIndex(size_t index) { 103 auto i = index % _eventLoops.length; 104 return _eventLoops[i]; 105 } 106 107 int opApply(scope int delegate(EventLoop) dg) { 108 int ret = 0; 109 foreach (pool; _eventLoops) { 110 ret = dg(pool); 111 if (ret) 112 break; 113 } 114 return ret; 115 } 116 117 private: 118 shared int _loopIndex; 119 shared bool _isRunning; 120 EventLoop[] _eventLoops; 121 }