1 module hunt.concurrency.TaskPool;
2 
3 import hunt.concurrency.SimpleQueue;
4 import hunt.logging.ConsoleLogger;
5 import hunt.system.Memory;
6 import hunt.util.Common;
7 
8 import core.thread;
9 import core.atomic;
10 import core.sync.condition;
11 import core.sync.mutex;
12 
13 import std.conv;
14 import std.format;
15 import std.traits;
16 
17 private enum TaskStatus : ubyte {
18     ready,
19     processing,
20     done
21 }
22 
23 /* Atomics code.  These forward to core.atomic, but are written like this
24    for two reasons:
25 
26    1.  They used to actually contain ASM code and I don' want to have to change
27        to directly calling core.atomic in a zillion different places.
28 
29    2.  core.atomic has some misc. issues that make my use cases difficult
30        without wrapping it.  If I didn't wrap it, casts would be required
31        basically everywhere.
32 */
33 private void atomicSetUbyte(T)(ref T stuff, T newVal)
34         if (__traits(isIntegral, T) && is(T : ubyte)) {
35     //core.atomic.cas(cast(shared) &stuff, stuff, newVal);
36     atomicStore(*(cast(shared)&stuff), newVal);
37 }
38 
39 private ubyte atomicReadUbyte(T)(ref T val)
40         if (__traits(isIntegral, T) && is(T : ubyte)) {
41     return atomicLoad(*(cast(shared)&val));
42 }
43 
44 // This gets rid of the need for a lot of annoying casts in other parts of the
45 // code, when enums are involved.
46 private bool atomicCasUbyte(T)(ref T stuff, T testVal, T newVal)
47         if (__traits(isIntegral, T) && is(T : ubyte)) {
48     return core.atomic.cas(cast(shared)&stuff, testVal, newVal);
49 }
50 
51 
52 /**
53  * 
54  */
55 class AbstractTask : Runnable {
56 
57     Throwable exception;
58     ubyte taskStatus = TaskStatus.ready;
59 
60     final void run() {
61         atomicSetUbyte(taskStatus, TaskStatus.processing);
62         try {
63             onRun();
64         } catch (Throwable e) {
65             exception = e;
66             debug warning(e.msg);
67             version(HUNT_DEBUG) {
68                 warning(e);
69             } 
70         }
71 
72         atomicSetUbyte(taskStatus, TaskStatus.done);
73     }
74 
75     abstract protected void onRun();
76 
77     bool done() @property {
78         if (atomicReadUbyte(taskStatus) == TaskStatus.done) {
79             if (exception) {
80                 throw exception;
81             }
82             return true;
83         }
84         return false;
85     }
86 }
87 
88 /**
89  * 
90  */
91 class Task(alias fun, Args...) : AbstractTask {
92     Args _args;
93 
94     static if (Args.length > 0) {
95         this(Args args) {
96             _args = args;
97         }
98     } else {
99         this() {
100         }
101     }
102 
103     /**
104     The return type of the function called by this `Task`.  This can be
105     `void`.
106     */
107     alias ReturnType = typeof(fun(_args));
108 
109     static if (!is(ReturnType == void)) {
110         static if (is(typeof(&fun(_args)))) {
111             // Ref return.
112             ReturnType* returnVal;
113 
114             ref ReturnType fixRef(ReturnType* val) {
115                 return *val;
116             }
117 
118         } else {
119             ReturnType returnVal;
120 
121             ref ReturnType fixRef(ref ReturnType val) {
122                 return val;
123             }
124         }
125     }
126 
127     private static void impl(AbstractTask myTask) {
128         auto myCastedTask = cast(typeof(this)) myTask;
129         static if (is(ReturnType == void)) {
130             fun(myCastedTask._args);
131         } else static if (is(typeof(addressOf(fun(myCastedTask._args))))) {
132             myCastedTask.returnVal = addressOf(fun(myCastedTask._args));
133         } else {
134             myCastedTask.returnVal = fun(myCastedTask._args);
135         }
136     }
137 
138     protected override void onRun() {
139         impl(this);
140     }
141 }
142 
143 T* addressOf(T)(ref T val) {
144     return &val;
145 }
146 
147 auto makeTask(alias fun, Args...)(Args args) {
148     return new Task!(fun, Args)(args);
149 }
150 
151 auto makeTask(F, Args...)(F delegateOrFp, Args args)
152         if (is(typeof(delegateOrFp(args)))) // && !isSafeTask!F
153         {
154     return new Task!(run, F, Args)(delegateOrFp, args);
155 }
156 
157 // Calls `fpOrDelegate` with `args`.  This is an
158 // adapter that makes `Task` work with delegates, function pointers and
159 // functors instead of just aliases.
160 ReturnType!F run(F, Args...)(F fpOrDelegate, ref Args args) {
161     return fpOrDelegate(args);
162 }
163 
164 /*
165 This class serves two purposes:
166 
167 1.  It distinguishes std.parallelism threads from other threads so that
168     the std.parallelism daemon threads can be terminated.
169 
170 2.  It adds a reference to the pool that the thread is a member of,
171     which is also necessary to allow the daemon threads to be properly
172     terminated.
173 */
174 final class ParallelismThread : Thread {
175 
176     this(void delegate() dg) {
177         super(dg);
178         taskQueue = new BlockingQueue!(AbstractTask)();
179         state = ThreadState.NEW;
180     }
181 
182     ThreadState state;
183     int factor = -1;
184 
185     TaskPool pool;
186     Queue!(AbstractTask) taskQueue;
187 }
188 
189 enum ThreadState {
190     NEW,
191     BUSY,
192     IDLE,
193     TERMINATED
194 }
195 
196 /**
197  * 
198  */
199 enum PoolState : ubyte {
200     running,
201     finishing,
202     stopNow
203 }
204 
205 /**
206  * 
207  */
208 class PoolStatus {
209     size_t total;
210 
211     int active;
212 
213     int idle;
214     
215     int waiting;
216 
217     override string toString() {
218         return format("total: %d, active: %d, idle: %d, waiting: %d", 
219             total, active, idle, waiting);
220     }
221 }
222 
223 /** 
224  *
225  */
226 class TaskPool {
227 
228     private ParallelismThread[] pool;
229     private PoolState status = PoolState.running;
230 
231     // The instanceStartIndex of the next instance that will be created.
232     // __gshared size_t nextInstanceIndex = 1;
233 
234     // The index of the first thread in this instance.
235     // immutable size_t instanceStartIndex;
236 
237     // The index of the current thread.
238     static size_t threadIndex;
239 
240     // The index that the next thread to be initialized in this pool will have.
241     shared size_t nextThreadIndex;
242 
243     Condition workerCondition;
244     Condition waiterCondition;
245     Mutex queueMutex;
246     Mutex waiterMutex; // For waiterCondition
247 
248     bool isSingleTask = false;
249 
250     /**
251     Default constructor that initializes a `TaskPool` with
252     `totalCPUs` - 1 worker threads.  The minus 1 is included because the
253     main thread will also be available to do work.
254 
255     Note:  On single-core machines, the primitives provided by `TaskPool`
256            operate transparently in single-threaded mode.
257      */
258     this() {
259         this(totalCPUs - 1);
260     }
261     
262     /**
263     Allows for custom number of worker threads.
264     */
265     this(size_t nWorkers, bool isDaemon = false) {
266         if (nWorkers == 0)
267             nWorkers = 1;
268 
269         queueMutex = new Mutex(this);
270         waiterMutex = new Mutex();
271         workerCondition = new Condition(queueMutex);
272         waiterCondition = new Condition(waiterMutex);
273         nextThreadIndex = 0;
274 
275         pool = new ParallelismThread[nWorkers];
276 
277         foreach (size_t index, ref ParallelismThread poolThread; pool) {
278             poolThread = new ParallelismThread(&startWorkLoop);
279             poolThread.pool = this;
280             poolThread.name = "worker-thread-" ~ index.to!string();
281             poolThread.start();
282         }
283 
284         this.isDaemon = isDaemon;
285     }
286 
287     bool isDaemon() @property @trusted {
288         return pool[0].isDaemon;
289     }
290 
291     /// Ditto
292     void isDaemon(bool newVal) @property @trusted {
293         foreach (thread; pool) {
294             thread.isDaemon = newVal;
295         }
296     }
297 
298     // This function performs initialization for each thread that affects
299     // thread local storage and therefore must be done from within the
300     // worker thread.  It then calls executeWorkLoop().
301     private void startWorkLoop() {
302         // Initialize thread index.
303         size_t index = atomicOp!("+=")(nextThreadIndex, 1);
304         threadIndex = index - 1;
305 
306         executeWorkLoop();
307     }
308 
309     // This is the main work loop that worker threads spend their time in
310     // until they terminate.  It's also entered by non-worker threads when
311     // finish() is called with the blocking variable set to true.
312     private void executeWorkLoop() {
313         ParallelismThread workerThread = pool[threadIndex];
314         workerThread.state = ThreadState.IDLE;
315         scope(exit) {
316             version(HUNT_IO_DEBUG) infof("Thread exited, threadIndex: %d, name: %s", threadIndex, workerThread.name());
317             workerThread.state = ThreadState.TERMINATED;
318         }
319 
320         while (atomicReadUbyte(status) != PoolState.stopNow) {
321             AbstractTask task = workerThread.taskQueue.dequeue();
322             if (task is null) {
323                 if (atomicReadUbyte(status) == PoolState.finishing) {
324                     atomicSetUbyte(status, PoolState.stopNow);
325                     break;
326                 }
327             } else {
328                 version(HUNT_IO_DEBUG) infof("running a task in thread: %s", workerThread.name());
329                 workerThread.state = ThreadState.BUSY;
330                 // scope(exit) {
331                 //     workerThread.state = ThreadState.IDLE;
332                 // }
333                 
334                 doJob(task);
335                 
336                 workerThread.state = ThreadState.IDLE;
337                 workerThread.factor = -1;
338 
339                 version(HUNT_IO_DEBUG) infof("A task finished in thread: %s", workerThread.name());
340             }
341         }
342 
343     }
344 
345     private void doJob(AbstractTask job) nothrow {        
346         try {
347             job.run();
348         } catch(Throwable t) {
349             warning(t.msg);
350             version(HUNT_DEBUG) warning(t);
351         }
352     }
353 
354     private void waiterLock() {
355         if (!isSingleTask)
356             waiterMutex.lock();
357     }
358 
359     private void waiterUnlock() {
360         if (!isSingleTask)
361             waiterMutex.unlock();
362     }
363 
364     private void wait() {
365         if (!isSingleTask)
366             workerCondition.wait();
367     }
368 
369     private void notify() {
370         if (!isSingleTask)
371             workerCondition.notify();
372     }
373 
374     private void notifyAll() {
375         if (!isSingleTask)
376             workerCondition.notifyAll();
377     }
378 
379     private void waitUntilCompletion() {
380         waiterCondition.wait();
381     }
382 
383     private void notifyWaiters() {
384         if (!isSingleTask)
385             waiterCondition.notifyAll();
386     }
387 
388     void stop() @trusted {
389         // queueLock();
390         // scope(exit) queueUnlock();
391         atomicSetUbyte(status, PoolState.stopNow);
392         notifyAll();
393     }
394 
395     void finish(bool blocking = false) @trusted {
396         {
397             // queueLock();
398             // scope(exit) queueUnlock();
399             atomicCasUbyte(status, PoolState.running, PoolState.finishing);
400             notifyAll();
401         }
402         if (blocking) {
403             // Use this thread as a worker until everything is finished.
404             // stopWorkLoop();
405             // taskQueue.wakeup();
406             executeWorkLoop();
407 
408             foreach (t; pool) {
409                 // Maybe there should be something here to prevent a thread
410                 // from calling join() on itself if this function is called
411                 // from a worker thread in the same pool, but:
412                 //
413                 // 1.  Using an if statement to skip join() would result in
414                 //     finish() returning without all tasks being finished.
415                 //
416                 // 2.  If an exception were thrown, it would bubble up to the
417                 //     Task from which finish() was called and likely be
418                 //     swallowed.
419                 t.join();
420             }
421         }
422     }
423 
424     void put(int factor, AbstractTask task) {
425         int nWorkers = cast(int)pool.length;
426         if(factor<0) factor = -factor;
427         int i = factor % nWorkers;
428         ParallelismThread selectedThread = pool[i];
429 
430         // Make sure that an IDLE thread selected
431         // int j = 0;
432         // while(selectedThread.state != ThreadState.IDLE) {
433         //     version(HUNT_IO_DEBUG) warningf("Worker thread is busy: %s, index: %d", selectedThread.name(), i);
434         //     i = (i+1) % nWorkers;
435         //     selectedThread = pool[i];
436         //     j++;
437         //     if(j >= nWorkers) {
438         //         j = 0;
439         //         version(HUNT_DEBUG) warning("All the worker threads are busy. Select one randomly.");
440         //         // No idle thread found after checking the whole pool.
441         //         // No wait. Select one randomly.
442         //         // Warning: It may be in a dead-lock status;
443         //         break;
444         //     }
445         // }
446 
447         if(selectedThread.state != ThreadState.IDLE) {
448             version(HUNT_IO_DEBUG) {
449                 warningf("The %s is busy, index: %d, factor: %d", selectedThread.name(), i, factor);
450             }
451 
452             int lastFactor = selectedThread.factor;
453             if(lastFactor != -1 && lastFactor != factor) {
454                 version(HUNT_DEBUG) {
455                     warningf("A factor collision detected, last: %d, current: %d. " ~ 
456                         "To remove this warning, increase the size of the task pool please!", 
457                         lastFactor, factor);
458                 }
459 
460                 ParallelismThread newSelectedThread = getIdleThread(factor);
461                 if(newSelectedThread !is null) {
462                     selectedThread = newSelectedThread;
463                     selectedThread.factor = factor;
464                 } else {
465                     // TODO: Tasks pending completion -@zhangxueping at 2020-04-26T10:00:58+08:00
466                     // Create a new worker thread.
467                 }
468             } else {
469                 selectedThread.factor = factor;
470             }
471         } else {
472             selectedThread.factor = factor;
473         }
474 
475         version(HUNT_IO_DEBUG) {
476             tracef("selected a thread: %s, index: %d", selectedThread.name(), i);
477             PoolStatus status = checkStatus();
478             warning(status.toString());
479         }
480 
481         selectedThread.taskQueue.enqueue(task);
482     }
483 
484     private ParallelismThread getIdleThread(int startIndex = 0) {
485         int j = 0;
486         int nWorkers = cast(int)pool.length;
487         ParallelismThread selectedThread;
488 
489         do {            
490             startIndex = (startIndex+1) % nWorkers;
491             selectedThread = pool[startIndex];
492             version(HUNT_IO_DEBUG) {
493                 warningf("The worker thread status: %s, index: %d", 
494                 selectedThread.name(), startIndex);
495             }
496 
497             j++;
498             if(j >= nWorkers) {
499                 j = 0;
500                 version(HUNT_DEBUG) warning("All the worker threads are busy. Then nothing selected.");
501                 // No idle thread found after checking the whole pool.
502                 // No wait. Select one randomly.
503                 // Warning: It may be in a dead-lock status;
504                 break;
505             }
506         } while(selectedThread.state != ThreadState.IDLE);
507 
508         return selectedThread;
509     }
510 
511     PoolStatus checkStatus() {
512         PoolStatus status = new PoolStatus();
513 
514         status.total = pool.length;
515         foreach (ParallelismThread th; pool) {
516             if(th.state == ThreadState.BUSY) {
517                 status.active++;
518             } else if(th.state == ThreadState.IDLE) {
519                 status.idle++;
520             }
521             // tracef("Current: %s, WorkerThread: %s, state: %s", Thread.getThis().name(), t.name(), t.state);
522         }
523 
524         return status;
525     }
526 }