1 module hunt.concurrency.TaskPool; 2 3 import hunt.concurrency.SimpleQueue; 4 import hunt.logging.ConsoleLogger; 5 import hunt.system.Memory; 6 import hunt.util.Common; 7 8 import core.thread; 9 import core.atomic; 10 import core.sync.condition; 11 import core.sync.mutex; 12 13 import std.conv; 14 import std.format; 15 import std.traits; 16 17 private enum TaskStatus : ubyte { 18 ready, 19 processing, 20 done 21 } 22 23 /* Atomics code. These forward to core.atomic, but are written like this 24 for two reasons: 25 26 1. They used to actually contain ASM code and I don' want to have to change 27 to directly calling core.atomic in a zillion different places. 28 29 2. core.atomic has some misc. issues that make my use cases difficult 30 without wrapping it. If I didn't wrap it, casts would be required 31 basically everywhere. 32 */ 33 private void atomicSetUbyte(T)(ref T stuff, T newVal) 34 if (__traits(isIntegral, T) && is(T : ubyte)) { 35 //core.atomic.cas(cast(shared) &stuff, stuff, newVal); 36 atomicStore(*(cast(shared)&stuff), newVal); 37 } 38 39 private ubyte atomicReadUbyte(T)(ref T val) 40 if (__traits(isIntegral, T) && is(T : ubyte)) { 41 return atomicLoad(*(cast(shared)&val)); 42 } 43 44 // This gets rid of the need for a lot of annoying casts in other parts of the 45 // code, when enums are involved. 46 private bool atomicCasUbyte(T)(ref T stuff, T testVal, T newVal) 47 if (__traits(isIntegral, T) && is(T : ubyte)) { 48 return core.atomic.cas(cast(shared)&stuff, testVal, newVal); 49 } 50 51 52 /** 53 * 54 */ 55 class AbstractTask : Runnable { 56 57 Throwable exception; 58 ubyte taskStatus = TaskStatus.ready; 59 60 final void run() { 61 atomicSetUbyte(taskStatus, TaskStatus.processing); 62 try { 63 onRun(); 64 } catch (Throwable e) { 65 exception = e; 66 debug warning(e.msg); 67 version(HUNT_DEBUG) { 68 warning(e); 69 } 70 } 71 72 atomicSetUbyte(taskStatus, TaskStatus.done); 73 } 74 75 abstract protected void onRun(); 76 77 bool done() @property { 78 if (atomicReadUbyte(taskStatus) == TaskStatus.done) { 79 if (exception) { 80 throw exception; 81 } 82 return true; 83 } 84 return false; 85 } 86 } 87 88 /** 89 * 90 */ 91 class Task(alias fun, Args...) : AbstractTask { 92 Args _args; 93 94 static if (Args.length > 0) { 95 this(Args args) { 96 _args = args; 97 } 98 } else { 99 this() { 100 } 101 } 102 103 /** 104 The return type of the function called by this `Task`. This can be 105 `void`. 106 */ 107 alias ReturnType = typeof(fun(_args)); 108 109 static if (!is(ReturnType == void)) { 110 static if (is(typeof(&fun(_args)))) { 111 // Ref return. 112 ReturnType* returnVal; 113 114 ref ReturnType fixRef(ReturnType* val) { 115 return *val; 116 } 117 118 } else { 119 ReturnType returnVal; 120 121 ref ReturnType fixRef(ref ReturnType val) { 122 return val; 123 } 124 } 125 } 126 127 private static void impl(AbstractTask myTask) { 128 auto myCastedTask = cast(typeof(this)) myTask; 129 static if (is(ReturnType == void)) { 130 fun(myCastedTask._args); 131 } else static if (is(typeof(addressOf(fun(myCastedTask._args))))) { 132 myCastedTask.returnVal = addressOf(fun(myCastedTask._args)); 133 } else { 134 myCastedTask.returnVal = fun(myCastedTask._args); 135 } 136 } 137 138 protected override void onRun() { 139 impl(this); 140 } 141 } 142 143 T* addressOf(T)(ref T val) { 144 return &val; 145 } 146 147 auto makeTask(alias fun, Args...)(Args args) { 148 return new Task!(fun, Args)(args); 149 } 150 151 auto makeTask(F, Args...)(F delegateOrFp, Args args) 152 if (is(typeof(delegateOrFp(args)))) // && !isSafeTask!F 153 { 154 return new Task!(run, F, Args)(delegateOrFp, args); 155 } 156 157 // Calls `fpOrDelegate` with `args`. This is an 158 // adapter that makes `Task` work with delegates, function pointers and 159 // functors instead of just aliases. 160 ReturnType!F run(F, Args...)(F fpOrDelegate, ref Args args) { 161 return fpOrDelegate(args); 162 } 163 164 /* 165 This class serves two purposes: 166 167 1. It distinguishes std.parallelism threads from other threads so that 168 the std.parallelism daemon threads can be terminated. 169 170 2. It adds a reference to the pool that the thread is a member of, 171 which is also necessary to allow the daemon threads to be properly 172 terminated. 173 */ 174 final class ParallelismThread : Thread { 175 176 this(void delegate() dg) { 177 super(dg); 178 taskQueue = new BlockingQueue!(AbstractTask)(); 179 state = ThreadState.NEW; 180 } 181 182 ThreadState state; 183 int factor = -1; 184 185 TaskPool pool; 186 Queue!(AbstractTask) taskQueue; 187 } 188 189 enum ThreadState { 190 NEW, 191 BUSY, 192 IDLE, 193 TERMINATED 194 } 195 196 /** 197 * 198 */ 199 enum PoolState : ubyte { 200 running, 201 finishing, 202 stopNow 203 } 204 205 /** 206 * 207 */ 208 class PoolStatus { 209 size_t total; 210 211 int active; 212 213 int idle; 214 215 int waiting; 216 217 override string toString() { 218 return format("total: %d, active: %d, idle: %d, waiting: %d", 219 total, active, idle, waiting); 220 } 221 } 222 223 /** 224 * 225 */ 226 class TaskPool { 227 228 private ParallelismThread[] pool; 229 private PoolState status = PoolState.running; 230 231 // The instanceStartIndex of the next instance that will be created. 232 // __gshared size_t nextInstanceIndex = 1; 233 234 // The index of the first thread in this instance. 235 // immutable size_t instanceStartIndex; 236 237 // The index of the current thread. 238 static size_t threadIndex; 239 240 // The index that the next thread to be initialized in this pool will have. 241 shared size_t nextThreadIndex; 242 243 Condition workerCondition; 244 Condition waiterCondition; 245 Mutex queueMutex; 246 Mutex waiterMutex; // For waiterCondition 247 248 bool isSingleTask = false; 249 250 /** 251 Default constructor that initializes a `TaskPool` with 252 `totalCPUs` - 1 worker threads. The minus 1 is included because the 253 main thread will also be available to do work. 254 255 Note: On single-core machines, the primitives provided by `TaskPool` 256 operate transparently in single-threaded mode. 257 */ 258 this() { 259 this(totalCPUs - 1); 260 } 261 262 /** 263 Allows for custom number of worker threads. 264 */ 265 this(size_t nWorkers, bool isDaemon = false) { 266 if (nWorkers == 0) 267 nWorkers = 1; 268 269 queueMutex = new Mutex(this); 270 waiterMutex = new Mutex(); 271 workerCondition = new Condition(queueMutex); 272 waiterCondition = new Condition(waiterMutex); 273 nextThreadIndex = 0; 274 275 pool = new ParallelismThread[nWorkers]; 276 277 foreach (size_t index, ref ParallelismThread poolThread; pool) { 278 poolThread = new ParallelismThread(&startWorkLoop); 279 poolThread.pool = this; 280 poolThread.name = "worker-thread-" ~ index.to!string(); 281 poolThread.start(); 282 } 283 284 this.isDaemon = isDaemon; 285 } 286 287 bool isDaemon() @property @trusted { 288 return pool[0].isDaemon; 289 } 290 291 /// Ditto 292 void isDaemon(bool newVal) @property @trusted { 293 foreach (thread; pool) { 294 thread.isDaemon = newVal; 295 } 296 } 297 298 // This function performs initialization for each thread that affects 299 // thread local storage and therefore must be done from within the 300 // worker thread. It then calls executeWorkLoop(). 301 private void startWorkLoop() { 302 // Initialize thread index. 303 size_t index = atomicOp!("+=")(nextThreadIndex, 1); 304 threadIndex = index - 1; 305 306 executeWorkLoop(); 307 } 308 309 // This is the main work loop that worker threads spend their time in 310 // until they terminate. It's also entered by non-worker threads when 311 // finish() is called with the blocking variable set to true. 312 private void executeWorkLoop() { 313 ParallelismThread workerThread = pool[threadIndex]; 314 workerThread.state = ThreadState.IDLE; 315 scope(exit) { 316 version(HUNT_IO_DEBUG) infof("Thread exited, threadIndex: %d, name: %s", threadIndex, workerThread.name()); 317 workerThread.state = ThreadState.TERMINATED; 318 } 319 320 while (atomicReadUbyte(status) != PoolState.stopNow) { 321 AbstractTask task = workerThread.taskQueue.dequeue(); 322 if (task is null) { 323 if (atomicReadUbyte(status) == PoolState.finishing) { 324 atomicSetUbyte(status, PoolState.stopNow); 325 break; 326 } 327 } else { 328 version(HUNT_IO_DEBUG) infof("running a task in thread: %s", workerThread.name()); 329 workerThread.state = ThreadState.BUSY; 330 // scope(exit) { 331 // workerThread.state = ThreadState.IDLE; 332 // } 333 334 doJob(task); 335 336 workerThread.state = ThreadState.IDLE; 337 workerThread.factor = -1; 338 339 version(HUNT_IO_DEBUG) infof("A task finished in thread: %s", workerThread.name()); 340 } 341 } 342 343 } 344 345 private void doJob(AbstractTask job) nothrow { 346 try { 347 job.run(); 348 } catch(Throwable t) { 349 warning(t.msg); 350 version(HUNT_DEBUG) warning(t); 351 } 352 } 353 354 private void waiterLock() { 355 if (!isSingleTask) 356 waiterMutex.lock(); 357 } 358 359 private void waiterUnlock() { 360 if (!isSingleTask) 361 waiterMutex.unlock(); 362 } 363 364 private void wait() { 365 if (!isSingleTask) 366 workerCondition.wait(); 367 } 368 369 private void notify() { 370 if (!isSingleTask) 371 workerCondition.notify(); 372 } 373 374 private void notifyAll() { 375 if (!isSingleTask) 376 workerCondition.notifyAll(); 377 } 378 379 private void waitUntilCompletion() { 380 waiterCondition.wait(); 381 } 382 383 private void notifyWaiters() { 384 if (!isSingleTask) 385 waiterCondition.notifyAll(); 386 } 387 388 void stop() @trusted { 389 // queueLock(); 390 // scope(exit) queueUnlock(); 391 atomicSetUbyte(status, PoolState.stopNow); 392 notifyAll(); 393 } 394 395 void finish(bool blocking = false) @trusted { 396 { 397 // queueLock(); 398 // scope(exit) queueUnlock(); 399 atomicCasUbyte(status, PoolState.running, PoolState.finishing); 400 notifyAll(); 401 } 402 if (blocking) { 403 // Use this thread as a worker until everything is finished. 404 // stopWorkLoop(); 405 // taskQueue.wakeup(); 406 executeWorkLoop(); 407 408 foreach (t; pool) { 409 // Maybe there should be something here to prevent a thread 410 // from calling join() on itself if this function is called 411 // from a worker thread in the same pool, but: 412 // 413 // 1. Using an if statement to skip join() would result in 414 // finish() returning without all tasks being finished. 415 // 416 // 2. If an exception were thrown, it would bubble up to the 417 // Task from which finish() was called and likely be 418 // swallowed. 419 t.join(); 420 } 421 } 422 } 423 424 void put(int factor, AbstractTask task) { 425 int nWorkers = cast(int)pool.length; 426 if(factor<0) factor = -factor; 427 int i = factor % nWorkers; 428 ParallelismThread selectedThread = pool[i]; 429 430 // Make sure that an IDLE thread selected 431 // int j = 0; 432 // while(selectedThread.state != ThreadState.IDLE) { 433 // version(HUNT_IO_DEBUG) warningf("Worker thread is busy: %s, index: %d", selectedThread.name(), i); 434 // i = (i+1) % nWorkers; 435 // selectedThread = pool[i]; 436 // j++; 437 // if(j >= nWorkers) { 438 // j = 0; 439 // version(HUNT_DEBUG) warning("All the worker threads are busy. Select one randomly."); 440 // // No idle thread found after checking the whole pool. 441 // // No wait. Select one randomly. 442 // // Warning: It may be in a dead-lock status; 443 // break; 444 // } 445 // } 446 447 if(selectedThread.state != ThreadState.IDLE) { 448 version(HUNT_IO_DEBUG) { 449 warningf("The %s is busy, index: %d, factor: %d", selectedThread.name(), i, factor); 450 } 451 452 int lastFactor = selectedThread.factor; 453 if(lastFactor != -1 && lastFactor != factor) { 454 version(HUNT_DEBUG) { 455 warningf("A factor collision detected, last: %d, current: %d. " ~ 456 "To remove this warning, increase the size of the task pool please!", 457 lastFactor, factor); 458 } 459 460 ParallelismThread newSelectedThread = getIdleThread(factor); 461 if(newSelectedThread !is null) { 462 selectedThread = newSelectedThread; 463 selectedThread.factor = factor; 464 } else { 465 // TODO: Tasks pending completion -@zhangxueping at 2020-04-26T10:00:58+08:00 466 // Create a new worker thread. 467 } 468 } else { 469 selectedThread.factor = factor; 470 } 471 } else { 472 selectedThread.factor = factor; 473 } 474 475 version(HUNT_IO_DEBUG) { 476 tracef("selected a thread: %s, index: %d", selectedThread.name(), i); 477 PoolStatus status = checkStatus(); 478 warning(status.toString()); 479 } 480 481 selectedThread.taskQueue.enqueue(task); 482 } 483 484 private ParallelismThread getIdleThread(int startIndex = 0) { 485 int j = 0; 486 int nWorkers = cast(int)pool.length; 487 ParallelismThread selectedThread; 488 489 do { 490 startIndex = (startIndex+1) % nWorkers; 491 selectedThread = pool[startIndex]; 492 version(HUNT_IO_DEBUG) { 493 warningf("The worker thread status: %s, index: %d", 494 selectedThread.name(), startIndex); 495 } 496 497 j++; 498 if(j >= nWorkers) { 499 j = 0; 500 version(HUNT_DEBUG) warning("All the worker threads are busy. Then nothing selected."); 501 // No idle thread found after checking the whole pool. 502 // No wait. Select one randomly. 503 // Warning: It may be in a dead-lock status; 504 break; 505 } 506 } while(selectedThread.state != ThreadState.IDLE); 507 508 return selectedThread; 509 } 510 511 PoolStatus checkStatus() { 512 PoolStatus status = new PoolStatus(); 513 514 status.total = pool.length; 515 foreach (ParallelismThread th; pool) { 516 if(th.state == ThreadState.BUSY) { 517 status.active++; 518 } else if(th.state == ThreadState.IDLE) { 519 status.idle++; 520 } 521 // tracef("Current: %s, WorkerThread: %s, state: %s", Thread.getThis().name(), t.name(), t.state); 522 } 523 524 return status; 525 } 526 }