1 module hunt.concurrency.ForkJoinTaskHelper; 2 3 import hunt.Exceptions; 4 import hunt.util.WeakReference; 5 6 import core.sync.condition; 7 import core.sync.mutex; 8 import core.thread; 9 10 import std.exception; 11 12 alias ReentrantLock = Mutex; 13 14 15 interface IForkJoinTask { 16 /** 17 * The status field holds run control status bits packed into a 18 * single int to ensure atomicity. Status is initially zero, and 19 * takes on nonnegative values until completed, upon which it 20 * holds (sign bit) DONE, possibly with ABNORMAL (cancelled or 21 * exceptional) and THROWN (in which case an exception has been 22 * stored). Tasks with dependent blocked waiting joiners have the 23 * SIGNAL bit set. Completion of a task with SIGNAL set awakens 24 * any waiters via notifyAll. (Waiters also help signal others 25 * upon completion.) 26 * 27 * These control bits occupy only (some of) the upper half (16 28 * bits) of status field. The lower bits are used for user-defined 29 * tags. 30 */ 31 int getStatus(); // accessed directly by pool and workers 32 33 int doExec(); 34 35 void internalWait(long timeout); 36 37 bool cancel(bool mayInterruptIfRunning); 38 39 /** 40 * Cancels, ignoring any exceptions thrown by cancel. Used during 41 * worker and pool shutdown. Cancel is spec'ed not to throw any 42 * exceptions, but if it does anyway, we have no recourse during 43 * shutdown, so guard against this case. 44 */ 45 static void cancelIgnoringExceptions(IForkJoinTask t) { 46 if (t !is null && t.getStatus() >= 0) { 47 try { 48 t.cancel(false); 49 } catch (Throwable ignore) { 50 } 51 } 52 } 53 } 54 55 56 57 /** 58 * Key-value nodes for exception table. The chained hash table 59 * uses identity comparisons, full locking, and weak references 60 * for keys. The table has a fixed capacity because it only 61 * maintains task exceptions long enough for joiners to access 62 * them, so should never become very large for sustained 63 * periods. However, since we do not know when the last joiner 64 * completes, we must use weak references and expunge them. We do 65 * so on each operation (hence full locking). Also, some thread in 66 * any ForkJoinPool will call helpExpungeStaleExceptions when its 67 * pool becomes isQuiescent. 68 */ 69 final class ExceptionNode : WeakReference!IForkJoinTask { 70 Throwable ex; 71 ExceptionNode next; 72 ThreadID thrower; // use id not ref to avoid weak cycles 73 size_t hashCode; // store task hashCode before weak ref disappears 74 this(IForkJoinTask task, Throwable ex, ExceptionNode next) { 75 this.ex = ex; 76 this.next = next; 77 this.thrower = Thread.getThis().id(); 78 this.hashCode = hashOf(task); 79 super(task); 80 } 81 } 82 83 84 /** 85 */ 86 struct ForkJoinTaskHelper { 87 88 // Exception table support 89 90 /** 91 * Hash table of exceptions thrown by tasks, to enable reporting 92 * by callers. Because exceptions are rare, we don't directly keep 93 * them with task objects, but instead use a weak ref table. Note 94 * that cancellation exceptions don't appear in the table, but are 95 * instead recorded as status values. 96 * 97 * The exception table has a fixed capacity. 98 */ 99 package __gshared ExceptionNode[] exceptionTable; 100 101 /** Lock protecting access to exceptionTable. */ 102 package __gshared ReentrantLock exceptionTableLock; 103 104 /** Reference queue of stale exceptionally completed tasks. */ 105 // private __gshared ReferenceQueue!(IForkJoinTask) exceptionTableRefQueue; 106 107 shared static this() { 108 exceptionTable = new ExceptionNode[32]; 109 exceptionTableLock = new ReentrantLock(); 110 // exceptionTableRefQueue = new ReferenceQueue!(IForkJoinTask)(); 111 } 112 113 /** 114 * Polls stale refs and removes them. Call only while holding lock. 115 */ 116 package static void expungeStaleExceptions() { 117 // TODO: Tasks pending completion -@zxp at 12/22/2018, 5:04:07 PM 118 // 119 implementationMissing(false); 120 // for (Object x; (x = exceptionTableRefQueue.poll()) !is null;) { 121 // ExceptionNode en = cast(ExceptionNode)x; 122 // if (en !is null) { 123 // ExceptionNode[] t = exceptionTable; 124 // int i = en.hashCode & (t.length - 1); 125 // ExceptionNode e = t[i]; 126 // ExceptionNode pred = null; 127 // while (e !is null) { 128 // ExceptionNode next = e.next; 129 // if (e == x) { 130 // if (pred is null) 131 // t[i] = next; 132 // else 133 // pred.next = next; 134 // break; 135 // } 136 // pred = e; 137 // e = next; 138 // } 139 // } 140 // } 141 } 142 143 /** 144 * If lock is available, polls stale refs and removes them. 145 * Called from ForkJoinPool when pools become quiescent. 146 */ 147 static void helpExpungeStaleExceptions() { 148 ReentrantLock lock = exceptionTableLock; 149 if (lock.tryLock()) { 150 try { 151 expungeStaleExceptions(); 152 } finally { 153 lock.unlock(); 154 } 155 } 156 } 157 158 /** 159 * A version of "sneaky throw" to relay exceptions. 160 */ 161 static void rethrow(Throwable ex) { 162 uncheckedThrow!(RuntimeException)(ex); 163 } 164 165 /** 166 * The sneaky part of sneaky throw, relying on generics 167 * limitations to evade compiler complaints about rethrowing 168 * unchecked exceptions. 169 */ 170 static void uncheckedThrow(T)(Throwable t) if(is(T : Throwable)) { 171 if (t !is null) 172 throw cast(T)t; // rely on vacuous cast 173 else 174 throw new Error("Unknown Exception"); 175 } 176 }