1 /* 2 * Hunt - A refined core library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.concurrency.FutureTask; 13 14 import hunt.concurrency.atomic.AtomicHelper; 15 import hunt.concurrency.Executors; 16 import hunt.concurrency.Future; 17 import hunt.concurrency.thread; 18 import hunt.Exceptions; 19 import hunt.util.Common; 20 21 import core.thread; 22 import core.time; 23 24 import hunt.concurrency.thread; 25 import hunt.logging.ConsoleLogger; 26 27 28 /** 29 * A cancellable asynchronous computation. This class provides a base 30 * implementation of {@link Future}, with methods to start and cancel 31 * a computation, query to see if the computation is complete, and 32 * retrieve the result of the computation. The result can only be 33 * retrieved when the computation has completed; the {@code get} 34 * methods will block if the computation has not yet completed. Once 35 * the computation has completed, the computation cannot be restarted 36 * or cancelled (unless the computation is invoked using 37 * {@link #runAndReset}). 38 * 39 * <p>A {@code FutureTask} can be used to wrap a {@link Callable} or 40 * {@link Runnable} object. Because {@code FutureTask} implements 41 * {@code Runnable}, a {@code FutureTask} can be submitted to an 42 * {@link Executor} for execution. 43 * 44 * <p>In addition to serving as a standalone class, this class provides 45 * {@code protected} functionality that may be useful when creating 46 * customized task classes. 47 * 48 * @since 1.5 49 * @author Doug Lea 50 * @param (V) The result type returned by this FutureTask's {@code get} methods 51 */ 52 class FutureTask(V) : RunnableFuture!(V) { 53 /* 54 * Revision notes: This differs from previous versions of this 55 * class that relied on AbstractQueuedSynchronizer, mainly to 56 * avoid surprising users about retaining interrupt status during 57 * cancellation races. Sync control in the current design relies 58 * on a "state" field updated via CAS to track completion, along 59 * with a simple Treiber stack to hold waiting threads. 60 */ 61 62 /** 63 * The run state of this task, initially NEW. The run state 64 * transitions to a terminal state only in methods set, 65 * setException, and cancel. During completion, state may take on 66 * values of COMPLETING (while outcome is being set) or 67 * INTERRUPTING (only while interrupting the runner to satisfy a 68 * cancel(true)). Transitions from these intermediate to final 69 * states use cheaper ordered/lazy writes because values are unique 70 * and cannot be further modified. 71 * 72 * Possible state transitions: 73 * NEW -> COMPLETING -> NORMAL 74 * NEW -> COMPLETING -> EXCEPTIONAL 75 * NEW -> CANCELLED 76 * NEW -> INTERRUPTING -> INTERRUPTED 77 */ 78 private shared(int) state; 79 private enum int NEW = 0; 80 private enum int COMPLETING = 1; 81 private enum int NORMAL = 2; 82 private enum int EXCEPTIONAL = 3; 83 private enum int CANCELLED = 4; 84 private enum int INTERRUPTING = 5; 85 private enum int INTERRUPTED = 6; 86 87 /** The underlying callable; nulled out after running */ 88 private Callable!(V) callable; 89 /** The result to return or exception to throw from get() */ 90 static if(!is(V == void)) { 91 private V outcome; // non-volatile, protected by state reads/writes 92 } 93 private Throwable exception; 94 /** The thread running the callable; CASed during run() */ 95 private Thread runner; 96 /** Treiber stack of waiting threads */ 97 private WaitNode waiters; 98 99 /** 100 * Returns result or throws exception for completed task. 101 * 102 * @param s completed state value 103 */ 104 105 private V report(int s) { 106 // Object x = outcome; 107 if (s == NORMAL) { 108 static if(!is(V == void)) { 109 return outcome; // cast(V) 110 } else { 111 return ; // cast(V) 112 } 113 } 114 115 if (s >= CANCELLED) 116 throw new CancellationException(); 117 throw new ExecutionException(exception); 118 } 119 120 /** 121 * Creates a {@code FutureTask} that will, upon running, execute the 122 * given {@code Callable}. 123 * 124 * @param callable the callable task 125 * @throws NullPointerException if the callable is null 126 */ 127 this(Callable!(V) callable) { 128 if (callable is null) 129 throw new NullPointerException(); 130 this.callable = callable; 131 this.state = NEW; // ensure visibility of callable 132 } 133 134 /** 135 * Creates a {@code FutureTask} that will, upon running, execute the 136 * given {@code Runnable}, and arrange that {@code get} will return the 137 * given result on successful completion. 138 * 139 * @param runnable the runnable task 140 * @param result the result to return on successful completion. If 141 * you don't need a particular result, consider using 142 * constructions of the form: 143 * {@code Future<?> f = new FutureTask!(void)(runnable, null)} 144 * @throws NullPointerException if the runnable is null 145 */ 146 static if(is(V == void)) { 147 this(Runnable runnable) { 148 this.callable = Executors.callable(runnable); 149 this.state = NEW; // ensure visibility of callable 150 } 151 } else { 152 this(Runnable runnable, V result) { 153 this.callable = Executors.callable(runnable, result); 154 this.state = NEW; // ensure visibility of callable 155 } 156 } 157 158 bool isCancelled() { 159 return state >= CANCELLED; 160 } 161 162 bool isDone() { 163 return state != NEW; 164 } 165 166 bool cancel(bool mayInterruptIfRunning) { 167 if (!(state == NEW && AtomicHelper.compareAndSet(state, NEW, 168 mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) 169 return false; 170 try { // in case call to interrupt throws exception 171 if (mayInterruptIfRunning) { 172 try { 173 ThreadEx t = cast(ThreadEx) runner; 174 if (t !is null) 175 t.interrupt(); 176 } finally { // final state 177 AtomicHelper.store(state, INTERRUPTED); 178 } 179 } 180 } finally { 181 finishCompletion(); 182 } 183 return true; 184 } 185 186 /** 187 * @throws CancellationException {@inheritDoc} 188 */ 189 V get() { 190 int s = state; 191 if (s <= COMPLETING) 192 s = awaitDone(false, Duration.zero); 193 return report(s); 194 } 195 196 /** 197 * @throws CancellationException {@inheritDoc} 198 */ 199 V get(Duration timeout) { 200 int s = state; 201 if (s <= COMPLETING && 202 (s = awaitDone(true, timeout)) <= COMPLETING) 203 throw new TimeoutException(); 204 return report(s); 205 } 206 207 /** 208 * Protected method invoked when this task transitions to state 209 * {@code isDone} (whether normally or via cancellation). The 210 * default implementation does nothing. Subclasses may override 211 * this method to invoke completion callbacks or perform 212 * bookkeeping. Note that you can query status inside the 213 * implementation of this method to determine whether this task 214 * has been cancelled. 215 */ 216 protected void done() { } 217 218 /** 219 * Sets the result of this future to the given value unless 220 * this future has already been set or has been cancelled. 221 * 222 * <p>This method is invoked internally by the {@link #run} method 223 * upon successful completion of the computation. 224 * 225 * @param v the value 226 */ 227 228 static if(is(V == void)) { 229 protected void set() { 230 if (AtomicHelper.compareAndSet(state, NEW, COMPLETING)) { 231 // outcome = v; 232 AtomicHelper.store(state, NORMAL); // final state 233 finishCompletion(); 234 } 235 } 236 237 void run() { 238 if (state != NEW || 239 !AtomicHelper.compareAndSet(runner, null, Thread.getThis())) 240 return; 241 try { 242 Callable!(V) c = callable; 243 if (c !is null && state == NEW) { 244 bool ran; 245 try { 246 c.call(); 247 ran = true; 248 } catch (Throwable ex) { 249 ran = false; 250 setException(ex); 251 } 252 if (ran) 253 set(); 254 } 255 } finally { 256 // runner must be non-null until state is settled to 257 // prevent concurrent calls to run() 258 runner = null; 259 // state must be re-read after nulling runner to prevent 260 // leaked interrupts 261 int s = state; 262 if (s >= INTERRUPTING) 263 handlePossibleCancellationInterrupt(s); 264 } 265 } 266 } else { 267 protected void set(V v) { 268 if (AtomicHelper.compareAndSet(state, NEW, COMPLETING)) { 269 outcome = v; 270 AtomicHelper.store(state, NORMAL); // final state 271 finishCompletion(); 272 } 273 } 274 275 void run() { 276 if (state != NEW || 277 !AtomicHelper.compareAndSet(runner, null, Thread.getThis())) 278 return; 279 try { 280 Callable!(V) c = callable; 281 if (c !is null && state == NEW) { 282 V result; 283 bool ran; 284 try { 285 result = c.call(); 286 ran = true; 287 } catch (Throwable ex) { 288 result = V.init; 289 ran = false; 290 setException(ex); 291 } 292 if (ran) 293 set(result); 294 } 295 } finally { 296 // runner must be non-null until state is settled to 297 // prevent concurrent calls to run() 298 runner = null; 299 // state must be re-read after nulling runner to prevent 300 // leaked interrupts 301 int s = state; 302 if (s >= INTERRUPTING) 303 handlePossibleCancellationInterrupt(s); 304 } 305 } 306 } 307 308 /** 309 * Causes this future to report an {@link ExecutionException} 310 * with the given throwable as its cause, unless this future has 311 * already been set or has been cancelled. 312 * 313 * <p>This method is invoked internally by the {@link #run} method 314 * upon failure of the computation. 315 * 316 * @param t the cause of failure 317 */ 318 protected void setException(Throwable t) { 319 if (AtomicHelper.compareAndSet(state, NEW, COMPLETING)) { 320 exception = t; 321 AtomicHelper.store(state, EXCEPTIONAL); // final state 322 finishCompletion(); 323 } 324 } 325 326 /** 327 * Executes the computation without setting its result, and then 328 * resets this future to initial state, failing to do so if the 329 * computation encounters an exception or is cancelled. This is 330 * designed for use with tasks that intrinsically execute more 331 * than once. 332 * 333 * @return {@code true} if successfully run and reset 334 */ 335 protected bool runAndReset() { 336 if (state != NEW || 337 !AtomicHelper.compareAndSet(runner, null, Thread.getThis())) 338 return false; 339 bool ran = false; 340 int s = state; 341 try { 342 Callable!(V) c = callable; 343 if (c !is null && s == NEW) { 344 try { 345 c.call(); // don't set result 346 ran = true; 347 } catch (Throwable ex) { 348 setException(ex); 349 } 350 } 351 } finally { 352 // runner must be non-null until state is settled to 353 // prevent concurrent calls to run() 354 runner = null; 355 // state must be re-read after nulling runner to prevent 356 // leaked interrupts 357 s = state; 358 if (s >= INTERRUPTING) 359 handlePossibleCancellationInterrupt(s); 360 } 361 return ran && s == NEW; 362 } 363 364 /** 365 * Ensures that any interrupt from a possible cancel(true) is only 366 * delivered to a task while in run or runAndReset. 367 */ 368 private void handlePossibleCancellationInterrupt(int s) { 369 // It is possible for our interrupter to stall before getting a 370 // chance to interrupt us. Let's spin-wait patiently. 371 if (s == INTERRUPTING) 372 while (state == INTERRUPTING) 373 Thread.yield(); // wait out pending interrupt 374 375 assert(state == INTERRUPTED); 376 377 // We want to clear any interrupt we may have received from 378 // cancel(true). However, it is permissible to use interrupts 379 // as an independent mechanism for a task to communicate with 380 // its caller, and there is no way to clear only the 381 // cancellation interrupt. 382 // 383 ThreadEx.interrupted(); 384 } 385 386 /** 387 * Simple linked list nodes to record waiting threads in a Treiber 388 * stack. See other classes such as Phaser and SynchronousQueue 389 * for more detailed explanation. 390 */ 391 static final class WaitNode { 392 Thread thread; 393 WaitNode next; 394 this() { thread = Thread.getThis(); } 395 } 396 397 /** 398 * Removes and signals all waiting threads, invokes done(), and 399 * nulls out callable. 400 */ 401 private void finishCompletion() { 402 // assert state > COMPLETING; 403 for (WaitNode q; (q = waiters) !is null;) { 404 if (AtomicHelper.compareAndSet(waiters, q, null)) { 405 for (;;) { 406 Thread t = q.thread; 407 if (t !is null) { 408 q.thread = null; 409 LockSupport.unpark(t); 410 } 411 WaitNode next = q.next; 412 if (next is null) 413 break; 414 q.next = null; // unlink to help gc 415 q = next; 416 } 417 break; 418 } 419 } 420 421 done(); 422 423 callable = null; // to reduce footprint 424 } 425 426 /** 427 * Awaits completion or aborts on interrupt or timeout. 428 * 429 * @param timed true if use timed waits 430 * @param duration time to wait, if timed 431 * @return state upon completion or at timeout 432 */ 433 private int awaitDone(bool timed, Duration timeout) { 434 // The code below is very delicate, to achieve these goals: 435 // - call nanoTime exactly once for each call to park 436 // - if nanos <= 0L, return promptly without allocation or nanoTime 437 // - if nanos == Long.MIN_VALUE, don't underflow 438 // - if nanos == Long.MAX_VALUE, and nanoTime is non-monotonic 439 // and we suffer a spurious wakeup, we will do no worse than 440 // to park-spin for a while 441 MonoTime startTime = MonoTime.zero; // Special value 0L means not yet parked 442 WaitNode q = null; 443 bool queued = false; 444 for (;;) { 445 int s = state; 446 if (s > COMPLETING) { 447 if (q !is null) 448 q.thread = null; 449 return s; 450 } else if (s == COMPLETING) { 451 // We may have already promised (via isDone) that we are done 452 // so never return empty-handed or throw InterruptedException 453 Thread.yield(); 454 } else if (ThreadEx.interrupted()) { 455 removeWaiter(q); 456 throw new InterruptedException(); 457 } else if (q is null) { 458 if (timed && timeout <= Duration.zero) 459 return s; 460 q = new WaitNode(); 461 } else if (!queued) { 462 queued = AtomicHelper.compareAndSet!(WaitNode)(waiters, q.next = waiters, q); 463 } else if (timed) { 464 Duration parkDuration; 465 if (startTime == MonoTime.zero) { // first time 466 startTime = MonoTime.currTime; 467 if (startTime == MonoTime.zero) 468 startTime = MonoTime(1); 469 parkDuration = timeout; 470 } else { 471 Duration elapsed = MonoTime.currTime - startTime; 472 if (elapsed >= timeout) { 473 removeWaiter(q); 474 return state; 475 } 476 parkDuration = timeout - elapsed; 477 } 478 // nanoTime may be slow; recheck before parking 479 if (state < COMPLETING) { 480 LockSupport.park(this, parkDuration); 481 } 482 } else { 483 LockSupport.park(this); 484 } 485 } 486 } 487 488 /** 489 * Tries to unlink a timed-out or interrupted wait node to avoid 490 * accumulating garbage. Internal nodes are simply unspliced 491 * without CAS since it is harmless if they are traversed anyway 492 * by releasers. To avoid effects of unsplicing from already 493 * removed nodes, the list is retraversed in case of an apparent 494 * race. This is slow when there are a lot of nodes, but we don't 495 * expect lists to be long enough to outweigh higher-overhead 496 * schemes. 497 */ 498 private void removeWaiter(WaitNode node) { 499 if (node !is null) { 500 node.thread = null; 501 retry: 502 for (;;) { // restart on removeWaiter race 503 for (WaitNode pred = null, q = waiters, s; q !is null; q = s) { 504 s = q.next; 505 if (q.thread !is null) 506 pred = q; 507 else if (pred !is null) { 508 pred.next = s; 509 if (pred.thread is null) // check for race 510 continue retry; 511 } 512 else if (!AtomicHelper.compareAndSet(waiters, q, s)) 513 continue retry; 514 } 515 break; 516 } 517 } 518 } 519 520 /** 521 * Returns a string representation of this FutureTask. 522 * 523 * @implSpec 524 * The default implementation returns a string identifying this 525 * FutureTask, as well as its completion state. The state, in 526 * brackets, contains one of the strings {@code "Completed Normally"}, 527 * {@code "Completed Exceptionally"}, {@code "Cancelled"}, or {@code 528 * "Not completed"}. 529 * 530 * @return a string representation of this FutureTask 531 */ 532 override string toString() { 533 string status; 534 switch (state) { 535 case NORMAL: 536 status = "[Completed normally]"; 537 break; 538 case EXCEPTIONAL: 539 status = "[Completed exceptionally: " ~ exception.toString() ~ "]"; 540 break; 541 case CANCELLED: 542 case INTERRUPTING: 543 case INTERRUPTED: 544 status = "[Cancelled]"; 545 break; 546 default: 547 Callable!V callable = this.callable; 548 status = (callable is null) 549 ? "[Not completed]" 550 : "[Not completed, task = " ~ (cast(Object)callable).toString() ~ "]"; 551 } 552 return super.toString() ~ status; 553 } 554 555 }