1 module hunt.concurrency.ForkJoinTaskHelper;
2 
3 import hunt.Exceptions;
4 import hunt.util.WeakReference;
5 
6 import core.sync.condition;
7 import core.sync.mutex;
8 import core.thread;
9 
10 import std.exception;
11 
12 alias ReentrantLock = Mutex;
13 
14 
15 interface IForkJoinTask {
16     /**
17      * The status field holds run control status bits packed into a
18      * single int to ensure atomicity.  Status is initially zero, and
19      * takes on nonnegative values until completed, upon which it
20      * holds (sign bit) DONE, possibly with ABNORMAL (cancelled or
21      * exceptional) and THROWN (in which case an exception has been
22      * stored). Tasks with dependent blocked waiting joiners have the
23      * SIGNAL bit set.  Completion of a task with SIGNAL set awakens
24      * any waiters via notifyAll. (Waiters also help signal others
25      * upon completion.)
26      *
27      * These control bits occupy only (some of) the upper half (16
28      * bits) of status field. The lower bits are used for user-defined
29      * tags.
30      */
31     int getStatus(); // accessed directly by pool and workers
32 
33     int doExec();
34 
35     void internalWait(long timeout);
36 
37     bool cancel(bool mayInterruptIfRunning);
38 
39     /**
40      * Cancels, ignoring any exceptions thrown by cancel. Used during
41      * worker and pool shutdown. Cancel is spec'ed not to throw any
42      * exceptions, but if it does anyway, we have no recourse during
43      * shutdown, so guard against this case.
44      */
45     static void cancelIgnoringExceptions(IForkJoinTask t) {
46         if (t !is null && t.getStatus() >= 0) {
47             try {
48                 t.cancel(false);
49             } catch (Throwable ignore) {
50             }
51         }
52     }
53 }
54 
55 
56 
57 /**
58  * Key-value nodes for exception table.  The chained hash table
59  * uses identity comparisons, full locking, and weak references
60  * for keys. The table has a fixed capacity because it only
61  * maintains task exceptions long enough for joiners to access
62  * them, so should never become very large for sustained
63  * periods. However, since we do not know when the last joiner
64  * completes, we must use weak references and expunge them. We do
65  * so on each operation (hence full locking). Also, some thread in
66  * any ForkJoinPool will call helpExpungeStaleExceptions when its
67  * pool becomes isQuiescent.
68  */
69 final class ExceptionNode : WeakReference!IForkJoinTask { 
70     Throwable ex;
71     ExceptionNode next;
72     ThreadID thrower;  // use id not ref to avoid weak cycles
73     size_t hashCode;  // store task hashCode before weak ref disappears
74     this(IForkJoinTask task, Throwable ex, ExceptionNode next) {
75         this.ex = ex;
76         this.next = next;
77         this.thrower = Thread.getThis().id();
78         this.hashCode = hashOf(task);
79         super(task);
80     }
81 }
82 
83 
84 /**
85 */
86 struct ForkJoinTaskHelper {
87 
88     // Exception table support
89 
90     /**
91      * Hash table of exceptions thrown by tasks, to enable reporting
92      * by callers. Because exceptions are rare, we don't directly keep
93      * them with task objects, but instead use a weak ref table.  Note
94      * that cancellation exceptions don't appear in the table, but are
95      * instead recorded as status values.
96      *
97      * The exception table has a fixed capacity.
98      */
99     package __gshared ExceptionNode[] exceptionTable;
100 
101     /** Lock protecting access to exceptionTable. */
102     package __gshared ReentrantLock exceptionTableLock;
103 
104     /** Reference queue of stale exceptionally completed tasks. */
105     // private __gshared ReferenceQueue!(IForkJoinTask) exceptionTableRefQueue;
106 
107     shared static this() {
108         exceptionTable = new ExceptionNode[32];
109         exceptionTableLock = new ReentrantLock();
110         // exceptionTableRefQueue = new ReferenceQueue!(IForkJoinTask)();
111     }
112 
113     /**
114      * Polls stale refs and removes them. Call only while holding lock.
115      */
116     package static void expungeStaleExceptions() {
117         // TODO: Tasks pending completion -@zxp at 12/22/2018, 5:04:07 PM
118         // 
119         implementationMissing(false);
120         // for (Object x; (x = exceptionTableRefQueue.poll()) !is null;) {
121         //     ExceptionNode en = cast(ExceptionNode)x;
122         //     if (en !is null) {
123         //         ExceptionNode[] t = exceptionTable;
124         //         int i = en.hashCode & (t.length - 1);
125         //         ExceptionNode e = t[i];
126         //         ExceptionNode pred = null;
127         //         while (e !is null) {
128         //             ExceptionNode next = e.next;
129         //             if (e == x) {
130         //                 if (pred is null)
131         //                     t[i] = next;
132         //                 else
133         //                     pred.next = next;
134         //                 break;
135         //             }
136         //             pred = e;
137         //             e = next;
138         //         }
139         //     }
140         // }
141     }
142 
143     /**
144      * If lock is available, polls stale refs and removes them.
145      * Called from ForkJoinPool when pools become quiescent.
146      */
147     static void helpExpungeStaleExceptions() {
148         ReentrantLock lock = exceptionTableLock;
149         if (lock.tryLock()) {
150             try {
151                 expungeStaleExceptions();
152             } finally {
153                 lock.unlock();
154             }
155         }
156     }
157 
158     /**
159      * A version of "sneaky throw" to relay exceptions.
160      */
161     static void rethrow(Throwable ex) {
162         uncheckedThrow!(RuntimeException)(ex);
163     }
164 
165     /**
166      * The sneaky part of sneaky throw, relying on generics
167      * limitations to evade compiler complaints about rethrowing
168      * unchecked exceptions.
169      */
170     static void uncheckedThrow(T)(Throwable t) if(is(T : Throwable)) {
171         if (t !is null)
172             throw cast(T)t; // rely on vacuous cast
173         else
174             throw new Error("Unknown Exception");
175     }
176 }