1 /* 2 * Hunt - A refined core library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.event.EventLoopGroup; 13 14 import hunt.concurrency.TaskPool; 15 import hunt.event.EventLoop; 16 import hunt.logging.ConsoleLogger; 17 import hunt.system.Memory; 18 import hunt.util.Lifecycle; 19 20 import core.atomic; 21 22 /** 23 * 24 */ 25 class EventLoopGroup : Lifecycle { 26 private TaskPool _pool; 27 28 this(size_t ioThreadSize = (totalCPUs - 1), size_t workerThreadSize = 0) { 29 size_t _size = ioThreadSize > 0 ? ioThreadSize : 1; 30 31 version(HUNT_DEBUG) infof("ioThreadSize: %d, workerThreadSize: %d", ioThreadSize, workerThreadSize); 32 33 _eventLoops = new EventLoop[_size]; 34 35 if(workerThreadSize > 0) { 36 _pool = new TaskPool(workerThreadSize, true); 37 } 38 39 foreach (i; 0 .. _size) { 40 _eventLoops[i] = new EventLoop(i, _size, _pool); 41 } 42 } 43 44 void start() { 45 start(-1); 46 } 47 48 /** 49 timeout: in millisecond 50 */ 51 void start(long timeout) { 52 if (cas(&_isRunning, false, true)) { 53 foreach (EventLoop pool; _eventLoops) { 54 pool.runAsync(timeout); 55 } 56 } 57 } 58 59 void stop() { 60 if (!cas(&_isRunning, true, false)) 61 return; 62 63 version (HUNT_IO_DEBUG) 64 trace("stopping EventLoopGroup..."); 65 foreach (EventLoop pool; _eventLoops) { 66 pool.stop(); 67 } 68 69 version (HUNT_IO_DEBUG) 70 trace("EventLoopGroup stopped."); 71 } 72 73 bool isRunning() { 74 return _isRunning; 75 } 76 77 bool isReady() { 78 79 foreach (EventLoop pool; _eventLoops) { 80 if(!pool.isReady()) return false; 81 } 82 83 return true; 84 } 85 86 @property size_t size() { 87 return _eventLoops.length; 88 } 89 90 EventLoop nextLoop(size_t factor) { 91 return _eventLoops[factor % _eventLoops.length]; 92 } 93 94 // Warning: 95 // It's dangerous because it may lead that the different io channels are 96 // assigned to the same slot in the same EventLoop. 97 // Then the last channel may be overwritten and be collected by GC. 98 deprecated("Using nextLoop(size_t factor) instead.") 99 EventLoop nextLoop() { 100 size_t index = atomicOp!"+="(_loopIndex, 1); 101 if(index > 10000) { 102 index = 0; 103 atomicStore(_loopIndex, 0); 104 } 105 index %= _eventLoops.length; 106 return _eventLoops[index]; 107 } 108 109 EventLoop opIndex(size_t index) { 110 auto i = index % _eventLoops.length; 111 return _eventLoops[i]; 112 } 113 114 int opApply(scope int delegate(EventLoop) dg) { 115 int ret = 0; 116 foreach (pool; _eventLoops) { 117 ret = dg(pool); 118 if (ret) 119 break; 120 } 121 return ret; 122 } 123 124 private: 125 shared int _loopIndex; 126 shared bool _isRunning; 127 EventLoop[] _eventLoops; 128 }