1 module hunt.util.worker.WorkerThread; 2 3 import hunt.util.Closeable; 4 import hunt.util.ResoureManager; 5 import hunt.util.worker.Task; 6 import hunt.util.worker.Worker; 7 8 import hunt.logging.ConsoleLogger; 9 10 import core.atomic; 11 import core.memory; 12 import core.thread; 13 import core.sync.condition; 14 import core.sync.mutex; 15 import std.conv; 16 17 18 19 enum WorkerThreadState { 20 Idle, 21 Busy, // occupied 22 Stopped 23 } 24 25 bool inWorkerThread() { 26 WorkerThread th = cast(WorkerThread) Thread.getThis(); 27 return th !is null; 28 } 29 30 /** 31 * 32 */ 33 class WorkerThread : Thread { 34 35 private shared WorkerThreadState _state; 36 private size_t _index; 37 private Task _task; 38 private Duration _timeout; 39 40 private Condition _condition; 41 private Mutex _mutex; 42 43 this(size_t index, Duration timeout = 5.seconds, size_t stackSize = 0) { 44 _index = index; 45 _timeout = timeout; 46 _state = WorkerThreadState.Idle; 47 _mutex = new Mutex(); 48 _condition = new Condition(_mutex); 49 this.name = "WorkerThread-" ~ _index.to!string(); 50 super(&run, stackSize); 51 } 52 53 void stop() { 54 _state = WorkerThreadState.Stopped; 55 } 56 57 bool isBusy() { 58 return _state == WorkerThreadState.Busy; 59 } 60 61 bool isIdle() { 62 return _state == WorkerThreadState.Idle; 63 } 64 65 WorkerThreadState state() { 66 return _state; 67 } 68 69 size_t index() { 70 return _index; 71 } 72 73 Task task() { 74 return _task; 75 } 76 77 bool attatch(Task task) { 78 assert(task !is null); 79 bool r = cas(&_state, WorkerThreadState.Idle, WorkerThreadState.Busy); 80 81 if (r) { 82 version(HUNT_IO_DEBUG) { 83 infof("attatching task %d with thread %s", task.id, this.name); 84 } 85 86 _mutex.lock(); 87 scope (exit) { 88 _mutex.unlock(); 89 } 90 _task = task; 91 _condition.notify(); 92 93 } else { 94 warningf("%s is unavailable. state: %s", this.name(), _state); 95 } 96 97 return r; 98 } 99 100 private void run() nothrow { 101 while (_state != WorkerThreadState.Stopped) { 102 103 scope (exit) { 104 version (HUNT_IO_DEBUG) { 105 tracef("%s Done. state: %s", this.name(), _state); 106 } 107 108 collectResoure(); 109 _task = null; 110 bool r = cas(&_state, WorkerThreadState.Busy, WorkerThreadState.Idle); 111 if(!r) { 112 warningf("Failed to set thread %s to Idle, its state is %s", this.name, _state); 113 } 114 } 115 116 try { 117 doRun(); 118 } catch (Throwable ex) { 119 warning(ex); 120 } 121 } 122 123 version (HUNT_DEBUG) tracef("%s Stopped. state: %s", this.name(), _state); 124 } 125 126 private bool _isWaiting = false; 127 128 private void doRun() { 129 _mutex.lock(); 130 131 Task task = _task; 132 while(task is null && _state != WorkerThreadState.Stopped) { 133 bool r = _condition.wait(_timeout); 134 task = _task; 135 136 version(HUNT_IO_DEBUG) { 137 if(!r && _state == WorkerThreadState.Busy) { 138 if(task is null) { 139 warningf("No task attatched on a busy thread %s in %s, task: %s", this.name, _timeout); 140 } else { 141 warningf("more tests need for this status, thread %s in %s", this.name, _timeout); 142 } 143 } 144 } 145 } 146 147 _mutex.unlock(); 148 149 if(task !is null) { 150 version(HUNT_IO_DEBUG) { 151 tracef("Try to exeucte task %d in thread %s, its status: %s", task.id, this.name, task.status); 152 } 153 task.execute(); 154 } 155 } 156 }