1 /* 2 * Hunt - A refined core library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.event.EventLoopGroup; 13 14 import hunt.event.EventLoop; 15 import hunt.logging; 16 import hunt.system.Memory; 17 import hunt.util.Lifecycle; 18 import hunt.util.worker; 19 20 import core.atomic; 21 22 /** 23 * 24 */ 25 class EventLoopGroup : Lifecycle { 26 private TaskQueue _pool; 27 private Worker _worker; 28 29 this(size_t ioThreadSize = (totalCPUs - 1), size_t workerThreadSize = 0) { 30 size_t _size = ioThreadSize > 0 ? ioThreadSize : 1; 31 32 version(HUNT_DEBUG) infof("ioThreadSize: %d, workerThreadSize: %d", ioThreadSize, workerThreadSize); 33 34 _eventLoops = new EventLoop[_size]; 35 36 if(workerThreadSize > 0) { 37 _pool = new MemoryTaskQueue(); 38 _worker = new Worker(_pool, workerThreadSize); 39 _worker.run(); 40 } 41 42 foreach (i; 0 .. _size) { 43 _eventLoops[i] = new EventLoop(i, _size, _worker); 44 } 45 } 46 47 void start() { 48 start(-1); 49 } 50 51 Worker worker() { 52 return _worker; 53 } 54 55 /** 56 timeout: in millisecond 57 */ 58 void start(long timeout) { 59 if (cas(&_isRunning, false, true)) { 60 foreach (EventLoop pool; _eventLoops) { 61 pool.runAsync(timeout); 62 } 63 } 64 } 65 66 void stop() { 67 if (!cas(&_isRunning, true, false)) 68 return; 69 70 if(_worker !is null) { 71 _worker.stop(); 72 } 73 74 version (HUNT_IO_DEBUG) 75 trace("stopping EventLoopGroup..."); 76 foreach (EventLoop pool; _eventLoops) { 77 pool.stop(); 78 } 79 80 version (HUNT_IO_DEBUG) 81 trace("EventLoopGroup stopped."); 82 } 83 84 bool isRunning() { 85 return _isRunning; 86 } 87 88 bool isReady() { 89 90 foreach (EventLoop pool; _eventLoops) { 91 if(!pool.isReady()) return false; 92 } 93 94 return true; 95 } 96 97 @property size_t size() { 98 return _eventLoops.length; 99 } 100 101 EventLoop nextLoop(size_t factor) { 102 return _eventLoops[factor % _eventLoops.length]; 103 } 104 105 EventLoop opIndex(size_t index) { 106 auto i = index % _eventLoops.length; 107 return _eventLoops[i]; 108 } 109 110 int opApply(scope int delegate(EventLoop) dg) { 111 int ret = 0; 112 foreach (pool; _eventLoops) { 113 ret = dg(pool); 114 if (ret) 115 break; 116 } 117 return ret; 118 } 119 120 private: 121 shared int _loopIndex; 122 shared bool _isRunning; 123 EventLoop[] _eventLoops; 124 }