1 module hunt.util.TaskPool;
2 
3 import hunt.io.SimpleQueue;
4 import hunt.logging;
5 import hunt.system.Memory;
6 import hunt.util.Runnable;
7 
8 
9 import core.thread;
10 import core.atomic;
11 import core.sync.condition;
12 import core.sync.mutex;
13 
14 import std.conv;
15 import std.format;
16 import std.traits;
17 
18 private enum TaskStatus : ubyte {
19     ready,
20     processing,
21     done
22 }
23 
24 /* Atomics code.  These forward to core.atomic, but are written like this
25    for two reasons:
26 
27    1.  They used to actually contain ASM code and I don' want to have to change
28        to directly calling core.atomic in a zillion different places.
29 
30    2.  core.atomic has some misc. issues that make my use cases difficult
31        without wrapping it.  If I didn't wrap it, casts would be required
32        basically everywhere.
33 */
34 private void atomicSetUbyte(T)(ref T stuff, T newVal)
35         if (__traits(isIntegral, T) && is(T : ubyte)) {
36     //core.atomic.cas(cast(shared) &stuff, stuff, newVal);
37     atomicStore(*(cast(shared)&stuff), newVal);
38 }
39 
40 private ubyte atomicReadUbyte(T)(ref T val)
41         if (__traits(isIntegral, T) && is(T : ubyte)) {
42     return atomicLoad(*(cast(shared)&val));
43 }
44 
45 // This gets rid of the need for a lot of annoying casts in other parts of the
46 // code, when enums are involved.
47 private bool atomicCasUbyte(T)(ref T stuff, T testVal, T newVal)
48         if (__traits(isIntegral, T) && is(T : ubyte)) {
49     return core.atomic.cas(cast(shared)&stuff, testVal, newVal);
50 }
51 
52 
53 /**
54  * 
55  */
56 class AbstractTask : Runnable {
57 
58     Throwable exception;
59     ubyte taskStatus = TaskStatus.ready;
60 
61     final void run() {
62         atomicSetUbyte(taskStatus, TaskStatus.processing);
63         try {
64             onRun();
65         } catch (Throwable e) {
66             exception = e;
67             debug warning(e.msg);
68             version(HUNT_DEBUG) {
69                 warning(e);
70             } 
71         }
72 
73         atomicSetUbyte(taskStatus, TaskStatus.done);
74     }
75 
76     abstract protected void onRun();
77 
78     bool done() @property {
79         if (atomicReadUbyte(taskStatus) == TaskStatus.done) {
80             if (exception) {
81                 throw exception;
82             }
83             return true;
84         }
85         return false;
86     }
87 }
88 
89 /**
90  * 
91  */
92 class Task(alias fun, Args...) : AbstractTask {
93     Args _args;
94 
95     static if (Args.length > 0) {
96         this(Args args) {
97             _args = args;
98         }
99     } else {
100         this() {
101         }
102     }
103 
104     /**
105     The return type of the function called by this `Task`.  This can be
106     `void`.
107     */
108     alias ReturnType = typeof(fun(_args));
109 
110     static if (!is(ReturnType == void)) {
111         static if (is(typeof(&fun(_args)))) {
112             // Ref return.
113             ReturnType* returnVal;
114 
115             ref ReturnType fixRef(ReturnType* val) {
116                 return *val;
117             }
118 
119         } else {
120             ReturnType returnVal;
121 
122             ref ReturnType fixRef(ref ReturnType val) {
123                 return val;
124             }
125         }
126     }
127 
128     private static void impl(AbstractTask myTask) {
129         auto myCastedTask = cast(typeof(this)) myTask;
130         static if (is(ReturnType == void)) {
131             fun(myCastedTask._args);
132         } else static if (is(typeof(addressOf(fun(myCastedTask._args))))) {
133             myCastedTask.returnVal = addressOf(fun(myCastedTask._args));
134         } else {
135             myCastedTask.returnVal = fun(myCastedTask._args);
136         }
137     }
138 
139     protected override void onRun() {
140         impl(this);
141     }
142 }
143 
144 T* addressOf(T)(ref T val) {
145     return &val;
146 }
147 
148 auto makeTask(alias fun, Args...)(Args args) {
149     return new Task!(fun, Args)(args);
150 }
151 
152 auto makeTask(F, Args...)(F delegateOrFp, Args args)
153         if (is(typeof(delegateOrFp(args)))) // && !isSafeTask!F
154         {
155     return new Task!(run, F, Args)(delegateOrFp, args);
156 }
157 
158 // Calls `fpOrDelegate` with `args`.  This is an
159 // adapter that makes `Task` work with delegates, function pointers and
160 // functors instead of just aliases.
161 ReturnType!F run(F, Args...)(F fpOrDelegate, ref Args args) {
162     return fpOrDelegate(args);
163 }
164 
165 /*
166 This class serves two purposes:
167 
168 1.  It distinguishes std.parallelism threads from other threads so that
169     the std.parallelism daemon threads can be terminated.
170 
171 2.  It adds a reference to the pool that the thread is a member of,
172     which is also necessary to allow the daemon threads to be properly
173     terminated.
174 */
175 final class ParallelismThread : Thread {
176 
177     this(void delegate() dg) {
178         super(dg);
179         taskQueue = new BlockingQueue!(AbstractTask)(10.seconds);
180         state = ThreadState.NEW;
181     }
182 
183     ThreadState state;
184     int factor = -1;
185 
186     TaskPool pool;
187     Queue!(AbstractTask) taskQueue;
188 }
189 
190 enum ThreadState {
191     NEW,
192     BUSY,
193     IDLE,
194     TERMINATED
195 }
196 
197 /**
198  * 
199  */
200 enum PoolState : ubyte {
201     running,
202     finishing,
203     stopNow
204 }
205 
206 /**
207  * 
208  */
209 class PoolStatus {
210     size_t total;
211 
212     int active;
213 
214     int idle;
215     
216     int waiting;
217 
218     override string toString() {
219         return format("total: %d, active: %d, idle: %d, waiting: %d", 
220             total, active, idle, waiting);
221     }
222 }
223 
224 /** 
225  *
226  */
227 class TaskPool {
228 
229     private ParallelismThread[] pool;
230     private PoolState status = PoolState.running;
231 
232     // The instanceStartIndex of the next instance that will be created.
233     // __gshared size_t nextInstanceIndex = 1;
234 
235     // The index of the first thread in this instance.
236     // immutable size_t instanceStartIndex;
237 
238     // The index of the current thread.
239     static size_t threadIndex;
240 
241     // The index that the next thread to be initialized in this pool will have.
242     shared size_t nextThreadIndex;
243 
244     Condition workerCondition;
245     Condition waiterCondition;
246     Mutex queueMutex;
247     Mutex waiterMutex; // For waiterCondition
248 
249     bool isSingleTask = false;
250 
251     /**
252     Default constructor that initializes a `TaskPool` with
253     `totalCPUs` - 1 worker threads.  The minus 1 is included because the
254     main thread will also be available to do work.
255 
256     Note:  On single-core machines, the primitives provided by `TaskPool`
257            operate transparently in single-threaded mode.
258      */
259     this() {
260         this(totalCPUs - 1);
261     }
262     
263     /**
264     Allows for custom number of worker threads.
265     */
266     this(size_t nWorkers, bool isDaemon = false) {
267         if (nWorkers == 0)
268             nWorkers = 1;
269 
270         queueMutex = new Mutex(this);
271         waiterMutex = new Mutex();
272         workerCondition = new Condition(queueMutex);
273         waiterCondition = new Condition(waiterMutex);
274         nextThreadIndex = 0;
275 
276         pool = new ParallelismThread[nWorkers];
277 
278         foreach (size_t index, ref ParallelismThread poolThread; pool) {
279             poolThread = new ParallelismThread(&startWorkLoop);
280             poolThread.pool = this;
281             poolThread.name = "worker-thread-" ~ index.to!string();
282             poolThread.start();
283         }
284 
285         this.isDaemon = isDaemon;
286     }
287 
288     bool isDaemon() @property @trusted {
289         return pool[0].isDaemon;
290     }
291 
292     /// Ditto
293     void isDaemon(bool newVal) @property @trusted {
294         foreach (thread; pool) {
295             thread.isDaemon = newVal;
296         }
297     }
298 
299     // This function performs initialization for each thread that affects
300     // thread local storage and therefore must be done from within the
301     // worker thread.  It then calls executeWorkLoop().
302     private void startWorkLoop() {
303         // Initialize thread index.
304         size_t index = atomicOp!("+=")(nextThreadIndex, 1);
305         threadIndex = index - 1;
306 
307         executeWorkLoop();
308     }
309 
310     // This is the main work loop that worker threads spend their time in
311     // until they terminate.  It's also entered by non-worker threads when
312     // finish() is called with the blocking variable set to true.
313     private void executeWorkLoop() {
314         ParallelismThread workerThread = pool[threadIndex];
315         workerThread.state = ThreadState.IDLE;
316         scope(exit) {
317             version(HUNT_IO_DEBUG) infof("Thread exited, threadIndex: %d, name: %s", threadIndex, workerThread.name());
318             workerThread.state = ThreadState.TERMINATED;
319         }
320 
321         while (atomicReadUbyte(status) != PoolState.stopNow) {
322             AbstractTask task = workerThread.taskQueue.dequeue();
323             if (task is null) {
324                 version(HUNT_IO_DEBUG) {
325                     warningf("A empty task returned on thread: %s", workerThread.name);
326                 }
327             } else {
328                 version(HUNT_IO_DEBUG) infof("running a task in thread: %s", workerThread.name());
329                 workerThread.state = ThreadState.BUSY;
330 
331                 scope(exit) {
332                     workerThread.state = ThreadState.IDLE;
333                     workerThread.factor = -1;
334 
335                     version(HUNT_IO_DEBUG) infof("A task finished in thread: %s", workerThread.name());
336                 }
337                 
338                 doJob(task);
339             }
340         }
341     }
342 
343     private void doJob(AbstractTask job) nothrow {        
344         try {
345             job.run();
346         } catch(Throwable t) {
347             warning(t.msg);
348             version(HUNT_DEBUG) warning(t);
349         }
350     }
351 
352     private void waiterLock() {
353         if (!isSingleTask)
354             waiterMutex.lock();
355     }
356 
357     private void waiterUnlock() {
358         if (!isSingleTask)
359             waiterMutex.unlock();
360     }
361 
362     private void wait() {
363         if (!isSingleTask)
364             workerCondition.wait();
365     }
366 
367     private void notify() {
368         if (!isSingleTask)
369             workerCondition.notify();
370     }
371 
372     private void notifyAll() {
373         if (!isSingleTask)
374             workerCondition.notifyAll();
375     }
376 
377     private void waitUntilCompletion() {
378         waiterCondition.wait();
379     }
380 
381     private void notifyWaiters() {
382         if (!isSingleTask)
383             waiterCondition.notifyAll();
384     }
385 
386     void stop() @trusted {
387         // queueLock();
388         // scope(exit) queueUnlock();
389         atomicSetUbyte(status, PoolState.stopNow);
390         notifyAll();
391     }
392 
393     void finish(bool blocking = false) @trusted {
394         {
395             // queueLock();
396             // scope(exit) queueUnlock();
397             atomicCasUbyte(status, PoolState.running, PoolState.finishing);
398             notifyAll();
399         }
400         if (blocking) {
401             // Use this thread as a worker until everything is finished.
402             // stopWorkLoop();
403             // taskQueue.wakeup();
404             executeWorkLoop();
405 
406             foreach (t; pool) {
407                 // Maybe there should be something here to prevent a thread
408                 // from calling join() on itself if this function is called
409                 // from a worker thread in the same pool, but:
410                 //
411                 // 1.  Using an if statement to skip join() would result in
412                 //     finish() returning without all tasks being finished.
413                 //
414                 // 2.  If an exception were thrown, it would bubble up to the
415                 //     Task from which finish() was called and likely be
416                 //     swallowed.
417                 t.join();
418             }
419         }
420     }
421 
422     void put(int factor, AbstractTask task) {
423         int nWorkers = cast(int)pool.length;
424         if(factor<0) factor = -factor;
425         int i = factor % nWorkers;
426         ParallelismThread selectedThread = pool[i];
427 
428         version(HUNT_IO_DEBUG) {
429             if(selectedThread.state != ThreadState.IDLE) {
430                 int lastFactor = selectedThread.factor;
431                 if(lastFactor != -1 && lastFactor != factor) {
432                     warningf("The %s is busy. For factor, last: %d, current: %d. ", selectedThread.name(), 
433                         lastFactor, factor);
434                 }
435             }
436         }
437         selectedThread.factor = factor;
438 
439         version(HUNT_IO_DEBUG) {
440             tracef("Selected worker thread[%d]: %s, factor: %d", i, selectedThread.name(), factor);
441             // PoolStatus status = checkStatus();
442             // warning(status.toString());
443         }
444 
445         selectedThread.taskQueue.enqueue(task);
446     }
447 
448     // private ParallelismThread getIdleThread(int startIndex = 0) {
449     //     int j = 0;
450     //     int nWorkers = cast(int)pool.length;
451     //     ParallelismThread selectedThread;
452 
453     //     do {            
454     //         startIndex = (startIndex+1) % nWorkers;
455     //         selectedThread = pool[startIndex];
456     //         version(HUNT_IO_DEBUG) {
457     //             warningf("The worker thread status: %s, index: %d", 
458     //             selectedThread.name(), startIndex);
459     //         }
460 
461     //         j++;
462     //         if(j >= nWorkers) {
463     //             j = 0;
464     //             version(HUNT_DEBUG) warning("All the worker threads are busy. Then nothing selected.");
465     //             // No idle thread found after checking the whole pool.
466     //             // No wait. Select one randomly.
467     //             // Warning: It may be in a dead-lock status;
468     //             break;
469     //         }
470     //     } while(selectedThread.state != ThreadState.IDLE);
471 
472     //     return selectedThread;
473     // }
474 
475     PoolStatus checkStatus() {
476         PoolStatus status = new PoolStatus();
477 
478         status.total = pool.length;
479         foreach (size_t index, ParallelismThread th; pool) {
480             version(HUNT_IO_DEBUG) {
481                 tracef("Thread[%d]: %s, state: %s", index, th.name, th.state);
482             }
483             if(th.state == ThreadState.BUSY) {
484                 status.active++;
485             } else if(th.state == ThreadState.IDLE) {
486                 status.idle++;
487             }
488             // tracef("Current: %s, WorkerThread: %s, state: %s", Thread.getThis().name(), t.name(), t.state);
489         }
490 
491         return status;
492     }
493 }