1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the domain, as explained at 33 * http://creativecommons.org/publicdomain/zero/1.0/ 34 */ 35 36 module hunt.concurrency.CompletableFuture; 37 38 import hunt.concurrency.CompletionStage; 39 import hunt.concurrency.Delayed; 40 import hunt.concurrency.Exceptions; 41 import hunt.concurrency.ForkJoinPool; 42 import hunt.concurrency.ForkJoinTask; 43 import hunt.concurrency.Future; 44 import hunt.concurrency.Promise; 45 import hunt.concurrency.ScheduledThreadPoolExecutor; 46 import hunt.concurrency.thread; 47 import hunt.concurrency.ThreadFactory; 48 import hunt.concurrency.atomic.AtomicHelper; 49 50 import hunt.Exceptions; 51 import hunt.Functions; 52 import hunt.util.Common; 53 import hunt.util.DateTime; 54 import hunt.util.ObjectUtils; 55 56 import hunt.logging.ConsoleLogger; 57 58 import core.thread; 59 import core.time; 60 import std.conv; 61 import std.concurrency : initOnce; 62 63 64 // Modes for Completion.tryFire. Signedness matters. 65 enum int SYNC = 0; 66 enum int ASYNC = 1; 67 enum int NESTED = -1; 68 69 70 /** The encoding of the null value. */ 71 __gshared AltResult NIL; // = new AltResult(null); 72 73 /* ------------- Async task preliminaries -------------- */ 74 75 private static bool USE_COMMON_POOL() { 76 return ForkJoinPool.getCommonPoolParallelism() > 1; 77 } 78 79 /** 80 * Default executor -- ForkJoinPool.commonPool() unless it cannot 81 * support parallelism. 82 */ 83 private static Executor ASYNC_POOL() { 84 __gshared Executor inst; 85 return initOnce!inst({ 86 Executor e; 87 if(USE_COMMON_POOL){ 88 e = ForkJoinPool.commonPool(); 89 } else { 90 e = new ThreadPerTaskExecutor(); 91 } 92 return e; 93 }()); 94 } 95 96 97 shared static this() { 98 NIL = new AltResult(null); 99 } 100 101 102 /** 103 */ 104 abstract class AbstractCompletableFuture { 105 shared bool _isDone = false; 106 bool _isNull = true; 107 108 AltResult altResult; 109 110 Completion stack; // Top of Treiber stack of dependent actions 111 112 abstract void bipush(AbstractCompletableFuture b, BiCompletion c); 113 abstract void cleanStack(); 114 abstract bool completeExceptionally(Throwable ex); 115 abstract void postComplete(); 116 abstract void unipush(Completion c); 117 118 /** 119 * Returns {@code true} if completed in any fashion: normally, 120 * exceptionally, or via cancellation. 121 * 122 * @return {@code true} if completed 123 */ 124 bool isDone() { 125 return _isDone; 126 } 127 128 129 /** 130 * Returns {@code true} if this CompletableFuture was cancelled 131 * before it completed normally. 132 * 133 * @return {@code true} if this CompletableFuture was cancelled 134 * before it completed normally 135 */ 136 bool isCancelled() { 137 AltResult ar = altResult; 138 if (ar !is null) { 139 CancellationException ce = cast(CancellationException)ar.ex; 140 if(ce !is null) 141 return true; 142 } 143 return false; 144 } 145 146 /** 147 * Returns {@code true} if this CompletableFuture completed 148 * exceptionally, in any way. Possible causes include 149 * cancellation, explicit invocation of {@code 150 * completeExceptionally}, and abrupt termination of a 151 * CompletionStage action. 152 * 153 * @return {@code true} if this CompletableFuture completed 154 * exceptionally 155 */ 156 bool isCompletedExceptionally() { 157 // Object r = result; 158 // AltResult ar = cast(AltResult)r; 159 // return ar !is null && r !is NIL; 160 161 return altResult !is null && altResult !is NIL; 162 } 163 164 bool isCompletedSuccessfully() { 165 return _isDone && altResult is null; 166 } 167 168 alias isFaulted = isCompletedExceptionally; 169 alias isCompleted = isDone; 170 } 171 172 /** 173 * A {@link Future} that may be explicitly completed (setting its 174 * value and status), and may be used as a {@link CompletionStage}, 175 * supporting dependent functions and actions that trigger upon its 176 * completion. 177 * 178 * <p>When two or more threads attempt to 179 * {@link #complete complete}, 180 * {@link #completeExceptionally completeExceptionally}, or 181 * {@link #cancel cancel} 182 * a CompletableFuture, only one of them succeeds. 183 * 184 * <p>In addition to these and related methods for directly 185 * manipulating status and results, CompletableFuture implements 186 * interface {@link CompletionStage} with the following policies: <ul> 187 * 188 * <li>Actions supplied for dependent completions of 189 * <em>non-async</em> methods may be performed by the thread that 190 * completes the current CompletableFuture, or by any other caller of 191 * a completion method. 192 * 193 * <li>All <em>async</em> methods without an explicit Executor 194 * argument are performed using the {@link ForkJoinPool#commonPool()} 195 * (unless it does not support a parallelism level of at least two, in 196 * which case, a new Thread is created to run each task). This may be 197 * overridden for non-static methods in subclasses by defining method 198 * {@link #defaultExecutor()}. To simplify monitoring, debugging, 199 * and tracking, all generated asynchronous tasks are instances of the 200 * marker interface {@link AsynchronousCompletionTask}. Operations 201 * with time-delays can use adapter methods defined in this class, for 202 * example: {@code supplyAsync(supplier, delayedExecutor(timeout, 203 * timeUnit))}. To support methods with delays and timeouts, this 204 * class maintains at most one daemon thread for triggering and 205 * cancelling actions, not for running them. 206 * 207 * <li>All CompletionStage methods are implemented independently of 208 * other methods, so the behavior of one method is not impacted 209 * by overrides of others in subclasses. 210 * 211 * <li>All CompletionStage methods return CompletableFutures. To 212 * restrict usages to only those methods defined in interface 213 * CompletionStage, use method {@link #minimalCompletionStage}. Or to 214 * ensure only that clients do not themselves modify a future, use 215 * method {@link #copy}. 216 * </ul> 217 * 218 * <p>CompletableFuture also implements {@link Future} with the following 219 * policies: <ul> 220 * 221 * <li>Since (unlike {@link FutureTask}) this class has no direct 222 * control over the computation that causes it to be completed, 223 * cancellation is treated as just another form of exceptional 224 * completion. Method {@link #cancel cancel} has the same effect as 225 * {@code completeExceptionally(new CancellationException())}. Method 226 * {@link #isCompletedExceptionally} can be used to determine if a 227 * CompletableFuture completed in any exceptional fashion. 228 * 229 * <li>In case of exceptional completion with a CompletionException, 230 * methods {@link #get()} and {@link #get(long, TimeUnit)} throw an 231 * {@link ExecutionException} with the same cause as held in the 232 * corresponding CompletionException. To simplify usage in most 233 * contexts, this class also defines methods {@link #join()} and 234 * {@link #getNow} that instead throw the CompletionException directly 235 * in these cases. 236 * </ul> 237 * 238 * <p>Arguments used to pass a completion result (that is, for 239 * parameters of type {@code T}) for methods accepting them may be 240 * null, but passing a null value for any other parameter will result 241 * in a {@link NullPointerException} being thrown. 242 * 243 * <p>Subclasses of this class should normally override the "virtual 244 * constructor" method {@link #newIncompleteFuture}, which establishes 245 * the concrete type returned by CompletionStage methods. For example, 246 * here is a class that substitutes a different default Executor and 247 * disables the {@code obtrude} methods: 248 * 249 * <pre> {@code 250 * class MyCompletableFuture(T) : CompletableFuture!(T) { 251 * final Executor myExecutor = ...; 252 * MyCompletableFuture() { } 253 * <U> CompletableFuture!(U) newIncompleteFuture() { 254 * return new MyCompletableFuture!(U)(); } 255 * Executor defaultExecutor() { 256 * return myExecutor; } 257 * void obtrudeValue(T value) { 258 * throw new UnsupportedOperationException(); } 259 * void obtrudeException(Throwable ex) { 260 * throw new UnsupportedOperationException(); } 261 * }}</pre> 262 * 263 * @author Doug Lea 264 * @param <T> The result type returned by this future's {@code join} 265 * and {@code get} methods 266 */ 267 class CompletableFuture(T) : 268 AbstractCompletableFuture, Future!(T), Promise!(T), CompletionStage!(T) { 269 270 // A CompletableFuture is also a Promise. 271 // https://www.eclipse.org/jetty/javadoc/9.4.8.v20171121/org/eclipse/jetty/util/Promise.Completable.html 272 273 static if(is(T == void)) { 274 275 alias ConsumerT = Action; 276 alias BiConsumerT = Action1!(Throwable); 277 alias FunctionT(V) = Func!(V); 278 279 this(bool completed = false) { 280 if(completed) completeValue!false(); 281 } 282 283 } else { 284 285 alias ConsumerT = Consumer!(T); 286 alias BiConsumerT = Action2!(T, Throwable); 287 alias FunctionT(V) = Func1!(T, V); 288 289 private T result; // Either the result or boxed AltResult 290 291 /** 292 * Creates a new complete CompletableFuture with given encoded result. 293 */ 294 this(T r) { 295 completeValue!false(r); 296 } 297 } 298 299 /** 300 * Creates a new incomplete CompletableFuture. 301 */ 302 this() { 303 } 304 305 this(AltResult r) { 306 completeValue!false(r); 307 } 308 309 /* 310 * Overview: 311 * 312 * A CompletableFuture may have dependent completion actions, 313 * collected in a linked stack. It atomically completes by CASing 314 * a result field, and then pops off and runs those actions. This 315 * applies across normal vs exceptional outcomes, sync vs async 316 * actions, binary triggers, and various forms of completions. 317 * 318 * Non-nullness of field "result" indicates done. It may 319 * be set directly if known to be thread-confined, else via CAS. 320 * An AltResult is used to box null as a result, as well as to 321 * hold exceptions. Using a single field makes completion simple 322 * to detect and trigger. Result encoding and decoding is 323 * straightforward but tedious and adds to the sprawl of trapping 324 * and associating exceptions with targets. Minor simplifications 325 * rely on (static) NIL (to box null results) being the only 326 * AltResult with a null exception field, so we don't usually need 327 * explicit comparisons. Even though some of the generics casts 328 * are unchecked (see SuppressWarnings annotations), they are 329 * placed to be appropriate even if checked. 330 * 331 * Dependent actions are represented by Completion objects linked 332 * as Treiber stacks headed by field "stack". There are Completion 333 * classes for each kind of action, grouped into: 334 * - single-input (UniCompletion), 335 * - two-input (BiCompletion), 336 * - projected (BiCompletions using exactly one of two inputs), 337 * - shared (CoCompletion, used by the second of two sources), 338 * - zero-input source actions, 339 * - Signallers that unblock waiters. 340 * class Completion : ForkJoinTask to enable async execution 341 * (adding no space overhead because we exploit its "tag" methods 342 * to maintain claims). It is also declared as Runnable to allow 343 * usage with arbitrary executors. 344 * 345 * Support for each kind of CompletionStage relies on a separate 346 * class, along with two CompletableFuture methods: 347 * 348 * * A Completion class with name X corresponding to function, 349 * prefaced with "Uni", "Bi", or "Or". Each class contains 350 * fields for source(s), actions, and dependent. They are 351 * boringly similar, differing from others only with respect to 352 * underlying functional forms. We do this so that users don't 353 * encounter layers of adapters in common usages. 354 * 355 * * Boolean CompletableFuture method x(...) (for example 356 * biApply) takes all of the arguments needed to check that an 357 * action is triggerable, and then either runs the action or 358 * arranges its async execution by executing its Completion 359 * argument, if present. The method returns true if known to be 360 * complete. 361 * 362 * * Completion method tryFire(int mode) invokes the associated x 363 * method with its held arguments, and on success cleans up. 364 * The mode argument allows tryFire to be called twice (SYNC, 365 * then ASYNC); the first to screen and trap exceptions while 366 * arranging to execute, and the second when called from a task. 367 * (A few classes are not used async so take slightly different 368 * forms.) The claim() callback suppresses function invocation 369 * if already claimed by another thread. 370 * 371 * * Some classes (for example UniApply) have separate handling 372 * code for when known to be thread-confined ("now" methods) and 373 * for when shared (in tryFire), for efficiency. 374 * 375 * * CompletableFuture method xStage(...) is called from a public 376 * stage method of CompletableFuture f. It screens user 377 * arguments and invokes and/or creates the stage object. If 378 * not async and already triggerable, the action is run 379 * immediately. Otherwise a Completion c is created, and 380 * submitted to the executor if triggerable, or pushed onto f's 381 * stack if not. Completion actions are started via c.tryFire. 382 * We recheck after pushing to a source future's stack to cover 383 * possible races if the source completes while pushing. 384 * Classes with two inputs (for example BiApply) deal with races 385 * across both while pushing actions. The second completion is 386 * a CoCompletion pointing to the first, shared so that at most 387 * one performs the action. The multiple-arity methods allOf 388 * does this pairwise to form trees of completions. Method 389 * anyOf is handled differently from allOf because completion of 390 * any source should trigger a cleanStack of other sources. 391 * Each AnyOf completion can reach others via a shared array. 392 * 393 * Note that the generic type parameters of methods vary according 394 * to whether "this" is a source, dependent, or completion. 395 * 396 * Method postComplete is called upon completion unless the target 397 * is guaranteed not to be observable (i.e., not yet returned or 398 * linked). Multiple threads can call postComplete, which 399 * atomically pops each dependent action, and tries to trigger it 400 * via method tryFire, in NESTED mode. Triggering can propagate 401 * recursively, so NESTED mode returns its completed dependent (if 402 * one exists) for further processing by its caller (see method 403 * postFire). 404 * 405 * Blocking methods get() and join() rely on Signaller Completions 406 * that wake up waiting threads. The mechanics are similar to 407 * Treiber stack wait-nodes used in FutureTask, Phaser, and 408 * SynchronousQueue. See their internal documentation for 409 * algorithmic details. 410 * 411 * Without precautions, CompletableFutures would be prone to 412 * garbage accumulation as chains of Completions build up, each 413 * pointing back to its sources. So we null out fields as soon as 414 * possible. The screening checks needed anyway harmlessly ignore 415 * null arguments that may have been obtained during races with 416 * threads nulling out fields. We also try to unlink non-isLive 417 * (fired or cancelled) Completions from stacks that might 418 * otherwise never be popped: Method cleanStack always unlinks non 419 * isLive completions from the head of stack; others may 420 * occasionally remain if racing with other cancellations or 421 * removals. 422 * 423 * Completion fields need not be declared as final or volatile 424 * because they are only visible to other threads upon safe 425 * publication. 426 */ 427 428 429 // final bool internalComplete(T r) { // CAS from null to r 430 // // return AtomicHelper.compareAndSet(this.result, null, r); 431 // if(AtomicHelper.compareAndSet(_isDone, false, true)) { 432 // this.result = r; 433 // _isNull = false; 434 // return true; 435 // } 436 // return false; 437 // } 438 439 /** Returns true if successfully pushed c onto stack. */ 440 final bool tryPushStack(Completion c) { 441 // info(typeid(c).name); 442 443 Completion h = stack; 444 445 AtomicHelper.store(c.next, h); // CAS piggyback 446 bool r = AtomicHelper.compareAndSet(this.stack, h, c); 447 // Completion x = this.stack; 448 // while(x !is null) { 449 // tracef("%s, Completion: %s", cast(Object*)this, typeid(x).name); 450 // x = x.next; 451 // } 452 return r; 453 } 454 455 /** Unconditionally pushes c onto stack, retrying if necessary. */ 456 final void pushStack(Completion c) { 457 do {} while (!tryPushStack(c)); 458 } 459 460 /* ------------- Encoding and decoding outcomes -------------- */ 461 462 /** Completes with the null value, unless already completed. */ 463 final bool completeNull(bool useCas = true)() { 464 // return AtomicHelper.compareAndSet(this.result, null, NIL); 465 static if(useCas) { 466 if(AtomicHelper.compareAndSet(_isDone, false, true)) { 467 _isNull = true; 468 altResult = NIL; 469 return true; 470 } 471 } else { 472 if(!_isDone) { 473 _isNull = true; 474 altResult = NIL; 475 _isDone = true; 476 return true; 477 } 478 } 479 480 return false; 481 } 482 483 /** Returns the encoding of the given non-exceptional value. */ 484 // final Object encodeValue(T t) { 485 // return (t is null) ? NIL : cast(Object)t; 486 // } 487 488 /** Completes with a non-exceptional result, unless already completed. */ 489 490 static if(is(T == void)) { 491 final bool completeValue(bool useCas = true)() { 492 // return AtomicHelper.compareAndSet(this.result, null, (t is null) ? NIL : cast(Object)t); 493 static if(useCas) { 494 if(AtomicHelper.compareAndSet(_isDone, false, true)) { 495 _isNull = false; 496 return true; 497 } 498 } 499 else { 500 if(!_isDone) { 501 _isDone = true; 502 _isNull = false; 503 return true; 504 } 505 } 506 return false; 507 } 508 } else { 509 final bool completeValue(bool useCas = true)(T t) { 510 // return AtomicHelper.compareAndSet(this.result, null, (t is null) ? NIL : cast(Object)t); 511 static if(useCas) { 512 if(AtomicHelper.compareAndSet(_isDone, false, true)) { 513 this.result = t; 514 _isNull = false; 515 return true; 516 } 517 } 518 else { 519 if(!_isDone) { 520 this.result = t; 521 _isDone = true; 522 _isNull = false; 523 return true; 524 } 525 } 526 return false; 527 } 528 } 529 530 final bool completeValue(bool useCas = true)(AltResult r) { 531 static if(useCas) { 532 if(AtomicHelper.compareAndSet(_isDone, false, true)) { 533 altResult = r; 534 return true; 535 } 536 } else { 537 if(!_isDone) { 538 _isDone = true; 539 altResult = r; 540 return true; 541 } 542 } 543 return false; 544 } 545 546 /** Completes with an exceptional result, unless already completed. */ 547 private final bool completeThrowable(bool useCas = true)(Throwable x) { 548 // return AtomicHelper.compareAndSet(this.result, null, encodeThrowable(x)); 549 static if(useCas) { 550 if(AtomicHelper.compareAndSet(_isDone, false, true)) { 551 altResult = encodeThrowable(x); 552 return true; 553 } 554 } else { 555 if(!_isDone) { 556 _isDone = true; 557 altResult = encodeThrowable(x); 558 return true; 559 } 560 } 561 return false; 562 } 563 564 565 /** 566 * Completes with the given (non-null) exceptional result as a 567 * wrapped CompletionException unless it is one already, unless 568 * already completed. May complete with the given Object r 569 * (which must have been the result of a source future) if it is 570 * equivalent, i.e. if this is a simple propagation of an 571 * existing CompletionException. 572 */ 573 private final bool completeThrowable(bool useCas = true)(Throwable x, AltResult r) { 574 // return AtomicHelper.compareAndSet(this.result, null, encodeThrowable(x, r)); 575 static if(useCas) { 576 if(AtomicHelper.compareAndSet(_isDone, false, true)) { 577 _isDone = true; 578 altResult = encodeThrowable(x, r); 579 return true; 580 } 581 } else { 582 if(!_isDone) { 583 _isDone = true; 584 altResult = encodeThrowable(x, r); 585 return true; 586 } 587 } 588 return false; 589 } 590 591 /** 592 * Returns the encoding of the given arguments: if the exception 593 * is non-null, encodes as AltResult. Otherwise uses the given 594 * value, boxed as NIL if null. 595 */ 596 // Object encodeOutcome(T t, Throwable x) { 597 // return (x is null) ? (t is null) ? NIL : cast(Object)t : encodeThrowable(x); 598 // } 599 600 601 /** 602 * Completes with r or a copy of r, unless already completed. 603 * If exceptional, r is first coerced to a CompletionException. 604 */ 605 // final bool completeRelay(T r, AltResult ar) { 606 // // return AtomicHelper.compareAndSet(this.result, null, encodeRelay(r)); 607 608 // if(AtomicHelper.compareAndSet(_isDone, false, true)) { 609 // // altResult = encodeThrowable(x, r); 610 // // this.result = encodeRelay(r); 611 // implementationMissing(false); 612 // return true; 613 // } 614 // return false; 615 // } 616 617 618 /* ------------- Base Completion classes and operations -------------- */ 619 620 621 /** 622 * Pops and tries to trigger all reachable dependents. Call only 623 * when known to be done. 624 */ 625 protected final override void postComplete() { 626 /* 627 * On each step, variable f holds current dependents to pop 628 * and run. It is extended along only one path at a time, 629 * pushing others to avoid unbounded recursion. 630 */ 631 632 AbstractCompletableFuture f = this; Completion h; 633 while ((h = f.stack) !is null || 634 (f !is this && (h = (f = this).stack) !is null)) { 635 AbstractCompletableFuture d; Completion t; 636 t = h.next; 637 // infof("this: %s, h: %s", cast(Object*)this, typeid(h).name); 638 639 if(AtomicHelper.compareAndSet(f.stack, h, t)) { 640 if (t !is null) { 641 if (f !is this) { 642 pushStack(h); 643 continue; 644 } 645 AtomicHelper.compareAndSet(h.next, t, null); // try to detach 646 } 647 // infof("Completion: %s, this: %s", typeid(h).name, typeid(this).name); 648 d = h.tryFire(NESTED); 649 f = (d is null) ? this : d; 650 } 651 } 652 } 653 654 /** Traverses stack and unlinks one or more dead Completions, if found. */ 655 protected final override void cleanStack() { 656 Completion p = stack; 657 // ensure head of stack live 658 for (bool unlinked = false;;) { 659 if (p is null) 660 return; 661 else if (p.isLive()) { 662 if (unlinked) 663 return; 664 else 665 break; 666 } 667 else if (AtomicHelper.compareAndSet(this.stack, p, (p = p.next))) 668 unlinked = true; 669 else 670 p = stack; 671 } 672 // try to unlink first non-live 673 for (Completion q = p.next; q !is null;) { 674 Completion s = q.next; 675 if (q.isLive()) { 676 p = q; 677 q = s; 678 } else if (AtomicHelper.compareAndSet(p.next, q, s)) 679 break; 680 else 681 q = p.next; 682 } 683 } 684 685 /* ------------- One-input Completions -------------- */ 686 687 /** 688 * Pushes the given completion unless it completes while trying. 689 * Caller should first check that result is null. 690 */ 691 protected final override void unipush(Completion c) { 692 if (c !is null) { 693 while (!tryPushStack(c)) { 694 if (_isDone) { 695 AtomicHelper.store(c.next, null); 696 break; 697 } 698 } 699 if (_isDone) 700 c.tryFire(SYNC); 701 } 702 } 703 704 /** 705 * Post-processing by dependent after successful UniCompletion tryFire. 706 * Tries to clean stack of source a, and then either runs postComplete 707 * or returns this to caller, depending on mode. 708 */ 709 private final CompletableFuture!(T) postFire(AbstractCompletableFuture a, int mode) { 710 711 // infof("this: %s, h: %s", cast(Object*)this, typeid(this.stack).name); 712 713 if (a !is null && a.stack !is null) { 714 bool done = a._isDone; 715 if (!done) 716 a.cleanStack(); 717 if (mode >= 0 && (done || a._isDone)) 718 a.postComplete(); 719 } 720 721 // if(stack is null) 722 // infof("this: %s, mode=%d, result: %s, stack: null", cast(Object*)this, mode, result is null); 723 // else 724 // infof("this: %s, mode=%d, result: %s, stack: %s", cast(Object*)this, mode, result is null, typeid(this.stack).name); 725 726 if (_isDone && stack !is null) { 727 if (mode < 0) 728 return this; 729 else 730 postComplete(); 731 } 732 return null; 733 } 734 735 private CompletableFuture!(V) uniApplyStage(V)(Executor e, FunctionT!(V) f) { 736 if (f is null) throw new NullPointerException(); 737 if (_isDone) { 738 return uniApplyNow!(V)(e, f); 739 } 740 CompletableFuture!(V) d = newIncompleteFuture!(V)(); 741 unipush(new UniApply!(T,V)(e, d, this, f)); 742 return d; 743 } 744 745 746 private CompletableFuture!(V) uniApplyNow(V)(Executor e, FunctionT!(V) f) { 747 748 CompletableFuture!(V) d = newIncompleteFuture!(V)(); 749 750 AltResult ar = this.altResult; 751 if(ar !is null) { 752 Throwable x = ar.ex; 753 if (x !is null) { 754 d.completeThrowable!false(x, ar); 755 return d; 756 } 757 } 758 759 try { 760 if (e !is null) { 761 e.execute(new UniApply!(T, V)(null, d, this, f)); 762 } else { 763 764 static if(is(T == void)) { 765 d.completeValue!false(f()); 766 } else { 767 d.completeValue!false(f(this.result)); 768 } 769 } 770 } catch (Throwable ex) { 771 d.completeThrowable!false(ex); 772 } 773 return d; 774 } 775 776 777 private CompletableFuture!(void) uniAcceptStage(Executor e, 778 ConsumerT f) { 779 if (f is null) throw new NullPointerException(); 780 if (isDone) 781 return uniAcceptNow(e, f); 782 CompletableFuture!(void) d = newIncompleteFuture!(void)(); 783 unipush(new UniAccept!(T)(e, d, this, f)); 784 return d; 785 } 786 787 private CompletableFuture!(void) uniAcceptNow(Executor e, ConsumerT f) { 788 Throwable x; 789 CompletableFuture!(void) d = newIncompleteFuture!(void)(); 790 AltResult ar = altResult; 791 if (ar !is null) { 792 if ((x = ar.ex) !is null) { 793 d.completeValue!false(encodeThrowable(x, ar)); 794 return d; 795 } 796 // r = null; 797 } 798 799 try { 800 if (e !is null) { 801 e.execute(new UniAccept!(T)(null, d, this, f)); 802 } else { 803 804 static if(is(T == void)) { 805 f(); 806 } else { 807 T t = this.result; 808 f(t); 809 } 810 d.completeValue!false(NIL); 811 } 812 } catch (Throwable ex) { 813 d.completeThrowable!false(ex); 814 } 815 return d; 816 } 817 818 819 private CompletableFuture!(void) uniRunStage(Executor e, Runnable f) { 820 if (f is null) throw new NullPointerException(); 821 if (!isDone()) 822 return uniRunNow(altResult, e, f); 823 CompletableFuture!(void) d = newIncompleteFuture!(void)(); 824 unipush(new UniRun!(T)(e, d, this, f)); 825 return d; 826 } 827 828 private CompletableFuture!(void) uniRunNow(AltResult ar, Executor e, Runnable f) { 829 Throwable x; 830 CompletableFuture!(void) d = newIncompleteFuture!(void)(); 831 if (ar !is null && (x = ar.ex) !is null) 832 d.completeValue!false(encodeThrowable(x, ar)); 833 else 834 try { 835 if (e !is null) { 836 e.execute(new UniRun!(T)(null, d, this, f)); 837 } else { 838 f.run(); 839 d.completeNull(); 840 } 841 } catch (Throwable ex) { 842 d.completeThrowable!false(ex); 843 } 844 return d; 845 } 846 847 848 final bool uniWhenComplete(CompletableFuture!(T) r, 849 BiConsumerT f, 850 UniWhenComplete!(T) c) { 851 Throwable x = null; 852 if (!_isDone) { 853 try { 854 if (c !is null && !c.claim()) 855 return false; 856 AltResult ar = r.altResult; 857 if (ar !is null) { 858 x = ar.ex; 859 warning("Need to check"); 860 861 static if(is(T == void)) { 862 f(x); 863 } else { 864 f(T.init, x); 865 } 866 if (x is null) { 867 completeValue(ar); 868 } 869 } else { 870 871 static if(is(T == void)) { 872 f(x); 873 completeValue(); 874 } else { 875 T t = r.result; 876 f(t, x); 877 completeValue(t); 878 } 879 } 880 881 if (x is null) { 882 return true; 883 } 884 } catch (Throwable ex) { 885 if (x is null) 886 x = ex; 887 else if (x !is ex) 888 x.next = ex; 889 } 890 891 completeThrowable(x, r.altResult); 892 } 893 return true; 894 } 895 896 private CompletableFuture!(T) uniWhenCompleteStage( 897 Executor e, BiConsumerT f) { 898 if (f is null) throw new NullPointerException(); 899 CompletableFuture!(T) d = newIncompleteFuture!(T)(); 900 Object r; 901 if (!isDone) 902 unipush(new UniWhenComplete!(T)(e, d, this, f)); 903 else if (e is null) 904 d.uniWhenComplete(this, f, null); 905 else { 906 try { 907 e.execute(new UniWhenComplete!(T)(null, d, this, f)); 908 } catch (Throwable ex) { 909 d.completeThrowable!false(ex); 910 } 911 } 912 return d; 913 } 914 915 final bool uniHandle(S)(CompletableFuture!(S) r, BiFunction!(S, Throwable, T) f, 916 UniHandle!(S, T) c) { 917 Throwable x; 918 if (!isDone()) { 919 try { 920 if (c !is null && !c.claim()) 921 return false; 922 S s; 923 AltResult ar = r.altResult; 924 if (ar !is null) { 925 x = ar.ex; 926 static if(is(S == class) || is(S == interface)) { 927 s = null; 928 } else { 929 warning("to check"); 930 s = S.init; 931 } 932 } else { 933 x = null; 934 s = r.result; 935 } 936 completeValue(f(s, x)); 937 } catch (Throwable ex) { 938 completeThrowable(ex); 939 } 940 } 941 return true; 942 } 943 944 private CompletableFuture!(V) uniHandleStage(V)(Executor e, 945 BiFunction!(T, Throwable, V) f) { 946 947 if (f is null) throw new NullPointerException(); 948 CompletableFuture!(V) d = newIncompleteFuture!(V)(); 949 950 if (!isDone()) 951 unipush(new UniHandle!(T,V)(e, d, this, f)); 952 else if (e is null) 953 d.uniHandle!(T)(this, f, null); 954 else { 955 try { 956 e.execute(new UniHandle!(T,V)(null, d, this, f)); 957 } catch (Throwable ex) { 958 d.completeThrowable!false(ex); 959 } 960 } 961 return d; 962 } 963 964 965 final bool uniExceptionally(CompletableFuture!(T) r, 966 Function!(Throwable, T) f, 967 UniExceptionally!(T) c) { 968 Throwable x; 969 if (!isDone()) { 970 try { 971 AltResult ar = r.altResult; 972 if (ar !is null && (x = ar.ex) !is null) { 973 if (c !is null && !c.claim()) 974 return false; 975 976 static if(is(T == void)) { 977 f(x); 978 completeValue(); 979 } else { 980 completeValue(f(x)); 981 } 982 } else { 983 984 static if(is(T == void)) { 985 completeValue(); 986 } else { 987 completeValue(r.result); 988 } 989 } 990 } catch (Throwable ex) { 991 completeThrowable(ex); 992 } 993 } 994 return true; 995 } 996 997 private CompletableFuture!(T) uniExceptionallyStage(Function!(Throwable, T) f) { 998 if (f is null) throw new NullPointerException(); 999 CompletableFuture!(T) d = newIncompleteFuture!(T)(); 1000 if (!isDone()) 1001 unipush(new UniExceptionally!(T)(d, this, f)); 1002 else 1003 d.uniExceptionally(this, f, null); 1004 return d; 1005 } 1006 1007 private MinimalStage!(T) uniAsMinimalStage() { 1008 if (isDone()) { 1009 if(isFaulted()) 1010 return new MinimalStage!(T)(this.altResult); 1011 else { 1012 1013 static if(is(T == void)) { 1014 return new MinimalStage!(T)(true); 1015 } else { 1016 return new MinimalStage!(T)(this.result); 1017 } 1018 } 1019 } 1020 MinimalStage!(T) d = new MinimalStage!(T)(); 1021 unipush(new UniRelay!(T,T)(d, this)); 1022 return d; 1023 } 1024 1025 private CompletableFuture!(V) uniComposeStage(V)(Executor e, 1026 FunctionT!(CompletionStage!(V)) f) { 1027 1028 if (f is null) throw new NullPointerException(); 1029 CompletableFuture!(V) d = newIncompleteFuture!(V)(); 1030 Throwable x; 1031 if (!isDone()) 1032 unipush(new UniCompose!(T,V)(e, d, this, f)); 1033 else if (e is null) { 1034 T t = this.result; 1035 AltResult ar = this.altResult; 1036 if (ar !is null) { 1037 if ((x = ar.ex) !is null) { 1038 d.completeThrowable!false(x, ar); 1039 return d; 1040 } 1041 warning("To check"); 1042 t = T.init; 1043 } 1044 1045 try { 1046 auto gg = f(t).toCompletableFuture(); 1047 CompletableFuture!(V) g = cast(CompletableFuture!(V))gg; 1048 if(g is null && gg !is null) { 1049 warningf("bad cast"); 1050 } 1051 1052 if (g.isDone()) 1053 d.completeValue!false(g.result); 1054 else { 1055 g.unipush(new UniRelay!(V,V)(d, g)); 1056 } 1057 } catch (Throwable ex) { 1058 d.completeThrowable!false(ex); 1059 } 1060 } 1061 else 1062 try { 1063 e.execute(new UniCompose!(T,V)(null, d, this, f)); 1064 } catch (Throwable ex) { 1065 d.completeThrowable!false(ex); 1066 } 1067 return d; 1068 } 1069 1070 /* ------------- Two-input Completions -------------- */ 1071 1072 /** A Completion for an action with two sources */ 1073 1074 /** 1075 * Pushes completion to this and b unless both done. 1076 * Caller should first check that either result or b.result is null. 1077 */ 1078 final override void bipush(AbstractCompletableFuture b, BiCompletion c) { 1079 if (c !is null) { 1080 while (!_isDone) { 1081 if (tryPushStack(c)) { 1082 if (!b._isDone) 1083 b.unipush(new CoCompletion(c)); 1084 else if (_isDone) 1085 c.tryFire(SYNC); 1086 return; 1087 } 1088 } 1089 b.unipush(c); 1090 } 1091 } 1092 1093 /** Post-processing after successful BiCompletion tryFire. */ 1094 final CompletableFuture!(T) postFire(AbstractCompletableFuture a, 1095 AbstractCompletableFuture b, int mode) { 1096 if (b !is null && b.stack !is null) { // clean second source 1097 if (!b.isDone()) 1098 b.cleanStack(); 1099 if (mode >= 0 && b.isDone()) 1100 b.postComplete(); 1101 } 1102 return postFire(a, mode); 1103 } 1104 1105 1106 final bool biApply(R, S)(CompletableFuture!R r, CompletableFuture!S s, 1107 BiFunction!(R, S, T) f, 1108 BiApply!(R,S,T) c) { 1109 Throwable x; 1110 1111 AltResult ar = r.altResult; 1112 AltResult ars = s.altResult; 1113 R rr = r.result; 1114 S ss = s.result; 1115 1116 tryComplete: if (!isDone) { 1117 if (ar !is null) { 1118 if ((x = ar.ex) !is null) { 1119 completeThrowable(x, ar); 1120 goto tryComplete; 1121 } 1122 warning("To check"); 1123 rr = R.init; 1124 } 1125 if (ars !is null) { 1126 if ((x = ars.ex) !is null) { 1127 completeThrowable(x, ars); 1128 goto tryComplete; 1129 } 1130 warning("To check"); 1131 ss = S.init; 1132 } 1133 1134 try { 1135 if (c !is null && !c.claim()) 1136 return false; 1137 completeValue(f(rr, ss)); 1138 } catch (Throwable ex) { 1139 completeThrowable(ex); 1140 } 1141 } 1142 return true; 1143 } 1144 1145 private CompletableFuture!(V) biApplyStage(U,V)( 1146 Executor e, CompletionStage!(U) o, BiFunction!(T, U, V) f) { 1147 1148 if (f is null) 1149 throw new NullPointerException(); 1150 CompletableFuture!(U) b = cast(CompletableFuture!(U))o.toCompletableFuture(); 1151 if (b is null) 1152 throw new NullPointerException(); 1153 1154 CompletableFuture!(V) d = newIncompleteFuture!(V)(); 1155 1156 if (!isDone() || !b.isDone()) 1157 bipush(b, new BiApply!(T,U,V)(e, d, this, b, f)); 1158 else if (e is null) 1159 d.biApply!(T, U)(this, b, f, null); 1160 else 1161 try { 1162 e.execute(new BiApply!(T,U,V)(null, d, this, b, f)); 1163 } catch (Throwable ex) { 1164 d.completeThrowable!false(ex); 1165 } 1166 return d; 1167 } 1168 1169 1170 final bool biAccept(R, S)(CompletableFuture!R r, CompletableFuture!S s, 1171 BiConsumer!(R,S) f, 1172 BiAccept!(R, S) c) { 1173 Throwable x; 1174 1175 AltResult ar = r.altResult; 1176 AltResult ars = s.altResult; 1177 R rr = r.result; 1178 S ss = s.result; 1179 1180 tryComplete: if (!isDone()) { 1181 if (ar !is null) { 1182 if ((x = ar.ex) !is null) { 1183 completeThrowable(x, ar); 1184 goto tryComplete; 1185 } 1186 warning("To check"); 1187 rr = R.init; 1188 } 1189 if (ars !is null) { 1190 if ((x = ars.ex) !is null) { 1191 completeThrowable(x, ars); 1192 goto tryComplete; 1193 } 1194 warning("To check"); 1195 ss = S.init; 1196 } 1197 1198 try { 1199 if (c !is null && !c.claim()) 1200 return false; 1201 f(rr, ss); 1202 completeNull(); 1203 } catch (Throwable ex) { 1204 completeThrowable(ex); 1205 } 1206 } 1207 return true; 1208 } 1209 1210 private CompletableFuture!(void) biAcceptStage(U)(Executor e, 1211 CompletionStage!(U) o, BiConsumer!(T, U) f) { 1212 1213 if (f is null) 1214 throw new NullPointerException(); 1215 1216 CompletableFuture!(U) b = cast(CompletableFuture!(U))o.toCompletableFuture(); 1217 if (b is null) 1218 throw new NullPointerException(); 1219 1220 CompletableFuture!(void) d = newIncompleteFuture!(void)(); 1221 if (!isDone() || !b.isDone()) 1222 bipush(b, new BiAccept!(T,U)(e, d, this, b, f)); 1223 else if (e is null) 1224 d.biAccept!(T, U)(this, b, f, null); 1225 else 1226 try { 1227 e.execute(new BiAccept!(T,U)(null, d, this, b, f)); 1228 } catch (Throwable ex) { 1229 d.completeThrowable!false(ex); 1230 } 1231 return d; 1232 } 1233 1234 1235 final bool biRun(AbstractCompletableFuture r, AbstractCompletableFuture s, 1236 Runnable f, IUniCompletion c) { 1237 Throwable x; 1238 if (!isDone()) { 1239 AltResult ar = r.altResult; 1240 AltResult ars = s.altResult; 1241 if(ar !is null && (x = ar.ex) !is null){ 1242 completeThrowable(x, ar); 1243 } else if(ars !is null && (x = ars.ex) !is null){ 1244 completeThrowable(x, ars); 1245 } else { 1246 try { 1247 if (c !is null && !c.claim()) 1248 return false; 1249 f.run(); 1250 completeNull(); 1251 } catch (Throwable ex) { 1252 completeThrowable(ex); 1253 } 1254 } 1255 } 1256 return true; 1257 } 1258 1259 private CompletableFuture!(void) biRunStage(U)(Executor e, CompletionStage!U o, 1260 Runnable f) { 1261 CompletableFuture!U b = cast(CompletableFuture!U)o.toCompletableFuture(); 1262 if (f is null || b is null) 1263 throw new NullPointerException(); 1264 CompletableFuture!(void) d = newIncompleteFuture!(void)(); 1265 // if ((r = result) is null || (s = b.result) is null) 1266 if (!isDone() || !b.isDone()) 1267 bipush(b, new BiRun!(T, U)(e, d, this, b, f)); 1268 else if (e is null) 1269 d.biRun(this, b, f, null); 1270 else 1271 try { 1272 e.execute(new BiRun!(T, U)(null, d, this, b, f)); 1273 } catch (Throwable ex) { 1274 d.completeThrowable!false(ex); 1275 } 1276 return d; 1277 } 1278 1279 1280 /* ------------- Projected (Ored) BiCompletions -------------- */ 1281 1282 /** 1283 * Pushes completion to this and b unless either done. 1284 * Caller should first check that result and b.result are both null. 1285 */ 1286 final void orpush(AbstractCompletableFuture b, BiCompletion c) { 1287 if (c !is null) { 1288 while (!tryPushStack(c)) { 1289 if (_isDone) { 1290 AtomicHelper.store(c.next, null); 1291 break; 1292 } 1293 } 1294 if (_isDone) 1295 c.tryFire(SYNC); 1296 else 1297 b.unipush(new CoCompletion(c)); 1298 } 1299 } 1300 1301 private CompletableFuture!(V) orApplyStage(U, V)( // U : T,V 1302 Executor e, CompletionStage!(U) o, FunctionT!(V) f) { 1303 1304 CompletableFuture!(U) b = cast(CompletableFuture!(U))o.toCompletableFuture(); 1305 if (f is null || b is null) 1306 throw new NullPointerException(); 1307 1308 if(this._isDone) 1309 return uniApplyNow!(V)(e, f); 1310 else if(b._isDone) { 1311 return b.uniApplyNow!(V)(e, f); 1312 } 1313 // T r; CompletableFuture!T z; 1314 // if ((r = (z = this).result) !is null || 1315 // (r = (z = b).result) !is null) 1316 // return z.uniApplyNow!(V)(e, f); 1317 1318 CompletableFuture!(V) d = newIncompleteFuture!(V)(); 1319 orpush(b, new OrApply!(T,U,V)(e, d, this, b, f)); 1320 return d; 1321 } 1322 1323 1324 private CompletableFuture!(void) orAcceptStage(U)( // U : T 1325 Executor e, CompletionStage!(U) o, ConsumerT f) { 1326 CompletableFuture!(U) b; 1327 if (f is null || (b = cast(CompletableFuture!(U))o) is null) 1328 throw new NullPointerException(); 1329 1330 CompletableFuture!T z; 1331 if ((z = this).isDone() || (z = b).isDone()) 1332 return z.uniAcceptNow(e, f); 1333 1334 CompletableFuture!(void) d = newIncompleteFuture!(void)(); 1335 orpush(b, new OrAccept!(T,U)(e, d, this, b, f)); 1336 return d; 1337 } 1338 1339 private CompletableFuture!(void) orRunStage(U)(Executor e, CompletionStage!U o, 1340 Runnable f) { 1341 AbstractCompletableFuture b; 1342 if (f is null || (b = o.toCompletableFuture()) is null) 1343 throw new NullPointerException(); 1344 1345 AbstractCompletableFuture z; 1346 if ((z = this).isDone() || (z = b).isDone()) 1347 return z.uniRunNow(e, f); 1348 1349 CompletableFuture!(void) d = newIncompleteFuture!(void)(); 1350 orpush(b, new OrRun(e, d, this, b, f)); 1351 return d; 1352 } 1353 1354 1355 /* ------------- Signallers -------------- */ 1356 1357 /** 1358 * Returns raw result after waiting, or null if interruptible and 1359 * interrupted. 1360 */ 1361 private void waitingGet(bool interruptible) { 1362 Signaller q = null; 1363 bool queued = false; 1364 while (!isDone()) { 1365 if (q is null) { 1366 q = new Signaller(interruptible, Duration.zero, MonoTime.zero); 1367 ForkJoinWorkerThread th = cast(ForkJoinWorkerThread)Thread.getThis(); 1368 if (th !is null) 1369 ForkJoinPool.helpAsyncBlocker(defaultExecutor(), q); 1370 } 1371 else if (!queued) { 1372 queued = tryPushStack(q); 1373 } 1374 else { 1375 try { 1376 ForkJoinPool.managedBlock(q); 1377 } catch (InterruptedException ie) { // currently cannot happen 1378 q.interrupted = true; 1379 } 1380 if (q.interrupted && interruptible) 1381 break; 1382 } 1383 } 1384 if (q !is null && queued) { 1385 q.thread = null; 1386 if (!interruptible && q.interrupted) 1387 ThreadEx.currentThread().interrupt(); 1388 if (!isDone()) 1389 cleanStack(); 1390 } 1391 if (isDone()) 1392 postComplete(); 1393 } 1394 1395 /** 1396 * Returns raw result after waiting, or null if interrupted, or 1397 * throws TimeoutException on timeout. 1398 */ 1399 private void timedGet(Duration timeout) { 1400 if (ThreadEx.interrupted()) 1401 return; 1402 1403 if (timeout <= Duration.zero) 1404 throw new TimeoutException(); 1405 1406 MonoTime d = MonoTime.currTime + timeout; 1407 MonoTime deadline = (d == MonoTime.zero) ? MonoTime(1) : d; // avoid 0 1408 Signaller q = null; 1409 bool queued = false; 1410 while (!isDone()) { // similar to untimed 1411 if (q is null) { 1412 q = new Signaller(true, timeout, deadline); 1413 ForkJoinWorkerThread th = cast(ForkJoinWorkerThread)ThreadEx.currentThread(); 1414 if (th !is null) 1415 ForkJoinPool.helpAsyncBlocker(defaultExecutor(), q); 1416 } 1417 else if (!queued) 1418 queued = tryPushStack(q); 1419 else if (q.remaining <= Duration.zero) 1420 break; 1421 else { 1422 try { 1423 ForkJoinPool.managedBlock(q); 1424 } catch (InterruptedException ie) { 1425 q.interrupted = true; 1426 } 1427 if (q.interrupted) 1428 break; 1429 } 1430 } 1431 1432 bool r = isDone(); 1433 if (q !is null && queued) { 1434 q.thread = null; 1435 if (!r) 1436 cleanStack(); 1437 } 1438 if (r) 1439 postComplete(); 1440 if (r || (q !is null && q.interrupted)) 1441 return ; 1442 1443 throw new TimeoutException(); 1444 } 1445 1446 /* ------------- methods -------------- */ 1447 1448 static if(is(T == void)) { 1449 void get() { 1450 if (!isDone()) waitingGet(true); { 1451 reportGet(this.altResult); 1452 } 1453 } 1454 1455 void getNow() { 1456 if(isDone()) { 1457 reportJoin(this.altResult); 1458 } 1459 } 1460 1461 bool complete() { 1462 bool triggered = completeValue(); 1463 postComplete(); 1464 return triggered; 1465 } 1466 1467 /** 1468 * Callback invoked when the operation completes. 1469 */ 1470 void succeeded() { 1471 complete(); 1472 } 1473 1474 void get(Duration timeout) { 1475 if (!isDone()) timedGet(timeout); 1476 reportGet(this.altResult); 1477 } 1478 1479 void join() { 1480 if (!isDone()) waitingGet(false); 1481 reportJoin(this.altResult); 1482 } 1483 1484 } else { 1485 1486 /** 1487 * Waits if necessary for this future to complete, and then 1488 * returns its result. 1489 * 1490 * @return the result value 1491 * @throws CancellationException if this future was cancelled 1492 * @throws ExecutionException if this future completed exceptionally 1493 * @throws InterruptedException if the current thread was interrupted 1494 * while waiting 1495 */ 1496 T get() { 1497 if (!isDone()) waitingGet(true); 1498 return reportGet!(T)(this.result, this.altResult); 1499 } 1500 1501 /** 1502 * Waits if necessary for at most the given time for this future 1503 * to complete, and then returns its result, if available. 1504 * 1505 * @param timeout the maximum time to wait 1506 * @param unit the time unit of the timeout argument 1507 * @return the result value 1508 * @throws CancellationException if this future was cancelled 1509 * @throws ExecutionException if this future completed exceptionally 1510 * @throws InterruptedException if the current thread was interrupted 1511 * while waiting 1512 * @throws TimeoutException if the wait timed out 1513 */ 1514 T get(Duration timeout) { 1515 if (!isDone()) timedGet(timeout); 1516 return reportGet!T(this.result, this.altResult); 1517 } 1518 1519 1520 /** 1521 * Returns the result value when complete, or throws an 1522 * (unchecked) exception if completed exceptionally. To better 1523 * conform with the use of common functional forms, if a 1524 * computation involved in the completion of this 1525 * CompletableFuture threw an exception, this method throws an 1526 * (unchecked) {@link CompletionException} with the underlying 1527 * exception as its cause. 1528 * 1529 * @return the result value 1530 * @throws CancellationException if the computation was cancelled 1531 * @throws CompletionException if this future completed 1532 * exceptionally or a completion computation threw an exception 1533 */ 1534 T join() { 1535 if (!isDone()) waitingGet(false); 1536 return reportJoin!T(this.result, this.altResult); 1537 } 1538 1539 /** 1540 * Returns the result value (or throws any encountered exception) 1541 * if completed, else returns the given valueIfAbsent. 1542 * 1543 * @param valueIfAbsent the value to return if not completed 1544 * @return the result value, if completed, else the given valueIfAbsent 1545 * @throws CancellationException if the computation was cancelled 1546 * @throws CompletionException if this future completed 1547 * exceptionally or a completion computation threw an exception 1548 */ 1549 T getNow(T valueIfAbsent) { 1550 return (!isDone()) 1551 ? valueIfAbsent 1552 : reportJoin!T(this.result, this.altResult); 1553 } 1554 /** 1555 * If not already completed, sets the value returned by {@link 1556 * #get()} and related methods to the given value. 1557 * 1558 * @param value the result value 1559 * @return {@code true} if this invocation caused this CompletableFuture 1560 * to transition to a completed state, else {@code false} 1561 */ 1562 bool complete(T value) { 1563 bool triggered = completeValue(value); 1564 postComplete(); 1565 return triggered; 1566 } 1567 1568 /** 1569 * Callback invoked when the operation completes. 1570 */ 1571 void succeeded(T result) { 1572 complete(result); 1573 } 1574 } 1575 1576 /** 1577 * If not already completed, causes invocations of {@link #get()} 1578 * and related methods to throw the given exception. 1579 * 1580 * @param ex the exception 1581 * @return {@code true} if this invocation caused this CompletableFuture 1582 * to transition to a completed state, else {@code false} 1583 */ 1584 override bool completeExceptionally(Throwable ex) { 1585 if (ex is null) throw new NullPointerException(); 1586 bool triggered = completeValue(new AltResult(ex)); 1587 postComplete(); 1588 return triggered; 1589 } 1590 1591 /** 1592 * Callback invoked when the operation fails. 1593 */ 1594 void failed(Exception x) { 1595 completeExceptionally(x); 1596 } 1597 1598 1599 /** 1600 */ 1601 CompletableFuture!(U) thenApply(U)(FunctionT!(U) fn) { 1602 return uniApplyStage!(U)(cast(Executor)null, fn); 1603 } 1604 1605 CompletableFuture!(U) thenApplyAsync(U)(FunctionT!(U) fn) { 1606 return uniApplyStage!(U)(defaultExecutor(), fn); 1607 } 1608 1609 CompletableFuture!(U) thenApplyAsync(U)(FunctionT!(U) fn, Executor executor) { 1610 return uniApplyStage!(U)(screenExecutor(executor), fn); 1611 } 1612 1613 CompletableFuture!(void) thenAccept(ConsumerT action) { 1614 return uniAcceptStage(cast(Executor)null, action); 1615 } 1616 1617 CompletableFuture!(void) thenAcceptAsync(ConsumerT action) { 1618 return uniAcceptStage(defaultExecutor(), action); 1619 } 1620 1621 CompletableFuture!(void) thenAcceptAsync(ConsumerT action, 1622 Executor executor) { 1623 return uniAcceptStage(screenExecutor(executor), action); 1624 } 1625 1626 CompletableFuture!(void) thenRun(Runnable action) { 1627 return uniRunStage(null, action); 1628 } 1629 1630 CompletableFuture!(void) thenRunAsync(Runnable action) { 1631 return uniRunStage(defaultExecutor(), action); 1632 } 1633 1634 CompletableFuture!(void) thenRunAsync(Runnable action, 1635 Executor executor) { 1636 return uniRunStage(screenExecutor(executor), action); 1637 } 1638 1639 CompletableFuture!(V) thenCombine(U, V)( 1640 CompletionStage!(U) other, 1641 BiFunction!(T, U, V) fn) { 1642 return biApplyStage!(U, V)(null, other, fn); 1643 } 1644 1645 CompletableFuture!(V) thenCombineAsync(U, V)( 1646 CompletionStage!(U) other, 1647 BiFunction!(T, U, V) fn) { 1648 return biApplyStage!(U, V)(defaultExecutor(), other, fn); 1649 } 1650 1651 CompletableFuture!(V) thenCombineAsync(U, V)( 1652 CompletionStage!(U) other, 1653 BiFunction!(T, U, V) fn, Executor executor) { 1654 return biApplyStage!(U, V)(screenExecutor(executor), other, fn); 1655 } 1656 1657 CompletableFuture!(void) thenAcceptBoth(U)(CompletionStage!(U) other, 1658 BiConsumer!(T, U) action) { 1659 return biAcceptStage!(U)(null, other, action); 1660 } 1661 1662 CompletableFuture!(void) thenAcceptBothAsync(U)( 1663 CompletionStage!(U) other, 1664 BiConsumer!(T, U) action) { 1665 return biAcceptStage!(U)(defaultExecutor(), other, action); 1666 } 1667 1668 CompletableFuture!(void) thenAcceptBothAsync(U)( 1669 CompletionStage!(U) other, 1670 BiConsumer!(T, U) action, Executor executor) { 1671 return biAcceptStage!(U)(screenExecutor(executor), other, action); 1672 } 1673 1674 CompletableFuture!(void) runAfterBoth(U)(CompletionStage!U other, 1675 Action action) { 1676 return biRunStage(null, other, new class Runnable { 1677 void run() { 1678 action(); 1679 } 1680 } ); 1681 } 1682 1683 CompletableFuture!(void) runAfterBoth(U)(CompletionStage!U other, 1684 Runnable action) { 1685 return biRunStage(null, other, action); 1686 } 1687 1688 CompletableFuture!(void) runAfterBothAsync(U)(CompletionStage!U other, 1689 Runnable action) { 1690 return biRunStage(defaultExecutor(), other, action); 1691 } 1692 1693 CompletableFuture!(void) runAfterBothAsync(U)(CompletionStage!U other, 1694 Runnable action, 1695 Executor executor) { 1696 return biRunStage(screenExecutor(executor), other, action); 1697 } 1698 1699 CompletableFuture!(U) applyToEither(U)( 1700 CompletionStage!(T) other, FunctionT!(U) fn) { 1701 return orApplyStage!(T, U)(null, other, fn); 1702 } 1703 1704 CompletableFuture!(U) applyToEitherAsync(U)( 1705 CompletionStage!(T) other, FunctionT!(U) fn) { 1706 return orApplyStage!(T, U)(defaultExecutor(), other, fn); 1707 } 1708 1709 CompletableFuture!(U) applyToEitherAsync(U)( 1710 CompletionStage!(T) other, FunctionT!(U) fn, 1711 Executor executor) { 1712 return orApplyStage!(T, U)(screenExecutor(executor), other, fn); 1713 } 1714 1715 CompletableFuture!(void) acceptEither( 1716 CompletionStage!(T) other, ConsumerT action) { 1717 return orAcceptStage(null, other, action); 1718 } 1719 1720 CompletableFuture!(void) acceptEitherAsync( 1721 CompletionStage!(T) other, ConsumerT action) { 1722 return orAcceptStage(defaultExecutor(), other, action); 1723 } 1724 1725 CompletableFuture!(void) acceptEitherAsync( 1726 CompletionStage!(T) other, ConsumerT action, 1727 Executor executor) { 1728 return orAcceptStage(screenExecutor(executor), other, action); 1729 } 1730 1731 CompletableFuture!(void) runAfterEither(U)(CompletionStag!U other, 1732 Runnable action) { 1733 return orRunStage(null, other, action); 1734 } 1735 1736 CompletableFuture!(void) runAfterEitherAsync(U)(CompletionStage!U other, 1737 Runnable action) { 1738 return orRunStage(defaultExecutor(), other, action); 1739 } 1740 1741 CompletableFuture!(void) runAfterEitherAsync(U)(CompletionStage!U other, 1742 Runnable action, 1743 Executor executor) { 1744 return orRunStage(screenExecutor(executor), other, action); 1745 } 1746 1747 CompletableFuture!(U) thenCompose(U)( 1748 FunctionT!(CompletionStage!(U)) fn) { 1749 return uniComposeStage!(U)(null, fn); 1750 } 1751 1752 CompletableFuture!(U) thenComposeAsync(U)( 1753 FunctionT!(CompletionStage!(U)) fn) { 1754 return uniComposeStage!(U)(defaultExecutor(), fn); 1755 } 1756 1757 CompletableFuture!(U) thenComposeAsync(U)( 1758 FunctionT!(CompletionStage!(U)) fn, 1759 Executor executor) { 1760 return uniComposeStage!(U)(screenExecutor(executor), fn); 1761 } 1762 1763 CompletableFuture!(T) whenComplete(BiConsumerT action) { 1764 return uniWhenCompleteStage(null, action); 1765 } 1766 1767 CompletableFuture!(T) whenCompleteAsync(BiConsumerT action) { 1768 return uniWhenCompleteStage(defaultExecutor(), action); 1769 } 1770 1771 CompletableFuture!(T) whenCompleteAsync( 1772 BiConsumerT action, Executor executor) { 1773 return uniWhenCompleteStage(screenExecutor(executor), action); 1774 } 1775 1776 CompletableFuture!(U) handle(U)(BiFunction!(T, Throwable, U) fn) { 1777 return uniHandleStage!(U)(null, fn); 1778 } 1779 1780 CompletableFuture!(U) handleAsync(U)( 1781 BiFunction!(T, Throwable, U) fn) { 1782 return uniHandleStage!(U)(defaultExecutor(), fn); 1783 } 1784 1785 CompletableFuture!(U) handleAsync(U)( 1786 BiFunction!(T, Throwable, U) fn, Executor executor) { 1787 return uniHandleStage!(U)(screenExecutor(executor), fn); 1788 } 1789 1790 /** 1791 * Returns this CompletableFuture. 1792 * 1793 * @return this CompletableFuture 1794 */ 1795 CompletableFuture!(T) toCompletableFuture() { 1796 return this; 1797 } 1798 1799 // not in interface CompletionStage 1800 1801 /** 1802 * Returns a new CompletableFuture that is completed when this 1803 * CompletableFuture completes, with the result of the given 1804 * function of the exception triggering this CompletableFuture's 1805 * completion when it completes exceptionally; otherwise, if this 1806 * CompletableFuture completes normally, then the returned 1807 * CompletableFuture also completes normally with the same value. 1808 * Note: More flexible versions of this functionality are 1809 * available using methods {@code whenComplete} and {@code handle}. 1810 * 1811 * @param fn the function to use to compute the value of the 1812 * returned CompletableFuture if this CompletableFuture completed 1813 * exceptionally 1814 * @return the new CompletableFuture 1815 */ 1816 CompletableFuture!(T) exceptionally(Function!(Throwable, T) fn) { 1817 return uniExceptionallyStage(fn); 1818 } 1819 1820 1821 /* ------------- Control and status methods -------------- */ 1822 1823 /** 1824 * If not already completed, completes this CompletableFuture with 1825 * a {@link CancellationException}. Dependent CompletableFutures 1826 * that have not already completed will also complete 1827 * exceptionally, with a {@link CompletionException} caused by 1828 * this {@code CancellationException}. 1829 * 1830 * @param mayInterruptIfRunning this value has no effect in this 1831 * implementation because interrupts are not used to control 1832 * processing. 1833 * 1834 * @return {@code true} if this task is now cancelled 1835 */ 1836 bool cancel(bool mayInterruptIfRunning) { 1837 bool cancelled = (!_isDone) && 1838 completeValue(new AltResult(new CancellationException())); 1839 postComplete(); 1840 return cancelled || isCancelled(); 1841 } 1842 1843 /** 1844 * Forcibly causes subsequent invocations of method {@link #get()} 1845 * and related methods to throw the given exception, whether or 1846 * not already completed. This method is designed for use only in 1847 * error recovery actions, and even in such situations may result 1848 * in ongoing dependent completions using established versus 1849 * overwritten outcomes. 1850 * 1851 * @param ex the exception 1852 * @throws NullPointerException if the exception is null 1853 */ 1854 void obtrudeException(Throwable ex) { 1855 if (ex is null) throw new NullPointerException(); 1856 altResult = new AltResult(ex); 1857 _isDone = true; 1858 postComplete(); 1859 } 1860 1861 /** 1862 * Returns the estimated number of CompletableFutures whose 1863 * completions are awaiting completion of this CompletableFuture. 1864 * This method is designed for use in monitoring system state, not 1865 * for synchronization control. 1866 * 1867 * @return the number of dependent CompletableFutures 1868 */ 1869 int getNumberOfDependents() { 1870 int count = 0; 1871 for (Completion p = stack; p !is null; p = p.next) 1872 ++count; 1873 return count; 1874 } 1875 1876 /** 1877 * Returns a string identifying this CompletableFuture, as well as 1878 * its completion state. The state, in brackets, contains the 1879 * String {@code "Completed Normally"} or the String {@code 1880 * "Completed Exceptionally"}, or the String {@code "Not 1881 * completed"} followed by the number of CompletableFutures 1882 * dependent upon its completion, if any. 1883 * 1884 * @return a string identifying this CompletableFuture, as well as its state 1885 */ 1886 override string toString() { 1887 1888 int count = 0; // avoid call to getNumberOfDependents in case disabled 1889 for (Completion p = stack; p !is null; p = p.next) 1890 ++count; 1891 string s; 1892 if(!isDone) { 1893 s = (count == 0) 1894 ? "[Not completed]" 1895 : "[Not completed, " ~ count.to!string ~ " dependents]"; 1896 } else { 1897 s = "[Completed normally]"; 1898 AltResult ar = cast(AltResult)altResult; 1899 if(ar !is null) { 1900 if(ar.ex !is null) { 1901 s = "[Completed exceptionally: " ~ ar.ex.msg ~ "]"; 1902 } 1903 } 1904 } 1905 return super.toString() ~ s; 1906 } 1907 1908 // jdk9 additions 1909 1910 /** 1911 * Returns the default Executor used for async methods that do not 1912 * specify an Executor. This class uses the {@link 1913 * ForkJoinPool#commonPool()} if it supports more than one 1914 * parallel thread, or else an Executor using one thread per async 1915 * task. This method may be overridden in subclasses to return 1916 * an Executor that provides at least one independent thread. 1917 * 1918 * @return the executor 1919 */ 1920 Executor defaultExecutor() { 1921 return ASYNC_POOL; 1922 } 1923 1924 /** 1925 * Returns a new CompletableFuture that is completed normally with 1926 * the same value as this CompletableFuture when it completes 1927 * normally. If this CompletableFuture completes exceptionally, 1928 * then the returned CompletableFuture completes exceptionally 1929 * with a CompletionException with this exception as cause. The 1930 * behavior is equivalent to {@code thenApply(x -> x)}. This 1931 * method may be useful as a form of "defensive copying", to 1932 * prevent clients from completing, while still being able to 1933 * arrange dependent actions. 1934 * 1935 * @return the new CompletableFuture 1936 */ 1937 CompletableFuture!(T) copy() { 1938 return uniCopyStage!(T, T)(this); 1939 } 1940 1941 /** 1942 * Returns a new CompletionStage that is completed normally with 1943 * the same value as this CompletableFuture when it completes 1944 * normally, and cannot be independently completed or otherwise 1945 * used in ways not defined by the methods of interface {@link 1946 * CompletionStage}. If this CompletableFuture completes 1947 * exceptionally, then the returned CompletionStage completes 1948 * exceptionally with a CompletionException with this exception as 1949 * cause. 1950 * 1951 * <p>Unless overridden by a subclass, a new non-minimal 1952 * CompletableFuture with all methods available can be obtained from 1953 * a minimal CompletionStage via {@link #toCompletableFuture()}. 1954 * For example, completion of a minimal stage can be awaited by 1955 * 1956 * <pre> {@code minimalStage.toCompletableFuture().join(); }</pre> 1957 * 1958 * @return the new CompletionStage 1959 */ 1960 CompletionStage!(T) minimalCompletionStage() { 1961 return uniAsMinimalStage(); 1962 } 1963 1964 /** 1965 * Completes this CompletableFuture with the result of 1966 * the given Supplier function invoked from an asynchronous 1967 * task using the given executor. 1968 * 1969 * @param supplier a function returning the value to be used 1970 * to complete this CompletableFuture 1971 * @param executor the executor to use for asynchronous execution 1972 * @return this CompletableFuture 1973 */ 1974 CompletableFuture!(T) completeAsync(Supplier!(T) supplier, 1975 Executor executor) { 1976 if (supplier is null || executor is null) 1977 throw new NullPointerException(); 1978 executor.execute(new AsyncSupply!(T)(this, supplier)); 1979 return this; 1980 } 1981 1982 /** 1983 * Completes this CompletableFuture with the result of the given 1984 * Supplier function invoked from an asynchronous task using the 1985 * default executor. 1986 * 1987 * @param supplier a function returning the value to be used 1988 * to complete this CompletableFuture 1989 * @return this CompletableFuture 1990 */ 1991 CompletableFuture!(T) completeAsync(Supplier!(T) supplier) { 1992 return completeAsync(supplier, defaultExecutor()); 1993 } 1994 1995 1996 1997 static if(is(T == void)) { 1998 1999 void obtrudeValue() { 2000 _isNull = false; 2001 _isDone = true; 2002 postComplete(); 2003 } 2004 2005 CompletableFuture!(T) orTimeout(Duration timeout) { 2006 if (!_isDone) { 2007 ScheduledFuture!(void) f = Delayer.delay(new Timeout(this), timeout); 2008 whenComplete((Throwable ex) { 2009 if (ex is null && f !is null && !f.isDone()) 2010 f.cancel(false); 2011 }); 2012 } 2013 return this; 2014 } 2015 2016 CompletableFuture!(T) completeOnTimeout(Duration timeout) { 2017 if (!_isDone) { 2018 ScheduledFuture!(void) f = 2019 Delayer.delay(new DelayedCompleter!(T)(this), timeout); 2020 2021 whenComplete((Throwable ex) { 2022 if (ex is null && f !is null && !f.isDone()) 2023 f.cancel(false); 2024 }); 2025 } 2026 return this; 2027 } 2028 2029 } else { 2030 /** 2031 * Forcibly sets or resets the value subsequently returned by 2032 * method {@link #get()} and related methods, whether or not 2033 * already completed. This method is designed for use only in 2034 * error recovery actions, and even in such situations may result 2035 * in ongoing dependent completions using established versus 2036 * overwritten outcomes. 2037 * 2038 * @param value the completion value 2039 */ 2040 void obtrudeValue(T value) { 2041 // result = (value is null) ? NIL : value; 2042 2043 static if(is(T == class) || is(T == interface)) { 2044 if(value is null) { 2045 this.altResult = NIL; 2046 this.result = null; 2047 _isNull = true; 2048 } else { 2049 this.result = value; 2050 _isNull = false; 2051 } 2052 } else { 2053 this.result = value; 2054 _isNull = false; 2055 } 2056 _isDone = true; 2057 2058 postComplete(); 2059 } 2060 2061 /** 2062 * Exceptionally completes this CompletableFuture with 2063 * a {@link TimeoutException} if not otherwise completed 2064 * before the given timeout. 2065 * 2066 * @param timeout how long to wait before completing exceptionally 2067 * with a TimeoutException, in units of {@code unit} 2068 * @param unit a {@code TimeUnit} determining how to interpret the 2069 * {@code timeout} parameter 2070 * @return this CompletableFuture 2071 */ 2072 CompletableFuture!(T) orTimeout(Duration timeout) { 2073 if (!_isDone) { 2074 ScheduledFuture!(void) f = Delayer.delay(new Timeout(this), timeout); 2075 whenComplete((T ignore, Throwable ex) { 2076 if (ex is null && f !is null && !f.isDone()) 2077 f.cancel(false); 2078 }); 2079 } 2080 return this; 2081 } 2082 2083 /** 2084 * Completes this CompletableFuture with the given value if not 2085 * otherwise completed before the given timeout. 2086 * 2087 * @param value the value to use upon timeout 2088 * @param timeout how long to wait before completing normally 2089 * with the given value, in units of {@code unit} 2090 * @param unit a {@code TimeUnit} determining how to interpret the 2091 * {@code timeout} parameter 2092 * @return this CompletableFuture 2093 */ 2094 CompletableFuture!(T) completeOnTimeout(T value, Duration timeout) { 2095 if (!_isDone) { 2096 ScheduledFuture!(void) f = 2097 Delayer.delay(new DelayedCompleter!(T)(this, value), timeout); 2098 2099 whenComplete((T ignore, Throwable ex) { 2100 if (ex is null && f !is null && !f.isDone()) 2101 f.cancel(false); 2102 }); 2103 } 2104 return this; 2105 } 2106 2107 } 2108 } 2109 2110 2111 2112 private final class AltResult { // See above 2113 Throwable ex; // null only for NIL 2114 this(Throwable x) { this.ex = x; } 2115 } 2116 2117 2118 2119 abstract class Completion : ForkJoinTask!(void), 2120 Runnable, AsynchronousCompletionTask { 2121 Completion next; // Treiber stack link 2122 2123 /** 2124 * Performs completion action if triggered, returning a 2125 * dependent that may need propagation, if one exists. 2126 * 2127 * @param mode SYNC, ASYNC, or NESTED 2128 */ 2129 abstract AbstractCompletableFuture tryFire(int mode); 2130 2131 /** Returns true if possibly still triggerable. Used by cleanStack. */ 2132 abstract bool isLive(); 2133 2134 final void run() { tryFire(ASYNC); } 2135 final override bool exec() { tryFire(ASYNC); return false; } 2136 // final override void getRawResult() { return null; } 2137 // final override void setRawResult(void v) {} 2138 } 2139 2140 2141 /** 2142 * A marker interface identifying asynchronous tasks produced by 2143 * {@code async} methods. This may be useful for monitoring, 2144 * debugging, and tracking asynchronous activities. 2145 * 2146 */ 2147 private interface AsynchronousCompletionTask { 2148 } 2149 2150 2151 2152 /** Fallback if ForkJoinPool.commonPool() cannot support parallelism */ 2153 private final class ThreadPerTaskExecutor : Executor { 2154 void execute(Runnable r) { new ThreadEx(r).start(); } 2155 } 2156 2157 2158 /** A Completion with a source, dependent, and executor. */ 2159 2160 interface IUniCompletion { 2161 bool claim(); 2162 bool isLive(); 2163 } 2164 2165 abstract class UniCompletion : Completion, IUniCompletion { 2166 Executor executor; // executor to use (null if none) 2167 AbstractCompletableFuture dep; // the dependent to complete 2168 AbstractCompletableFuture src; // source for action 2169 2170 this(Executor executor, AbstractCompletableFuture dep, 2171 AbstractCompletableFuture src) { 2172 this.executor = executor; this.dep = dep; this.src = src; 2173 } 2174 2175 /** 2176 * Returns true if action can be run. Call only when known to 2177 * be triggerable. Uses FJ tag bit to ensure that only one 2178 * thread claims ownership. If async, starts as task -- a 2179 * later call to tryFire will run action. 2180 */ 2181 final bool claim() { 2182 Executor e = executor; 2183 if (compareAndSetForkJoinTaskTag(cast(short)0, cast(short)1)) { 2184 if (e is null) 2185 return true; 2186 executor = null; // disable 2187 e.execute(this); 2188 } 2189 return false; 2190 } 2191 2192 final override bool isLive() { return dep !is null; } 2193 } 2194 2195 2196 final class UniApply(T, V) : UniCompletion { 2197 2198 static if(is(T == void)) { 2199 alias FunctionT(V) = Func!(V); 2200 } else { 2201 alias FunctionT(V) = Func1!(T, V); 2202 } 2203 2204 FunctionT!(V) fn; 2205 this(Executor executor, CompletableFuture!(V) dep, 2206 CompletableFuture!(T) src, 2207 FunctionT!(V) fn) { 2208 super(executor, dep, src); this.fn = fn; 2209 } 2210 2211 final override CompletableFuture!(V) tryFire(int mode) { 2212 CompletableFuture!(V) d = cast(CompletableFuture!(V))dep; 2213 CompletableFuture!(T) a = cast(CompletableFuture!(T))src; 2214 Throwable x; FunctionT!(V) f = fn; 2215 if (d is null || f is null || a is null || !a.isDone()) 2216 return null; 2217 2218 tryComplete: if (!d.isDone()) { 2219 AltResult ar = a.altResult; 2220 if (ar !is null) { 2221 if ((x = ar.ex) !is null) { 2222 d.completeThrowable(x, ar); 2223 goto tryComplete; 2224 } 2225 } 2226 try { 2227 if (mode <= 0 && !claim()) { 2228 return null; 2229 } else { 2230 static if(is(T == void)) { 2231 d.completeValue(f()); 2232 } else { 2233 T t = a.result; 2234 d.completeValue(f(t)); 2235 } 2236 } 2237 } catch (Throwable ex) { 2238 d.completeThrowable(ex); 2239 } 2240 } 2241 dep = null; src = null; fn = null; 2242 2243 return d.postFire(a, mode); 2244 } 2245 } 2246 2247 2248 final class UniAccept(T) : UniCompletion { 2249 2250 static if(is(T == void)) { 2251 alias ConsumerT = Action; 2252 } else { 2253 alias ConsumerT = Consumer!(T); 2254 } 2255 ConsumerT fn; 2256 2257 this(Executor executor, CompletableFuture!(void) dep, 2258 CompletableFuture!(T) src, ConsumerT fn) { 2259 super(executor, dep, src); this.fn = fn; 2260 } 2261 2262 final override CompletableFuture!(void) tryFire(int mode) { 2263 CompletableFuture!(void) d = cast(CompletableFuture!(void))dep; 2264 CompletableFuture!(T) a = cast(CompletableFuture!(T))src; 2265 Throwable x; 2266 ConsumerT f = fn; 2267 if (d is null || f is null 2268 || a is null || !a.isDone()) 2269 return null; 2270 2271 tryComplete: if (!d._isDone) { 2272 AltResult ar = a.altResult; 2273 if (ar !is null) { 2274 if ((x = ar.ex) !is null) { 2275 d.completeThrowable(x, ar); 2276 goto tryComplete; 2277 } 2278 } 2279 2280 try { 2281 if (mode <= 0 && !claim()) { 2282 return null; 2283 } else { 2284 static if(is(T == void)) { 2285 f(); 2286 } else { 2287 T t = a.result; 2288 f(t); 2289 } 2290 d.completeNull(); 2291 } 2292 } catch (Throwable ex) { 2293 d.completeThrowable(ex); 2294 } 2295 } 2296 dep = null; src = null; fn = null; 2297 return d.postFire(a, mode); 2298 } 2299 } 2300 2301 2302 final class UniRun(T) : UniCompletion { 2303 Runnable fn; 2304 2305 this(Executor executor, CompletableFuture!(void) dep, 2306 CompletableFuture!(T) src, Runnable fn) { 2307 super(executor, dep, src); this.fn = fn; 2308 } 2309 2310 final override CompletableFuture!(void) tryFire(int mode) { 2311 CompletableFuture!(void) d = cast(CompletableFuture!(void))dep; 2312 CompletableFuture!(T) a = cast(CompletableFuture!(T))src; 2313 Throwable x; 2314 Runnable f = fn; 2315 if (d is null || f is null || a is null || !a.isDone()) 2316 return null; 2317 2318 if (!d.isDone()) { 2319 AltResult ar = a.altResult; 2320 if(ar !is null && (x = ar.ex) !is null) { 2321 d.completeThrowable(x, ar); 2322 } else { 2323 try { 2324 if (mode <= 0 && !claim()) 2325 return null; 2326 else { 2327 f.run(); 2328 d.completeNull(); 2329 } 2330 } catch (Throwable ex) { 2331 d.completeThrowable(ex); 2332 } 2333 } 2334 } 2335 dep = null; src = null; fn = null; 2336 return d.postFire(a, mode); 2337 } 2338 } 2339 2340 2341 final class UniWhenComplete(T) : UniCompletion { 2342 2343 static if(is(T == void)) { 2344 alias BiConsumerT = Action1!(Throwable); 2345 } else { 2346 alias BiConsumerT = BiConsumer!(T, Throwable); 2347 } 2348 2349 BiConsumerT fn; 2350 this(Executor executor, CompletableFuture!(T) dep, 2351 CompletableFuture!(T) src, 2352 BiConsumerT fn) { 2353 super(executor, dep, src); this.fn = fn; 2354 } 2355 2356 final override CompletableFuture!(T) tryFire(int mode) { 2357 CompletableFuture!(T) d = cast(CompletableFuture!(T))dep; 2358 CompletableFuture!(T) a = cast(CompletableFuture!(T))src; 2359 BiConsumerT f = fn; 2360 2361 if (d is null || f is null 2362 || a is null || !a.isDone() 2363 || !d.uniWhenComplete(a, f, mode > 0 ? null : this)) 2364 return null; 2365 dep = null; src = null; fn = null; 2366 return d.postFire(a, mode); 2367 } 2368 } 2369 2370 2371 2372 final class UniHandle(T, V) : UniCompletion { 2373 BiFunction!(T, Throwable, V) fn; 2374 this(Executor executor, CompletableFuture!(V) dep, 2375 CompletableFuture!(T) src, 2376 BiFunction!(T, Throwable, V) fn) { 2377 super(executor, dep, src); this.fn = fn; 2378 } 2379 2380 final override CompletableFuture!(V) tryFire(int mode) { 2381 CompletableFuture!(V) d = cast(CompletableFuture!(V))dep; 2382 CompletableFuture!(T) a = cast(CompletableFuture!(T))src; 2383 BiFunction!(T, Throwable, V) f = fn; 2384 2385 if (d is null || f is null || a is null || !a.isDone() 2386 || !d.uniHandle!(T)(a, f, mode > 0 ? null : this)) 2387 return null; 2388 dep = null; src = null; fn = null; 2389 return d.postFire(a, mode); 2390 } 2391 } 2392 2393 2394 final class UniExceptionally(T) : UniCompletion { 2395 Function!(Throwable, T) fn; 2396 2397 this(CompletableFuture!(T) dep, CompletableFuture!(T) src, 2398 Function!(Throwable, T) fn) { 2399 super(null, dep, src); this.fn = fn; 2400 } 2401 2402 final override CompletableFuture!(T) tryFire(int mode) { // never ASYNC 2403 // assert mode != ASYNC; 2404 CompletableFuture!(T) d = cast(CompletableFuture!(T))dep; 2405 CompletableFuture!(T) a = cast(CompletableFuture!(T))src; 2406 Function!(Throwable, T) f = fn; 2407 2408 if (d is null || f is null || a is null || !a.isDone() 2409 || !d.uniExceptionally(a, f, this)) 2410 return null; 2411 2412 dep = null; src = null; fn = null; 2413 return d.postFire(a, mode); 2414 } 2415 } 2416 2417 2418 final class UniRelay(U, T : U) : UniCompletion { 2419 this(CompletableFuture!(U) dep, CompletableFuture!(T) src) { 2420 super(null, dep, src); 2421 } 2422 2423 final override CompletableFuture!(U) tryFire(int mode) { 2424 CompletableFuture!(U) d; CompletableFuture!(T) a; 2425 if ((d = cast(CompletableFuture!(U))dep) is null 2426 || (a = cast(CompletableFuture!(T))src) is null || !a.isDone()) 2427 return null; 2428 2429 if (!d.isDone()) { 2430 if(a.isFaulted()) { 2431 d.completeValue(a.altResult); 2432 } else { 2433 static if(is(T == void)) { 2434 d.completeValue(); 2435 } else { 2436 d.completeValue(a.result); 2437 } 2438 } 2439 // d.completeRelay(r); 2440 } 2441 src = null; dep = null; 2442 return d.postFire(a, mode); 2443 } 2444 } 2445 2446 2447 final class UniCompose(T, V) : UniCompletion { 2448 2449 static if(is(T == void)) { 2450 alias FunctionT(V) = Func!(V); 2451 } else { 2452 alias FunctionT(V) = Func1!(T, V); 2453 } 2454 2455 FunctionT!(CompletionStage!(V)) fn; 2456 2457 this(Executor executor, CompletableFuture!(V) dep, 2458 CompletableFuture!(T) src, 2459 FunctionT!(CompletionStage!(V)) fn) { 2460 super(executor, dep, src); this.fn = fn; 2461 } 2462 2463 final override CompletableFuture!(V) tryFire(int mode) { 2464 CompletableFuture!(V) d = cast(CompletableFuture!(V))dep; 2465 CompletableFuture!(T) a = cast(CompletableFuture!(T))src; 2466 FunctionT!(CompletionStage!(V)) f = fn; 2467 Throwable x; 2468 2469 if (d is null || f is null 2470 || a is null || !a.isDone()) 2471 return null; 2472 2473 AltResult ar = a.altResult; 2474 T t = a.result; 2475 2476 tryComplete: if (!d.isDone) { 2477 if (ar !is null) { 2478 if ((x = ar.ex) !is null) { 2479 d.completeThrowable(x, ar); 2480 goto tryComplete; 2481 } 2482 warning("To check"); 2483 t = T.init; 2484 } 2485 2486 try { 2487 if (mode <= 0 && !claim()) 2488 return null; 2489 CompletionStage!V ff = f(t); 2490 CompletableFuture!(V) g = cast(CompletableFuture!(V))(ff.toCompletableFuture()); 2491 2492 if (g.isDone()) { 2493 if(g.isFaulted()) 2494 d.completeValue(g.altResult); 2495 else 2496 d.completeValue(g.result); 2497 } 2498 else { 2499 g.unipush(new UniRelay!(V,V)(d, g)); 2500 if (!d.isDone()) 2501 return null; 2502 } 2503 } catch (Throwable ex) { 2504 d.completeThrowable(ex); 2505 } 2506 } 2507 dep = null; src = null; fn = null; 2508 return d.postFire(a, mode); 2509 } 2510 } 2511 2512 2513 /** A Completion for an action with two sources */ 2514 2515 abstract class BiCompletion : UniCompletion { 2516 AbstractCompletableFuture snd; // second source for action 2517 2518 this(Executor executor, AbstractCompletableFuture dep, 2519 AbstractCompletableFuture src, AbstractCompletableFuture snd) { 2520 super(executor, dep, src); this.snd = snd; 2521 } 2522 } 2523 2524 /** A Completion delegating to a BiCompletion */ 2525 2526 final class CoCompletion : Completion { 2527 BiCompletion base; 2528 2529 this(BiCompletion base) { this.base = base; } 2530 2531 final override AbstractCompletableFuture tryFire(int mode) { 2532 BiCompletion c; AbstractCompletableFuture d; 2533 if ((c = base) is null || (d = c.tryFire(mode)) is null) 2534 return null; 2535 base = null; // detach 2536 return d; 2537 } 2538 2539 final override bool isLive() { 2540 BiCompletion c; 2541 return (c = base) !is null 2542 // && c.isLive() 2543 && c.dep !is null; 2544 } 2545 } 2546 2547 2548 final class BiApply(T, U, V) : BiCompletion { 2549 BiFunction!(T, U, V) fn; 2550 2551 this(Executor executor, CompletableFuture!(V) dep, 2552 CompletableFuture!(T) src, CompletableFuture!(U) snd, 2553 BiFunction!(T, U, V) fn) { 2554 super(executor, dep, src, snd); this.fn = fn; 2555 } 2556 2557 final override CompletableFuture!(V) tryFire(int mode) { 2558 CompletableFuture!(V) d = cast(CompletableFuture!(V))dep; 2559 CompletableFuture!(T) a = cast(CompletableFuture!(T))src; 2560 CompletableFuture!(U) b = cast(CompletableFuture!(U))snd; 2561 BiFunction!(T, U, V) f = fn; 2562 2563 if (d is null || f is null || a is null || !a.isDone() 2564 || b is null || !b.isDone() 2565 || !d.biApply!(T, U)(a, b, f, mode > 0 ? null : this)) 2566 return null; 2567 2568 dep = null; src = null; snd = null; fn = null; 2569 return d.postFire(a, b, mode); 2570 } 2571 } 2572 2573 2574 final class BiAccept(T, U) : BiCompletion { 2575 BiConsumer!(T, U) fn; 2576 2577 this(Executor executor, CompletableFuture!(void) dep, 2578 CompletableFuture!T src, CompletableFuture!U snd, 2579 BiConsumer!(T, U) fn) { 2580 super(executor, dep, src, snd); this.fn = fn; 2581 } 2582 2583 final override CompletableFuture!(void) tryFire(int mode) { 2584 CompletableFuture!(void) d = cast(CompletableFuture!(void))dep; 2585 CompletableFuture!T a = cast(CompletableFuture!(T))src; 2586 CompletableFuture!U b = cast(CompletableFuture!(U))snd; 2587 BiConsumer!(T, U) f = fn; 2588 if (d is null || f is null 2589 || a is null || !a.isDone() 2590 || b is null || !b.isDone() 2591 || !d.biAccept!(T, U)(a, b, f, mode > 0 ? null : this)) 2592 return null; 2593 dep = null; src = null; snd = null; fn = null; 2594 return d.postFire(a, b, mode); 2595 } 2596 } 2597 2598 2599 final class BiRun(T, U) : BiCompletion { 2600 Runnable fn; 2601 2602 this(Executor executor, CompletableFuture!(void) dep, 2603 CompletableFuture!(T) src, CompletableFuture!(U) snd, 2604 Runnable fn) { 2605 super(executor, dep, src, snd); this.fn = fn; 2606 } 2607 2608 final override CompletableFuture!(void) tryFire(int mode) { 2609 CompletableFuture!(void) d = cast(CompletableFuture!(void))dep; 2610 CompletableFuture!(T) a = cast(CompletableFuture!(T))src; 2611 CompletableFuture!(U) b = cast(CompletableFuture!(U))snd; 2612 Runnable f = fn; 2613 if (d is null || f is null 2614 || a is null || !a.isDone() 2615 || b is null || !b.isDone() 2616 || !d.biRun(a, b, f, mode > 0 ? null : this)) 2617 return null; 2618 2619 dep = null; src = null; snd = null; fn = null; 2620 return d.postFire(a, b, mode); 2621 } 2622 } 2623 2624 2625 final class BiRelay : BiCompletion { // for And 2626 this(CompletableFuture!(void) dep, 2627 AbstractCompletableFuture src, AbstractCompletableFuture snd) { 2628 super(null, dep, src, snd); 2629 } 2630 2631 final override CompletableFuture!(void) tryFire(int mode) { 2632 CompletableFuture!(void) d = cast(CompletableFuture!(void))dep; 2633 AbstractCompletableFuture a; 2634 AbstractCompletableFuture b; 2635 Throwable x; 2636 2637 if (d is null || (a = src) is null || !a.isDone() 2638 || (b = snd) is null || !b.isDone()) 2639 return null; 2640 2641 if (!d.isDone()) { 2642 AltResult ar = a.altResult; 2643 AltResult ars = b.altResult; 2644 if(ar !is null && (x = ar.ex) !is null){ 2645 d.completeThrowable(x, ar); 2646 } else if(ars !is null && (x = ars.ex) !is null){ 2647 d.completeThrowable(x, ars); 2648 } 2649 else 2650 d.completeNull(); 2651 } 2652 src = null; snd = null; dep = null; 2653 return d.postFire(a, b, mode); 2654 } 2655 } 2656 2657 2658 final class OrApply(T, U : T, V) : BiCompletion { 2659 2660 static if(is(T == void)) { 2661 alias FunctionT(V) = Func!(V); 2662 } else { 2663 alias FunctionT(V) = Func1!(T, V); 2664 } 2665 2666 FunctionT!(V) fn; 2667 2668 this(Executor executor, CompletableFuture!(V) dep, 2669 CompletableFuture!(T) src, CompletableFuture!(U) snd, 2670 FunctionT!(V) fn) { 2671 super(executor, dep, src, snd); this.fn = fn; 2672 } 2673 2674 final override CompletableFuture!(V) tryFire(int mode) { 2675 CompletableFuture!(V) d = cast(CompletableFuture!(V))dep; 2676 CompletableFuture!(T) a = cast(CompletableFuture!(T))src; 2677 CompletableFuture!(U) b = cast(CompletableFuture!(U))snd; 2678 Throwable x; 2679 FunctionT!(V) f = fn; 2680 2681 if (d is null || f is null || a is null || b is null 2682 || (!a.isDone() && !b.isDone())) 2683 return null; 2684 2685 T t; 2686 AltResult ar; 2687 if(a.isDone()) { 2688 t = a.result; 2689 ar = a.altResult; 2690 } else if(b.isDone()) { 2691 t = b.result; 2692 ar = b.altResult; 2693 } else { 2694 warning("unhandled status"); 2695 } 2696 2697 tryComplete: if (!d.isDone()) { 2698 try { 2699 if (mode <= 0 && !claim()) 2700 return null; 2701 2702 if (ar !is null) { 2703 if ((x = ar.ex) !is null) { 2704 d.completeThrowable(x, ar); 2705 goto tryComplete; 2706 } 2707 } 2708 d.completeValue(f(t)); 2709 } catch (Throwable ex) { 2710 d.completeThrowable(ex); 2711 } 2712 } 2713 dep = null; src = null; snd = null; fn = null; 2714 return d.postFire(a, b, mode); 2715 } 2716 } 2717 2718 2719 final class OrAccept(T, U : T) : BiCompletion { 2720 static if(is(T == void)) { 2721 alias ConsumerT = Action; 2722 } else { 2723 alias ConsumerT = Consumer!(T); 2724 } 2725 2726 ConsumerT fn; 2727 this(Executor executor, CompletableFuture!(void) dep, 2728 CompletableFuture!(T) src, CompletableFuture!(U) snd, 2729 ConsumerT fn) { 2730 super(executor, dep, src, snd); this.fn = fn; 2731 } 2732 2733 final override CompletableFuture!(void) tryFire(int mode) { 2734 CompletableFuture!(void) d = cast(CompletableFuture!(void))dep; 2735 CompletableFuture!(T) a = cast(CompletableFuture!(T))src; 2736 CompletableFuture!(U) b = cast(CompletableFuture!(U))snd; 2737 Object r; Throwable x; 2738 ConsumerT f = fn; 2739 2740 if (d is null || f is null || a is null || b is null 2741 || (!a.isDone() && !b.isDone())) 2742 return null; 2743 2744 static if(is(T == void)) { 2745 AltResult ar; 2746 if(a.isDone()) { 2747 ar = a.altResult; 2748 } else if(b.isDone()) { 2749 ar = b.altResult; 2750 } else { 2751 warning("unhandled status"); 2752 } 2753 } else { 2754 T t; 2755 AltResult ar; 2756 if(a.isDone()) { 2757 t = a.result; 2758 ar = a.altResult; 2759 } else if(b.isDone()) { 2760 t = b.result; 2761 ar = b.altResult; 2762 } else { 2763 warning("unhandled status"); 2764 } 2765 } 2766 tryComplete: if (!d.isDone()) { 2767 try { 2768 if (mode <= 0 && !claim()) 2769 return null; 2770 2771 if (ar !is null) { 2772 if ((x = ar.ex) !is null) { 2773 d.completeThrowable(x, ar); 2774 goto tryComplete; 2775 } 2776 r = null; 2777 } 2778 static if(is(T == void)) { 2779 f(); 2780 } else { 2781 f(t); 2782 } 2783 d.completeNull(); 2784 } catch (Throwable ex) { 2785 d.completeThrowable(ex); 2786 } 2787 } 2788 dep = null; src = null; snd = null; fn = null; 2789 return d.postFire(a, b, mode); 2790 } 2791 } 2792 2793 2794 final class OrRun : BiCompletion { 2795 Runnable fn; 2796 2797 this(Executor executor, CompletableFuture!(void) dep, 2798 AbstractCompletableFuture src, AbstractCompletableFuture snd, 2799 Runnable fn) { 2800 super(executor, dep, src, snd); this.fn = fn; 2801 } 2802 2803 final override CompletableFuture!(void) tryFire(int mode) { 2804 CompletableFuture!(void) d = cast(CompletableFuture!(void))dep; 2805 AbstractCompletableFuture a; 2806 AbstractCompletableFuture b; 2807 Throwable x; Runnable f = fn; 2808 if (d is null || f is null 2809 || (a = src) is null || (b = snd) is null 2810 || (!a.isDone() && !b.isDone())) 2811 return null; 2812 2813 if (!d._isDone) { 2814 try { 2815 if (mode <= 0 && !claim()) 2816 return null; 2817 else { 2818 AltResult ar; 2819 if(a.isDone()) 2820 ar = a.altResult; 2821 else if(b.isDone) 2822 ar = b.altResult; 2823 2824 if (ar !is null && (x = ar.ex) !is null) 2825 d.completeThrowable(x, ar); 2826 else { 2827 f.run(); 2828 d.completeNull(); 2829 } 2830 } 2831 } catch (Throwable ex) { 2832 d.completeThrowable(ex); 2833 } 2834 } 2835 dep = null; src = null; snd = null; fn = null; 2836 return d.postFire(a, b, mode); 2837 } 2838 } 2839 2840 2841 /** Completion for an anyOf input future. */ 2842 2843 static class AnyOf(T) : Completion { 2844 CompletableFuture!(T) dep; 2845 CompletableFuture!(T) src; 2846 CompletableFuture!(T)[] srcs; 2847 2848 this(CompletableFuture!(T) dep, CompletableFuture!(T) src, 2849 CompletableFuture!(T)[] srcs) { 2850 this.dep = dep; this.src = src; this.srcs = srcs; 2851 } 2852 2853 final override CompletableFuture!(T) tryFire(int mode) { 2854 // assert mode != ASYNC; 2855 CompletableFuture!(T) d = dep; 2856 CompletableFuture!(T) a = src; 2857 CompletableFuture!(T)[] as = srcs; 2858 if (d is null || a is null || !a.isDone() 2859 || as is null) 2860 return null; 2861 2862 dep = null; src = null; srcs = null; 2863 bool r=false; 2864 if(a.isFaulted()) 2865 r = d.completeValue(a.altResult); 2866 else 2867 r = d.completeValue(a.result); 2868 if (r) { 2869 foreach (AbstractCompletableFuture b; as) 2870 if (b !is a) 2871 b.cleanStack(); 2872 if (mode < 0) 2873 return d; 2874 else 2875 d.postComplete(); 2876 } 2877 return null; 2878 } 2879 2880 final override bool isLive() { 2881 CompletableFuture!(T) d; 2882 return (d = dep) !is null && !d.isDone(); 2883 } 2884 } 2885 2886 2887 /* ------------- Zero-input Async forms -------------- */ 2888 2889 2890 final class AsyncSupply(T) : ForkJoinTask!(void), Runnable, 2891 AsynchronousCompletionTask { 2892 2893 CompletableFuture!(T) dep; 2894 Supplier!(T) fn; 2895 2896 this(CompletableFuture!(T) dep, Supplier!(T) fn) { 2897 this.dep = dep; this.fn = fn; 2898 } 2899 2900 // final override void getRawResult() { return null; } 2901 // final override void setRawResult(void v) {} 2902 final override bool exec() { run(); return false; } 2903 2904 void run() { 2905 CompletableFuture!(T) d; Supplier!(T) f; 2906 if ((d = dep) !is null && (f = fn) !is null) { 2907 dep = null; fn = null; 2908 if (!d._isDone) { 2909 try { 2910 2911 static if(is(T == void)) { 2912 f(); 2913 d.completeValue(); 2914 } else { 2915 d.completeValue(f()); 2916 } 2917 } catch (Throwable ex) { 2918 d.completeThrowable(ex); 2919 } 2920 } 2921 d.postComplete(); 2922 } 2923 } 2924 } 2925 2926 2927 2928 final class AsyncRun : ForkJoinTask!(void), Runnable, AsynchronousCompletionTask { 2929 CompletableFuture!(void) dep; Action fn; 2930 2931 this(CompletableFuture!(void) dep, Action fn) { 2932 this.dep = dep; this.fn = fn; 2933 } 2934 2935 // final override void getRawResult() { return null; } 2936 // final override void setRawResult(void v) {} 2937 final override bool exec() { run(); return false; } 2938 2939 void run() { 2940 CompletableFuture!(void) d; Action f; 2941 if ((d = dep) !is null && (f = fn) !is null) { 2942 dep = null; fn = null; 2943 if (!d.isDone) { 2944 try { 2945 f(); 2946 d.completeNull(); 2947 } catch (Throwable ex) { 2948 warning(ex); 2949 d.completeThrowable(ex); 2950 } 2951 } 2952 d.postComplete(); 2953 } 2954 } 2955 } 2956 2957 2958 /* ------------- Signallers -------------- */ 2959 2960 /** 2961 * Completion for recording and releasing a waiting thread. This 2962 * class implements ManagedBlocker to avoid starvation when 2963 * blocking actions pile up in ForkJoinPools. 2964 */ 2965 2966 final class Signaller : Completion, ManagedBlocker { 2967 Duration remaining; // remaining wait time if timed 2968 MonoTime deadline; // non-zero if timed 2969 bool interruptible; 2970 bool interrupted; 2971 Thread thread; 2972 2973 this(bool interruptible, Duration remaining, MonoTime deadline) { 2974 this.thread = Thread.getThis(); 2975 this.interruptible = interruptible; 2976 this.remaining = remaining; 2977 this.deadline = deadline; 2978 } 2979 2980 final override AbstractCompletableFuture tryFire(int ignore) { 2981 Thread w = thread; // no need to atomically claim 2982 if (w !is null) { 2983 thread = null; 2984 LockSupport.unpark(w); 2985 } 2986 return null; 2987 } 2988 2989 bool isReleasable() { 2990 if (ThreadEx.interrupted()) 2991 interrupted = true; 2992 return ((interrupted && interruptible) || 2993 (deadline != MonoTime.zero && 2994 (remaining <= Duration.zero || 2995 (remaining = deadline - MonoTime.currTime) <= Duration.zero)) || 2996 thread is null); 2997 } 2998 2999 bool block() { 3000 while (!isReleasable()) { 3001 if (deadline == MonoTime.zero) 3002 LockSupport.park(this); 3003 else 3004 LockSupport.park(this, remaining); 3005 } 3006 return true; 3007 } 3008 final override bool isLive() { return thread !is null; } 3009 } 3010 3011 3012 /** 3013 * Singleton delay scheduler, used only for starting and 3014 * cancelling tasks. 3015 */ 3016 3017 3018 final class Delayer { 3019 static ScheduledFuture!(void) delay(Runnable command, Duration delay) { 3020 return delayer.schedule(command, delay); 3021 } 3022 3023 __gshared ScheduledThreadPoolExecutor delayer; 3024 shared static this() { 3025 delayer = new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory()); 3026 3027 delayer.setRemoveOnCancelPolicy(true); 3028 } 3029 } 3030 3031 final class DaemonThreadFactory : ThreadFactory { 3032 ThreadEx newThread(Runnable runnable) { 3033 ThreadEx t = new ThreadEx(runnable, "CompletableFutureDelayScheduler"); 3034 t.isDaemon = true; 3035 // t.name = "CompletableFutureDelayScheduler"; 3036 return t; 3037 } 3038 } 3039 3040 // Little class-ified lambdas to better support monitoring 3041 3042 final class DelayedExecutor : Executor { 3043 Duration delay; 3044 Executor executor; 3045 3046 this(Duration delay, Executor executor) { 3047 this.delay = delay; this.executor = executor; 3048 } 3049 void execute(Runnable r) { 3050 Delayer.delay(new TaskSubmitter(executor, r), delay); 3051 } 3052 } 3053 3054 /** Action to submit user task */ 3055 final class TaskSubmitter : Runnable { 3056 Executor executor; 3057 Runnable action; 3058 this(Executor executor, Runnable action) { 3059 this.executor = executor; 3060 this.action = action; 3061 } 3062 void run() { executor.execute(action); } 3063 } 3064 3065 /** Action to completeExceptionally on timeout */ 3066 final class Timeout : Runnable { 3067 AbstractCompletableFuture f; 3068 3069 this(AbstractCompletableFuture f) { this.f = f; } 3070 3071 void run() { 3072 if (f !is null && !f.isDone()) 3073 f.completeExceptionally(new TimeoutException()); 3074 } 3075 } 3076 3077 /** Action to complete on timeout */ 3078 final class DelayedCompleter(U) : Runnable if(!is(U == void)) { 3079 CompletableFuture!(U) f; 3080 U u; 3081 3082 this(CompletableFuture!(U) f, U u) { this.f = f; this.u = u; } 3083 3084 void run() { 3085 if (f !is null) { 3086 f.complete(u); 3087 } 3088 } 3089 } 3090 3091 final class DelayedCompleter(U) : Runnable if(is(U == void)) { 3092 CompletableFuture!(U) f; 3093 3094 this(CompletableFuture!(U) f) { this.f = f; } 3095 3096 void run() { 3097 if (f !is null) { 3098 f.complete(); 3099 } 3100 } 3101 } 3102 3103 3104 /** 3105 * A subclass that just throws UOE for most non-CompletionStage methods. 3106 */ 3107 final class MinimalStage(T) : CompletableFuture!(T) { 3108 this() { } 3109 this(AltResult r) { super(r); } 3110 3111 override CompletableFuture!(U) newIncompleteFuture(U)() { 3112 return new MinimalStage!(U)(); } 3113 override T get() { 3114 throw new UnsupportedOperationException(); } 3115 override T get(Duration timeout) { 3116 throw new UnsupportedOperationException(); } 3117 override T join() { 3118 throw new UnsupportedOperationException(); } 3119 override bool completeExceptionally(Throwable ex) { 3120 throw new UnsupportedOperationException(); } 3121 override bool cancel(bool mayInterruptIfRunning) { 3122 throw new UnsupportedOperationException(); } 3123 override void obtrudeException(Throwable ex) { 3124 throw new UnsupportedOperationException(); } 3125 override bool isDone() { 3126 throw new UnsupportedOperationException(); } 3127 override bool isCancelled() { 3128 throw new UnsupportedOperationException(); } 3129 override bool isCompletedExceptionally() { 3130 throw new UnsupportedOperationException(); } 3131 override int getNumberOfDependents() { 3132 throw new UnsupportedOperationException(); } 3133 override CompletableFuture!(T) completeAsync 3134 (Supplier!(T) supplier, Executor executor) { 3135 throw new UnsupportedOperationException(); } 3136 override CompletableFuture!(T) completeAsync 3137 (Supplier!(T) supplier) { 3138 throw new UnsupportedOperationException(); } 3139 override CompletableFuture!(T) orTimeout 3140 (Duration timeout) { 3141 throw new UnsupportedOperationException(); } 3142 3143 override CompletableFuture!(T) toCompletableFuture() { 3144 if (isDone()) { 3145 if(isFaulted()) { 3146 return new CompletableFuture!(T)(this.altResult); 3147 } else { 3148 static if(is(T == void)) { 3149 return new CompletableFuture!(T)(true); 3150 } else { 3151 return new CompletableFuture!(T)(this.result); 3152 } 3153 } 3154 } else { 3155 CompletableFuture!(T) d = new CompletableFuture!T(); 3156 unipush(new UniRelay!(T,T)(d, this)); 3157 return d; 3158 } 3159 } 3160 3161 static if(is(T == void)) { 3162 3163 this(bool r) { super(r); } 3164 3165 override T getNow() { 3166 throw new UnsupportedOperationException(); } 3167 override bool complete() { 3168 throw new UnsupportedOperationException(); } 3169 3170 override void obtrudeValue() { 3171 throw new UnsupportedOperationException(); } 3172 3173 override CompletableFuture!(T) completeOnTimeout(Duration timeout) { 3174 throw new UnsupportedOperationException(); } 3175 3176 } else { 3177 3178 this(T r) { super(r); } 3179 3180 override T getNow(T valueIfAbsent) { 3181 throw new UnsupportedOperationException(); } 3182 override bool complete(T value) { 3183 throw new UnsupportedOperationException(); } 3184 3185 override void obtrudeValue(T value) { 3186 throw new UnsupportedOperationException(); } 3187 3188 override CompletableFuture!(T) completeOnTimeout(T value, Duration timeout) { 3189 throw new UnsupportedOperationException(); } 3190 } 3191 3192 } 3193 3194 3195 3196 /** 3197 * Null-checks user executor argument, and translates uses of 3198 * commonPool to ASYNC_POOL in case parallelism disabled. 3199 */ 3200 Executor screenExecutor(Executor e) { 3201 if (!USE_COMMON_POOL && e is ForkJoinPool.commonPool()) 3202 return ASYNC_POOL; 3203 if (e is null) throw new NullPointerException(); 3204 return e; 3205 } 3206 3207 3208 /** 3209 * Returns a new CompletableFuture that is asynchronously completed 3210 * by a task running in the {@link ForkJoinPool#commonPool()} with 3211 * the value obtained by calling the given Supplier. 3212 * 3213 * @param supplier a function returning the value to be used 3214 * to complete the returned CompletableFuture 3215 * @param <U> the function's return type 3216 * @return the new CompletableFuture 3217 */ 3218 CompletableFuture!(U) supplyAsync(U)(Supplier!(U) supplier) { 3219 return asyncSupplyStage!(U)(ASYNC_POOL, supplier); 3220 } 3221 3222 /** 3223 * Returns a new CompletableFuture that is asynchronously completed 3224 * by a task running in the given executor with the value obtained 3225 * by calling the given Supplier. 3226 * 3227 * @param supplier a function returning the value to be used 3228 * to complete the returned CompletableFuture 3229 * @param executor the executor to use for asynchronous execution 3230 * @param <U> the function's return type 3231 * @return the new CompletableFuture 3232 */ 3233 CompletableFuture!(U) supplyAsync(U)(Supplier!(U) supplier, Executor executor) { 3234 return asyncSupplyStage!(U)(screenExecutor(executor), supplier); 3235 } 3236 3237 /** 3238 * Returns a new CompletableFuture that is asynchronously completed 3239 * by a task running in the {@link ForkJoinPool#commonPool()} after 3240 * it runs the given action. 3241 * 3242 * @param runnable the action to run before completing the 3243 * returned CompletableFuture 3244 * @return the new CompletableFuture 3245 */ 3246 CompletableFuture!(void) runAsync(Runnable runnable) { 3247 if(runnable is null) 3248 throw new NullPointerException(); 3249 return asyncRunStage(ASYNC_POOL, { runnable.run(); }); 3250 } 3251 3252 3253 CompletableFuture!(void) runAsync(Action act) { 3254 if(act is null) 3255 throw new NullPointerException(); 3256 return asyncRunStage(ASYNC_POOL, act); 3257 } 3258 3259 /** 3260 * Returns a new CompletableFuture that is asynchronously completed 3261 * by a task running in the given executor after it runs the given 3262 * action. 3263 * 3264 * @param runnable the action to run before completing the 3265 * returned CompletableFuture 3266 * @param executor the executor to use for asynchronous execution 3267 * @return the new CompletableFuture 3268 */ 3269 CompletableFuture!(void) runAsync(Runnable runnable, Executor executor) { 3270 if(runnable is null) 3271 throw new NullPointerException(); 3272 return asyncRunStage(screenExecutor(executor), { runnable.run(); }); 3273 } 3274 3275 /** 3276 * Returns a new CompletableFuture that is already completed with 3277 * the given value. 3278 * 3279 * @param value the value 3280 * @param <U> the type of the value 3281 * @return the completed CompletableFuture 3282 */ 3283 CompletableFuture!(U) completedFuture(U)(U value) { 3284 static if(is(U == class) || is(U == interface)) { 3285 if(value is null) 3286 return new CompletableFuture!(U)(NIL); 3287 else 3288 return new CompletableFuture!(U)(value); 3289 } else { 3290 return new CompletableFuture!(U)(value); 3291 } 3292 } 3293 3294 /* ------------- Zero-input Async forms -------------- */ 3295 3296 CompletableFuture!(U) asyncSupplyStage(U)(Executor e, 3297 Supplier!(U) f) { 3298 if (f is null) throw new NullPointerException(); 3299 CompletableFuture!(U) d = new CompletableFuture!(U)(); 3300 e.execute(new AsyncSupply!(U)(d, f)); 3301 return d; 3302 } 3303 3304 3305 CompletableFuture!(void) asyncRunStage(Executor e, Action f) { 3306 if (f is null) throw new NullPointerException(); 3307 CompletableFuture!(void) d = new CompletableFuture!(void)(); 3308 e.execute(new AsyncRun(d, f)); 3309 return d; 3310 } 3311 3312 3313 /** 3314 * Returns a new Executor that submits a task to the given base 3315 * executor after the given delay (or no delay if non-positive). 3316 * Each delay commences upon invocation of the returned executor's 3317 * {@code execute} method. 3318 * 3319 * @param delay how long to delay, in units of {@code unit} 3320 * @param unit a {@code TimeUnit} determining how to interpret the 3321 * {@code delay} parameter 3322 * @param executor the base executor 3323 * @return the new delayed executor 3324 */ 3325 Executor delayedExecutor(Duration delay, Executor executor) { 3326 if (executor is null) 3327 throw new NullPointerException(); 3328 return new DelayedExecutor(delay, executor); 3329 } 3330 3331 /** 3332 * Returns a new Executor that submits a task to the default 3333 * executor after the given delay (or no delay if non-positive). 3334 * Each delay commences upon invocation of the returned executor's 3335 * {@code execute} method. 3336 * 3337 * @param delay how long to delay, in units of {@code unit} 3338 * @param unit a {@code TimeUnit} determining how to interpret the 3339 * {@code delay} parameter 3340 * @return the new delayed executor 3341 */ 3342 Executor delayedExecutor(Duration delay) { 3343 return new DelayedExecutor(delay, ASYNC_POOL); 3344 } 3345 3346 /** 3347 * Returns a new CompletionStage that is already completed with 3348 * the given value and supports only those methods in 3349 * interface {@link CompletionStage}. 3350 * 3351 * @param value the value 3352 * @param <U> the type of the value 3353 * @return the completed CompletionStage 3354 */ 3355 CompletionStage!(U) completedStage(U)(U value) { 3356 return new MinimalStage!(U)((value is null) ? NIL : value); 3357 } 3358 3359 /** 3360 * Returns a new CompletableFuture that is already completed 3361 * exceptionally with the given exception. 3362 * 3363 * @param ex the exception 3364 * @param <U> the type of the value 3365 * @return the exceptionally completed CompletableFuture 3366 */ 3367 CompletableFuture!(U) failedFuture(U)(Throwable ex) { 3368 if (ex is null) throw new NullPointerException(); 3369 return new CompletableFuture!(U)(new AltResult(ex)); 3370 } 3371 3372 /** 3373 * Returns a new CompletionStage that is already completed 3374 * exceptionally with the given exception and supports only those 3375 * methods in interface {@link CompletionStage}. 3376 * 3377 * @param ex the exception 3378 * @param <U> the type of the value 3379 * @return the exceptionally completed CompletionStage 3380 */ 3381 CompletionStage!(U) failedStage(U)(Throwable ex) { 3382 if (ex is null) throw new NullPointerException(); 3383 return new MinimalStage!(U)(new AltResult(ex)); 3384 } 3385 3386 3387 /* ------------- Arbitrary-arity constructions -------------- */ 3388 3389 /** 3390 * Returns a new CompletableFuture that is completed when all of 3391 * the given CompletableFutures complete. If any of the given 3392 * CompletableFutures complete exceptionally, then the returned 3393 * CompletableFuture also does so, with a CompletionException 3394 * holding this exception as its cause. Otherwise, the results, 3395 * if any, of the given CompletableFutures are not reflected in 3396 * the returned CompletableFuture, but may be obtained by 3397 * inspecting them individually. If no CompletableFutures are 3398 * provided, returns a CompletableFuture completed with the value 3399 * {@code null}. 3400 * 3401 * <p>Among the applications of this method is to await completion 3402 * of a set of independent CompletableFutures before continuing a 3403 * program, as in: {@code CompletableFuture.allOf(c1, c2, 3404 * c3).join();}. 3405 * 3406 * @param cfs the CompletableFutures 3407 * @return a new CompletableFuture that is completed when all of the 3408 * given CompletableFutures complete 3409 * @throws NullPointerException if the array or any of its elements are 3410 * {@code null} 3411 */ 3412 CompletableFuture!(void) allOf(T)(CompletableFuture!T[] cfs...) { 3413 return andTree!(T)(cfs, 0, cast(int)cfs.length - 1); 3414 } 3415 3416 /** 3417 * Returns a new CompletableFuture that is completed when any of 3418 * the given CompletableFutures complete, with the same result. 3419 * Otherwise, if it completed exceptionally, the returned 3420 * CompletableFuture also does so, with a CompletionException 3421 * holding this exception as its cause. If no CompletableFutures 3422 * are provided, returns an incomplete CompletableFuture. 3423 * 3424 * @param cfs the CompletableFutures 3425 * @return a new CompletableFuture that is completed with the 3426 * result or exception of any of the given CompletableFutures when 3427 * one completes 3428 * @throws NullPointerException if the array or any of its elements are 3429 * {@code null} 3430 */ 3431 static CompletableFuture!(U) anyOf(U)(CompletableFuture!(U)[] cfs...) { 3432 int n; 3433 if ((n = cast(int)cfs.length) <= 1) 3434 return (n == 0) 3435 ? new CompletableFuture!(U)() 3436 : uniCopyStage!(U, U)(cfs[0]); 3437 3438 foreach(CompletableFuture!(U) cf; cfs) { 3439 if (cf.isDone()) { 3440 if(cf.isFaulted()) 3441 return new CompletableFuture!(U)(encodeRelay(cf.altResult)); 3442 else 3443 return new CompletableFuture!(U)(cf.result); 3444 } 3445 } 3446 3447 cfs = cfs.dup; 3448 CompletableFuture!(U) d = new CompletableFuture!U(); 3449 foreach (CompletableFuture!(U) cf; cfs) 3450 cf.unipush(new AnyOf!(U)(d, cf, cfs)); 3451 // If d was completed while we were adding completions, we should 3452 // clean the stack of any sources that may have had completions 3453 // pushed on their stack after d was completed. 3454 if (d._isDone) 3455 for (size_t i = 0, len = cfs.length; i < len; i++) 3456 if (cfs[i]._isDone) 3457 for (i++; i < len; i++) 3458 if (!cfs[i]._isDone) 3459 cfs[i].cleanStack(); 3460 return d; 3461 } 3462 3463 3464 /** Recursively constructs a tree of completions. */ 3465 private CompletableFuture!(void) andTree(T)(CompletableFuture!T[] cfs, 3466 int lo, int hi) { 3467 CompletableFuture!(void) d = new CompletableFuture!(void)(); 3468 if (lo > hi) // empty 3469 d.completeNull!false(); 3470 else { 3471 AbstractCompletableFuture a, b; 3472 Object r, s, z; 3473 Throwable x; 3474 3475 int mid = (lo + hi) >>> 1; 3476 if ((a = (lo == mid ? cfs[lo] : 3477 andTree!(T)(cfs, lo, mid))) is null || 3478 (b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] : 3479 andTree!(T)(cfs, mid+1, hi))) is null) 3480 throw new NullPointerException(); 3481 3482 if (!a.isDone() || !b.isDone()) 3483 a.bipush(b, new BiRelay(d, a, b)); 3484 else { 3485 AltResult ar = a.altResult; 3486 AltResult ars = b.altResult; 3487 if(ar !is null && (x = ar.ex) !is null){ 3488 d.completeThrowable!false(x, ar); 3489 } else if(ars !is null && (x = ars.ex) !is null){ 3490 d.completeThrowable!false(x, ars); 3491 } else { 3492 d.completeNull!false(); 3493 } 3494 } 3495 } 3496 return d; 3497 } 3498 3499 /** 3500 * Returns the encoding of the given (non-null) exception as a 3501 * wrapped CompletionException unless it is one already. 3502 */ 3503 private AltResult encodeThrowable(Throwable x) { 3504 CompletionException ex = cast(CompletionException)x; 3505 if(ex is null) { 3506 return new AltResult(new CompletionException(x)); 3507 } else { 3508 return new AltResult(x); 3509 } 3510 } 3511 3512 /** 3513 * Returns the encoding of the given (non-null) exception as a 3514 * wrapped CompletionException unless it is one already. May 3515 * return the given Object r (which must have been the result of a 3516 * source future) if it is equivalent, i.e. if this is a simple 3517 * relay of an existing CompletionException. 3518 */ 3519 private AltResult encodeThrowable(Throwable x, AltResult r) { 3520 CompletionException cex = cast(CompletionException)x; 3521 if (cex is null) 3522 x = new CompletionException(x); 3523 else { 3524 if (r !is null && x is r.ex) 3525 return r; 3526 } 3527 return new AltResult(x); 3528 } 3529 3530 private CompletableFuture!(U) uniCopyStage(U, T : U)(CompletableFuture!(T) src) { 3531 CompletableFuture!(U) d = newIncompleteFuture!(U)();// src.newIncompleteFuture(); 3532 if (src._isDone) { 3533 if(src.isFaulted()) 3534 d.completeValue!false(src.altResult); 3535 else { 3536 static if(is(T == void)) { 3537 d.completeValue!false(); 3538 } else { 3539 d.completeValue!false(src.result); 3540 } 3541 } 3542 } 3543 else 3544 src.unipush(new UniRelay!(U, T)(d, src)); 3545 return d; 3546 } 3547 3548 /** 3549 * Returns the encoding of a copied outcome; if exceptional, 3550 * rewraps as a CompletionException, else returns argument. 3551 */ 3552 private AltResult encodeRelay(AltResult ar) { 3553 Throwable x; 3554 3555 if (ar !is null && (x = ar.ex) !is null) { 3556 CompletionException cex = cast(CompletionException)x; 3557 if(cex is null) { 3558 ar = new AltResult(new CompletionException(x)); 3559 } 3560 } 3561 return ar; 3562 } 3563 3564 /** 3565 * Reports result using Future.get conventions. 3566 */ 3567 private V reportGet(V)(V r, AltResult ar) if(!is(V == void)) { 3568 if (ar is null) // by convention below, null means interrupted 3569 throw new InterruptedException(); 3570 3571 if (ar !is null) { 3572 Throwable x, cause; 3573 if ((x = ar.ex) is null){ 3574 warning("to check"); 3575 return V.init; 3576 } 3577 CancellationException cex = cast(CancellationException)x; 3578 if (cex !is null) 3579 throw cex; 3580 CompletionException cex2 = cast(CompletionException)x; 3581 if (cex2 !is null && 3582 (cause = x.next) !is null) 3583 x = cause; 3584 throw new ExecutionException(x); 3585 } 3586 return r; 3587 } 3588 3589 private void reportGet(AltResult ar) { 3590 if (ar is null) // by convention below, null means interrupted 3591 throw new InterruptedException(); 3592 3593 if (ar !is null) { 3594 Throwable x, cause; 3595 if ((x = ar.ex) is null){ 3596 warning("to check"); 3597 return; 3598 } 3599 CancellationException cex = cast(CancellationException)x; 3600 if (cex !is null) 3601 throw cex; 3602 CompletionException cex2 = cast(CompletionException)x; 3603 if (cex2 !is null && 3604 (cause = x.next) !is null) 3605 x = cause; 3606 throw new ExecutionException(x); 3607 } 3608 } 3609 3610 3611 /** 3612 * Decodes outcome to return result or throw unchecked exception. 3613 */ 3614 private V reportJoin(V)(V r, AltResult ar) if(!is(V == void)) { 3615 if (ar !is null) { 3616 Throwable x; 3617 if ((x = ar.ex) is null) { 3618 warning("to check"); 3619 return V.init; 3620 } 3621 CancellationException cex = cast(CancellationException)x; 3622 if (cex !is null) 3623 throw cex; 3624 CompletionException cex2 = cast(CompletionException)x; 3625 if (cex2 !is null) 3626 throw cex2; 3627 throw new CompletionException(x); 3628 } 3629 return r; 3630 } 3631 3632 3633 private void reportJoin(AltResult ar) { 3634 if (ar !is null) { 3635 Throwable x; 3636 if ((x = ar.ex) is null) { 3637 warning("need to check"); 3638 return; 3639 } 3640 CancellationException cex = cast(CancellationException)x; 3641 if (cex !is null) 3642 throw cex; 3643 CompletionException cex2 = cast(CompletionException)x; 3644 if (cex2 !is null) 3645 throw cex2; 3646 throw new CompletionException(x); 3647 } 3648 } 3649 3650 /** 3651 * Returns a new incomplete CompletableFuture of the type to be 3652 * returned by a CompletionStage method. Subclasses should 3653 * normally override this method to return an instance of the same 3654 * class as this CompletableFuture. The default implementation 3655 * returns an instance of class CompletableFuture. 3656 * 3657 * @param <U> the type of the value 3658 * @return a new CompletableFuture 3659 */ 3660 static CompletableFuture!(U) newIncompleteFuture(U)() { 3661 return new CompletableFuture!(U)(); 3662 }