1 module hunt.util.TaskPool; 2 3 import hunt.io.SimpleQueue; 4 import hunt.logging.ConsoleLogger; 5 import hunt.system.Memory; 6 import hunt.util.Runnable; 7 8 9 import core.thread; 10 import core.atomic; 11 import core.sync.condition; 12 import core.sync.mutex; 13 14 import std.conv; 15 import std.format; 16 import std.traits; 17 18 private enum TaskStatus : ubyte { 19 ready, 20 processing, 21 done 22 } 23 24 /* Atomics code. These forward to core.atomic, but are written like this 25 for two reasons: 26 27 1. They used to actually contain ASM code and I don' want to have to change 28 to directly calling core.atomic in a zillion different places. 29 30 2. core.atomic has some misc. issues that make my use cases difficult 31 without wrapping it. If I didn't wrap it, casts would be required 32 basically everywhere. 33 */ 34 private void atomicSetUbyte(T)(ref T stuff, T newVal) 35 if (__traits(isIntegral, T) && is(T : ubyte)) { 36 //core.atomic.cas(cast(shared) &stuff, stuff, newVal); 37 atomicStore(*(cast(shared)&stuff), newVal); 38 } 39 40 private ubyte atomicReadUbyte(T)(ref T val) 41 if (__traits(isIntegral, T) && is(T : ubyte)) { 42 return atomicLoad(*(cast(shared)&val)); 43 } 44 45 // This gets rid of the need for a lot of annoying casts in other parts of the 46 // code, when enums are involved. 47 private bool atomicCasUbyte(T)(ref T stuff, T testVal, T newVal) 48 if (__traits(isIntegral, T) && is(T : ubyte)) { 49 return core.atomic.cas(cast(shared)&stuff, testVal, newVal); 50 } 51 52 53 /** 54 * 55 */ 56 class AbstractTask : Runnable { 57 58 Throwable exception; 59 ubyte taskStatus = TaskStatus.ready; 60 61 final void run() { 62 atomicSetUbyte(taskStatus, TaskStatus.processing); 63 try { 64 onRun(); 65 } catch (Throwable e) { 66 exception = e; 67 debug warning(e.msg); 68 version(HUNT_DEBUG) { 69 warning(e); 70 } 71 } 72 73 atomicSetUbyte(taskStatus, TaskStatus.done); 74 } 75 76 abstract protected void onRun(); 77 78 bool done() @property { 79 if (atomicReadUbyte(taskStatus) == TaskStatus.done) { 80 if (exception) { 81 throw exception; 82 } 83 return true; 84 } 85 return false; 86 } 87 } 88 89 /** 90 * 91 */ 92 class Task(alias fun, Args...) : AbstractTask { 93 Args _args; 94 95 static if (Args.length > 0) { 96 this(Args args) { 97 _args = args; 98 } 99 } else { 100 this() { 101 } 102 } 103 104 /** 105 The return type of the function called by this `Task`. This can be 106 `void`. 107 */ 108 alias ReturnType = typeof(fun(_args)); 109 110 static if (!is(ReturnType == void)) { 111 static if (is(typeof(&fun(_args)))) { 112 // Ref return. 113 ReturnType* returnVal; 114 115 ref ReturnType fixRef(ReturnType* val) { 116 return *val; 117 } 118 119 } else { 120 ReturnType returnVal; 121 122 ref ReturnType fixRef(ref ReturnType val) { 123 return val; 124 } 125 } 126 } 127 128 private static void impl(AbstractTask myTask) { 129 auto myCastedTask = cast(typeof(this)) myTask; 130 static if (is(ReturnType == void)) { 131 fun(myCastedTask._args); 132 } else static if (is(typeof(addressOf(fun(myCastedTask._args))))) { 133 myCastedTask.returnVal = addressOf(fun(myCastedTask._args)); 134 } else { 135 myCastedTask.returnVal = fun(myCastedTask._args); 136 } 137 } 138 139 protected override void onRun() { 140 impl(this); 141 } 142 } 143 144 T* addressOf(T)(ref T val) { 145 return &val; 146 } 147 148 auto makeTask(alias fun, Args...)(Args args) { 149 return new Task!(fun, Args)(args); 150 } 151 152 auto makeTask(F, Args...)(F delegateOrFp, Args args) 153 if (is(typeof(delegateOrFp(args)))) // && !isSafeTask!F 154 { 155 return new Task!(run, F, Args)(delegateOrFp, args); 156 } 157 158 // Calls `fpOrDelegate` with `args`. This is an 159 // adapter that makes `Task` work with delegates, function pointers and 160 // functors instead of just aliases. 161 ReturnType!F run(F, Args...)(F fpOrDelegate, ref Args args) { 162 return fpOrDelegate(args); 163 } 164 165 /* 166 This class serves two purposes: 167 168 1. It distinguishes std.parallelism threads from other threads so that 169 the std.parallelism daemon threads can be terminated. 170 171 2. It adds a reference to the pool that the thread is a member of, 172 which is also necessary to allow the daemon threads to be properly 173 terminated. 174 */ 175 final class ParallelismThread : Thread { 176 177 this(void delegate() dg) { 178 super(dg); 179 taskQueue = new BlockingQueue!(AbstractTask)(10.seconds); 180 state = ThreadState.NEW; 181 } 182 183 ThreadState state; 184 int factor = -1; 185 186 TaskPool pool; 187 Queue!(AbstractTask) taskQueue; 188 } 189 190 enum ThreadState { 191 NEW, 192 BUSY, 193 IDLE, 194 TERMINATED 195 } 196 197 /** 198 * 199 */ 200 enum PoolState : ubyte { 201 running, 202 finishing, 203 stopNow 204 } 205 206 /** 207 * 208 */ 209 class PoolStatus { 210 size_t total; 211 212 int active; 213 214 int idle; 215 216 int waiting; 217 218 override string toString() { 219 return format("total: %d, active: %d, idle: %d, waiting: %d", 220 total, active, idle, waiting); 221 } 222 } 223 224 /** 225 * 226 */ 227 class TaskPool { 228 229 private ParallelismThread[] pool; 230 private PoolState status = PoolState.running; 231 232 // The instanceStartIndex of the next instance that will be created. 233 // __gshared size_t nextInstanceIndex = 1; 234 235 // The index of the first thread in this instance. 236 // immutable size_t instanceStartIndex; 237 238 // The index of the current thread. 239 static size_t threadIndex; 240 241 // The index that the next thread to be initialized in this pool will have. 242 shared size_t nextThreadIndex; 243 244 Condition workerCondition; 245 Condition waiterCondition; 246 Mutex queueMutex; 247 Mutex waiterMutex; // For waiterCondition 248 249 bool isSingleTask = false; 250 251 /** 252 Default constructor that initializes a `TaskPool` with 253 `totalCPUs` - 1 worker threads. The minus 1 is included because the 254 main thread will also be available to do work. 255 256 Note: On single-core machines, the primitives provided by `TaskPool` 257 operate transparently in single-threaded mode. 258 */ 259 this() { 260 this(totalCPUs - 1); 261 } 262 263 /** 264 Allows for custom number of worker threads. 265 */ 266 this(size_t nWorkers, bool isDaemon = false) { 267 if (nWorkers == 0) 268 nWorkers = 1; 269 270 queueMutex = new Mutex(this); 271 waiterMutex = new Mutex(); 272 workerCondition = new Condition(queueMutex); 273 waiterCondition = new Condition(waiterMutex); 274 nextThreadIndex = 0; 275 276 pool = new ParallelismThread[nWorkers]; 277 278 foreach (size_t index, ref ParallelismThread poolThread; pool) { 279 poolThread = new ParallelismThread(&startWorkLoop); 280 poolThread.pool = this; 281 poolThread.name = "worker-thread-" ~ index.to!string(); 282 poolThread.start(); 283 } 284 285 this.isDaemon = isDaemon; 286 } 287 288 bool isDaemon() @property @trusted { 289 return pool[0].isDaemon; 290 } 291 292 /// Ditto 293 void isDaemon(bool newVal) @property @trusted { 294 foreach (thread; pool) { 295 thread.isDaemon = newVal; 296 } 297 } 298 299 // This function performs initialization for each thread that affects 300 // thread local storage and therefore must be done from within the 301 // worker thread. It then calls executeWorkLoop(). 302 private void startWorkLoop() { 303 // Initialize thread index. 304 size_t index = atomicOp!("+=")(nextThreadIndex, 1); 305 threadIndex = index - 1; 306 307 executeWorkLoop(); 308 } 309 310 // This is the main work loop that worker threads spend their time in 311 // until they terminate. It's also entered by non-worker threads when 312 // finish() is called with the blocking variable set to true. 313 private void executeWorkLoop() { 314 ParallelismThread workerThread = pool[threadIndex]; 315 workerThread.state = ThreadState.IDLE; 316 scope(exit) { 317 version(HUNT_IO_DEBUG) infof("Thread exited, threadIndex: %d, name: %s", threadIndex, workerThread.name()); 318 workerThread.state = ThreadState.TERMINATED; 319 } 320 321 while (atomicReadUbyte(status) != PoolState.stopNow) { 322 AbstractTask task = workerThread.taskQueue.dequeue(); 323 if (task is null) { 324 version(HUNT_IO_DEBUG) { 325 // warningf("A empty task returned on thread: %s", workerThread.name); 326 } 327 } else { 328 version(HUNT_IO_DEBUG) infof("running a task in thread: %s", workerThread.name()); 329 workerThread.state = ThreadState.BUSY; 330 331 scope(exit) { 332 workerThread.state = ThreadState.IDLE; 333 workerThread.factor = -1; 334 335 version(HUNT_IO_DEBUG) infof("A task finished in thread: %s", workerThread.name()); 336 } 337 338 doJob(task); 339 } 340 } 341 } 342 343 private void doJob(AbstractTask job) nothrow { 344 try { 345 job.run(); 346 } catch(Throwable t) { 347 warning(t.msg); 348 version(HUNT_DEBUG) warning(t); 349 } 350 } 351 352 private void waiterLock() { 353 if (!isSingleTask) 354 waiterMutex.lock(); 355 } 356 357 private void waiterUnlock() { 358 if (!isSingleTask) 359 waiterMutex.unlock(); 360 } 361 362 private void wait() { 363 if (!isSingleTask) 364 workerCondition.wait(); 365 } 366 367 private void notify() { 368 if (!isSingleTask) 369 workerCondition.notify(); 370 } 371 372 private void notifyAll() { 373 if (!isSingleTask) 374 workerCondition.notifyAll(); 375 } 376 377 private void waitUntilCompletion() { 378 waiterCondition.wait(); 379 } 380 381 private void notifyWaiters() { 382 if (!isSingleTask) 383 waiterCondition.notifyAll(); 384 } 385 386 void stop() @trusted { 387 // queueLock(); 388 // scope(exit) queueUnlock(); 389 atomicSetUbyte(status, PoolState.stopNow); 390 notifyAll(); 391 } 392 393 void finish(bool blocking = false) @trusted { 394 { 395 // queueLock(); 396 // scope(exit) queueUnlock(); 397 atomicCasUbyte(status, PoolState.running, PoolState.finishing); 398 notifyAll(); 399 } 400 if (blocking) { 401 // Use this thread as a worker until everything is finished. 402 // stopWorkLoop(); 403 // taskQueue.wakeup(); 404 executeWorkLoop(); 405 406 foreach (t; pool) { 407 // Maybe there should be something here to prevent a thread 408 // from calling join() on itself if this function is called 409 // from a worker thread in the same pool, but: 410 // 411 // 1. Using an if statement to skip join() would result in 412 // finish() returning without all tasks being finished. 413 // 414 // 2. If an exception were thrown, it would bubble up to the 415 // Task from which finish() was called and likely be 416 // swallowed. 417 t.join(); 418 } 419 } 420 } 421 422 void put(int factor, AbstractTask task) { 423 int nWorkers = cast(int)pool.length; 424 if(factor<0) factor = -factor; 425 int i = factor % nWorkers; 426 ParallelismThread selectedThread = pool[i]; 427 428 version(HUNT_IO_DEBUG) { 429 if(selectedThread.state != ThreadState.IDLE) { 430 int lastFactor = selectedThread.factor; 431 if(lastFactor != -1 && lastFactor != factor) { 432 warningf("The %s is busy. For factor, last: %d, current: %d. ", selectedThread.name(), 433 lastFactor, factor); 434 } 435 } 436 } 437 selectedThread.factor = factor; 438 439 version(HUNT_IO_DEBUG) { 440 tracef("Selected worker thread[%d]: %s, factor: %d", i, selectedThread.name(), factor); 441 // PoolStatus status = checkStatus(); 442 // warning(status.toString()); 443 } 444 445 selectedThread.taskQueue.enqueue(task); 446 } 447 448 // private ParallelismThread getIdleThread(int startIndex = 0) { 449 // int j = 0; 450 // int nWorkers = cast(int)pool.length; 451 // ParallelismThread selectedThread; 452 453 // do { 454 // startIndex = (startIndex+1) % nWorkers; 455 // selectedThread = pool[startIndex]; 456 // version(HUNT_IO_DEBUG) { 457 // warningf("The worker thread status: %s, index: %d", 458 // selectedThread.name(), startIndex); 459 // } 460 461 // j++; 462 // if(j >= nWorkers) { 463 // j = 0; 464 // version(HUNT_DEBUG) warning("All the worker threads are busy. Then nothing selected."); 465 // // No idle thread found after checking the whole pool. 466 // // No wait. Select one randomly. 467 // // Warning: It may be in a dead-lock status; 468 // break; 469 // } 470 // } while(selectedThread.state != ThreadState.IDLE); 471 472 // return selectedThread; 473 // } 474 475 PoolStatus checkStatus() { 476 PoolStatus status = new PoolStatus(); 477 478 status.total = pool.length; 479 foreach (size_t index, ParallelismThread th; pool) { 480 version(HUNT_IO_DEBUG) { 481 tracef("Thread[%d]: %s, state: %s", index, th.name, th.state); 482 } 483 if(th.state == ThreadState.BUSY) { 484 status.active++; 485 } else if(th.state == ThreadState.IDLE) { 486 status.idle++; 487 } 488 // tracef("Current: %s, WorkerThread: %s, state: %s", Thread.getThis().name(), t.name(), t.state); 489 } 490 491 return status; 492 } 493 }