1 /*
2  * Hunt - A refined core library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net/
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module hunt.concurrency.FutureTask;
13 
14 import hunt.concurrency.atomic.AtomicHelper;
15 import hunt.concurrency.Executors;
16 import hunt.concurrency.Future;
17 import hunt.concurrency.thread;
18 import hunt.Exceptions;
19 import hunt.util.Common;
20 
21 static if(CompilerHelper.isGreaterThan(2093)) {
22     import core.thread.osthread;
23 } else {
24     import core.thread;
25 }
26 
27 import core.time;
28 
29 import hunt.concurrency.thread;
30 import hunt.logging.ConsoleLogger;
31 
32 
33 /**
34  * A cancellable asynchronous computation.  This class provides a base
35  * implementation of {@link Future}, with methods to start and cancel
36  * a computation, query to see if the computation is complete, and
37  * retrieve the result of the computation.  The result can only be
38  * retrieved when the computation has completed; the {@code get}
39  * methods will block if the computation has not yet completed.  Once
40  * the computation has completed, the computation cannot be restarted
41  * or cancelled (unless the computation is invoked using
42  * {@link #runAndReset}).
43  *
44  * <p>A {@code FutureTask} can be used to wrap a {@link Callable} or
45  * {@link Runnable} object.  Because {@code FutureTask} implements
46  * {@code Runnable}, a {@code FutureTask} can be submitted to an
47  * {@link Executor} for execution.
48  *
49  * <p>In addition to serving as a standalone class, this class provides
50  * {@code protected} functionality that may be useful when creating
51  * customized task classes.
52  *
53  * @author Doug Lea
54  * @param (V) The result type returned by this FutureTask's {@code get} methods
55  */
56 class FutureTask(V) : RunnableFuture!(V) {
57     /*
58      * Revision notes: This differs from previous versions of this
59      * class that relied on AbstractQueuedSynchronizer, mainly to
60      * avoid surprising users about retaining interrupt status during
61      * cancellation races. Sync control in the current design relies
62      * on a "state" field updated via CAS to track completion, along
63      * with a simple Treiber stack to hold waiting threads.
64      */
65 
66     /**
67      * The run state of this task, initially NEW.  The run state
68      * transitions to a terminal state only in methods set,
69      * setException, and cancel.  During completion, state may take on
70      * values of COMPLETING (while outcome is being set) or
71      * INTERRUPTING (only while interrupting the runner to satisfy a
72      * cancel(true)). Transitions from these intermediate to final
73      * states use cheaper ordered/lazy writes because values are unique
74      * and cannot be further modified.
75      *
76      * Possible state transitions:
77      * NEW -> COMPLETING -> NORMAL
78      * NEW -> COMPLETING -> EXCEPTIONAL
79      * NEW -> CANCELLED
80      * NEW -> INTERRUPTING -> INTERRUPTED
81      */
82     private shared(int) state;
83     private enum int NEW          = 0;
84     private enum int COMPLETING   = 1;
85     private enum int NORMAL       = 2;
86     private enum int EXCEPTIONAL  = 3;
87     private enum int CANCELLED    = 4;
88     private enum int INTERRUPTING = 5;
89     private enum int INTERRUPTED  = 6;
90 
91     /** The underlying callable; nulled out after running */
92     private Callable!(V) callable;
93     /** The result to return or exception to throw from get() */
94     static if(!is(V == void)) {
95         private V outcome; // non-volatile, protected by state reads/writes
96     }
97     private Throwable exception;
98     /** The thread running the callable; CASed during run() */
99     private Thread runner;
100     /** Treiber stack of waiting threads */
101     private WaitNode waiters;
102 
103     /**
104      * Returns result or throws exception for completed task.
105      *
106      * @param s completed state value
107      */
108 
109     private V report(int s) {
110         // Object x = outcome;
111         if (s == NORMAL) {
112             static if(!is(V == void)) {
113                 return outcome; // cast(V)
114             } else {
115                 return ; // cast(V)
116             }
117         }
118             
119         if (s >= CANCELLED)
120             throw new CancellationException();
121         throw new ExecutionException(exception);
122     }
123 
124     /**
125      * Creates a {@code FutureTask} that will, upon running, execute the
126      * given {@code Callable}.
127      *
128      * @param  callable the callable task
129      * @throws NullPointerException if the callable is null
130      */
131     this(Callable!(V) callable) {
132         if (callable is null)
133             throw new NullPointerException();
134         this.callable = callable;
135         this.state = NEW;       // ensure visibility of callable
136     }
137 
138     /**
139      * Creates a {@code FutureTask} that will, upon running, execute the
140      * given {@code Runnable}, and arrange that {@code get} will return the
141      * given result on successful completion.
142      *
143      * @param runnable the runnable task
144      * @param result the result to return on successful completion. If
145      * you don't need a particular result, consider using
146      * constructions of the form:
147      * {@code Future<?> f = new FutureTask!(void)(runnable, null)}
148      * @throws NullPointerException if the runnable is null
149      */
150 static if(is(V == void)) {
151     this(Runnable runnable) {
152         this.callable = Executors.callable(runnable);
153         this.state = NEW;       // ensure visibility of callable
154     }
155 } else {
156     this(Runnable runnable, V result) {
157         this.callable = Executors.callable(runnable, result);
158         this.state = NEW;       // ensure visibility of callable
159     }
160 }
161 
162     bool isCancelled() {
163         return state >= CANCELLED;
164     }
165 
166     bool isDone() {
167         return state != NEW;
168     }
169 
170     bool cancel(bool mayInterruptIfRunning) {
171         if (!(state == NEW && AtomicHelper.compareAndSet(state, NEW,
172             mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
173             return false;
174         try {    // in case call to interrupt throws exception
175             if (mayInterruptIfRunning) {
176                 try {
177                     ThreadEx t = cast(ThreadEx) runner;
178                     if (t !is null)
179                         t.interrupt();
180                 } finally { // final state
181                     AtomicHelper.store(state, INTERRUPTED);
182                 }
183             }
184         } finally {
185             finishCompletion();
186         }
187         return true;
188     }
189 
190     /**
191      * @throws CancellationException {@inheritDoc}
192      */
193     V get() {
194         int s = state;
195         if (s <= COMPLETING)
196             s = awaitDone(false, Duration.zero);
197         return report(s);
198     }
199 
200     /**
201      * @throws CancellationException {@inheritDoc}
202      */
203     V get(Duration timeout) {
204         int s = state;
205         if (s <= COMPLETING &&
206             (s = awaitDone(true, timeout)) <= COMPLETING)
207             throw new TimeoutException();
208         return report(s);
209     }
210 
211     /**
212      * Protected method invoked when this task transitions to state
213      * {@code isDone} (whether normally or via cancellation). The
214      * default implementation does nothing.  Subclasses may override
215      * this method to invoke completion callbacks or perform
216      * bookkeeping. Note that you can query status inside the
217      * implementation of this method to determine whether this task
218      * has been cancelled.
219      */
220     protected void done() { }
221 
222     /**
223      * Sets the result of this future to the given value unless
224      * this future has already been set or has been cancelled.
225      *
226      * <p>This method is invoked internally by the {@link #run} method
227      * upon successful completion of the computation.
228      *
229      * @param v the value
230      */
231 
232 static if(is(V == void)) {
233     protected void set() {
234         if (AtomicHelper.compareAndSet(state, NEW, COMPLETING)) {
235             // outcome = v;
236             AtomicHelper.store(state, NORMAL);  // final state
237             finishCompletion();
238         }
239     }
240 
241     void run() {
242         if (state != NEW ||
243             !AtomicHelper.compareAndSet(runner, null, Thread.getThis()))
244             return;
245         try {
246             Callable!(V) c = callable;
247             if (c !is null && state == NEW) {
248                 bool ran;
249                 try {
250                     c.call();
251                     ran = true;
252                 } catch (Throwable ex) {
253                     ran = false;
254                     setException(ex);
255                 }
256                 if (ran)
257                     set();
258             }
259         } finally {
260             // runner must be non-null until state is settled to
261             // prevent concurrent calls to run()
262             runner = null;
263             // state must be re-read after nulling runner to prevent
264             // leaked interrupts
265             int s = state;
266             if (s >= INTERRUPTING)
267                 handlePossibleCancellationInterrupt(s);
268         }
269     }    
270 } else {
271     protected void set(V v) {
272         if (AtomicHelper.compareAndSet(state, NEW, COMPLETING)) {
273             outcome = v;
274             AtomicHelper.store(state, NORMAL);  // final state
275             finishCompletion();
276         }
277     }
278 
279     void run() {
280         if (state != NEW ||
281             !AtomicHelper.compareAndSet(runner, null, Thread.getThis()))
282             return;
283         try {
284             Callable!(V) c = callable;
285             if (c !is null && state == NEW) {
286                 V result;
287                 bool ran;
288                 try {
289                     result = c.call();
290                     ran = true;
291                 } catch (Throwable ex) {
292                     result = V.init;
293                     ran = false;
294                     setException(ex);
295                 }
296                 if (ran)
297                     set(result);
298             }
299         } finally {
300             // runner must be non-null until state is settled to
301             // prevent concurrent calls to run()
302             runner = null;
303             // state must be re-read after nulling runner to prevent
304             // leaked interrupts
305             int s = state;
306             if (s >= INTERRUPTING)
307                 handlePossibleCancellationInterrupt(s);
308         }
309     }    
310 }
311 
312     /**
313      * Causes this future to report an {@link ExecutionException}
314      * with the given throwable as its cause, unless this future has
315      * already been set or has been cancelled.
316      *
317      * <p>This method is invoked internally by the {@link #run} method
318      * upon failure of the computation.
319      *
320      * @param t the cause of failure
321      */
322     protected void setException(Throwable t) {
323         if (AtomicHelper.compareAndSet(state, NEW, COMPLETING)) {
324             exception = t;
325             AtomicHelper.store(state, EXCEPTIONAL); // final state
326             finishCompletion();
327         }
328     }
329 
330     /**
331      * Executes the computation without setting its result, and then
332      * resets this future to initial state, failing to do so if the
333      * computation encounters an exception or is cancelled.  This is
334      * designed for use with tasks that intrinsically execute more
335      * than once.
336      *
337      * @return {@code true} if successfully run and reset
338      */
339     protected bool runAndReset() {
340         if (state != NEW ||
341             !AtomicHelper.compareAndSet(runner, null, Thread.getThis()))
342             return false;
343         bool ran = false;
344         int s = state;
345         try {
346             Callable!(V) c = callable;
347             if (c !is null && s == NEW) {
348                 try {
349                     c.call(); // don't set result
350                     ran = true;
351                 } catch (Throwable ex) {
352                     setException(ex);
353                 }
354             }
355         } finally {
356             // runner must be non-null until state is settled to
357             // prevent concurrent calls to run()
358             runner = null;
359             // state must be re-read after nulling runner to prevent
360             // leaked interrupts
361             s = state;
362             if (s >= INTERRUPTING)
363                 handlePossibleCancellationInterrupt(s);
364         }
365         return ran && s == NEW;
366     }
367 
368     /**
369      * Ensures that any interrupt from a possible cancel(true) is only
370      * delivered to a task while in run or runAndReset.
371      */
372     private void handlePossibleCancellationInterrupt(int s) {
373         // It is possible for our interrupter to stall before getting a
374         // chance to interrupt us.  Let's spin-wait patiently.
375         if (s == INTERRUPTING)
376             while (state == INTERRUPTING)
377                 Thread.yield(); // wait out pending interrupt
378 
379         assert(state == INTERRUPTED);
380 
381         // We want to clear any interrupt we may have received from
382         // cancel(true).  However, it is permissible to use interrupts
383         // as an independent mechanism for a task to communicate with
384         // its caller, and there is no way to clear only the
385         // cancellation interrupt.
386         //
387         ThreadEx.interrupted();
388     }
389 
390     /**
391      * Simple linked list nodes to record waiting threads in a Treiber
392      * stack.  See other classes such as Phaser and SynchronousQueue
393      * for more detailed explanation.
394      */
395     static final class WaitNode {
396         Thread thread;
397         WaitNode next;
398         this() { thread = Thread.getThis(); }
399     }
400 
401     /**
402      * Removes and signals all waiting threads, invokes done(), and
403      * nulls out callable.
404      */
405     private void finishCompletion() {
406         // assert state > COMPLETING;
407         for (WaitNode q; (q = waiters) !is null;) {
408             if (AtomicHelper.compareAndSet(waiters, q, null)) {
409                 for (;;) {
410                     Thread t = q.thread;
411                     if (t !is null) {
412                         q.thread = null;
413                         LockSupport.unpark(t);
414                     }
415                     WaitNode next = q.next;
416                     if (next is null)
417                         break;
418                     q.next = null; // unlink to help gc
419                     q = next;
420                 }
421                 break;
422             }
423         }
424 
425         done();
426 
427         callable = null;        // to reduce footprint
428     }
429 
430     /**
431      * Awaits completion or aborts on interrupt or timeout.
432      *
433      * @param timed true if use timed waits
434      * @param duration time to wait, if timed
435      * @return state upon completion or at timeout
436      */
437     private int awaitDone(bool timed, Duration timeout) {
438         // The code below is very delicate, to achieve these goals:
439         // - call nanoTime exactly once for each call to park
440         // - if nanos <= 0L, return promptly without allocation or nanoTime
441         // - if nanos == Long.MIN_VALUE, don't underflow
442         // - if nanos == Long.MAX_VALUE, and nanoTime is non-monotonic
443         //   and we suffer a spurious wakeup, we will do no worse than
444         //   to park-spin for a while
445         MonoTime startTime = MonoTime.zero;    // Special value 0L means not yet parked
446         WaitNode q = null;
447         bool queued = false;
448         for (;;) {
449             int s = state;
450             if (s > COMPLETING) {
451                 if (q !is null)
452                     q.thread = null;
453                 return s;
454             } else if (s == COMPLETING) {
455                 // We may have already promised (via isDone) that we are done
456                 // so never return empty-handed or throw InterruptedException
457                 Thread.yield();
458             } else if (ThreadEx.interrupted()) {
459                 removeWaiter(q);
460                 throw new InterruptedException();
461             } else if (q is null) {
462                 if (timed && timeout <= Duration.zero)
463                     return s;
464                 q = new WaitNode();
465             } else if (!queued) {
466                 queued = AtomicHelper.compareAndSet!(WaitNode)(waiters, q.next = waiters, q);
467             } else if (timed) {
468                 Duration parkDuration;
469                 if (startTime == MonoTime.zero) { // first time
470                     startTime = MonoTime.currTime;
471                     if (startTime == MonoTime.zero)
472                         startTime = MonoTime(1);
473                     parkDuration = timeout;
474                 } else {                    
475                     Duration elapsed = MonoTime.currTime - startTime;
476                     if (elapsed >= timeout) {
477                         removeWaiter(q);
478                         return state;
479                     }
480                     parkDuration = timeout - elapsed;
481                 }
482                 // nanoTime may be slow; recheck before parking
483                 if (state < COMPLETING) {
484                     LockSupport.park(this, parkDuration);
485                 }
486             } else {
487                 LockSupport.park(this);
488             }
489         }
490     }
491 
492     /**
493      * Tries to unlink a timed-out or interrupted wait node to avoid
494      * accumulating garbage.  Internal nodes are simply unspliced
495      * without CAS since it is harmless if they are traversed anyway
496      * by releasers.  To avoid effects of unsplicing from already
497      * removed nodes, the list is retraversed in case of an apparent
498      * race.  This is slow when there are a lot of nodes, but we don't
499      * expect lists to be long enough to outweigh higher-overhead
500      * schemes.
501      */
502     private void removeWaiter(WaitNode node) {
503         if (node !is null) {
504             node.thread = null;
505             retry:
506             for (;;) {          // restart on removeWaiter race
507                 for (WaitNode pred = null, q = waiters, s; q !is null; q = s) {
508                     s = q.next;
509                     if (q.thread !is null)
510                         pred = q;
511                     else if (pred !is null) {
512                         pred.next = s;
513                         if (pred.thread is null) // check for race
514                             continue retry;
515                     }
516                     else if (!AtomicHelper.compareAndSet(waiters, q, s))
517                         continue retry;
518                 }
519                 break;
520             }
521         }
522     }
523 
524     /**
525      * Returns a string representation of this FutureTask.
526      *
527      * @implSpec
528      * The default implementation returns a string identifying this
529      * FutureTask, as well as its completion state.  The state, in
530      * brackets, contains one of the strings {@code "Completed Normally"},
531      * {@code "Completed Exceptionally"}, {@code "Cancelled"}, or {@code
532      * "Not completed"}.
533      *
534      * @return a string representation of this FutureTask
535      */
536     override string toString() {
537         string status;
538         switch (state) {
539         case NORMAL:
540             status = "[Completed normally]";
541             break;
542         case EXCEPTIONAL:
543             status = "[Completed exceptionally: " ~ exception.toString() ~ "]";
544             break;
545         case CANCELLED:
546         case INTERRUPTING:
547         case INTERRUPTED:
548             status = "[Cancelled]";
549             break;
550         default:
551             Callable!V callable = this.callable;
552             status = (callable is null)
553                 ? "[Not completed]"
554                 : "[Not completed, task = " ~ (cast(Object)callable).toString() ~ "]";
555         }
556         return super.toString() ~ status;
557     }
558 
559 }