1 /*
2  * Hunt - A refined core library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net/
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module hunt.concurrency.FutureTask;
13 
14 import hunt.concurrency.atomic.AtomicHelper;
15 import hunt.concurrency.Executors;
16 import hunt.concurrency.Future;
17 import hunt.concurrency.thread;
18 import hunt.Exceptions;
19 import hunt.util.Common;
20 
21 import core.thread;
22 import core.time;
23 
24 import hunt.concurrency.thread;
25 import hunt.logging.ConsoleLogger;
26 
27 
28 /**
29  * A cancellable asynchronous computation.  This class provides a base
30  * implementation of {@link Future}, with methods to start and cancel
31  * a computation, query to see if the computation is complete, and
32  * retrieve the result of the computation.  The result can only be
33  * retrieved when the computation has completed; the {@code get}
34  * methods will block if the computation has not yet completed.  Once
35  * the computation has completed, the computation cannot be restarted
36  * or cancelled (unless the computation is invoked using
37  * {@link #runAndReset}).
38  *
39  * <p>A {@code FutureTask} can be used to wrap a {@link Callable} or
40  * {@link Runnable} object.  Because {@code FutureTask} implements
41  * {@code Runnable}, a {@code FutureTask} can be submitted to an
42  * {@link Executor} for execution.
43  *
44  * <p>In addition to serving as a standalone class, this class provides
45  * {@code protected} functionality that may be useful when creating
46  * customized task classes.
47  *
48  * @since 1.5
49  * @author Doug Lea
50  * @param (V) The result type returned by this FutureTask's {@code get} methods
51  */
52 class FutureTask(V) : RunnableFuture!(V) {
53     /*
54      * Revision notes: This differs from previous versions of this
55      * class that relied on AbstractQueuedSynchronizer, mainly to
56      * avoid surprising users about retaining interrupt status during
57      * cancellation races. Sync control in the current design relies
58      * on a "state" field updated via CAS to track completion, along
59      * with a simple Treiber stack to hold waiting threads.
60      */
61 
62     /**
63      * The run state of this task, initially NEW.  The run state
64      * transitions to a terminal state only in methods set,
65      * setException, and cancel.  During completion, state may take on
66      * values of COMPLETING (while outcome is being set) or
67      * INTERRUPTING (only while interrupting the runner to satisfy a
68      * cancel(true)). Transitions from these intermediate to final
69      * states use cheaper ordered/lazy writes because values are unique
70      * and cannot be further modified.
71      *
72      * Possible state transitions:
73      * NEW -> COMPLETING -> NORMAL
74      * NEW -> COMPLETING -> EXCEPTIONAL
75      * NEW -> CANCELLED
76      * NEW -> INTERRUPTING -> INTERRUPTED
77      */
78     private shared(int) state;
79     private enum int NEW          = 0;
80     private enum int COMPLETING   = 1;
81     private enum int NORMAL       = 2;
82     private enum int EXCEPTIONAL  = 3;
83     private enum int CANCELLED    = 4;
84     private enum int INTERRUPTING = 5;
85     private enum int INTERRUPTED  = 6;
86 
87     /** The underlying callable; nulled out after running */
88     private Callable!(V) callable;
89     /** The result to return or exception to throw from get() */
90     static if(!is(V == void)) {
91         private V outcome; // non-volatile, protected by state reads/writes
92     }
93     private Throwable exception;
94     /** The thread running the callable; CASed during run() */
95     private Thread runner;
96     /** Treiber stack of waiting threads */
97     private WaitNode waiters;
98 
99     /**
100      * Returns result or throws exception for completed task.
101      *
102      * @param s completed state value
103      */
104 
105     private V report(int s) {
106         // Object x = outcome;
107         if (s == NORMAL) {
108             static if(!is(V == void)) {
109                 return outcome; // cast(V)
110             } else {
111                 return ; // cast(V)
112             }
113         }
114             
115         if (s >= CANCELLED)
116             throw new CancellationException();
117         throw new ExecutionException(exception);
118     }
119 
120     /**
121      * Creates a {@code FutureTask} that will, upon running, execute the
122      * given {@code Callable}.
123      *
124      * @param  callable the callable task
125      * @throws NullPointerException if the callable is null
126      */
127     this(Callable!(V) callable) {
128         if (callable is null)
129             throw new NullPointerException();
130         this.callable = callable;
131         this.state = NEW;       // ensure visibility of callable
132     }
133 
134     /**
135      * Creates a {@code FutureTask} that will, upon running, execute the
136      * given {@code Runnable}, and arrange that {@code get} will return the
137      * given result on successful completion.
138      *
139      * @param runnable the runnable task
140      * @param result the result to return on successful completion. If
141      * you don't need a particular result, consider using
142      * constructions of the form:
143      * {@code Future<?> f = new FutureTask!(void)(runnable, null)}
144      * @throws NullPointerException if the runnable is null
145      */
146 static if(is(V == void)) {
147     this(Runnable runnable) {
148         this.callable = Executors.callable(runnable);
149         this.state = NEW;       // ensure visibility of callable
150     }
151 } else {
152     this(Runnable runnable, V result) {
153         this.callable = Executors.callable(runnable, result);
154         this.state = NEW;       // ensure visibility of callable
155     }
156 }
157 
158     bool isCancelled() {
159         return state >= CANCELLED;
160     }
161 
162     bool isDone() {
163         return state != NEW;
164     }
165 
166     bool cancel(bool mayInterruptIfRunning) {
167         if (!(state == NEW && AtomicHelper.compareAndSet(state, NEW,
168             mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
169             return false;
170         try {    // in case call to interrupt throws exception
171             if (mayInterruptIfRunning) {
172                 try {
173                     ThreadEx t = cast(ThreadEx) runner;
174                     if (t !is null)
175                         t.interrupt();
176                 } finally { // final state
177                     AtomicHelper.store(state, INTERRUPTED);
178                 }
179             }
180         } finally {
181             finishCompletion();
182         }
183         return true;
184     }
185 
186     /**
187      * @throws CancellationException {@inheritDoc}
188      */
189     V get() {
190         int s = state;
191         if (s <= COMPLETING)
192             s = awaitDone(false, Duration.zero);
193         return report(s);
194     }
195 
196     /**
197      * @throws CancellationException {@inheritDoc}
198      */
199     V get(Duration timeout) {
200         int s = state;
201         if (s <= COMPLETING &&
202             (s = awaitDone(true, timeout)) <= COMPLETING)
203             throw new TimeoutException();
204         return report(s);
205     }
206 
207     /**
208      * Protected method invoked when this task transitions to state
209      * {@code isDone} (whether normally or via cancellation). The
210      * default implementation does nothing.  Subclasses may override
211      * this method to invoke completion callbacks or perform
212      * bookkeeping. Note that you can query status inside the
213      * implementation of this method to determine whether this task
214      * has been cancelled.
215      */
216     protected void done() { }
217 
218     /**
219      * Sets the result of this future to the given value unless
220      * this future has already been set or has been cancelled.
221      *
222      * <p>This method is invoked internally by the {@link #run} method
223      * upon successful completion of the computation.
224      *
225      * @param v the value
226      */
227 
228 static if(is(V == void)) {
229     protected void set() {
230         if (AtomicHelper.compareAndSet(state, NEW, COMPLETING)) {
231             // outcome = v;
232             AtomicHelper.store(state, NORMAL);  // final state
233             finishCompletion();
234         }
235     }
236 
237     void run() {
238         if (state != NEW ||
239             !AtomicHelper.compareAndSet(runner, null, Thread.getThis()))
240             return;
241         try {
242             Callable!(V) c = callable;
243             if (c !is null && state == NEW) {
244                 bool ran;
245                 try {
246                     c.call();
247                     ran = true;
248                 } catch (Throwable ex) {
249                     ran = false;
250                     setException(ex);
251                 }
252                 if (ran)
253                     set();
254             }
255         } finally {
256             // runner must be non-null until state is settled to
257             // prevent concurrent calls to run()
258             runner = null;
259             // state must be re-read after nulling runner to prevent
260             // leaked interrupts
261             int s = state;
262             if (s >= INTERRUPTING)
263                 handlePossibleCancellationInterrupt(s);
264         }
265     }    
266 } else {
267     protected void set(V v) {
268         if (AtomicHelper.compareAndSet(state, NEW, COMPLETING)) {
269             outcome = v;
270             AtomicHelper.store(state, NORMAL);  // final state
271             finishCompletion();
272         }
273     }
274 
275     void run() {
276         if (state != NEW ||
277             !AtomicHelper.compareAndSet(runner, null, Thread.getThis()))
278             return;
279         try {
280             Callable!(V) c = callable;
281             if (c !is null && state == NEW) {
282                 V result;
283                 bool ran;
284                 try {
285                     result = c.call();
286                     ran = true;
287                 } catch (Throwable ex) {
288                     result = V.init;
289                     ran = false;
290                     setException(ex);
291                 }
292                 if (ran)
293                     set(result);
294             }
295         } finally {
296             // runner must be non-null until state is settled to
297             // prevent concurrent calls to run()
298             runner = null;
299             // state must be re-read after nulling runner to prevent
300             // leaked interrupts
301             int s = state;
302             if (s >= INTERRUPTING)
303                 handlePossibleCancellationInterrupt(s);
304         }
305     }    
306 }
307 
308     /**
309      * Causes this future to report an {@link ExecutionException}
310      * with the given throwable as its cause, unless this future has
311      * already been set or has been cancelled.
312      *
313      * <p>This method is invoked internally by the {@link #run} method
314      * upon failure of the computation.
315      *
316      * @param t the cause of failure
317      */
318     protected void setException(Throwable t) {
319         if (AtomicHelper.compareAndSet(state, NEW, COMPLETING)) {
320             exception = t;
321             AtomicHelper.store(state, EXCEPTIONAL); // final state
322             finishCompletion();
323         }
324     }
325 
326     /**
327      * Executes the computation without setting its result, and then
328      * resets this future to initial state, failing to do so if the
329      * computation encounters an exception or is cancelled.  This is
330      * designed for use with tasks that intrinsically execute more
331      * than once.
332      *
333      * @return {@code true} if successfully run and reset
334      */
335     protected bool runAndReset() {
336         if (state != NEW ||
337             !AtomicHelper.compareAndSet(runner, null, Thread.getThis()))
338             return false;
339         bool ran = false;
340         int s = state;
341         try {
342             Callable!(V) c = callable;
343             if (c !is null && s == NEW) {
344                 try {
345                     c.call(); // don't set result
346                     ran = true;
347                 } catch (Throwable ex) {
348                     setException(ex);
349                 }
350             }
351         } finally {
352             // runner must be non-null until state is settled to
353             // prevent concurrent calls to run()
354             runner = null;
355             // state must be re-read after nulling runner to prevent
356             // leaked interrupts
357             s = state;
358             if (s >= INTERRUPTING)
359                 handlePossibleCancellationInterrupt(s);
360         }
361         return ran && s == NEW;
362     }
363 
364     /**
365      * Ensures that any interrupt from a possible cancel(true) is only
366      * delivered to a task while in run or runAndReset.
367      */
368     private void handlePossibleCancellationInterrupt(int s) {
369         // It is possible for our interrupter to stall before getting a
370         // chance to interrupt us.  Let's spin-wait patiently.
371         if (s == INTERRUPTING)
372             while (state == INTERRUPTING)
373                 Thread.yield(); // wait out pending interrupt
374 
375         assert(state == INTERRUPTED);
376 
377         // We want to clear any interrupt we may have received from
378         // cancel(true).  However, it is permissible to use interrupts
379         // as an independent mechanism for a task to communicate with
380         // its caller, and there is no way to clear only the
381         // cancellation interrupt.
382         //
383         ThreadEx.interrupted();
384     }
385 
386     /**
387      * Simple linked list nodes to record waiting threads in a Treiber
388      * stack.  See other classes such as Phaser and SynchronousQueue
389      * for more detailed explanation.
390      */
391     static final class WaitNode {
392         Thread thread;
393         WaitNode next;
394         this() { thread = Thread.getThis(); }
395     }
396 
397     /**
398      * Removes and signals all waiting threads, invokes done(), and
399      * nulls out callable.
400      */
401     private void finishCompletion() {
402         // assert state > COMPLETING;
403         for (WaitNode q; (q = waiters) !is null;) {
404             if (AtomicHelper.compareAndSet(waiters, q, null)) {
405                 for (;;) {
406                     Thread t = q.thread;
407                     if (t !is null) {
408                         q.thread = null;
409                         LockSupport.unpark(t);
410                     }
411                     WaitNode next = q.next;
412                     if (next is null)
413                         break;
414                     q.next = null; // unlink to help gc
415                     q = next;
416                 }
417                 break;
418             }
419         }
420 
421         done();
422 
423         callable = null;        // to reduce footprint
424     }
425 
426     /**
427      * Awaits completion or aborts on interrupt or timeout.
428      *
429      * @param timed true if use timed waits
430      * @param duration time to wait, if timed
431      * @return state upon completion or at timeout
432      */
433     private int awaitDone(bool timed, Duration timeout) {
434         // The code below is very delicate, to achieve these goals:
435         // - call nanoTime exactly once for each call to park
436         // - if nanos <= 0L, return promptly without allocation or nanoTime
437         // - if nanos == Long.MIN_VALUE, don't underflow
438         // - if nanos == Long.MAX_VALUE, and nanoTime is non-monotonic
439         //   and we suffer a spurious wakeup, we will do no worse than
440         //   to park-spin for a while
441         MonoTime startTime = MonoTime.zero;    // Special value 0L means not yet parked
442         WaitNode q = null;
443         bool queued = false;
444         for (;;) {
445             int s = state;
446             if (s > COMPLETING) {
447                 if (q !is null)
448                     q.thread = null;
449                 return s;
450             } else if (s == COMPLETING) {
451                 // We may have already promised (via isDone) that we are done
452                 // so never return empty-handed or throw InterruptedException
453                 Thread.yield();
454             } else if (ThreadEx.interrupted()) {
455                 removeWaiter(q);
456                 throw new InterruptedException();
457             } else if (q is null) {
458                 if (timed && timeout <= Duration.zero)
459                     return s;
460                 q = new WaitNode();
461             } else if (!queued) {
462                 queued = AtomicHelper.compareAndSet!(WaitNode)(waiters, q.next = waiters, q);
463             } else if (timed) {
464                 Duration parkDuration;
465                 if (startTime == MonoTime.zero) { // first time
466                     startTime = MonoTime.currTime;
467                     if (startTime == MonoTime.zero)
468                         startTime = MonoTime(1);
469                     parkDuration = timeout;
470                 } else {                    
471                     Duration elapsed = MonoTime.currTime - startTime;
472                     if (elapsed >= timeout) {
473                         removeWaiter(q);
474                         return state;
475                     }
476                     parkDuration = timeout - elapsed;
477                 }
478                 // nanoTime may be slow; recheck before parking
479                 if (state < COMPLETING) {
480                     LockSupport.park(this, parkDuration);
481                 }
482             } else {
483                 LockSupport.park(this);
484             }
485         }
486     }
487 
488     /**
489      * Tries to unlink a timed-out or interrupted wait node to avoid
490      * accumulating garbage.  Internal nodes are simply unspliced
491      * without CAS since it is harmless if they are traversed anyway
492      * by releasers.  To avoid effects of unsplicing from already
493      * removed nodes, the list is retraversed in case of an apparent
494      * race.  This is slow when there are a lot of nodes, but we don't
495      * expect lists to be long enough to outweigh higher-overhead
496      * schemes.
497      */
498     private void removeWaiter(WaitNode node) {
499         if (node !is null) {
500             node.thread = null;
501             retry:
502             for (;;) {          // restart on removeWaiter race
503                 for (WaitNode pred = null, q = waiters, s; q !is null; q = s) {
504                     s = q.next;
505                     if (q.thread !is null)
506                         pred = q;
507                     else if (pred !is null) {
508                         pred.next = s;
509                         if (pred.thread is null) // check for race
510                             continue retry;
511                     }
512                     else if (!AtomicHelper.compareAndSet(waiters, q, s))
513                         continue retry;
514                 }
515                 break;
516             }
517         }
518     }
519 
520     /**
521      * Returns a string representation of this FutureTask.
522      *
523      * @implSpec
524      * The default implementation returns a string identifying this
525      * FutureTask, as well as its completion state.  The state, in
526      * brackets, contains one of the strings {@code "Completed Normally"},
527      * {@code "Completed Exceptionally"}, {@code "Cancelled"}, or {@code
528      * "Not completed"}.
529      *
530      * @return a string representation of this FutureTask
531      */
532     override string toString() {
533         string status;
534         switch (state) {
535         case NORMAL:
536             status = "[Completed normally]";
537             break;
538         case EXCEPTIONAL:
539             status = "[Completed exceptionally: " ~ exception.toString() ~ "]";
540             break;
541         case CANCELLED:
542         case INTERRUPTING:
543         case INTERRUPTED:
544             status = "[Cancelled]";
545             break;
546         default:
547             Callable!V callable = this.callable;
548             status = (callable is null)
549                 ? "[Not completed]"
550                 : "[Not completed, task = " ~ (cast(Object)callable).toString() ~ "]";
551         }
552         return super.toString() ~ status;
553     }
554 
555 }