1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the domain, as explained at 33 * http://creativecommons.org/publicdomain/zero/1.0/ 34 */ 35 36 module hunt.concurrency.ForkJoinTask; 37 38 import hunt.concurrency.atomic; 39 import hunt.concurrency.Future; 40 import hunt.concurrency.thread; 41 42 import hunt.concurrency.ForkJoinPool; 43 import hunt.concurrency.ForkJoinTaskHelper; 44 45 import hunt.collection.Collection; 46 import hunt.logging.ConsoleLogger; 47 import hunt.Exceptions; 48 import hunt.util.Common; 49 import hunt.util.DateTime; 50 51 import core.time; 52 import core.sync.condition; 53 import core.sync.mutex; 54 import core.thread; 55 56 57 /** 58 * Abstract base class for tasks that run within a {@link ForkJoinPool}. 59 * A {@code ForkJoinTask} is a thread-like entity that is much 60 * lighter weight than a normal thread. Huge numbers of tasks and 61 * subtasks may be hosted by a small number of actual threads in a 62 * ForkJoinPool, at the price of some usage limitations. 63 * 64 * <p>A "main" {@code ForkJoinTask} begins execution when it is 65 * explicitly submitted to a {@link ForkJoinPool}, or, if not already 66 * engaged in a ForkJoin computation, commenced in the {@link 67 * ForkJoinPool#commonPool()} via {@link #fork}, {@link #invoke}, or 68 * related methods. Once started, it will usually in turn start other 69 * subtasks. As indicated by the name of this class, many programs 70 * using {@code ForkJoinTask} employ only methods {@link #fork} and 71 * {@link #join}, or derivatives such as {@link 72 * #invokeAll(ForkJoinTask...) invokeAll}. However, this class also 73 * provides a number of other methods that can come into play in 74 * advanced usages, as well as extension mechanics that allow support 75 * of new forms of fork/join processing. 76 * 77 * <p>A {@code ForkJoinTask} is a lightweight form of {@link Future}. 78 * The efficiency of {@code ForkJoinTask}s stems from a set of 79 * restrictions (that are only partially statically enforceable) 80 * reflecting their main use as computational tasks calculating pure 81 * functions or operating on purely isolated objects. The primary 82 * coordination mechanisms are {@link #fork}, that arranges 83 * asynchronous execution, and {@link #join}, that doesn't proceed 84 * until the task's result has been computed. Computations should 85 * ideally avoid {@code synchronized} methods or blocks, and should 86 * minimize other blocking synchronization apart from joining other 87 * tasks or using synchronizers such as Phasers that are advertised to 88 * cooperate with fork/join scheduling. Subdividable tasks should also 89 * not perform blocking I/O, and should ideally access variables that 90 * are completely independent of those accessed by other running 91 * tasks. These guidelines are loosely enforced by not permitting 92 * checked exceptions such as {@code IOExceptions} to be 93 * thrown. However, computations may still encounter unchecked 94 * exceptions, that are rethrown to callers attempting to join 95 * them. These exceptions may additionally include {@link 96 * RejectedExecutionException} stemming from internal resource 97 * exhaustion, such as failure to allocate internal task 98 * queues. Rethrown exceptions behave in the same way as regular 99 * exceptions, but, when possible, contain stack traces (as displayed 100 * for example using {@code ex.printStackTrace()}) of both the thread 101 * that initiated the computation as well as the thread actually 102 * encountering the exception; minimally only the latter. 103 * 104 * <p>It is possible to define and use ForkJoinTasks that may block, 105 * but doing so requires three further considerations: (1) Completion 106 * of few if any <em>other</em> tasks should be dependent on a task 107 * that blocks on external synchronization or I/O. Event-style async 108 * tasks that are never joined (for example, those subclassing {@link 109 * CountedCompleter}) often fall into this category. (2) To minimize 110 * resource impact, tasks should be small; ideally performing only the 111 * (possibly) blocking action. (3) Unless the {@link 112 * ForkJoinPool.ManagedBlocker} API is used, or the number of possibly 113 * blocked tasks is known to be less than the pool's {@link 114 * ForkJoinPool#getParallelism} level, the pool cannot guarantee that 115 * enough threads will be available to ensure progress or good 116 * performance. 117 * 118 * <p>The primary method for awaiting completion and extracting 119 * results of a task is {@link #join}, but there are several variants: 120 * The {@link Future#get} methods support interruptible and/or timed 121 * waits for completion and report results using {@code Future} 122 * conventions. Method {@link #invoke} is semantically 123 * equivalent to {@code fork(); join()} but always attempts to begin 124 * execution in the current thread. The "<em>quiet</em>" forms of 125 * these methods do not extract results or report exceptions. These 126 * may be useful when a set of tasks are being executed, and you need 127 * to delay processing of results or exceptions until all complete. 128 * Method {@code invokeAll} (available in multiple versions) 129 * performs the most common form of parallel invocation: forking a set 130 * of tasks and joining them all. 131 * 132 * <p>In the most typical usages, a fork-join pair act like a call 133 * (fork) and return (join) from a parallel recursive function. As is 134 * the case with other forms of recursive calls, returns (joins) 135 * should be performed innermost-first. For example, {@code a.fork(); 136 * b.fork(); b.join(); a.join();} is likely to be substantially more 137 * efficient than joining {@code a} before {@code b}. 138 * 139 * <p>The execution status of tasks may be queried at several levels 140 * of detail: {@link #isDone} is true if a task completed in any way 141 * (including the case where a task was cancelled without executing); 142 * {@link #isCompletedNormally} is true if a task completed without 143 * cancellation or encountering an exception; {@link #isCancelled} is 144 * true if the task was cancelled (in which case {@link #getException} 145 * returns a {@link CancellationException}); and 146 * {@link #isCompletedAbnormally} is true if a task was either 147 * cancelled or encountered an exception, in which case {@link 148 * #getException} will return either the encountered exception or 149 * {@link CancellationException}. 150 * 151 * <p>The ForkJoinTask class is not usually directly subclassed. 152 * Instead, you subclass one of the abstract classes that support a 153 * particular style of fork/join processing, typically {@link 154 * RecursiveAction} for most computations that do not return results, 155 * {@link RecursiveTask} for those that do, and {@link 156 * CountedCompleter} for those in which completed actions trigger 157 * other actions. Normally, a concrete ForkJoinTask subclass declares 158 * fields comprising its parameters, established in a constructor, and 159 * then defines a {@code compute} method that somehow uses the control 160 * methods supplied by this base class. 161 * 162 * <p>Method {@link #join} and its variants are appropriate for use 163 * only when completion dependencies are acyclic; that is, the 164 * parallel computation can be described as a directed acyclic graph 165 * (DAG). Otherwise, executions may encounter a form of deadlock as 166 * tasks cyclically wait for each other. However, this framework 167 * supports other methods and techniques (for example the use of 168 * {@link Phaser}, {@link #helpQuiesce}, and {@link #complete}) that 169 * may be of use in constructing custom subclasses for problems that 170 * are not statically structured as DAGs. To support such usages, a 171 * ForkJoinTask may be atomically <em>tagged</em> with a {@code short} 172 * value using {@link #setForkJoinTaskTag} or {@link 173 * #compareAndSetForkJoinTaskTag} and checked using {@link 174 * #getForkJoinTaskTag}. The ForkJoinTask implementation does not use 175 * these {@code protected} methods or tags for any purpose, but they 176 * may be of use in the construction of specialized subclasses. For 177 * example, parallel graph traversals can use the supplied methods to 178 * avoid revisiting nodes/tasks that have already been processed. 179 * (Method names for tagging are bulky in part to encourage definition 180 * of methods that reflect their usage patterns.) 181 * 182 * <p>Most base support methods are {@code final}, to prevent 183 * overriding of implementations that are intrinsically tied to the 184 * underlying lightweight task scheduling framework. Developers 185 * creating new basic styles of fork/join processing should minimally 186 * implement {@code protected} methods {@link #exec}, {@link 187 * #setRawResult}, and {@link #getRawResult}, while also introducing 188 * an abstract computational method that can be implemented in its 189 * subclasses, possibly relying on other {@code protected} methods 190 * provided by this class. 191 * 192 * <p>ForkJoinTasks should perform relatively small amounts of 193 * computation. Large tasks should be split into smaller subtasks, 194 * usually via recursive decomposition. As a very rough rule of thumb, 195 * a task should perform more than 100 and less than 10000 basic 196 * computational steps, and should avoid indefinite looping. If tasks 197 * are too big, then parallelism cannot improve throughput. If too 198 * small, then memory and internal task maintenance overhead may 199 * overwhelm processing. 200 * 201 * <p>This class provides {@code adapt} methods for {@link Runnable} 202 * and {@link Callable}, that may be of use when mixing execution of 203 * {@code ForkJoinTasks} with other kinds of tasks. When all tasks are 204 * of this form, consider using a pool constructed in <em>asyncMode</em>. 205 * 206 * <p>ForkJoinTasks are {@code Serializable}, which enables them to be 207 * used in extensions such as remote execution frameworks. It is 208 * sensible to serialize tasks only before or after, but not during, 209 * execution. Serialization is not relied on during execution itself. 210 * 211 * @author Doug Lea 212 */ 213 abstract class ForkJoinTask(V) : Future!(V), IForkJoinTask { 214 215 /* 216 * See the internal documentation of class ForkJoinPool for a 217 * general implementation overview. ForkJoinTasks are mainly 218 * responsible for maintaining their "status" field amidst relays 219 * to methods in ForkJoinWorkerThread and ForkJoinPool. 220 * 221 * The methods of this class are more-or-less layered into 222 * (1) basic status maintenance 223 * (2) execution and awaiting completion 224 * (3) user-level methods that additionally report results. 225 * This is sometimes hard to see because this file orders exported 226 * methods in a way that flows well in javadocs. 227 */ 228 229 /** 230 * The status field holds run control status bits packed into a 231 * single int to ensure atomicity. Status is initially zero, and 232 * takes on nonnegative values until completed, upon which it 233 * holds (sign bit) DONE, possibly with ABNORMAL (cancelled or 234 * exceptional) and THROWN (in which case an exception has been 235 * stored). Tasks with dependent blocked waiting joiners have the 236 * SIGNAL bit set. Completion of a task with SIGNAL set awakens 237 * any waiters via notifyAll. (Waiters also help signal others 238 * upon completion.) 239 * 240 * These control bits occupy only (some of) the upper half (16 241 * bits) of status field. The lower bits are used for user-defined 242 * tags. 243 */ 244 shared int status; // accessed directly by pool and workers 245 246 Mutex thisMutex; 247 Condition thisLocker; 248 249 private enum int DONE = 1 << 31; // must be negative 250 private enum int ABNORMAL = 1 << 18; // set atomically with DONE 251 private enum int THROWN = 1 << 17; // set atomically with ABNORMAL 252 private enum int SIGNAL = 1 << 16; // true if joiner waiting 253 private enum int SMASK = 0xffff; // short bits for tags 254 255 this() { 256 thisMutex = new Mutex(this); 257 thisLocker = new Condition(thisMutex); 258 } 259 260 static bool isExceptionalStatus(int s) { // needed by subclasses 261 return (s & THROWN) != 0; 262 } 263 264 /** 265 * Sets DONE status and wakes up threads waiting to join this task. 266 * 267 * @return status on exit 268 */ 269 private int setDone() { 270 int s = AtomicHelper.getAndBitwiseOr(this.status, DONE); 271 debug(HUNT_CONCURRENCY_DEBUG) { 272 tracef("status: last=%d, new=%d", s, status); 273 } 274 if((s & SIGNAL) != 0) { 275 synchronized (this) { 276 debug(HUNT_CONCURRENCY_DEBUG) info("notifying on done ....."); 277 thisLocker.notifyAll(); 278 } 279 } 280 return s | DONE; 281 } 282 283 /** 284 * Marks cancelled or exceptional completion unless already done. 285 * 286 * @param completion must be DONE | ABNORMAL, ORed with THROWN if exceptional 287 * @return status on exit 288 */ 289 private int abnormalCompletion(int completion) { 290 for (int s, ns;;) { 291 if ((s = status) < 0) { 292 return s; 293 } else { 294 if(this.status == s) { 295 this.status = ns = s | completion; 296 if ((s & SIGNAL) != 0) 297 synchronized (this) { 298 thisLocker.notifyAll(); 299 } 300 return ns; 301 } 302 } 303 // FIXME: Needing refactor or cleanup -@zxp at 2019/2/7 10:33:03 304 // 305 // if (STATUS.weakCompareAndSet(this, s, ns = s | completion)) { 306 // if ((s & SIGNAL) != 0) 307 // synchronized (this) { notifyAll(); } 308 // return ns; 309 // } 310 } 311 } 312 313 int getStatus() { 314 return status; 315 } 316 317 /** 318 * Primary execution method for stolen tasks. Unless done, calls 319 * exec and records status if completed, but doesn't wait for 320 * completion otherwise. 321 * 322 * @return status on exit from this method 323 */ 324 final int doExec() { 325 int s; bool completed; 326 if ((s = status) >= 0) { 327 try { 328 completed = exec(); 329 } catch (Throwable rex) { 330 completed = false; 331 s = setExceptionalCompletion(rex); 332 } 333 debug(HUNT_CONCURRENCY_DEBUG) tracef("completed: %s", completed); 334 if (completed) { 335 s = setDone(); 336 } 337 } 338 return s; 339 } 340 341 /** 342 * If not done, sets SIGNAL status and performs Object.wait(timeout). 343 * This task may or may not be done on exit. Ignores interrupts. 344 * 345 * @param timeout using Object.wait conventions. 346 */ 347 final void internalWait(long timeout) { 348 int s = cast(int)(this.status | SIGNAL); 349 if (s >= 0) { 350 synchronized (this) { 351 if (status >= 0) 352 try { 353 thisLocker.wait(dur!(TimeUnit.Millisecond)(timeout)); 354 } catch (InterruptedException ie) { } 355 else 356 thisLocker.notifyAll(); 357 } 358 } 359 } 360 361 /** 362 * Blocks a non-worker-thread until completion. 363 * @return status upon completion 364 */ 365 private int externalAwaitDone() { 366 int s = tryExternalHelp(); 367 if(s < 0) 368 return s; 369 370 s = AtomicHelper.getAndBitwiseOr(this.status, SIGNAL); 371 debug(HUNT_CONCURRENCY_DEBUG) { 372 infof("status: last=%d, new=%d", s, status); 373 } 374 if(s < 0) 375 return s; 376 377 bool interrupted = false; 378 synchronized (this) { 379 for (;;) { 380 if ((s = status) >= 0) { 381 try { 382 thisLocker.wait(Duration.zero); 383 } catch (InterruptedException ie) { 384 interrupted = true; 385 } 386 } 387 else { 388 thisLocker.notifyAll(); 389 break; 390 } 391 } 392 } 393 if (interrupted) { 394 ThreadEx th = cast(ThreadEx) Thread.getThis(); 395 if(th !is null) 396 th.interrupt(); 397 } 398 return s; 399 } 400 401 /** 402 * Blocks a non-worker-thread until completion or interruption. 403 */ 404 private int externalInterruptibleAwaitDone() { 405 int s = tryExternalHelp(); 406 if(s <0) { 407 if (ThreadEx.interrupted()) 408 throw new InterruptedException(); 409 return s; 410 } 411 412 s = AtomicHelper.getAndBitwiseOr(this.status, SIGNAL); 413 debug(HUNT_CONCURRENCY_DEBUG) { 414 infof("status: last=%d, new=%d", s, status); 415 } 416 if (s >= 0) { 417 synchronized (this) { 418 for (;;) { 419 if ((s = status) >= 0) 420 thisLocker.wait(Duration.zero); 421 else { 422 thisLocker.notifyAll(); 423 break; 424 } 425 } 426 } 427 } 428 else if (ThreadEx.interrupted()) 429 throw new InterruptedException(); 430 return s; 431 } 432 433 /** 434 * Tries to help with tasks allowed for external callers. 435 * 436 * @return current status 437 */ 438 private int tryExternalHelp() { 439 int s = status; 440 if(s<0) return s; 441 ICountedCompleter cc = cast(ICountedCompleter)this; 442 if(cc !is null) { 443 return ForkJoinPool.common.externalHelpComplete( 444 cc, 0); 445 } else if(ForkJoinPool.common.tryExternalUnpush(this)) { 446 return doExec(); 447 } else 448 return 0; 449 // return ((s = status) < 0 ? s: 450 // (this instanceof CountedCompleter) ? 451 // ForkJoinPool.common.externalHelpComplete( 452 // (ICountedCompleter)this, 0) : 453 // ForkJoinPool.common.tryExternalUnpush(this) ? 454 // doExec() : 0); 455 } 456 457 /** 458 * Implementation for join, get, quietlyJoin. Directly handles 459 * only cases of already-completed, external wait, and 460 * unfork+exec. Others are relayed to ForkJoinPool.awaitJoin. 461 * 462 * @return status upon completion 463 */ 464 private int doJoin() { 465 int s = status; 466 if(s < 0) return s; 467 468 ForkJoinWorkerThread wt = cast(ForkJoinWorkerThread)Thread.getThis(); 469 if(wt !is null) { 470 WorkQueue w = wt.workQueue; 471 if(w.tryUnpush(this) && (s = doExec()) < 0 ) 472 return s; 473 else 474 return wt.pool.awaitJoin(w, this, MonoTime.zero); 475 } else { 476 return externalAwaitDone(); 477 } 478 } 479 480 /** 481 * Implementation for invoke, quietlyInvoke. 482 * 483 * @return status upon completion 484 */ 485 private int doInvoke() { 486 int s = doExec(); 487 if(s < 0) 488 return s; 489 ForkJoinWorkerThread wt = cast(ForkJoinWorkerThread)Thread.getThis(); 490 if(wt !is null) { 491 return wt.pool.awaitJoin(wt.workQueue, this, MonoTime.zero()); 492 } else { 493 return externalAwaitDone(); 494 } 495 } 496 497 /** 498 * Records exception and sets status. 499 * 500 * @return status on exit 501 */ 502 final int recordExceptionalCompletion(Throwable ex) { 503 int s; 504 if ((s = status) >= 0) { 505 size_t h = this.toHash(); 506 ReentrantLock lock = ForkJoinTaskHelper.exceptionTableLock; 507 lock.lock(); 508 try { 509 ExceptionNode[] t = ForkJoinTaskHelper.exceptionTable; 510 size_t i = h & (t.length - 1); 511 for (ExceptionNode e = t[i]; ; e = e.next) { 512 if (e is null) { 513 t[i] = new ExceptionNode(this, ex, t[i]); 514 break; 515 } 516 if (e.get() == this) // already present 517 break; 518 } 519 } finally { 520 lock.unlock(); 521 } 522 s = abnormalCompletion(DONE | ABNORMAL | THROWN); 523 } 524 return s; 525 } 526 527 /** 528 * Records exception and possibly propagates. 529 * 530 * @return status on exit 531 */ 532 private int setExceptionalCompletion(Throwable ex) { 533 int s = recordExceptionalCompletion(ex); 534 if ((s & THROWN) != 0) 535 internalPropagateException(ex); 536 return s; 537 } 538 539 /** 540 * Hook for exception propagation support for tasks with completers. 541 */ 542 void internalPropagateException(Throwable ex) { 543 } 544 545 /** 546 * Removes exception node and clears status. 547 */ 548 private void clearExceptionalCompletion() { 549 size_t h = this.toHash(); 550 ReentrantLock lock = ForkJoinTaskHelper.exceptionTableLock; 551 lock.lock(); 552 try { 553 ExceptionNode[] t = ForkJoinTaskHelper.exceptionTable; 554 size_t i = h & (t.length - 1); 555 ExceptionNode e = t[i]; 556 ExceptionNode pred = null; 557 while (e !is null) { 558 ExceptionNode next = e.next; 559 if (e.get() == this) { 560 if (pred is null) 561 t[i] = next; 562 else 563 pred.next = next; 564 break; 565 } 566 pred = e; 567 e = next; 568 } 569 status = 0; 570 } finally { 571 lock.unlock(); 572 } 573 } 574 575 /** 576 * Returns a rethrowable exception for this task, if available. 577 * To provide accurate stack traces, if the exception was not 578 * thrown by the current thread, we try to create a new exception 579 * of the same type as the one thrown, but with the recorded 580 * exception as its cause. If there is no such constructor, we 581 * instead try to use a no-arg constructor, followed by initCause, 582 * to the same effect. If none of these apply, or any fail due to 583 * other exceptions, we return the recorded exception, which is 584 * still correct, although it may contain a misleading stack 585 * trace. 586 * 587 * @return the exception, or null if none 588 */ 589 private Throwable getThrowableException() { 590 size_t h = this.toHash(); 591 ExceptionNode e; 592 ReentrantLock lock = ForkJoinTaskHelper.exceptionTableLock; 593 lock.lock(); 594 try { 595 ExceptionNode[] t = ForkJoinTaskHelper.exceptionTable; 596 e = t[h & ($ - 1)]; 597 while (e !is null && e.get() !is this) 598 e = e.next; 599 } finally { 600 lock.unlock(); 601 } 602 Throwable ex; 603 if (e is null || (ex = e.ex) is null) 604 return null; 605 return ex; 606 } 607 608 609 /** 610 * Throws exception, if any, associated with the given status. 611 */ 612 private void reportException(int s) { 613 ForkJoinTaskHelper.rethrow((s & THROWN) != 0 ? getThrowableException() : 614 new CancellationException()); 615 } 616 617 // methods 618 619 /** 620 * Arranges to asynchronously execute this task in the pool the 621 * current task is running in, if applicable, or using the {@link 622 * ForkJoinPool#commonPool()} if not {@link #inForkJoinPool}. While 623 * it is not necessarily enforced, it is a usage error to fork a 624 * task more than once unless it has completed and been 625 * reinitialized. Subsequent modifications to the state of this 626 * task or any data it operates on are not necessarily 627 * consistently observable by any thread other than the one 628 * executing it unless preceded by a call to {@link #join} or 629 * related methods, or a call to {@link #isDone} returning {@code 630 * true}. 631 * 632 * @return {@code this}, to simplify usage 633 */ 634 final ForkJoinTask!(V) fork() { 635 ForkJoinWorkerThread t = cast(ForkJoinWorkerThread)Thread.getThis(); 636 if (t !is null) 637 t.workQueue.push(this); 638 else 639 ForkJoinPool.common.externalPush(this); 640 return this; 641 } 642 643 /** 644 * Returns the result of the computation when it 645 * {@linkplain #isDone is done}. 646 * This method differs from {@link #get()} in that abnormal 647 * completion results in {@code RuntimeException} or {@code Error}, 648 * not {@code ExecutionException}, and that interrupts of the 649 * calling thread do <em>not</em> cause the method to abruptly 650 * return by throwing {@code InterruptedException}. 651 * 652 * @return the computed result 653 */ 654 final V join() { 655 int s; 656 if (((s = doJoin()) & ABNORMAL) != 0) { 657 reportException(s); 658 } 659 660 static if(!is(V == void)) { 661 return getRawResult(); 662 } 663 } 664 665 /** 666 * Commences performing this task, awaits its completion if 667 * necessary, and returns its result, or throws an (unchecked) 668 * {@code RuntimeException} or {@code Error} if the underlying 669 * computation did so. 670 * 671 * @return the computed result 672 */ 673 final V invoke() { 674 int s; 675 if (((s = doInvoke()) & ABNORMAL) != 0) 676 reportException(s); 677 678 static if(!is(V == void)) { 679 return getRawResult(); 680 } 681 } 682 683 /** 684 * Forks the given tasks, returning when {@code isDone} holds for 685 * each task or an (unchecked) exception is encountered, in which 686 * case the exception is rethrown. If more than one task 687 * encounters an exception, then this method throws any one of 688 * these exceptions. If any task encounters an exception, the 689 * other may be cancelled. However, the execution status of 690 * individual tasks is not guaranteed upon exceptional return. The 691 * status of each task may be obtained using {@link 692 * #getException()} and related methods to check if they have been 693 * cancelled, completed normally or exceptionally, or left 694 * unprocessed. 695 * 696 * @param t1 the first task 697 * @param t2 the second task 698 * @throws NullPointerException if any task is null 699 */ 700 static void invokeAll(IForkJoinTask t1, IForkJoinTask t2) { 701 int s1, s2; 702 implementationMissing(false); 703 // t2.fork(); 704 // if (((s1 = t1.doInvoke()) & ABNORMAL) != 0) 705 // t1.reportException(s1); 706 // if (((s2 = t2.doJoin()) & ABNORMAL) != 0) 707 // t2.reportException(s2); 708 } 709 710 /** 711 * Forks the given tasks, returning when {@code isDone} holds for 712 * each task or an (unchecked) exception is encountered, in which 713 * case the exception is rethrown. If more than one task 714 * encounters an exception, then this method throws any one of 715 * these exceptions. If any task encounters an exception, others 716 * may be cancelled. However, the execution status of individual 717 * tasks is not guaranteed upon exceptional return. The status of 718 * each task may be obtained using {@link #getException()} and 719 * related methods to check if they have been cancelled, completed 720 * normally or exceptionally, or left unprocessed. 721 * 722 * @param tasks the tasks 723 * @throws NullPointerException if any task is null 724 */ 725 static void invokeAll(IForkJoinTask[] tasks...) { 726 Throwable ex = null; 727 int last = cast(int)tasks.length - 1; 728 // for (int i = last; i >= 0; --i) { 729 // IForkJoinTask t = tasks[i]; 730 // if (t is null) { 731 // if (ex is null) 732 // ex = new NullPointerException(); 733 // } 734 // else if (i != 0) 735 // t.fork(); 736 // else if ((t.doInvoke() & ABNORMAL) != 0 && ex is null) 737 // ex = t.getException(); 738 // } 739 // for (int i = 1; i <= last; ++i) { 740 // IForkJoinTask t = tasks[i]; 741 // if (t !is null) { 742 // if (ex !is null) 743 // t.cancel(false); 744 // else if ((t.doJoin() & ABNORMAL) != 0) 745 // ex = t.getException(); 746 // } 747 // } 748 implementationMissing(false); 749 if (ex !is null) 750 ForkJoinTaskHelper.rethrow(ex); 751 } 752 753 /** 754 * Forks all tasks in the specified collection, returning when 755 * {@code isDone} holds for each task or an (unchecked) exception 756 * is encountered, in which case the exception is rethrown. If 757 * more than one task encounters an exception, then this method 758 * throws any one of these exceptions. If any task encounters an 759 * exception, others may be cancelled. However, the execution 760 * status of individual tasks is not guaranteed upon exceptional 761 * return. The status of each task may be obtained using {@link 762 * #getException()} and related methods to check if they have been 763 * cancelled, completed normally or exceptionally, or left 764 * unprocessed. 765 * 766 * @param tasks the collection of tasks 767 * @param (T) the type of the values returned from the tasks 768 * @return the tasks argument, to simplify usage 769 * @throws NullPointerException if tasks or any element are null 770 */ 771 static Collection!(T) invokeAll(T)(Collection!(T) tasks) if(is(T : IForkJoinTask)) { 772 // TODO: Tasks pending completion -@zxp at 12/21/2018, 10:36:15 PM 773 // 774 implementationMissing(false); 775 // if (!(tasks instanceof RandomAccess) || !(tasks instanceof List<?>)) { 776 // invokeAll(tasks.toArray(new IForkJoinTask[0])); 777 // return tasks; 778 // } 779 780 // List!(IForkJoinTask) ts = cast(List!(IForkJoinTask)) tasks; 781 // Throwable ex = null; 782 // int last = ts.size() - 1; 783 // for (int i = last; i >= 0; --i) { 784 // IForkJoinTask t = ts.get(i); 785 // if (t is null) { 786 // if (ex is null) 787 // ex = new NullPointerException(); 788 // } 789 // else if (i != 0) 790 // t.fork(); 791 // else if ((t.doInvoke() & ABNORMAL) != 0 && ex is null) 792 // ex = t.getException(); 793 // } 794 // for (int i = 1; i <= last; ++i) { 795 // IForkJoinTask t = ts.get(i); 796 // if (t !is null) { 797 // if (ex !is null) 798 // t.cancel(false); 799 // else if ((t.doJoin() & ABNORMAL) != 0) 800 // ex = t.getException(); 801 // } 802 // } 803 // if (ex !is null) 804 // rethrow(ex); 805 return tasks; 806 } 807 808 /** 809 * Attempts to cancel execution of this task. This attempt will 810 * fail if the task has already completed or could not be 811 * cancelled for some other reason. If successful, and this task 812 * has not started when {@code cancel} is called, execution of 813 * this task is suppressed. After this method returns 814 * successfully, unless there is an intervening call to {@link 815 * #reinitialize}, subsequent calls to {@link #isCancelled}, 816 * {@link #isDone}, and {@code cancel} will return {@code true} 817 * and calls to {@link #join} and related methods will result in 818 * {@code CancellationException}. 819 * 820 * <p>This method may be overridden in subclasses, but if so, must 821 * still ensure that these properties hold. In particular, the 822 * {@code cancel} method itself must not throw exceptions. 823 * 824 * <p>This method is designed to be invoked by <em>other</em> 825 * tasks. To terminate the current task, you can just return or 826 * throw an unchecked exception from its computation method, or 827 * invoke {@link #completeExceptionally(Throwable)}. 828 * 829 * @param mayInterruptIfRunning this value has no effect in the 830 * default implementation because interrupts are not used to 831 * control cancellation. 832 * 833 * @return {@code true} if this task is now cancelled 834 */ 835 bool cancel(bool mayInterruptIfRunning) { 836 int s = abnormalCompletion(DONE | ABNORMAL); 837 return (s & (ABNORMAL | THROWN)) == ABNORMAL; 838 } 839 840 final bool isDone() { 841 return status < 0; 842 } 843 844 final bool isCancelled() { 845 return (status & (ABNORMAL | THROWN)) == ABNORMAL; 846 } 847 848 /** 849 * Returns {@code true} if this task threw an exception or was cancelled. 850 * 851 * @return {@code true} if this task threw an exception or was cancelled 852 */ 853 final bool isCompletedAbnormally() { 854 return (status & ABNORMAL) != 0; 855 } 856 857 /** 858 * Returns {@code true} if this task completed without throwing an 859 * exception and was not cancelled. 860 * 861 * @return {@code true} if this task completed without throwing an 862 * exception and was not cancelled 863 */ 864 final bool isCompletedNormally() { 865 return (status & (DONE | ABNORMAL)) == DONE; 866 } 867 868 /** 869 * Returns the exception thrown by the base computation, or a 870 * {@code CancellationException} if cancelled, or {@code null} if 871 * none or if the method has not yet completed. 872 * 873 * @return the exception, or {@code null} if none 874 */ 875 final Throwable getException() { 876 int s = status; 877 return ((s & ABNORMAL) == 0 ? null : 878 (s & THROWN) == 0 ? new CancellationException() : 879 getThrowableException()); 880 } 881 882 /** 883 * Completes this task abnormally, and if not already aborted or 884 * cancelled, causes it to throw the given exception upon 885 * {@code join} and related operations. This method may be used 886 * to induce exceptions in asynchronous tasks, or to force 887 * completion of tasks that would not otherwise complete. Its use 888 * in other situations is discouraged. This method is 889 * overridable, but overridden versions must invoke {@code super} 890 * implementation to maintain guarantees. 891 * 892 * @param ex the exception to throw. If this exception is not a 893 * {@code RuntimeException} or {@code Error}, the actual exception 894 * thrown will be a {@code RuntimeException} with cause {@code ex}. 895 */ 896 void completeExceptionally(Exception ex) { 897 RuntimeException re = cast(RuntimeException)ex; 898 if(re !is null) { 899 setExceptionalCompletion(ex); 900 } else { 901 Error er = cast(Error)ex; 902 if(er is null) { 903 setExceptionalCompletion(new RuntimeException(ex)); 904 } else { 905 setExceptionalCompletion(ex); 906 } 907 } 908 } 909 910 /** 911 * Completes this task, and if not already aborted or cancelled, 912 * returning the given value as the result of subsequent 913 * invocations of {@code join} and related operations. This method 914 * may be used to provide results for asynchronous tasks, or to 915 * provide alternative handling for tasks that would not otherwise 916 * complete normally. Its use in other situations is 917 * discouraged. This method is overridable, but overridden 918 * versions must invoke {@code super} implementation to maintain 919 * guarantees. 920 * 921 * @param value the result value for this task 922 */ 923 static if(is(V == void)) { 924 void complete() { 925 // try { 926 // setRawResult(); 927 // } catch (Throwable rex) { 928 // setExceptionalCompletion(rex); 929 // return; 930 // } 931 setDone(); 932 } 933 } else { 934 void complete(V value) { 935 try { 936 setRawResult(value); 937 } catch (Throwable rex) { 938 setExceptionalCompletion(rex); 939 return; 940 } 941 setDone(); 942 } 943 } 944 945 /** 946 * Completes this task normally without setting a value. The most 947 * recent value established by {@link #setRawResult} (or {@code 948 * null} by default) will be returned as the result of subsequent 949 * invocations of {@code join} and related operations. 950 * 951 */ 952 final void quietlyComplete() { 953 setDone(); 954 } 955 956 /** 957 * Waits if necessary for the computation to complete, and then 958 * retrieves its result. 959 * 960 * @return the computed result 961 * @throws CancellationException if the computation was cancelled 962 * @throws ExecutionException if the computation threw an 963 * exception 964 * @throws InterruptedException if the current thread is not a 965 * member of a ForkJoinPool and was interrupted while waiting 966 */ 967 final V get() { 968 ForkJoinWorkerThread ft = cast(ForkJoinWorkerThread)Thread.getThis(); 969 int s = ft !is null ? doJoin() : externalInterruptibleAwaitDone(); 970 if ((s & THROWN) != 0) 971 throw new ExecutionException(getThrowableException()); 972 else if ((s & ABNORMAL) != 0) 973 throw new CancellationException(); 974 else { 975 static if(!is(V == void)) { 976 return getRawResult(); 977 } 978 } 979 } 980 981 /** 982 * Waits if necessary for at most the given time for the computation 983 * to complete, and then retrieves its result, if available. 984 * 985 * @param timeout the maximum time to wait 986 * @param unit the time unit of the timeout argument 987 * @return the computed result 988 * @throws CancellationException if the computation was cancelled 989 * @throws ExecutionException if the computation threw an 990 * exception 991 * @throws InterruptedException if the current thread is not a 992 * member of a ForkJoinPool and was interrupted while waiting 993 * @throws TimeoutException if the wait timed out 994 */ 995 final V get(Duration timeout) { 996 int s; 997 // TODO: Tasks pending completion -@zxp at 12/21/2018, 10:55:12 PM 998 // 999 // if (Thread.interrupted()) 1000 // throw new InterruptedException(); 1001 1002 if ((s = status) >= 0 && timeout > Duration.zero) { 1003 MonoTime deadline = MonoTime.currTime + timeout; 1004 // long deadline = (d == 0L) ? 1L : d; // avoid 0 1005 ForkJoinWorkerThread wt = cast(ForkJoinWorkerThread)Thread.getThis(); 1006 if (wt !is null) { 1007 s = wt.pool.awaitJoin(wt.workQueue, this, deadline); 1008 } 1009 else { 1010 ICountedCompleter ic = cast(ICountedCompleter)this; 1011 if(ic !is null) { 1012 s = ForkJoinPool.common.externalHelpComplete(ic, 0); 1013 } else if(ForkJoinPool.common.tryExternalUnpush(this)){ 1014 s = doExec(); 1015 } else 1016 s = 0; 1017 1018 if (s >= 0) { 1019 Duration ns; // measure in nanosecs, but wait in millisecs 1020 long ms; 1021 while ((s = status) >= 0 && 1022 (ns = deadline - MonoTime.currTime) > Duration.zero) { 1023 if ((ms = ns.total!(TimeUnit.Millisecond)()) > 0L) { 1024 s = AtomicHelper.getAndBitwiseOr(this.status, SIGNAL); 1025 if( s >= 0) { 1026 synchronized (this) { 1027 if (status >= 0) // OK to throw InterruptedException 1028 thisLocker.wait(dur!(TimeUnit.Millisecond)(ms)); 1029 else 1030 thisLocker.notifyAll(); 1031 } 1032 } 1033 } 1034 } 1035 } 1036 } 1037 } 1038 if (s >= 0) 1039 throw new TimeoutException(); 1040 else if ((s & THROWN) != 0) 1041 throw new ExecutionException(getThrowableException()); 1042 else if ((s & ABNORMAL) != 0) 1043 throw new CancellationException(); 1044 else { 1045 static if(!is(V == void)) { 1046 return getRawResult(); 1047 } 1048 } 1049 } 1050 1051 /** 1052 * Joins this task, without returning its result or throwing its 1053 * exception. This method may be useful when processing 1054 * collections of tasks when some have been cancelled or otherwise 1055 * known to have aborted. 1056 */ 1057 final void quietlyJoin() { 1058 doJoin(); 1059 } 1060 1061 /** 1062 * Commences performing this task and awaits its completion if 1063 * necessary, without returning its result or throwing its 1064 * exception. 1065 */ 1066 final void quietlyInvoke() { 1067 doInvoke(); 1068 } 1069 1070 /** 1071 * Possibly executes tasks until the pool hosting the current task 1072 * {@linkplain ForkJoinPool#isQuiescent is quiescent}. This 1073 * method may be of use in designs in which many tasks are forked, 1074 * but none are explicitly joined, instead executing them until 1075 * all are processed. 1076 */ 1077 // static void helpQuiesce() { 1078 // Thread t; 1079 // if ((t = Thread.getThis()) instanceof ForkJoinWorkerThread) { 1080 // ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t; 1081 // wt.pool.helpQuiescePool(wt.workQueue); 1082 // } 1083 // else 1084 // ForkJoinPool.quiesceCommonPool(); 1085 // } 1086 1087 /** 1088 * Resets the internal bookkeeping state of this task, allowing a 1089 * subsequent {@code fork}. This method allows repeated reuse of 1090 * this task, but only if reuse occurs when this task has either 1091 * never been forked, or has been forked, then completed and all 1092 * outstanding joins of this task have also completed. Effects 1093 * under any other usage conditions are not guaranteed. 1094 * This method may be useful when executing 1095 * pre-constructed trees of subtasks in loops. 1096 * 1097 * <p>Upon completion of this method, {@code isDone()} reports 1098 * {@code false}, and {@code getException()} reports {@code 1099 * null}. However, the value returned by {@code getRawResult} is 1100 * unaffected. To clear this value, you can invoke {@code 1101 * setRawResult(null)}. 1102 */ 1103 void reinitialize() { 1104 if ((status & THROWN) != 0) 1105 clearExceptionalCompletion(); 1106 else 1107 status = 0; 1108 } 1109 1110 /** 1111 * Returns the pool hosting the current thread, or {@code null} 1112 * if the current thread is executing outside of any ForkJoinPool. 1113 * 1114 * <p>This method returns {@code null} if and only if {@link 1115 * #inForkJoinPool} returns {@code false}. 1116 * 1117 * @return the pool, or {@code null} if none 1118 */ 1119 // static ForkJoinPool getPool() { 1120 // Thread t = Thread.getThis(); 1121 // return (t instanceof ForkJoinWorkerThread) ? 1122 // ((ForkJoinWorkerThread) t).pool : null; 1123 // } 1124 1125 /** 1126 * Returns {@code true} if the current thread is a {@link 1127 * ForkJoinWorkerThread} executing as a ForkJoinPool computation. 1128 * 1129 * @return {@code true} if the current thread is a {@link 1130 * ForkJoinWorkerThread} executing as a ForkJoinPool computation, 1131 * or {@code false} otherwise 1132 */ 1133 static bool inForkJoinPool() { 1134 ForkJoinWorkerThread t = cast(ForkJoinWorkerThread)Thread.getThis(); 1135 return t !is null; 1136 } 1137 1138 /** 1139 * Tries to unschedule this task for execution. This method will 1140 * typically (but is not guaranteed to) succeed if this task is 1141 * the most recently forked task by the current thread, and has 1142 * not commenced executing in another thread. This method may be 1143 * useful when arranging alternative local processing of tasks 1144 * that could have been, but were not, stolen. 1145 * 1146 * @return {@code true} if unforked 1147 */ 1148 bool tryUnfork() { 1149 ForkJoinWorkerThread t = cast(ForkJoinWorkerThread)Thread.getThis(); 1150 return t !is null? t.workQueue.tryUnpush(this) : 1151 ForkJoinPool.common.tryExternalUnpush(this); 1152 } 1153 1154 /** 1155 * Returns an estimate of the number of tasks that have been 1156 * forked by the current worker thread but not yet executed. This 1157 * value may be useful for heuristic decisions about whether to 1158 * fork other tasks. 1159 * 1160 * @return the number of tasks 1161 */ 1162 // static int getQueuedTaskCount() { 1163 // Thread t; ForkJoinPool.WorkQueue q; 1164 // if ((t = Thread.getThis()) instanceof ForkJoinWorkerThread) 1165 // q = ((ForkJoinWorkerThread)t).workQueue; 1166 // else 1167 // q = ForkJoinPool.commonSubmitterQueue(); 1168 // return (q is null) ? 0 : q.queueSize(); 1169 // } 1170 1171 /** 1172 * Returns an estimate of how many more locally queued tasks are 1173 * held by the current worker thread than there are other worker 1174 * threads that might steal them, or zero if this thread is not 1175 * operating in a ForkJoinPool. This value may be useful for 1176 * heuristic decisions about whether to fork other tasks. In many 1177 * usages of ForkJoinTasks, at steady state, each worker should 1178 * aim to maintain a small constant surplus (for example, 3) of 1179 * tasks, and to process computations locally if this threshold is 1180 * exceeded. 1181 * 1182 * @return the surplus number of tasks, which may be negative 1183 */ 1184 // static int getSurplusQueuedTaskCount() { 1185 // return ForkJoinPool.getSurplusQueuedTaskCount(); 1186 // } 1187 1188 // Extension methods 1189 static if(is(V == void)) { 1190 // protected abstract void setRawResult(); 1191 } else { 1192 1193 /** 1194 * Returns the result that would be returned by {@link #join}, even 1195 * if this task completed abnormally, or {@code null} if this task 1196 * is not known to have been completed. This method is designed 1197 * to aid debugging, as well as to support extensions. Its use in 1198 * any other context is discouraged. 1199 * 1200 * @return the result, or {@code null} if not completed 1201 */ 1202 abstract V getRawResult(); 1203 1204 /** 1205 * Forces the given value to be returned as a result. This method 1206 * is designed to support extensions, and should not in general be 1207 * called otherwise. 1208 * 1209 * @param value the value 1210 */ 1211 protected abstract void setRawResult(V value); 1212 } 1213 1214 /** 1215 * Immediately performs the base action of this task and returns 1216 * true if, upon return from this method, this task is guaranteed 1217 * to have completed normally. This method may return false 1218 * otherwise, to indicate that this task is not necessarily 1219 * complete (or is not known to be complete), for example in 1220 * asynchronous actions that require explicit invocations of 1221 * completion methods. This method may also throw an (unchecked) 1222 * exception to indicate abnormal exit. This method is designed to 1223 * support extensions, and should not in general be called 1224 * otherwise. 1225 * 1226 * @return {@code true} if this task is known to have completed normally 1227 */ 1228 protected abstract bool exec(); 1229 1230 /** 1231 * Returns, but does not unschedule or execute, a task queued by 1232 * the current thread but not yet executed, if one is immediately 1233 * available. There is no guarantee that this task will actually 1234 * be polled or executed next. Conversely, this method may return 1235 * null even if a task exists but cannot be accessed without 1236 * contention with other threads. This method is designed 1237 * primarily to support extensions, and is unlikely to be useful 1238 * otherwise. 1239 * 1240 * @return the next task, or {@code null} if none are available 1241 */ 1242 // protected static IForkJoinTask peekNextLocalTask() { 1243 // Thread t; ForkJoinPool.WorkQueue q; 1244 // if ((t = Thread.getThis()) instanceof ForkJoinWorkerThread) 1245 // q = ((ForkJoinWorkerThread)t).workQueue; 1246 // else 1247 // q = ForkJoinPool.commonSubmitterQueue(); 1248 // return (q is null) ? null : q.peek(); 1249 // } 1250 1251 /** 1252 * Unschedules and returns, without executing, the next task 1253 * queued by the current thread but not yet executed, if the 1254 * current thread is operating in a ForkJoinPool. This method is 1255 * designed primarily to support extensions, and is unlikely to be 1256 * useful otherwise. 1257 * 1258 * @return the next task, or {@code null} if none are available 1259 */ 1260 // protected static IForkJoinTask pollNextLocalTask() { 1261 // Thread t; 1262 // return ((t = Thread.getThis()) instanceof ForkJoinWorkerThread) ? 1263 // ((ForkJoinWorkerThread)t).workQueue.nextLocalTask() : 1264 // null; 1265 // } 1266 1267 /** 1268 * If the current thread is operating in a ForkJoinPool, 1269 * unschedules and returns, without executing, the next task 1270 * queued by the current thread but not yet executed, if one is 1271 * available, or if not available, a task that was forked by some 1272 * other thread, if available. Availability may be transient, so a 1273 * {@code null} result does not necessarily imply quiescence of 1274 * the pool this task is operating in. This method is designed 1275 * primarily to support extensions, and is unlikely to be useful 1276 * otherwise. 1277 * 1278 * @return a task, or {@code null} if none are available 1279 */ 1280 // protected static IForkJoinTask pollTask() { 1281 // Thread t; ForkJoinWorkerThread wt; 1282 // return ((t = Thread.getThis()) instanceof ForkJoinWorkerThread) ? 1283 // (wt = (ForkJoinWorkerThread)t).pool.nextTaskFor(wt.workQueue) : 1284 // null; 1285 // } 1286 1287 // /** 1288 // * If the current thread is operating in a ForkJoinPool, 1289 // * unschedules and returns, without executing, a task externally 1290 // * submitted to the pool, if one is available. Availability may be 1291 // * transient, so a {@code null} result does not necessarily imply 1292 // * quiescence of the pool. This method is designed primarily to 1293 // * support extensions, and is unlikely to be useful otherwise. 1294 // * 1295 // * @return a task, or {@code null} if none are available 1296 // */ 1297 // protected static IForkJoinTask pollSubmission() { 1298 // ForkJoinWorkerThread t = cast(ForkJoinWorkerThread)Thread.getThis(); 1299 // return t !is null ? t.pool.pollSubmission() : null; 1300 // } 1301 1302 // tag operations 1303 1304 /** 1305 * Returns the tag for this task. 1306 * 1307 * @return the tag for this task 1308 */ 1309 final short getForkJoinTaskTag() { 1310 return cast(short)status; 1311 } 1312 1313 /** 1314 * Atomically sets the tag value for this task and returns the old value. 1315 * 1316 * @param newValue the new tag value 1317 * @return the previous value of the tag 1318 */ 1319 final short setForkJoinTaskTag(short newValue) { 1320 while(true) { 1321 int s = status; 1322 if(AtomicHelper.compareAndSet(this.status, s, (s & ~SMASK) | (newValue & SMASK))) 1323 return cast(short)s; 1324 } 1325 // return 0; 1326 } 1327 1328 /** 1329 * Atomically conditionally sets the tag value for this task. 1330 * Among other applications, tags can be used as visit markers 1331 * in tasks operating on graphs, as in methods that check: {@code 1332 * if (task.compareAndSetForkJoinTaskTag((short)0, (short)1))} 1333 * before processing, otherwise exiting because the node has 1334 * already been visited. 1335 * 1336 * @param expect the expected tag value 1337 * @param update the new tag value 1338 * @return {@code true} if successful; i.e., the current value was 1339 * equal to {@code expect} and was changed to {@code update}. 1340 */ 1341 final bool compareAndSetForkJoinTaskTag(short expect, short update) { 1342 for (int s;;) { 1343 if (cast(short)(s = status) != expect) 1344 return false; 1345 if (AtomicHelper.compareAndSet(this.status, s, 1346 (s & ~SMASK) | (update & SMASK))) 1347 return true; 1348 } 1349 } 1350 1351 1352 /** 1353 * Returns a new {@code ForkJoinTask} that performs the {@code run} 1354 * method of the given {@code Runnable} as its action, and returns 1355 * a null result upon {@link #join}. 1356 * 1357 * @param runnable the runnable action 1358 * @return the task 1359 */ 1360 // static IForkJoinTask adapt(Runnable runnable) { 1361 // return new AdaptedRunnableAction(runnable); 1362 // } 1363 1364 /** 1365 * Returns a new {@code ForkJoinTask} that performs the {@code run} 1366 * method of the given {@code Runnable} as its action, and returns 1367 * the given result upon {@link #join}. 1368 * 1369 * @param runnable the runnable action 1370 * @param result the result upon completion 1371 * @param (T) the type of the result 1372 * @return the task 1373 */ 1374 static ForkJoinTask!(T) adapt(T)(Runnable runnable, T result) { 1375 return new AdaptedRunnable!(T)(runnable, result); 1376 } 1377 1378 /** 1379 * Returns a new {@code ForkJoinTask} that performs the {@code call} 1380 * method of the given {@code Callable} as its action, and returns 1381 * its result upon {@link #join}, translating any checked exceptions 1382 * encountered into {@code RuntimeException}. 1383 * 1384 * @param callable the callable action 1385 * @param (T) the type of the callable's result 1386 * @return the task 1387 */ 1388 static ForkJoinTask!(T) adapt(T)(Callable!(T) callable) { 1389 return new AdaptedCallable!(T)(callable); 1390 } 1391 } 1392 1393 1394 1395 /** 1396 * Adapter for Runnables. This implements RunnableFuture 1397 * to be compliant with AbstractExecutorService constraints 1398 * when used in ForkJoinPool. 1399 */ 1400 final class AdaptedRunnable(T) : ForkJoinTask!(T), RunnableFuture!(T) { 1401 final Runnable runnable; 1402 T result; 1403 this(Runnable runnable, T result) { 1404 if (runnable is null) throw new NullPointerException(); 1405 this.runnable = runnable; 1406 this.result = result; // OK to set this even before completion 1407 } 1408 final T getRawResult() { return result; } 1409 final void setRawResult(T v) { result = v; } 1410 final bool exec() { runnable.run(); return true; } 1411 final void run() { invoke(); } 1412 string toString() { 1413 return super.toString() ~ "[Wrapped task = " ~ runnable ~ "]"; 1414 } 1415 } 1416 1417 /** 1418 * Adapter for Runnables without results. 1419 */ 1420 final class AdaptedRunnableAction : ForkJoinTask!(void), Runnable { 1421 Runnable runnable; 1422 this(Runnable runnable) { 1423 if (runnable is null) throw new NullPointerException(); 1424 this.runnable = runnable; 1425 } 1426 // final Void getRawResult() { return null; } 1427 // final void setRawResult(Void v) { } 1428 final override bool exec() { runnable.run(); return true; } 1429 final void run() { invoke(); } 1430 override bool cancel(bool mayInterruptIfRunning) { 1431 return super.cancel(mayInterruptIfRunning); 1432 } 1433 1434 // override bool isCancelled() { 1435 // return super.isCancelled(); 1436 // } 1437 1438 // override bool isDone() { 1439 // return super.isDone(); 1440 // } 1441 1442 // override void get() { 1443 // super.get(); 1444 // } 1445 1446 // override void get(Duration timeout) { 1447 // super.get(timeout); 1448 // } 1449 1450 override string toString() { 1451 return super.toString() ~ "[Wrapped task = " ~ (cast(Object)runnable).toString() ~ "]"; 1452 } 1453 } 1454 1455 /** 1456 * Adapter for Runnables in which failure forces worker exception. 1457 */ 1458 final class RunnableExecuteAction : ForkJoinTask!(void) { 1459 Runnable runnable; 1460 this(Runnable runnable) { 1461 if (runnable is null) throw new NullPointerException(); 1462 this.runnable = runnable; 1463 } 1464 // final Void getRawResult() { return null; } 1465 // final void setRawResult(Void v) { } 1466 final override bool exec() { runnable.run(); return true; } 1467 override void internalPropagateException(Throwable ex) { 1468 ForkJoinTaskHelper.rethrow(ex); // rethrow outside exec() catches. 1469 } 1470 } 1471 1472 /** 1473 * Adapter for Callables. 1474 */ 1475 final class AdaptedCallable(T) : ForkJoinTask!(T), RunnableFuture!(T) { 1476 final Callable!(T) callable; 1477 T result; 1478 this(Callable!(T) callable) { 1479 if (callable is null) throw new NullPointerException(); 1480 this.callable = callable; 1481 } 1482 final T getRawResult() { return result; } 1483 final void setRawResult(T v) { result = v; } 1484 final bool exec() { 1485 try { 1486 result = callable.call(); 1487 return true; 1488 } catch (RuntimeException rex) { 1489 throw rex; 1490 } catch (Exception ex) { 1491 throw new RuntimeException(ex); 1492 } 1493 } 1494 final void run() { invoke(); } 1495 string toString() { 1496 return super.toString() ~ "[Wrapped task = " ~ callable ~ "]"; 1497 } 1498 } 1499 1500 1501 /*************************************************/ 1502 // CountedCompleter 1503 /*************************************************/ 1504 1505 interface ICountedCompleter : IForkJoinTask { 1506 ICountedCompleter getCompleter(); 1507 } 1508 1509 /** 1510 * A {@link ForkJoinTask} with a completion action performed when 1511 * triggered and there are no remaining pending actions. 1512 * CountedCompleters are in general more robust in the 1513 * presence of subtask stalls and blockage than are other forms of 1514 * ForkJoinTasks, but are less intuitive to program. Uses of 1515 * CountedCompleter are similar to those of other completion based 1516 * components (such as {@link java.nio.channels.CompletionHandler}) 1517 * except that multiple <em>pending</em> completions may be necessary 1518 * to trigger the completion action {@link #onCompletion(CountedCompleter)}, 1519 * not just one. 1520 * Unless initialized otherwise, the {@linkplain #getPendingCount pending 1521 * count} starts at zero, but may be (atomically) changed using 1522 * methods {@link #setPendingCount}, {@link #addToPendingCount}, and 1523 * {@link #compareAndSetPendingCount}. Upon invocation of {@link 1524 * #tryComplete}, if the pending action count is nonzero, it is 1525 * decremented; otherwise, the completion action is performed, and if 1526 * this completer itself has a completer, the process is continued 1527 * with its completer. As is the case with related synchronization 1528 * components such as {@link Phaser} and {@link Semaphore}, these methods 1529 * affect only internal counts; they do not establish any further 1530 * internal bookkeeping. In particular, the identities of pending 1531 * tasks are not maintained. As illustrated below, you can create 1532 * subclasses that do record some or all pending tasks or their 1533 * results when needed. As illustrated below, utility methods 1534 * supporting customization of completion traversals are also 1535 * provided. However, because CountedCompleters provide only basic 1536 * synchronization mechanisms, it may be useful to create further 1537 * abstract subclasses that maintain linkages, fields, and additional 1538 * support methods appropriate for a set of related usages. 1539 * 1540 * <p>A concrete CountedCompleter class must define method {@link 1541 * #compute}, that should in most cases (as illustrated below), invoke 1542 * {@code tryComplete()} once before returning. The class may also 1543 * optionally override method {@link #onCompletion(CountedCompleter)} 1544 * to perform an action upon normal completion, and method 1545 * {@link #onExceptionalCompletion(Throwable, CountedCompleter)} to 1546 * perform an action upon any exception. 1547 * 1548 * <p>CountedCompleters most often do not bear results, in which case 1549 * they are normally declared as {@code CountedCompleter!(void)}, and 1550 * will always return {@code null} as a result value. In other cases, 1551 * you should override method {@link #getRawResult} to provide a 1552 * result from {@code join(), invoke()}, and related methods. In 1553 * general, this method should return the value of a field (or a 1554 * function of one or more fields) of the CountedCompleter object that 1555 * holds the result upon completion. Method {@link #setRawResult} by 1556 * default plays no role in CountedCompleters. It is possible, but 1557 * rarely applicable, to override this method to maintain other 1558 * objects or fields holding result data. 1559 * 1560 * <p>A CountedCompleter that does not itself have a completer (i.e., 1561 * one for which {@link #getCompleter} returns {@code null}) can be 1562 * used as a regular ForkJoinTask with this added functionality. 1563 * However, any completer that in turn has another completer serves 1564 * only as an internal helper for other computations, so its own task 1565 * status (as reported in methods such as {@link ForkJoinTask#isDone}) 1566 * is arbitrary; this status changes only upon explicit invocations of 1567 * {@link #complete}, {@link ForkJoinTask#cancel}, 1568 * {@link ForkJoinTask#completeExceptionally(Throwable)} or upon 1569 * exceptional completion of method {@code compute}. Upon any 1570 * exceptional completion, the exception may be relayed to a task's 1571 * completer (and its completer, and so on), if one exists and it has 1572 * not otherwise already completed. Similarly, cancelling an internal 1573 * CountedCompleter has only a local effect on that completer, so is 1574 * not often useful. 1575 * 1576 * <p><b>Sample Usages.</b> 1577 * 1578 * <p><b>Parallel recursive decomposition.</b> CountedCompleters may 1579 * be arranged in trees similar to those often used with {@link 1580 * RecursiveAction}s, although the constructions involved in setting 1581 * them up typically vary. Here, the completer of each task is its 1582 * parent in the computation tree. Even though they entail a bit more 1583 * bookkeeping, CountedCompleters may be better choices when applying 1584 * a possibly time-consuming operation (that cannot be further 1585 * subdivided) to each element of an array or collection; especially 1586 * when the operation takes a significantly different amount of time 1587 * to complete for some elements than others, either because of 1588 * intrinsic variation (for example I/O) or auxiliary effects such as 1589 * garbage collection. Because CountedCompleters provide their own 1590 * continuations, other tasks need not block waiting to perform them. 1591 * 1592 * <p>For example, here is an initial version of a utility method that 1593 * uses divide-by-two recursive decomposition to divide work into 1594 * single pieces (leaf tasks). Even when work is split into individual 1595 * calls, tree-based techniques are usually preferable to directly 1596 * forking leaf tasks, because they reduce inter-thread communication 1597 * and improve load balancing. In the recursive case, the second of 1598 * each pair of subtasks to finish triggers completion of their parent 1599 * (because no result combination is performed, the default no-op 1600 * implementation of method {@code onCompletion} is not overridden). 1601 * The utility method sets up the root task and invokes it (here, 1602 * implicitly using the {@link ForkJoinPool#commonPool()}). It is 1603 * straightforward and reliable (but not optimal) to always set the 1604 * pending count to the number of child tasks and call {@code 1605 * tryComplete()} immediately before returning. 1606 * 1607 * <pre> {@code 1608 * static <E> void forEach(E[] array, Consumer<E> action) { 1609 * class Task extends CountedCompleter!(void) { 1610 * final int lo, hi; 1611 * Task(Task parent, int lo, int hi) { 1612 * super(parent); this.lo = lo; this.hi = hi; 1613 * } 1614 * 1615 * void compute() { 1616 * if (hi - lo >= 2) { 1617 * int mid = (lo + hi) >>> 1; 1618 * // must set pending count before fork 1619 * setPendingCount(2); 1620 * new Task(this, mid, hi).fork(); // right child 1621 * new Task(this, lo, mid).fork(); // left child 1622 * } 1623 * else if (hi > lo) 1624 * action.accept(array[lo]); 1625 * tryComplete(); 1626 * } 1627 * } 1628 * new Task(null, 0, array.length).invoke(); 1629 * }}</pre> 1630 * 1631 * This design can be improved by noticing that in the recursive case, 1632 * the task has nothing to do after forking its right task, so can 1633 * directly invoke its left task before returning. (This is an analog 1634 * of tail recursion removal.) Also, when the last action in a task 1635 * is to fork or invoke a subtask (a "tail call"), the call to {@code 1636 * tryComplete()} can be optimized away, at the cost of making the 1637 * pending count look "off by one". 1638 * 1639 * <pre> {@code 1640 * void compute() { 1641 * if (hi - lo >= 2) { 1642 * int mid = (lo + hi) >>> 1; 1643 * setPendingCount(1); // looks off by one, but correct! 1644 * new Task(this, mid, hi).fork(); // right child 1645 * new Task(this, lo, mid).compute(); // direct invoke 1646 * } else { 1647 * if (hi > lo) 1648 * action.accept(array[lo]); 1649 * tryComplete(); 1650 * } 1651 * }}</pre> 1652 * 1653 * As a further optimization, notice that the left task need not even exist. 1654 * Instead of creating a new one, we can continue using the original task, 1655 * and add a pending count for each fork. Additionally, because no task 1656 * in this tree implements an {@link #onCompletion(CountedCompleter)} method, 1657 * {@code tryComplete} can be replaced with {@link #propagateCompletion}. 1658 * 1659 * <pre> {@code 1660 * void compute() { 1661 * int n = hi - lo; 1662 * for (; n >= 2; n /= 2) { 1663 * addToPendingCount(1); 1664 * new Task(this, lo + n/2, lo + n).fork(); 1665 * } 1666 * if (n > 0) 1667 * action.accept(array[lo]); 1668 * propagateCompletion(); 1669 * }}</pre> 1670 * 1671 * When pending counts can be precomputed, they can be established in 1672 * the constructor: 1673 * 1674 * <pre> {@code 1675 * static <E> void forEach(E[] array, Consumer<E> action) { 1676 * class Task extends CountedCompleter!(void) { 1677 * final int lo, hi; 1678 * Task(Task parent, int lo, int hi) { 1679 * super(parent, 31 - Integer.numberOfLeadingZeros(hi - lo)); 1680 * this.lo = lo; this.hi = hi; 1681 * } 1682 * 1683 * void compute() { 1684 * for (int n = hi - lo; n >= 2; n /= 2) 1685 * new Task(this, lo + n/2, lo + n).fork(); 1686 * action.accept(array[lo]); 1687 * propagateCompletion(); 1688 * } 1689 * } 1690 * if (array.length > 0) 1691 * new Task(null, 0, array.length).invoke(); 1692 * }}</pre> 1693 * 1694 * Additional optimizations of such classes might entail specializing 1695 * classes for leaf steps, subdividing by say, four, instead of two 1696 * per iteration, and using an adaptive threshold instead of always 1697 * subdividing down to single elements. 1698 * 1699 * <p><b>Searching.</b> A tree of CountedCompleters can search for a 1700 * value or property in different parts of a data structure, and 1701 * report a result in an {@link 1702 * hunt.concurrency.atomic.AtomicReference AtomicReference} as 1703 * soon as one is found. The others can poll the result to avoid 1704 * unnecessary work. (You could additionally {@linkplain #cancel 1705 * cancel} other tasks, but it is usually simpler and more efficient 1706 * to just let them notice that the result is set and if so skip 1707 * further processing.) Illustrating again with an array using full 1708 * partitioning (again, in practice, leaf tasks will almost always 1709 * process more than one element): 1710 * 1711 * <pre> {@code 1712 * class Searcher<E> extends CountedCompleter<E> { 1713 * final E[] array; final AtomicReference<E> result; final int lo, hi; 1714 * Searcher(ICountedCompleter p, E[] array, AtomicReference<E> result, int lo, int hi) { 1715 * super(p); 1716 * this.array = array; this.result = result; this.lo = lo; this.hi = hi; 1717 * } 1718 * E getRawResult() { return result.get(); } 1719 * void compute() { // similar to ForEach version 3 1720 * int l = lo, h = hi; 1721 * while (result.get() is null && h >= l) { 1722 * if (h - l >= 2) { 1723 * int mid = (l + h) >>> 1; 1724 * addToPendingCount(1); 1725 * new Searcher(this, array, result, mid, h).fork(); 1726 * h = mid; 1727 * } 1728 * else { 1729 * E x = array[l]; 1730 * if (matches(x) && result.compareAndSet(null, x)) 1731 * quietlyCompleteRoot(); // root task is now joinable 1732 * break; 1733 * } 1734 * } 1735 * tryComplete(); // normally complete whether or not found 1736 * } 1737 * bool matches(E e) { ... } // return true if found 1738 * 1739 * static <E> E search(E[] array) { 1740 * return new Searcher<E>(null, array, new AtomicReference<E>(), 0, array.length).invoke(); 1741 * } 1742 * }}</pre> 1743 * 1744 * In this example, as well as others in which tasks have no other 1745 * effects except to {@code compareAndSet} a common result, the 1746 * trailing unconditional invocation of {@code tryComplete} could be 1747 * made conditional ({@code if (result.get() is null) tryComplete();}) 1748 * because no further bookkeeping is required to manage completions 1749 * once the root task completes. 1750 * 1751 * <p><b>Recording subtasks.</b> CountedCompleter tasks that combine 1752 * results of multiple subtasks usually need to access these results 1753 * in method {@link #onCompletion(CountedCompleter)}. As illustrated in the following 1754 * class (that performs a simplified form of map-reduce where mappings 1755 * and reductions are all of type {@code E}), one way to do this in 1756 * divide and conquer designs is to have each subtask record its 1757 * sibling, so that it can be accessed in method {@code onCompletion}. 1758 * This technique applies to reductions in which the order of 1759 * combining left and right results does not matter; ordered 1760 * reductions require explicit left/right designations. Variants of 1761 * other streamlinings seen in the above examples may also apply. 1762 * 1763 * <pre> {@code 1764 * class MyMapper<E> { E apply(E v) { ... } } 1765 * class MyReducer<E> { E apply(E x, E y) { ... } } 1766 * class MapReducer<E> extends CountedCompleter<E> { 1767 * final E[] array; final MyMapper<E> mapper; 1768 * final MyReducer<E> reducer; final int lo, hi; 1769 * MapReducer<E> sibling; 1770 * E result; 1771 * MapReducer(ICountedCompleter p, E[] array, MyMapper<E> mapper, 1772 * MyReducer<E> reducer, int lo, int hi) { 1773 * super(p); 1774 * this.array = array; this.mapper = mapper; 1775 * this.reducer = reducer; this.lo = lo; this.hi = hi; 1776 * } 1777 * void compute() { 1778 * if (hi - lo >= 2) { 1779 * int mid = (lo + hi) >>> 1; 1780 * MapReducer<E> left = new MapReducer(this, array, mapper, reducer, lo, mid); 1781 * MapReducer<E> right = new MapReducer(this, array, mapper, reducer, mid, hi); 1782 * left.sibling = right; 1783 * right.sibling = left; 1784 * setPendingCount(1); // only right is pending 1785 * right.fork(); 1786 * left.compute(); // directly execute left 1787 * } 1788 * else { 1789 * if (hi > lo) 1790 * result = mapper.apply(array[lo]); 1791 * tryComplete(); 1792 * } 1793 * } 1794 * void onCompletion(ICountedCompleter caller) { 1795 * if (caller != this) { 1796 * MapReducer<E> child = (MapReducer<E>)caller; 1797 * MapReducer<E> sib = child.sibling; 1798 * if (sib is null || sib.result is null) 1799 * result = child.result; 1800 * else 1801 * result = reducer.apply(child.result, sib.result); 1802 * } 1803 * } 1804 * E getRawResult() { return result; } 1805 * 1806 * static <E> E mapReduce(E[] array, MyMapper<E> mapper, MyReducer<E> reducer) { 1807 * return new MapReducer<E>(null, array, mapper, reducer, 1808 * 0, array.length).invoke(); 1809 * } 1810 * }}</pre> 1811 * 1812 * Here, method {@code onCompletion} takes a form common to many 1813 * completion designs that combine results. This callback-style method 1814 * is triggered once per task, in either of the two different contexts 1815 * in which the pending count is, or becomes, zero: (1) by a task 1816 * itself, if its pending count is zero upon invocation of {@code 1817 * tryComplete}, or (2) by any of its subtasks when they complete and 1818 * decrement the pending count to zero. The {@code caller} argument 1819 * distinguishes cases. Most often, when the caller is {@code this}, 1820 * no action is necessary. Otherwise the caller argument can be used 1821 * (usually via a cast) to supply a value (and/or links to other 1822 * values) to be combined. Assuming proper use of pending counts, the 1823 * actions inside {@code onCompletion} occur (once) upon completion of 1824 * a task and its subtasks. No additional synchronization is required 1825 * within this method to ensure thread safety of accesses to fields of 1826 * this task or other completed tasks. 1827 * 1828 * <p><b>Completion Traversals</b>. If using {@code onCompletion} to 1829 * process completions is inapplicable or inconvenient, you can use 1830 * methods {@link #firstComplete} and {@link #nextComplete} to create 1831 * custom traversals. For example, to define a MapReducer that only 1832 * splits out right-hand tasks in the form of the third ForEach 1833 * example, the completions must cooperatively reduce along 1834 * unexhausted subtask links, which can be done as follows: 1835 * 1836 * <pre> {@code 1837 * class MapReducer<E> extends CountedCompleter<E> { // version 2 1838 * final E[] array; final MyMapper<E> mapper; 1839 * final MyReducer<E> reducer; final int lo, hi; 1840 * MapReducer<E> forks, next; // record subtask forks in list 1841 * E result; 1842 * MapReducer(ICountedCompleter p, E[] array, MyMapper<E> mapper, 1843 * MyReducer<E> reducer, int lo, int hi, MapReducer<E> next) { 1844 * super(p); 1845 * this.array = array; this.mapper = mapper; 1846 * this.reducer = reducer; this.lo = lo; this.hi = hi; 1847 * this.next = next; 1848 * } 1849 * void compute() { 1850 * int l = lo, h = hi; 1851 * while (h - l >= 2) { 1852 * int mid = (l + h) >>> 1; 1853 * addToPendingCount(1); 1854 * (forks = new MapReducer(this, array, mapper, reducer, mid, h, forks)).fork(); 1855 * h = mid; 1856 * } 1857 * if (h > l) 1858 * result = mapper.apply(array[l]); 1859 * // process completions by reducing along and advancing subtask links 1860 * for (ICountedCompleter c = firstComplete(); c !is null; c = c.nextComplete()) { 1861 * for (MapReducer t = (MapReducer)c, s = t.forks; s !is null; s = t.forks = s.next) 1862 * t.result = reducer.apply(t.result, s.result); 1863 * } 1864 * } 1865 * E getRawResult() { return result; } 1866 * 1867 * static <E> E mapReduce(E[] array, MyMapper<E> mapper, MyReducer<E> reducer) { 1868 * return new MapReducer<E>(null, array, mapper, reducer, 1869 * 0, array.length, null).invoke(); 1870 * } 1871 * }}</pre> 1872 * 1873 * <p><b>Triggers.</b> Some CountedCompleters are themselves never 1874 * forked, but instead serve as bits of plumbing in other designs; 1875 * including those in which the completion of one or more async tasks 1876 * triggers another async task. For example: 1877 * 1878 * <pre> {@code 1879 * class HeaderBuilder extends CountedCompleter<...> { ... } 1880 * class BodyBuilder extends CountedCompleter<...> { ... } 1881 * class PacketSender extends CountedCompleter<...> { 1882 * PacketSender(...) { super(null, 1); ... } // trigger on second completion 1883 * void compute() { } // never called 1884 * void onCompletion(ICountedCompleter caller) { sendPacket(); } 1885 * } 1886 * // sample use: 1887 * PacketSender p = new PacketSender(); 1888 * new HeaderBuilder(p, ...).fork(); 1889 * new BodyBuilder(p, ...).fork();}</pre> 1890 * 1891 * @author Doug Lea 1892 */ 1893 abstract class CountedCompleter(T) : ForkJoinTask!(T), ICountedCompleter { 1894 1895 /** This task's completer, or null if none */ 1896 ICountedCompleter completer; 1897 /** The number of pending tasks until completion */ 1898 int pending; 1899 1900 /** 1901 * Creates a new CountedCompleter with the given completer 1902 * and initial pending count. 1903 * 1904 * @param completer this task's completer, or {@code null} if none 1905 * @param initialPendingCount the initial pending count 1906 */ 1907 protected this(ICountedCompleter completer, 1908 int initialPendingCount) { 1909 this.completer = completer; 1910 this.pending = initialPendingCount; 1911 } 1912 1913 /** 1914 * Creates a new CountedCompleter with the given completer 1915 * and an initial pending count of zero. 1916 * 1917 * @param completer this task's completer, or {@code null} if none 1918 */ 1919 protected this(ICountedCompleter completer) { 1920 this.completer = completer; 1921 } 1922 1923 /** 1924 * Creates a new CountedCompleter with no completer 1925 * and an initial pending count of zero. 1926 */ 1927 protected this() { 1928 this.completer = null; 1929 } 1930 1931 ICountedCompleter getCompleter() { 1932 return completer; 1933 } 1934 1935 /** 1936 * The main computation performed by this task. 1937 */ 1938 abstract void compute(); 1939 1940 /** 1941 * Performs an action when method {@link #tryComplete} is invoked 1942 * and the pending count is zero, or when the unconditional 1943 * method {@link #complete} is invoked. By default, this method 1944 * does nothing. You can distinguish cases by checking the 1945 * identity of the given caller argument. If not equal to {@code 1946 * this}, then it is typically a subtask that may contain results 1947 * (and/or links to other results) to combine. 1948 * 1949 * @param caller the task invoking this method (which may 1950 * be this task itself) 1951 */ 1952 void onCompletion(ICountedCompleter caller) { 1953 } 1954 1955 /** 1956 * Performs an action when method {@link 1957 * #completeExceptionally(Throwable)} is invoked or method {@link 1958 * #compute} throws an exception, and this task has not already 1959 * otherwise completed normally. On entry to this method, this task 1960 * {@link ForkJoinTask#isCompletedAbnormally}. The return value 1961 * of this method controls further propagation: If {@code true} 1962 * and this task has a completer that has not completed, then that 1963 * completer is also completed exceptionally, with the same 1964 * exception as this completer. The default implementation of 1965 * this method does nothing except return {@code true}. 1966 * 1967 * @param ex the exception 1968 * @param caller the task invoking this method (which may 1969 * be this task itself) 1970 * @return {@code true} if this exception should be propagated to this 1971 * task's completer, if one exists 1972 */ 1973 bool onExceptionalCompletion(Throwable ex, ICountedCompleter caller) { 1974 return true; 1975 } 1976 1977 /** 1978 * Returns the completer established in this task's constructor, 1979 * or {@code null} if none. 1980 * 1981 * @return the completer 1982 */ 1983 final ICountedCompleter getCompleter() { 1984 return completer; 1985 } 1986 1987 /** 1988 * Returns the current pending count. 1989 * 1990 * @return the current pending count 1991 */ 1992 final int getPendingCount() { 1993 return pending; 1994 } 1995 1996 /** 1997 * Sets the pending count to the given value. 1998 * 1999 * @param count the count 2000 */ 2001 final void setPendingCount(int count) { 2002 pending = count; 2003 } 2004 2005 /** 2006 * Adds (atomically) the given value to the pending count. 2007 * 2008 * @param delta the value to add 2009 */ 2010 final void addToPendingCount(int delta) { 2011 PENDING.getAndAdd(this, delta); 2012 } 2013 2014 /** 2015 * Sets (atomically) the pending count to the given count only if 2016 * it currently holds the given expected value. 2017 * 2018 * @param expected the expected value 2019 * @param count the new value 2020 * @return {@code true} if successful 2021 */ 2022 final bool compareAndSetPendingCount(int expected, int count) { 2023 return PENDING.compareAndSet(this, expected, count); 2024 } 2025 2026 /** 2027 * If the pending count is nonzero, (atomically) decrements it. 2028 * 2029 * @return the initial (undecremented) pending count holding on entry 2030 * to this method 2031 */ 2032 final int decrementPendingCountUnlessZero() { 2033 int c; 2034 do {} while ((c = pending) != 0 && 2035 !PENDING.weakCompareAndSet(this, c, c - 1)); 2036 return c; 2037 } 2038 2039 /** 2040 * Returns the root of the current computation; i.e., this 2041 * task if it has no completer, else its completer's root. 2042 * 2043 * @return the root of the current computation 2044 */ 2045 final ICountedCompleter getRoot() { 2046 ICountedCompleter a = this, p; 2047 while ((p = a.completer) !is null) 2048 a = p; 2049 return a; 2050 } 2051 2052 /** 2053 * If the pending count is nonzero, decrements the count; 2054 * otherwise invokes {@link #onCompletion(CountedCompleter)} 2055 * and then similarly tries to complete this task's completer, 2056 * if one exists, else marks this task as complete. 2057 */ 2058 final void tryComplete() { 2059 ICountedCompleter a = this, s = a; 2060 for (int c;;) { 2061 if ((c = a.pending) == 0) { 2062 a.onCompletion(s); 2063 if ((a = (s = a).completer) is null) { 2064 s.quietlyComplete(); 2065 return; 2066 } 2067 } 2068 else if (PENDING.weakCompareAndSet(a, c, c - 1)) 2069 return; 2070 } 2071 } 2072 2073 /** 2074 * Equivalent to {@link #tryComplete} but does not invoke {@link 2075 * #onCompletion(CountedCompleter)} along the completion path: 2076 * If the pending count is nonzero, decrements the count; 2077 * otherwise, similarly tries to complete this task's completer, if 2078 * one exists, else marks this task as complete. This method may be 2079 * useful in cases where {@code onCompletion} should not, or need 2080 * not, be invoked for each completer in a computation. 2081 */ 2082 final void propagateCompletion() { 2083 ICountedCompleter a = this, s; 2084 for (int c;;) { 2085 if ((c = a.pending) == 0) { 2086 if ((a = (s = a).completer) is null) { 2087 s.quietlyComplete(); 2088 return; 2089 } 2090 } 2091 else if (PENDING.weakCompareAndSet(a, c, c - 1)) 2092 return; 2093 } 2094 } 2095 2096 /** 2097 * Regardless of pending count, invokes 2098 * {@link #onCompletion(CountedCompleter)}, marks this task as 2099 * complete and further triggers {@link #tryComplete} on this 2100 * task's completer, if one exists. The given rawResult is 2101 * used as an argument to {@link #setRawResult} before invoking 2102 * {@link #onCompletion(CountedCompleter)} or marking this task 2103 * as complete; its value is meaningful only for classes 2104 * overriding {@code setRawResult}. This method does not modify 2105 * the pending count. 2106 * 2107 * <p>This method may be useful when forcing completion as soon as 2108 * any one (versus all) of several subtask results are obtained. 2109 * However, in the common (and recommended) case in which {@code 2110 * setRawResult} is not overridden, this effect can be obtained 2111 * more simply using {@link #quietlyCompleteRoot()}. 2112 * 2113 * @param rawResult the raw result 2114 */ 2115 void complete(T rawResult) { 2116 ICountedCompleter p; 2117 setRawResult(rawResult); 2118 onCompletion(this); 2119 quietlyComplete(); 2120 if ((p = completer) !is null) 2121 p.tryComplete(); 2122 } 2123 2124 /** 2125 * If this task's pending count is zero, returns this task; 2126 * otherwise decrements its pending count and returns {@code null}. 2127 * This method is designed to be used with {@link #nextComplete} in 2128 * completion traversal loops. 2129 * 2130 * @return this task, if pending count was zero, else {@code null} 2131 */ 2132 final ICountedCompleter firstComplete() { 2133 for (int c;;) { 2134 if ((c = pending) == 0) 2135 return this; 2136 else if (PENDING.weakCompareAndSet(this, c, c - 1)) 2137 return null; 2138 } 2139 } 2140 2141 /** 2142 * If this task does not have a completer, invokes {@link 2143 * ForkJoinTask#quietlyComplete} and returns {@code null}. Or, if 2144 * the completer's pending count is non-zero, decrements that 2145 * pending count and returns {@code null}. Otherwise, returns the 2146 * completer. This method can be used as part of a completion 2147 * traversal loop for homogeneous task hierarchies: 2148 * 2149 * <pre> {@code 2150 * for (ICountedCompleter c = firstComplete(); 2151 * c !is null; 2152 * c = c.nextComplete()) { 2153 * // ... process c ... 2154 * }}</pre> 2155 * 2156 * @return the completer, or {@code null} if none 2157 */ 2158 final ICountedCompleter nextComplete() { 2159 ICountedCompleter p; 2160 if ((p = completer) !is null) 2161 return p.firstComplete(); 2162 else { 2163 quietlyComplete(); 2164 return null; 2165 } 2166 } 2167 2168 /** 2169 * Equivalent to {@code getRoot().quietlyComplete()}. 2170 */ 2171 final void quietlyCompleteRoot() { 2172 for (ICountedCompleter a = this, p;;) { 2173 if ((p = a.completer) is null) { 2174 a.quietlyComplete(); 2175 return; 2176 } 2177 a = p; 2178 } 2179 } 2180 2181 /** 2182 * If this task has not completed, attempts to process at most the 2183 * given number of other unprocessed tasks for which this task is 2184 * on the completion path, if any are known to exist. 2185 * 2186 * @param maxTasks the maximum number of tasks to process. If 2187 * less than or equal to zero, then no tasks are 2188 * processed. 2189 */ 2190 final void helpComplete(int maxTasks) { 2191 Thread t = Thread.getThis(); 2192 ForkJoinWorkerThread wt = cast(ForkJoinWorkerThread)t; 2193 if (maxTasks > 0 && status >= 0) { 2194 if (wt !is null) 2195 wt.pool.helpComplete(wt.workQueue, this, maxTasks); 2196 else 2197 ForkJoinPool.common.externalHelpComplete(this, maxTasks); 2198 } 2199 } 2200 2201 /** 2202 * Supports ForkJoinTask exception propagation. 2203 */ 2204 void internalPropagateException(Throwable ex) { 2205 ICountedCompleter a = this, s = a; 2206 while (a.onExceptionalCompletion(ex, s) && 2207 (a = (s = a).completer) !is null && a.status >= 0 && 2208 isExceptionalStatus(a.recordExceptionalCompletion(ex))) { 2209 2210 } 2211 } 2212 2213 /** 2214 * Implements execution conventions for CountedCompleters. 2215 */ 2216 protected final bool exec() { 2217 compute(); 2218 return false; 2219 } 2220 2221 /** 2222 * Returns the result of the computation. By default, 2223 * returns {@code null}, which is appropriate for {@code Void} 2224 * actions, but in other cases should be overridden, almost 2225 * always to return a field or function of a field that 2226 * holds the result upon completion. 2227 * 2228 * @return the result of the computation 2229 */ 2230 T getRawResult() { return null; } 2231 2232 /** 2233 * A method that result-bearing CountedCompleters may optionally 2234 * use to help maintain result data. By default, does nothing. 2235 * Overrides are not recommended. However, if this method is 2236 * overridden to update existing objects or fields, then it must 2237 * in general be defined to be thread-safe. 2238 */ 2239 protected void setRawResult(T t) { } 2240 2241 // VarHandle mechanics 2242 // private static final VarHandle PENDING; 2243 // static { 2244 // try { 2245 // MethodHandles.Lookup l = MethodHandles.lookup(); 2246 // PENDING = l.findVarHandle(CountedCompleter.class, "pending", int.class); 2247 2248 // } catch (ReflectiveOperationException e) { 2249 // throw new ExceptionInInitializerError(e); 2250 // } 2251 // } 2252 } 2253 2254 2255 /** 2256 * A recursive result-bearing {@link ForkJoinTask}. 2257 * 2258 * <p>For a classic example, here is a task computing Fibonacci numbers: 2259 * 2260 * <pre> {@code 2261 * class Fibonacci extends RecursiveTask<Integer> { 2262 * final int n; 2263 * Fibonacci(int n) { this.n = n; } 2264 * protected Integer compute() { 2265 * if (n <= 1) 2266 * return n; 2267 * Fibonacci f1 = new Fibonacci(n - 1); 2268 * f1.fork(); 2269 * Fibonacci f2 = new Fibonacci(n - 2); 2270 * return f2.compute() + f1.join(); 2271 * } 2272 * }}</pre> 2273 * 2274 * However, besides being a dumb way to compute Fibonacci functions 2275 * (there is a simple fast linear algorithm that you'd use in 2276 * practice), this is likely to perform poorly because the smallest 2277 * subtasks are too small to be worthwhile splitting up. Instead, as 2278 * is the case for nearly all fork/join applications, you'd pick some 2279 * minimum granularity size (for example 10 here) for which you always 2280 * sequentially solve rather than subdividing. 2281 * 2282 * @author Doug Lea 2283 */ 2284 abstract class RecursiveTask(V) : ForkJoinTask!V { 2285 2286 /** 2287 * The result of the computation. 2288 */ 2289 V result; 2290 2291 /** 2292 * The main computation performed by this task. 2293 * @return the result of the computation 2294 */ 2295 protected abstract V compute(); 2296 2297 final override V getRawResult() { 2298 return result; 2299 } 2300 2301 protected final override void setRawResult(V value) { 2302 result = value; 2303 } 2304 2305 /** 2306 * Implements execution conventions for RecursiveTask. 2307 */ 2308 protected final override bool exec() { 2309 result = compute(); 2310 return true; 2311 } 2312 2313 }