1 module hunt.io.worker.WorkerGroup; 2 3 import std.stdio; 4 import std.container : DList; 5 import core.thread; 6 import core.sync.condition; 7 import core.sync.mutex; 8 import std.process; 9 import hunt.io.channel; 10 import hunt.io.BufferUtils; 11 import hunt.io.ByteBuffer; 12 import hunt.io.worker.Worker; 13 14 15 class WorkerGroup(T) { 16 17 this(size_t threadSize = 1) { 18 _mutex = new Mutex(); 19 _condition = new Condition(_mutex); 20 _isExit = false; 21 _threadSize = threadSize; 22 23 _thread = new Thread(() { 24 doThreadProc(); 25 }); 26 27 for(int i = 0 ; i < _threadSize ; ++i) 28 { 29 auto worker = new Worker!T(); 30 _workers[i] = worker; 31 } 32 } 33 34 35 private 36 { 37 bool _isExit; 38 Condition _condition; 39 Mutex _mutex; 40 DList!(Task!T) _queue; 41 size_t _threadSize; 42 Thread _thread; 43 Worker!T[size_t] _workers; 44 } 45 46 47 void put(T obj , ByteBuffer buffer) 48 { 49 if(obj !is null && buffer !is null) 50 { 51 auto task = new Task!T(obj, buffer); 52 _condition.mutex().lock(); 53 _queue.insertBack(task); 54 _condition.notify(); 55 _condition.mutex().unlock(); 56 } 57 } 58 59 void dispatch(Task!T task) 60 { 61 if (task !is null) 62 { 63 _workers[task.channel.handle % _threadSize].put(task); 64 } 65 } 66 67 void doThreadProc() 68 { 69 do 70 { 71 Task!T task = null; 72 { 73 _condition.mutex().lock(); 74 if (_queue.empty()) 75 { 76 _condition.wait(); 77 }else 78 { 79 task = _queue.front(); 80 _queue.removeFront(); 81 } 82 _condition.mutex().unlock(); 83 } 84 85 if (_isExit) 86 { 87 break; 88 } 89 90 dispatch(task); 91 92 } while (!_isExit); 93 94 return ; 95 } 96 97 void stop() 98 { 99 _isExit = true; 100 } 101 102 void run() 103 { 104 foreach(worker ; _workers.byValue) 105 { 106 worker.run(); 107 } 108 _thread.start(); 109 } 110 } 111