1 module hunt.util.worker.Worker; 2 3 import hunt.util.worker.Task; 4 // import hunt.util.worker.TaskQueue; 5 import hunt.util.worker.WorkerThread; 6 import hunt.logging; 7 8 import core.atomic; 9 import core.sync.condition; 10 import core.sync.mutex; 11 import core.thread; 12 13 import std.conv; 14 import std.concurrency; 15 16 17 18 /** 19 * 20 */ 21 class Worker { 22 23 private size_t _size; 24 private Thread _masterThread; 25 private WorkerThread[] _workerThreads; 26 private Task[size_t] _tasks; 27 private Mutex _taskLocker; 28 29 30 private TaskQueue _taskQueue; 31 private shared bool _isRunning = false; 32 33 this(TaskQueue taskQueue, size_t size = 8) { 34 _taskQueue = taskQueue; 35 _size = size; 36 37 version(HUNT_DEBUG) { 38 infof("Worker size: %d", size); 39 } 40 41 initialize(); 42 } 43 44 private void initialize() { 45 _taskLocker = new Mutex(); 46 _workerThreads = new WorkerThread[_size]; 47 48 foreach(size_t index; 0 .. _size) { 49 WorkerThread thread = new WorkerThread(index); 50 thread.start(); 51 52 _workerThreads[index] = thread; 53 } 54 } 55 56 void inspect() { 57 58 foreach(WorkerThread th; _workerThreads) { 59 60 Task task = th.task(); 61 62 if(th.state() == WorkerThreadState.Busy) { 63 if(task is null) { 64 warning("A dead worker thread detected: %s, %s", th.name, th.state()); 65 } else { 66 tracef("Thread: %s, state: %s, lifeTime: %s", th.name, th.state(), task.lifeTime()); 67 } 68 } else { 69 if(task is null) { 70 tracef("Thread: %s, state: %s", th.name, th.state()); 71 } else { 72 tracef("Thread: %s, state: %s", th.name, th.state(), task.executionTime); 73 } 74 } 75 } 76 } 77 78 void put(Task task) { 79 _taskQueue.push(task); 80 81 _taskLocker.lock(); 82 scope(exit) { 83 _taskLocker.unlock(); 84 } 85 86 _tasks[task.id] = task; 87 } 88 89 Task get(size_t id) { 90 _taskLocker.lock(); 91 scope(exit) { 92 _taskLocker.unlock(); 93 } 94 95 auto itemPtr = id in _tasks; 96 if(itemPtr is null) { 97 throw new Exception("Task does NOT exist: " ~ id.to!string); 98 } 99 100 return *itemPtr; 101 } 102 103 void remove(size_t id) { 104 _taskLocker.lock(); 105 scope(exit) { 106 _taskLocker.unlock(); 107 } 108 109 _tasks.remove(id); 110 } 111 112 void clear() { 113 _taskLocker.lock(); 114 scope(exit) { 115 _taskLocker.unlock(); 116 } 117 _tasks.clear(); 118 119 } 120 121 void run() { 122 bool r = cas(&_isRunning, false, true); 123 if(r) { 124 _masterThread = new Thread(&doRun); 125 _masterThread.start(); 126 } 127 } 128 129 void stop() { 130 bool r = cas(&_isRunning, true, false); 131 132 if(r) { 133 version(HUNT_IO_DEBUG) { 134 info("Stopping all the threads..."); 135 } 136 137 138 foreach(size_t index; 0 .. _size) { 139 _workerThreads[index].stop(); 140 _workerThreads[index].join(); 141 } 142 143 144 // To stop the master thread as soon as possible. 145 // _taskQueue.push(null); 146 _taskQueue.clear(); 147 148 if(_masterThread !is null) { 149 _masterThread.join(); 150 } 151 152 version(HUNT_IO_DEBUG) { 153 info("All the threads stopped."); 154 } 155 } 156 } 157 158 private WorkerThread findIdleThread() { 159 foreach(size_t index, WorkerThread thread; _workerThreads) { 160 version(HUNT_IO_DEBUG) { 161 tracef("Thread: %s, state: %s", thread.name, thread.state); 162 } 163 164 if(thread.isIdle()) 165 return thread; 166 } 167 168 return null; 169 } 170 171 private void doRun() { 172 while(_isRunning) { 173 try { 174 version(HUNT_IO_DEBUG) info("running..."); 175 Task task = _taskQueue.pop(); 176 if(!_isRunning) break; 177 178 if(task is null) { 179 version(HUNT_IO_DEBUG) { 180 warning("A null task popped!"); 181 inspect(); 182 } 183 continue; 184 } 185 186 WorkerThread workerThread; 187 bool isAttatched = false; 188 189 do { 190 workerThread = findIdleThread(); 191 192 // All worker threads are busy! 193 if(workerThread is null) { 194 // version(HUNT_METRIC) { 195 // _taskQueue.inspect(); 196 // } 197 // trace("All worker threads are busy!"); 198 // Thread.sleep(1.seconds); 199 // Thread.sleep(10.msecs); 200 Thread.yield(); 201 } else { 202 isAttatched = workerThread.attatch(task); 203 } 204 } while(!isAttatched && _isRunning); 205 206 } catch(Throwable ex) { 207 warning(ex); 208 } 209 } 210 211 version(HUNT_IO_DEBUG) info("Worker stopped!"); 212 213 } 214 215 } 216