1 module hunt.util.worker.WorkerThread; 2 3 import hunt.util.Closeable; 4 import hunt.util.ResoureManager; 5 import hunt.util.worker.Task; 6 import hunt.util.worker.Worker; 7 8 import hunt.logging; 9 10 import core.atomic; 11 import core.memory; 12 import core.thread; 13 import core.sync.condition; 14 import core.sync.mutex; 15 import std.conv; 16 17 18 19 enum WorkerThreadState { 20 Idle, 21 Busy, // occupied 22 Stopped 23 } 24 25 bool inWorkerThread() { 26 WorkerThread th = cast(WorkerThread) Thread.getThis(); 27 return th !is null; 28 } 29 30 /** 31 * 32 */ 33 class WorkerThread : Thread { 34 35 private shared WorkerThreadState _state; 36 private size_t _index; 37 private Task _task; 38 private Duration _timeout; 39 40 private Condition _condition; 41 private Mutex _mutex; 42 43 this(size_t index, Duration timeout = 5.seconds, size_t stackSize = 0) { 44 _index = index; 45 _timeout = timeout; 46 _state = WorkerThreadState.Idle; 47 _mutex = new Mutex(); 48 _condition = new Condition(_mutex); 49 this.name = "WorkerThread-" ~ _index.to!string(); 50 super(&run, stackSize); 51 } 52 53 void stop() { 54 version(HUNT_IO_DEBUG) { 55 infof("Stopping thread %s", this.name); 56 } 57 _state = WorkerThreadState.Stopped; 58 59 _mutex.lock(); 60 scope (exit) { 61 _mutex.unlock(); 62 } 63 _condition.notify(); 64 } 65 66 bool isBusy() { 67 return _state == WorkerThreadState.Busy; 68 } 69 70 bool isIdle() { 71 return _state == WorkerThreadState.Idle; 72 } 73 74 WorkerThreadState state() { 75 return _state; 76 } 77 78 size_t index() { 79 return _index; 80 } 81 82 Task task() { 83 return _task; 84 } 85 86 bool attatch(Task task) { 87 assert(task !is null); 88 bool r = cas(&_state, WorkerThreadState.Idle, WorkerThreadState.Busy); 89 90 if (r) { 91 version(HUNT_IO_DEBUG) { 92 infof("attatching task %d with thread %s", task.id, this.name); 93 } 94 95 _mutex.lock(); 96 scope (exit) { 97 _mutex.unlock(); 98 } 99 _task = task; 100 _condition.notify(); 101 102 } else { 103 warningf("%s is unavailable. state: %s", this.name(), _state); 104 } 105 106 return r; 107 } 108 109 private void run() nothrow { 110 while (_state != WorkerThreadState.Stopped) { 111 112 scope (exit) { 113 version (HUNT_IO_DEBUG) { 114 tracef("%s Done. state: %s", this.name(), _state); 115 } 116 117 collectResoure(); 118 _task = null; 119 120 if(_state != WorkerThreadState.Stopped) { 121 bool r = cas(&_state, WorkerThreadState.Busy, WorkerThreadState.Idle); 122 version(HUNT_IO_DEBUG) { 123 if(!r) { 124 warningf("Failed to set thread %s to Idle, its state is %s", this.name, _state); 125 } 126 } 127 } 128 } 129 130 try { 131 doRun(); 132 } catch (Throwable ex) { 133 warning(ex); 134 } 135 } 136 137 version (HUNT_DEBUG) tracef("%s Stopped. state: %s", this.name(), _state); 138 } 139 140 private bool _isWaiting = false; 141 142 private void doRun() { 143 _mutex.lock(); 144 145 Task task = _task; 146 while(task is null && _state != WorkerThreadState.Stopped) { 147 bool r = _condition.wait(_timeout); 148 task = _task; 149 150 version(HUNT_IO_DEBUG) { 151 if(!r && _state == WorkerThreadState.Busy) { 152 if(task is null) { 153 warningf("No task attatched on a busy thread %s in %s, task: %s", this.name, _timeout); 154 } else { 155 warningf("more tests need for this status, thread %s in %s", this.name, _timeout); 156 } 157 } 158 } 159 } 160 161 _mutex.unlock(); 162 163 if(task !is null) { 164 version(HUNT_IO_DEBUG) { 165 tracef("Try to exeucte task %d in thread %s, its status: %s", task.id, this.name, task.status); 166 } 167 task.execute(); 168 } 169 } 170 }