1 /* 2 * Hunt - A refined core library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.concurrency.FutureTask; 13 14 import hunt.concurrency.atomic.AtomicHelper; 15 import hunt.concurrency.Executors; 16 import hunt.concurrency.Future; 17 import hunt.concurrency.thread; 18 import hunt.Exceptions; 19 import hunt.util.Common; 20 21 static if(CompilerHelper.isGreaterThan(2093)) { 22 import core.thread.osthread; 23 } else { 24 import core.thread; 25 } 26 27 import core.time; 28 29 import hunt.concurrency.thread; 30 import hunt.logging.ConsoleLogger; 31 32 33 /** 34 * A cancellable asynchronous computation. This class provides a base 35 * implementation of {@link Future}, with methods to start and cancel 36 * a computation, query to see if the computation is complete, and 37 * retrieve the result of the computation. The result can only be 38 * retrieved when the computation has completed; the {@code get} 39 * methods will block if the computation has not yet completed. Once 40 * the computation has completed, the computation cannot be restarted 41 * or cancelled (unless the computation is invoked using 42 * {@link #runAndReset}). 43 * 44 * <p>A {@code FutureTask} can be used to wrap a {@link Callable} or 45 * {@link Runnable} object. Because {@code FutureTask} implements 46 * {@code Runnable}, a {@code FutureTask} can be submitted to an 47 * {@link Executor} for execution. 48 * 49 * <p>In addition to serving as a standalone class, this class provides 50 * {@code protected} functionality that may be useful when creating 51 * customized task classes. 52 * 53 * @author Doug Lea 54 * @param (V) The result type returned by this FutureTask's {@code get} methods 55 */ 56 class FutureTask(V) : RunnableFuture!(V) { 57 /* 58 * Revision notes: This differs from previous versions of this 59 * class that relied on AbstractQueuedSynchronizer, mainly to 60 * avoid surprising users about retaining interrupt status during 61 * cancellation races. Sync control in the current design relies 62 * on a "state" field updated via CAS to track completion, along 63 * with a simple Treiber stack to hold waiting threads. 64 */ 65 66 /** 67 * The run state of this task, initially NEW. The run state 68 * transitions to a terminal state only in methods set, 69 * setException, and cancel. During completion, state may take on 70 * values of COMPLETING (while outcome is being set) or 71 * INTERRUPTING (only while interrupting the runner to satisfy a 72 * cancel(true)). Transitions from these intermediate to final 73 * states use cheaper ordered/lazy writes because values are unique 74 * and cannot be further modified. 75 * 76 * Possible state transitions: 77 * NEW -> COMPLETING -> NORMAL 78 * NEW -> COMPLETING -> EXCEPTIONAL 79 * NEW -> CANCELLED 80 * NEW -> INTERRUPTING -> INTERRUPTED 81 */ 82 private shared(int) state; 83 private enum int NEW = 0; 84 private enum int COMPLETING = 1; 85 private enum int NORMAL = 2; 86 private enum int EXCEPTIONAL = 3; 87 private enum int CANCELLED = 4; 88 private enum int INTERRUPTING = 5; 89 private enum int INTERRUPTED = 6; 90 91 /** The underlying callable; nulled out after running */ 92 private Callable!(V) callable; 93 /** The result to return or exception to throw from get() */ 94 static if(!is(V == void)) { 95 private V outcome; // non-volatile, protected by state reads/writes 96 } 97 private Throwable exception; 98 /** The thread running the callable; CASed during run() */ 99 private Thread runner; 100 /** Treiber stack of waiting threads */ 101 private WaitNode waiters; 102 103 /** 104 * Returns result or throws exception for completed task. 105 * 106 * @param s completed state value 107 */ 108 109 private V report(int s) { 110 // Object x = outcome; 111 if (s == NORMAL) { 112 static if(!is(V == void)) { 113 return outcome; // cast(V) 114 } else { 115 return ; // cast(V) 116 } 117 } 118 119 if (s >= CANCELLED) 120 throw new CancellationException(); 121 throw new ExecutionException(exception); 122 } 123 124 /** 125 * Creates a {@code FutureTask} that will, upon running, execute the 126 * given {@code Callable}. 127 * 128 * @param callable the callable task 129 * @throws NullPointerException if the callable is null 130 */ 131 this(Callable!(V) callable) { 132 if (callable is null) 133 throw new NullPointerException(); 134 this.callable = callable; 135 this.state = NEW; // ensure visibility of callable 136 } 137 138 /** 139 * Creates a {@code FutureTask} that will, upon running, execute the 140 * given {@code Runnable}, and arrange that {@code get} will return the 141 * given result on successful completion. 142 * 143 * @param runnable the runnable task 144 * @param result the result to return on successful completion. If 145 * you don't need a particular result, consider using 146 * constructions of the form: 147 * {@code Future<?> f = new FutureTask!(void)(runnable, null)} 148 * @throws NullPointerException if the runnable is null 149 */ 150 static if(is(V == void)) { 151 this(Runnable runnable) { 152 this.callable = Executors.callable(runnable); 153 this.state = NEW; // ensure visibility of callable 154 } 155 } else { 156 this(Runnable runnable, V result) { 157 this.callable = Executors.callable(runnable, result); 158 this.state = NEW; // ensure visibility of callable 159 } 160 } 161 162 bool isCancelled() { 163 return state >= CANCELLED; 164 } 165 166 bool isDone() { 167 return state != NEW; 168 } 169 170 bool cancel(bool mayInterruptIfRunning) { 171 if (!(state == NEW && AtomicHelper.compareAndSet(state, NEW, 172 mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) 173 return false; 174 try { // in case call to interrupt throws exception 175 if (mayInterruptIfRunning) { 176 try { 177 ThreadEx t = cast(ThreadEx) runner; 178 if (t !is null) 179 t.interrupt(); 180 } finally { // final state 181 AtomicHelper.store(state, INTERRUPTED); 182 } 183 } 184 } finally { 185 finishCompletion(); 186 } 187 return true; 188 } 189 190 /** 191 * @throws CancellationException {@inheritDoc} 192 */ 193 V get() { 194 int s = state; 195 if (s <= COMPLETING) 196 s = awaitDone(false, Duration.zero); 197 return report(s); 198 } 199 200 /** 201 * @throws CancellationException {@inheritDoc} 202 */ 203 V get(Duration timeout) { 204 int s = state; 205 if (s <= COMPLETING && 206 (s = awaitDone(true, timeout)) <= COMPLETING) 207 throw new TimeoutException(); 208 return report(s); 209 } 210 211 /** 212 * Protected method invoked when this task transitions to state 213 * {@code isDone} (whether normally or via cancellation). The 214 * default implementation does nothing. Subclasses may override 215 * this method to invoke completion callbacks or perform 216 * bookkeeping. Note that you can query status inside the 217 * implementation of this method to determine whether this task 218 * has been cancelled. 219 */ 220 protected void done() { } 221 222 /** 223 * Sets the result of this future to the given value unless 224 * this future has already been set or has been cancelled. 225 * 226 * <p>This method is invoked internally by the {@link #run} method 227 * upon successful completion of the computation. 228 * 229 * @param v the value 230 */ 231 232 static if(is(V == void)) { 233 protected void set() { 234 if (AtomicHelper.compareAndSet(state, NEW, COMPLETING)) { 235 // outcome = v; 236 AtomicHelper.store(state, NORMAL); // final state 237 finishCompletion(); 238 } 239 } 240 241 void run() { 242 if (state != NEW || 243 !AtomicHelper.compareAndSet(runner, null, Thread.getThis())) 244 return; 245 try { 246 Callable!(V) c = callable; 247 if (c !is null && state == NEW) { 248 bool ran; 249 try { 250 c.call(); 251 ran = true; 252 } catch (Throwable ex) { 253 ran = false; 254 setException(ex); 255 } 256 if (ran) 257 set(); 258 } 259 } finally { 260 // runner must be non-null until state is settled to 261 // prevent concurrent calls to run() 262 runner = null; 263 // state must be re-read after nulling runner to prevent 264 // leaked interrupts 265 int s = state; 266 if (s >= INTERRUPTING) 267 handlePossibleCancellationInterrupt(s); 268 } 269 } 270 } else { 271 protected void set(V v) { 272 if (AtomicHelper.compareAndSet(state, NEW, COMPLETING)) { 273 outcome = v; 274 AtomicHelper.store(state, NORMAL); // final state 275 finishCompletion(); 276 } 277 } 278 279 void run() { 280 if (state != NEW || 281 !AtomicHelper.compareAndSet(runner, null, Thread.getThis())) 282 return; 283 try { 284 Callable!(V) c = callable; 285 if (c !is null && state == NEW) { 286 V result; 287 bool ran; 288 try { 289 result = c.call(); 290 ran = true; 291 } catch (Throwable ex) { 292 result = V.init; 293 ran = false; 294 setException(ex); 295 } 296 if (ran) 297 set(result); 298 } 299 } finally { 300 // runner must be non-null until state is settled to 301 // prevent concurrent calls to run() 302 runner = null; 303 // state must be re-read after nulling runner to prevent 304 // leaked interrupts 305 int s = state; 306 if (s >= INTERRUPTING) 307 handlePossibleCancellationInterrupt(s); 308 } 309 } 310 } 311 312 /** 313 * Causes this future to report an {@link ExecutionException} 314 * with the given throwable as its cause, unless this future has 315 * already been set or has been cancelled. 316 * 317 * <p>This method is invoked internally by the {@link #run} method 318 * upon failure of the computation. 319 * 320 * @param t the cause of failure 321 */ 322 protected void setException(Throwable t) { 323 if (AtomicHelper.compareAndSet(state, NEW, COMPLETING)) { 324 exception = t; 325 AtomicHelper.store(state, EXCEPTIONAL); // final state 326 finishCompletion(); 327 } 328 } 329 330 /** 331 * Executes the computation without setting its result, and then 332 * resets this future to initial state, failing to do so if the 333 * computation encounters an exception or is cancelled. This is 334 * designed for use with tasks that intrinsically execute more 335 * than once. 336 * 337 * @return {@code true} if successfully run and reset 338 */ 339 protected bool runAndReset() { 340 if (state != NEW || 341 !AtomicHelper.compareAndSet(runner, null, Thread.getThis())) 342 return false; 343 bool ran = false; 344 int s = state; 345 try { 346 Callable!(V) c = callable; 347 if (c !is null && s == NEW) { 348 try { 349 c.call(); // don't set result 350 ran = true; 351 } catch (Throwable ex) { 352 setException(ex); 353 } 354 } 355 } finally { 356 // runner must be non-null until state is settled to 357 // prevent concurrent calls to run() 358 runner = null; 359 // state must be re-read after nulling runner to prevent 360 // leaked interrupts 361 s = state; 362 if (s >= INTERRUPTING) 363 handlePossibleCancellationInterrupt(s); 364 } 365 return ran && s == NEW; 366 } 367 368 /** 369 * Ensures that any interrupt from a possible cancel(true) is only 370 * delivered to a task while in run or runAndReset. 371 */ 372 private void handlePossibleCancellationInterrupt(int s) { 373 // It is possible for our interrupter to stall before getting a 374 // chance to interrupt us. Let's spin-wait patiently. 375 if (s == INTERRUPTING) 376 while (state == INTERRUPTING) 377 Thread.yield(); // wait out pending interrupt 378 379 assert(state == INTERRUPTED); 380 381 // We want to clear any interrupt we may have received from 382 // cancel(true). However, it is permissible to use interrupts 383 // as an independent mechanism for a task to communicate with 384 // its caller, and there is no way to clear only the 385 // cancellation interrupt. 386 // 387 ThreadEx.interrupted(); 388 } 389 390 /** 391 * Simple linked list nodes to record waiting threads in a Treiber 392 * stack. See other classes such as Phaser and SynchronousQueue 393 * for more detailed explanation. 394 */ 395 static final class WaitNode { 396 Thread thread; 397 WaitNode next; 398 this() { thread = Thread.getThis(); } 399 } 400 401 /** 402 * Removes and signals all waiting threads, invokes done(), and 403 * nulls out callable. 404 */ 405 private void finishCompletion() { 406 // assert state > COMPLETING; 407 for (WaitNode q; (q = waiters) !is null;) { 408 if (AtomicHelper.compareAndSet(waiters, q, null)) { 409 for (;;) { 410 Thread t = q.thread; 411 if (t !is null) { 412 q.thread = null; 413 LockSupport.unpark(t); 414 } 415 WaitNode next = q.next; 416 if (next is null) 417 break; 418 q.next = null; // unlink to help gc 419 q = next; 420 } 421 break; 422 } 423 } 424 425 done(); 426 427 callable = null; // to reduce footprint 428 } 429 430 /** 431 * Awaits completion or aborts on interrupt or timeout. 432 * 433 * @param timed true if use timed waits 434 * @param duration time to wait, if timed 435 * @return state upon completion or at timeout 436 */ 437 private int awaitDone(bool timed, Duration timeout) { 438 // The code below is very delicate, to achieve these goals: 439 // - call nanoTime exactly once for each call to park 440 // - if nanos <= 0L, return promptly without allocation or nanoTime 441 // - if nanos == Long.MIN_VALUE, don't underflow 442 // - if nanos == Long.MAX_VALUE, and nanoTime is non-monotonic 443 // and we suffer a spurious wakeup, we will do no worse than 444 // to park-spin for a while 445 MonoTime startTime = MonoTime.zero; // Special value 0L means not yet parked 446 WaitNode q = null; 447 bool queued = false; 448 for (;;) { 449 int s = state; 450 if (s > COMPLETING) { 451 if (q !is null) 452 q.thread = null; 453 return s; 454 } else if (s == COMPLETING) { 455 // We may have already promised (via isDone) that we are done 456 // so never return empty-handed or throw InterruptedException 457 Thread.yield(); 458 } else if (ThreadEx.interrupted()) { 459 removeWaiter(q); 460 throw new InterruptedException(); 461 } else if (q is null) { 462 if (timed && timeout <= Duration.zero) 463 return s; 464 q = new WaitNode(); 465 } else if (!queued) { 466 queued = AtomicHelper.compareAndSet!(WaitNode)(waiters, q.next = waiters, q); 467 } else if (timed) { 468 Duration parkDuration; 469 if (startTime == MonoTime.zero) { // first time 470 startTime = MonoTime.currTime; 471 if (startTime == MonoTime.zero) 472 startTime = MonoTime(1); 473 parkDuration = timeout; 474 } else { 475 Duration elapsed = MonoTime.currTime - startTime; 476 if (elapsed >= timeout) { 477 removeWaiter(q); 478 return state; 479 } 480 parkDuration = timeout - elapsed; 481 } 482 // nanoTime may be slow; recheck before parking 483 if (state < COMPLETING) { 484 LockSupport.park(this, parkDuration); 485 } 486 } else { 487 LockSupport.park(this); 488 } 489 } 490 } 491 492 /** 493 * Tries to unlink a timed-out or interrupted wait node to avoid 494 * accumulating garbage. Internal nodes are simply unspliced 495 * without CAS since it is harmless if they are traversed anyway 496 * by releasers. To avoid effects of unsplicing from already 497 * removed nodes, the list is retraversed in case of an apparent 498 * race. This is slow when there are a lot of nodes, but we don't 499 * expect lists to be long enough to outweigh higher-overhead 500 * schemes. 501 */ 502 private void removeWaiter(WaitNode node) { 503 if (node !is null) { 504 node.thread = null; 505 retry: 506 for (;;) { // restart on removeWaiter race 507 for (WaitNode pred = null, q = waiters, s; q !is null; q = s) { 508 s = q.next; 509 if (q.thread !is null) 510 pred = q; 511 else if (pred !is null) { 512 pred.next = s; 513 if (pred.thread is null) // check for race 514 continue retry; 515 } 516 else if (!AtomicHelper.compareAndSet(waiters, q, s)) 517 continue retry; 518 } 519 break; 520 } 521 } 522 } 523 524 /** 525 * Returns a string representation of this FutureTask. 526 * 527 * @implSpec 528 * The default implementation returns a string identifying this 529 * FutureTask, as well as its completion state. The state, in 530 * brackets, contains one of the strings {@code "Completed Normally"}, 531 * {@code "Completed Exceptionally"}, {@code "Cancelled"}, or {@code 532 * "Not completed"}. 533 * 534 * @return a string representation of this FutureTask 535 */ 536 override string toString() { 537 string status; 538 switch (state) { 539 case NORMAL: 540 status = "[Completed normally]"; 541 break; 542 case EXCEPTIONAL: 543 status = "[Completed exceptionally: " ~ exception.toString() ~ "]"; 544 break; 545 case CANCELLED: 546 case INTERRUPTING: 547 case INTERRUPTED: 548 status = "[Cancelled]"; 549 break; 550 default: 551 Callable!V callable = this.callable; 552 status = (callable is null) 553 ? "[Not completed]" 554 : "[Not completed, task = " ~ (cast(Object)callable).toString() ~ "]"; 555 } 556 return super.toString() ~ status; 557 } 558 559 }