1 module hunt.io.worker.Worker; 2 3 import hunt.io.ByteBuffer; 4 import core.thread; 5 import core.sync.condition; 6 import core.sync.mutex; 7 import std.container : DList; 8 import hunt.io.channel; 9 10 class Task(T) { 11 this (T obj , ByteBuffer buffer) 12 { 13 this.channel = obj; 14 this.buffer = buffer; 15 } 16 T channel; 17 ByteBuffer buffer; 18 } 19 20 21 class Worker(T) { 22 23 this() 24 { 25 _isExit = false; 26 _mutex = new Mutex(); 27 _condition = new Condition(_mutex); 28 _isExit = false; 29 _thread = new Thread(() { 30 doThreadProc(); 31 }); 32 } 33 34 void doThreadProc() 35 { 36 do 37 { 38 Task!T task = null; 39 { 40 _condition.mutex().lock(); 41 if (_queue.empty()) 42 { 43 _condition.wait(); 44 }else 45 { 46 task = _queue.front(); 47 _queue.removeFront(); 48 } 49 _condition.mutex().unlock(); 50 } 51 52 if (_isExit) 53 { 54 break; 55 } 56 if (task !is null && !((cast(AbstractChannel)(task.channel)).isClosed())) 57 { 58 auto handle = task.channel.getDataReceivedHandler(); 59 if (handle !is null) 60 { 61 handle(task.buffer); 62 } 63 } 64 } while (!_isExit); 65 } 66 67 void run() 68 { 69 _thread.start(); 70 } 71 72 void put(Task!T task) 73 { 74 if(task !is null) 75 { 76 _condition.mutex().lock(); 77 _queue.insertBack(task); 78 _condition.notify(); 79 _condition.mutex().unlock(); 80 } 81 } 82 83 private { 84 bool _isExit; 85 Condition _condition; 86 Mutex _mutex; 87 Thread _thread; 88 DList!(Task!T) _queue; 89 } 90 91 } 92