1 /* 2 * Hunt - A refined core library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.event.EventLoopGroup; 13 14 import hunt.system.Memory; 15 import hunt.event.EventLoop; 16 import hunt.logging; 17 18 class EventLoopGroup { 19 20 this(uint size = (totalCPUs - 1)) { 21 // assert(size <= totalCPUs && size >= 0); 22 this._size = size > 0 ? size : 1; 23 eventLoopPool = new EventLoop[this._size]; 24 25 foreach (i; 0 .. this._size) { 26 eventLoopPool[i] = new EventLoop(); 27 } 28 } 29 30 /** 31 timeout: in millisecond 32 */ 33 void start(long timeout = -1) { 34 if (_started) 35 return; 36 _started = true; 37 foreach (i; 0 .. this._size) { 38 eventLoopPool[i].runAsync(timeout); 39 } 40 } 41 42 void stop() { 43 version (HUNT_DEBUG) 44 trace("stopping EventLoopGroup..."); 45 if (!_started) 46 return; 47 foreach (i; 0 .. this._size) { 48 eventLoopPool[i].stop(); 49 } 50 _started = false; 51 52 version (HUNT_DEBUG) 53 trace("EventLoopGroup stopped."); 54 } 55 56 @property size_t size() { 57 return _size; 58 } 59 60 EventLoop nextLoop() { 61 import core.atomic; 62 int index = atomicOp!"+="(_loopIndex, 1); 63 if(index > 10000) { 64 index = 0; 65 atomicStore(_loopIndex, 0); 66 } 67 index %= _size; 68 return eventLoopPool[index]; 69 } 70 private shared int _loopIndex; 71 72 EventLoop opIndex(size_t index) { 73 auto i = index % _size; 74 return eventLoopPool[i]; 75 } 76 77 int opApply(scope int delegate(EventLoop) dg) { 78 int ret = 0; 79 foreach (i; 0 .. this._size) { 80 ret = dg(eventLoopPool[i]); 81 if (ret) 82 break; 83 } 84 return ret; 85 } 86 87 private: 88 bool _started; 89 uint _size; 90 91 EventLoop[] eventLoopPool; 92 93 }