1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the public domain, as explained at 33 * http://creativecommons.org/publicdomain/zero/1.0/ 34 */ 35 36 module hunt.concurrency.ForkJoinTask; 37 38 import hunt.concurrency.atomic; 39 import hunt.concurrency.Future; 40 import hunt.concurrency.thread; 41 42 import hunt.concurrency.ForkJoinPool; 43 import hunt.concurrency.ForkJoinTaskHelper; 44 45 import hunt.collection.Collection; 46 import hunt.Exceptions; 47 import hunt.util.Common; 48 import hunt.util.DateTime; 49 // import java.util.List; 50 // import java.util.RandomAccess; 51 // import hunt.concurrency.locks.ReentrantLock; 52 53 import core.time; 54 import core.sync.condition; 55 import core.sync.mutex; 56 import core.thread; 57 58 59 /** 60 * Abstract base class for tasks that run within a {@link ForkJoinPool}. 61 * A {@code ForkJoinTask} is a thread-like entity that is much 62 * lighter weight than a normal thread. Huge numbers of tasks and 63 * subtasks may be hosted by a small number of actual threads in a 64 * ForkJoinPool, at the price of some usage limitations. 65 * 66 * <p>A "main" {@code ForkJoinTask} begins execution when it is 67 * explicitly submitted to a {@link ForkJoinPool}, or, if not already 68 * engaged in a ForkJoin computation, commenced in the {@link 69 * ForkJoinPool#commonPool()} via {@link #fork}, {@link #invoke}, or 70 * related methods. Once started, it will usually in turn start other 71 * subtasks. As indicated by the name of this class, many programs 72 * using {@code ForkJoinTask} employ only methods {@link #fork} and 73 * {@link #join}, or derivatives such as {@link 74 * #invokeAll(ForkJoinTask...) invokeAll}. However, this class also 75 * provides a number of other methods that can come into play in 76 * advanced usages, as well as extension mechanics that allow support 77 * of new forms of fork/join processing. 78 * 79 * <p>A {@code ForkJoinTask} is a lightweight form of {@link Future}. 80 * The efficiency of {@code ForkJoinTask}s stems from a set of 81 * restrictions (that are only partially statically enforceable) 82 * reflecting their main use as computational tasks calculating pure 83 * functions or operating on purely isolated objects. The primary 84 * coordination mechanisms are {@link #fork}, that arranges 85 * asynchronous execution, and {@link #join}, that doesn't proceed 86 * until the task's result has been computed. Computations should 87 * ideally avoid {@code synchronized} methods or blocks, and should 88 * minimize other blocking synchronization apart from joining other 89 * tasks or using synchronizers such as Phasers that are advertised to 90 * cooperate with fork/join scheduling. Subdividable tasks should also 91 * not perform blocking I/O, and should ideally access variables that 92 * are completely independent of those accessed by other running 93 * tasks. These guidelines are loosely enforced by not permitting 94 * checked exceptions such as {@code IOExceptions} to be 95 * thrown. However, computations may still encounter unchecked 96 * exceptions, that are rethrown to callers attempting to join 97 * them. These exceptions may additionally include {@link 98 * RejectedExecutionException} stemming from internal resource 99 * exhaustion, such as failure to allocate internal task 100 * queues. Rethrown exceptions behave in the same way as regular 101 * exceptions, but, when possible, contain stack traces (as displayed 102 * for example using {@code ex.printStackTrace()}) of both the thread 103 * that initiated the computation as well as the thread actually 104 * encountering the exception; minimally only the latter. 105 * 106 * <p>It is possible to define and use ForkJoinTasks that may block, 107 * but doing so requires three further considerations: (1) Completion 108 * of few if any <em>other</em> tasks should be dependent on a task 109 * that blocks on external synchronization or I/O. Event-style async 110 * tasks that are never joined (for example, those subclassing {@link 111 * CountedCompleter}) often fall into this category. (2) To minimize 112 * resource impact, tasks should be small; ideally performing only the 113 * (possibly) blocking action. (3) Unless the {@link 114 * ForkJoinPool.ManagedBlocker} API is used, or the number of possibly 115 * blocked tasks is known to be less than the pool's {@link 116 * ForkJoinPool#getParallelism} level, the pool cannot guarantee that 117 * enough threads will be available to ensure progress or good 118 * performance. 119 * 120 * <p>The primary method for awaiting completion and extracting 121 * results of a task is {@link #join}, but there are several variants: 122 * The {@link Future#get} methods support interruptible and/or timed 123 * waits for completion and report results using {@code Future} 124 * conventions. Method {@link #invoke} is semantically 125 * equivalent to {@code fork(); join()} but always attempts to begin 126 * execution in the current thread. The "<em>quiet</em>" forms of 127 * these methods do not extract results or report exceptions. These 128 * may be useful when a set of tasks are being executed, and you need 129 * to delay processing of results or exceptions until all complete. 130 * Method {@code invokeAll} (available in multiple versions) 131 * performs the most common form of parallel invocation: forking a set 132 * of tasks and joining them all. 133 * 134 * <p>In the most typical usages, a fork-join pair act like a call 135 * (fork) and return (join) from a parallel recursive function. As is 136 * the case with other forms of recursive calls, returns (joins) 137 * should be performed innermost-first. For example, {@code a.fork(); 138 * b.fork(); b.join(); a.join();} is likely to be substantially more 139 * efficient than joining {@code a} before {@code b}. 140 * 141 * <p>The execution status of tasks may be queried at several levels 142 * of detail: {@link #isDone} is true if a task completed in any way 143 * (including the case where a task was cancelled without executing); 144 * {@link #isCompletedNormally} is true if a task completed without 145 * cancellation or encountering an exception; {@link #isCancelled} is 146 * true if the task was cancelled (in which case {@link #getException} 147 * returns a {@link CancellationException}); and 148 * {@link #isCompletedAbnormally} is true if a task was either 149 * cancelled or encountered an exception, in which case {@link 150 * #getException} will return either the encountered exception or 151 * {@link CancellationException}. 152 * 153 * <p>The ForkJoinTask class is not usually directly subclassed. 154 * Instead, you subclass one of the abstract classes that support a 155 * particular style of fork/join processing, typically {@link 156 * RecursiveAction} for most computations that do not return results, 157 * {@link RecursiveTask} for those that do, and {@link 158 * CountedCompleter} for those in which completed actions trigger 159 * other actions. Normally, a concrete ForkJoinTask subclass declares 160 * fields comprising its parameters, established in a constructor, and 161 * then defines a {@code compute} method that somehow uses the control 162 * methods supplied by this base class. 163 * 164 * <p>Method {@link #join} and its variants are appropriate for use 165 * only when completion dependencies are acyclic; that is, the 166 * parallel computation can be described as a directed acyclic graph 167 * (DAG). Otherwise, executions may encounter a form of deadlock as 168 * tasks cyclically wait for each other. However, this framework 169 * supports other methods and techniques (for example the use of 170 * {@link Phaser}, {@link #helpQuiesce}, and {@link #complete}) that 171 * may be of use in constructing custom subclasses for problems that 172 * are not statically structured as DAGs. To support such usages, a 173 * ForkJoinTask may be atomically <em>tagged</em> with a {@code short} 174 * value using {@link #setForkJoinTaskTag} or {@link 175 * #compareAndSetForkJoinTaskTag} and checked using {@link 176 * #getForkJoinTaskTag}. The ForkJoinTask implementation does not use 177 * these {@code protected} methods or tags for any purpose, but they 178 * may be of use in the construction of specialized subclasses. For 179 * example, parallel graph traversals can use the supplied methods to 180 * avoid revisiting nodes/tasks that have already been processed. 181 * (Method names for tagging are bulky in part to encourage definition 182 * of methods that reflect their usage patterns.) 183 * 184 * <p>Most base support methods are {@code final}, to prevent 185 * overriding of implementations that are intrinsically tied to the 186 * underlying lightweight task scheduling framework. Developers 187 * creating new basic styles of fork/join processing should minimally 188 * implement {@code protected} methods {@link #exec}, {@link 189 * #setRawResult}, and {@link #getRawResult}, while also introducing 190 * an abstract computational method that can be implemented in its 191 * subclasses, possibly relying on other {@code protected} methods 192 * provided by this class. 193 * 194 * <p>ForkJoinTasks should perform relatively small amounts of 195 * computation. Large tasks should be split into smaller subtasks, 196 * usually via recursive decomposition. As a very rough rule of thumb, 197 * a task should perform more than 100 and less than 10000 basic 198 * computational steps, and should avoid indefinite looping. If tasks 199 * are too big, then parallelism cannot improve throughput. If too 200 * small, then memory and internal task maintenance overhead may 201 * overwhelm processing. 202 * 203 * <p>This class provides {@code adapt} methods for {@link Runnable} 204 * and {@link Callable}, that may be of use when mixing execution of 205 * {@code ForkJoinTasks} with other kinds of tasks. When all tasks are 206 * of this form, consider using a pool constructed in <em>asyncMode</em>. 207 * 208 * <p>ForkJoinTasks are {@code Serializable}, which enables them to be 209 * used in extensions such as remote execution frameworks. It is 210 * sensible to serialize tasks only before or after, but not during, 211 * execution. Serialization is not relied on during execution itself. 212 * 213 * @since 1.7 214 * @author Doug Lea 215 */ 216 abstract class ForkJoinTask(V) : Future!(V), IForkJoinTask { 217 218 /* 219 * See the internal documentation of class ForkJoinPool for a 220 * general implementation overview. ForkJoinTasks are mainly 221 * responsible for maintaining their "status" field amidst relays 222 * to methods in ForkJoinWorkerThread and ForkJoinPool. 223 * 224 * The methods of this class are more-or-less layered into 225 * (1) basic status maintenance 226 * (2) execution and awaiting completion 227 * (3) user-level methods that additionally report results. 228 * This is sometimes hard to see because this file orders exported 229 * methods in a way that flows well in javadocs. 230 */ 231 232 /** 233 * The status field holds run control status bits packed into a 234 * single int to ensure atomicity. Status is initially zero, and 235 * takes on nonnegative values until completed, upon which it 236 * holds (sign bit) DONE, possibly with ABNORMAL (cancelled or 237 * exceptional) and THROWN (in which case an exception has been 238 * stored). Tasks with dependent blocked waiting joiners have the 239 * SIGNAL bit set. Completion of a task with SIGNAL set awakens 240 * any waiters via notifyAll. (Waiters also help signal others 241 * upon completion.) 242 * 243 * These control bits occupy only (some of) the upper half (16 244 * bits) of status field. The lower bits are used for user-defined 245 * tags. 246 */ 247 int status; // accessed directly by pool and workers 248 249 Mutex thisMutex; 250 Condition thisLocker; 251 252 private enum int DONE = 1 << 31; // must be negative 253 private enum int ABNORMAL = 1 << 18; // set atomically with DONE 254 private enum int THROWN = 1 << 17; // set atomically with ABNORMAL 255 private enum int SIGNAL = 1 << 16; // true if joiner waiting 256 private enum int SMASK = 0xffff; // short bits for tags 257 258 this() { 259 thisMutex = new Mutex(this); 260 thisLocker = new Condition(thisMutex); 261 } 262 263 static bool isExceptionalStatus(int s) { // needed by subclasses 264 return (s & THROWN) != 0; 265 } 266 267 /** 268 * Sets DONE status and wakes up threads waiting to join this task. 269 * 270 * @return status on exit 271 */ 272 private int setDone() { 273 int s = cast(int)(this.status | DONE); 274 // if (((s = (int)STATUS.getAndBitwiseOr(this, DONE)) & SIGNAL) != 0) 275 if((s & SIGNAL) != 0) { 276 synchronized (this) { 277 thisLocker.notifyAll(); 278 } 279 } 280 return s | DONE; 281 } 282 283 /** 284 * Marks cancelled or exceptional completion unless already done. 285 * 286 * @param completion must be DONE | ABNORMAL, ORed with THROWN if exceptional 287 * @return status on exit 288 */ 289 private int abnormalCompletion(int completion) { 290 for (int s, ns;;) { 291 if ((s = status) < 0) { 292 return s; 293 } else { 294 if(this.status == s) { 295 this.status = ns = s | completion; 296 if ((s & SIGNAL) != 0) 297 synchronized (this) { 298 thisLocker.notifyAll(); 299 } 300 return ns; 301 } 302 } 303 // FIXME: Needing refactor or cleanup -@zxp at 2019/2/7 10:33:03 304 // 305 // if (STATUS.weakCompareAndSet(this, s, ns = s | completion)) { 306 // if ((s & SIGNAL) != 0) 307 // synchronized (this) { notifyAll(); } 308 // return ns; 309 // } 310 } 311 } 312 313 int getStatus() { 314 return status; 315 } 316 317 /** 318 * Primary execution method for stolen tasks. Unless done, calls 319 * exec and records status if completed, but doesn't wait for 320 * completion otherwise. 321 * 322 * @return status on exit from this method 323 */ 324 final int doExec() { 325 int s; bool completed; 326 if ((s = status) >= 0) { 327 try { 328 completed = exec(); 329 } catch (Throwable rex) { 330 completed = false; 331 s = setExceptionalCompletion(rex); 332 } 333 if (completed) 334 s = setDone(); 335 } 336 return s; 337 } 338 339 /** 340 * If not done, sets SIGNAL status and performs Object.wait(timeout). 341 * This task may or may not be done on exit. Ignores interrupts. 342 * 343 * @param timeout using Object.wait conventions. 344 */ 345 final void internalWait(long timeout) { 346 int s = cast(int)(this.status | SIGNAL); 347 if (s >= 0) { 348 synchronized (this) { 349 if (status >= 0) 350 try { 351 thisLocker.wait(dur!(TimeUnit.Millisecond)(timeout)); 352 } catch (InterruptedException ie) { } 353 else 354 thisLocker.notifyAll(); 355 } 356 } 357 } 358 359 /** 360 * Blocks a non-worker-thread until completion. 361 * @return status upon completion 362 */ 363 private int externalAwaitDone() { 364 int s = tryExternalHelp(); 365 if(s < 0) 366 return s; 367 368 // s = cast(int)STATUS.getAndBitwiseOr(this, SIGNAL); 369 s = cast(int)(this.status | SIGNAL); 370 if(s < 0) 371 return s; 372 373 bool interrupted = false; 374 synchronized (this) { 375 for (;;) { 376 if ((s = status) >= 0) { 377 try { 378 thisLocker.wait(Duration.zero); 379 } catch (InterruptedException ie) { 380 interrupted = true; 381 } 382 } 383 else { 384 thisLocker.notifyAll(); 385 break; 386 } 387 } 388 } 389 if (interrupted) { 390 ThreadEx th = cast(ThreadEx) Thread.getThis(); 391 if(th !is null) 392 th.interrupt(); 393 } 394 return s; 395 } 396 397 /** 398 * Blocks a non-worker-thread until completion or interruption. 399 */ 400 private int externalInterruptibleAwaitDone() { 401 int s = tryExternalHelp(); 402 if(s <0) { 403 if (ThreadEx.interrupted()) 404 throw new InterruptedException(); 405 return s; 406 } 407 408 // s = cast(int)STATUS.getAndBitwiseOr(this, SIGNAL); 409 s = cast(int)(this.status | SIGNAL); 410 if (s >= 0) { 411 synchronized (this) { 412 for (;;) { 413 if ((s = status) >= 0) 414 thisLocker.wait(Duration.zero); 415 else { 416 thisLocker.notifyAll(); 417 break; 418 } 419 } 420 } 421 } 422 else if (ThreadEx.interrupted()) 423 throw new InterruptedException(); 424 return s; 425 } 426 427 /** 428 * Tries to help with tasks allowed for external callers. 429 * 430 * @return current status 431 */ 432 private int tryExternalHelp() { 433 int s = status; 434 if(s<0) return s; 435 ICountedCompleter cc = cast(ICountedCompleter)this; 436 if(cc !is null) { 437 return ForkJoinPool.common.externalHelpComplete( 438 cc, 0); 439 } else if(ForkJoinPool.common.tryExternalUnpush(this)) { 440 return doExec(); 441 } else 442 return 0; 443 // return ((s = status) < 0 ? s: 444 // (this instanceof CountedCompleter) ? 445 // ForkJoinPool.common.externalHelpComplete( 446 // (ICountedCompleter)this, 0) : 447 // ForkJoinPool.common.tryExternalUnpush(this) ? 448 // doExec() : 0); 449 } 450 451 /** 452 * Implementation for join, get, quietlyJoin. Directly handles 453 * only cases of already-completed, external wait, and 454 * unfork+exec. Others are relayed to ForkJoinPool.awaitJoin. 455 * 456 * @return status upon completion 457 */ 458 private int doJoin() { 459 int s = status; 460 if(s < 0) return s; 461 462 ForkJoinWorkerThread wt = cast(ForkJoinWorkerThread)Thread.getThis(); 463 if(wt !is null) { 464 return wt.awaitJoin(this); 465 // WorkQueue w = wt.workQueue; 466 // if(w.workQueue.tryUnpush(this) && (s = doExec()) < 0 ) 467 // return s; 468 // else 469 // return wt.pool.awaitJoin(w, this, 0L); 470 } else { 471 return externalAwaitDone(); 472 } 473 } 474 475 /** 476 * Implementation for invoke, quietlyInvoke. 477 * 478 * @return status upon completion 479 */ 480 private int doInvoke() { 481 int s = doExec(); 482 if(s < 0) 483 return s; 484 ForkJoinWorkerThread wt = cast(ForkJoinWorkerThread)Thread.getThis(); 485 if(wt !is null) { 486 return wt.pool.awaitJoin(wt.workQueue, this, MonoTime.zero()); 487 } else { 488 return externalAwaitDone(); 489 } 490 } 491 492 /** 493 * Records exception and sets status. 494 * 495 * @return status on exit 496 */ 497 final int recordExceptionalCompletion(Throwable ex) { 498 int s; 499 if ((s = status) >= 0) { 500 size_t h = this.toHash(); 501 ReentrantLock lock = ForkJoinTaskHelper.exceptionTableLock; 502 lock.lock(); 503 try { 504 ForkJoinTaskHelper.expungeStaleExceptions(); 505 ExceptionNode[] t = ForkJoinTaskHelper.exceptionTable; 506 size_t i = h & (t.length - 1); 507 for (ExceptionNode e = t[i]; ; e = e.next) { 508 if (e is null) { 509 t[i] = new ExceptionNode(this, ex, t[i]); 510 break; 511 } 512 if (e.get() == this) // already present 513 break; 514 } 515 } finally { 516 lock.unlock(); 517 } 518 s = abnormalCompletion(DONE | ABNORMAL | THROWN); 519 } 520 return s; 521 } 522 523 /** 524 * Records exception and possibly propagates. 525 * 526 * @return status on exit 527 */ 528 private int setExceptionalCompletion(Throwable ex) { 529 int s = recordExceptionalCompletion(ex); 530 if ((s & THROWN) != 0) 531 internalPropagateException(ex); 532 return s; 533 } 534 535 /** 536 * Hook for exception propagation support for tasks with completers. 537 */ 538 void internalPropagateException(Throwable ex) { 539 } 540 541 /** 542 * Removes exception node and clears status. 543 */ 544 private void clearExceptionalCompletion() { 545 // int h = System.identityHashCode(this); 546 size_t h = this.toHash(); 547 ReentrantLock lock = ForkJoinTaskHelper.exceptionTableLock; 548 lock.lock(); 549 try { 550 ExceptionNode[] t = ForkJoinTaskHelper.exceptionTable; 551 size_t i = h & (t.length - 1); 552 ExceptionNode e = t[i]; 553 ExceptionNode pred = null; 554 while (e !is null) { 555 ExceptionNode next = e.next; 556 if (e.get() == this) { 557 if (pred is null) 558 t[i] = next; 559 else 560 pred.next = next; 561 break; 562 } 563 pred = e; 564 e = next; 565 } 566 ForkJoinTaskHelper.expungeStaleExceptions(); 567 status = 0; 568 } finally { 569 lock.unlock(); 570 } 571 } 572 573 /** 574 * Returns a rethrowable exception for this task, if available. 575 * To provide accurate stack traces, if the exception was not 576 * thrown by the current thread, we try to create a new exception 577 * of the same type as the one thrown, but with the recorded 578 * exception as its cause. If there is no such constructor, we 579 * instead try to use a no-arg constructor, followed by initCause, 580 * to the same effect. If none of these apply, or any fail due to 581 * other exceptions, we return the recorded exception, which is 582 * still correct, although it may contain a misleading stack 583 * trace. 584 * 585 * @return the exception, or null if none 586 */ 587 private Throwable getThrowableException() { 588 // int h = System.identityHashCode(this); 589 size_t h = this.toHash(); 590 ExceptionNode e; 591 ReentrantLock lock = ForkJoinTaskHelper.exceptionTableLock; 592 lock.lock(); 593 try { 594 ForkJoinTaskHelper.expungeStaleExceptions(); 595 ExceptionNode[] t = ForkJoinTaskHelper.exceptionTable; 596 e = t[h & ($ - 1)]; 597 while (e !is null && e.get() != this) 598 e = e.next; 599 } finally { 600 lock.unlock(); 601 } 602 Throwable ex; 603 if (e is null || (ex = e.ex) is null) 604 return null; 605 // TODO: Tasks pending completion -@zxp at 12/21/2018, 10:28:06 PM 606 // 607 // if (e.thrower != Thread.getThis().getId()) { 608 // try { 609 // Constructor<?> noArgCtor = null; 610 // // public ctors only 611 // for (Constructor<?> c : ex.getClass().getConstructors()) { 612 // Class<?>[] ps = c.getParameterTypes(); 613 // if (ps.length == 0) 614 // noArgCtor = c; 615 // else if (ps.length == 1 && ps[0] == Throwable.class) 616 // return (Throwable)c.newInstance(ex); 617 // } 618 // if (noArgCtor !is null) { 619 // Throwable wx = (Throwable)noArgCtor.newInstance(); 620 // wx.initCause(ex); 621 // return wx; 622 // } 623 // } catch (Exception ignore) { 624 // } 625 // } 626 return ex; 627 } 628 629 630 /** 631 * Throws exception, if any, associated with the given status. 632 */ 633 private void reportException(int s) { 634 ForkJoinTaskHelper.rethrow((s & THROWN) != 0 ? getThrowableException() : 635 new CancellationException()); 636 } 637 638 // public methods 639 640 /** 641 * Arranges to asynchronously execute this task in the pool the 642 * current task is running in, if applicable, or using the {@link 643 * ForkJoinPool#commonPool()} if not {@link #inForkJoinPool}. While 644 * it is not necessarily enforced, it is a usage error to fork a 645 * task more than once unless it has completed and been 646 * reinitialized. Subsequent modifications to the state of this 647 * task or any data it operates on are not necessarily 648 * consistently observable by any thread other than the one 649 * executing it unless preceded by a call to {@link #join} or 650 * related methods, or a call to {@link #isDone} returning {@code 651 * true}. 652 * 653 * @return {@code this}, to simplify usage 654 */ 655 final ForkJoinTask!(V) fork() { 656 ForkJoinWorkerThread t = cast(ForkJoinWorkerThread)Thread.getThis(); 657 if (t !is null) 658 t.workQueue.push(this); 659 else 660 ForkJoinPool.common.externalPush(this); 661 return this; 662 } 663 664 /** 665 * Returns the result of the computation when it 666 * {@linkplain #isDone is done}. 667 * This method differs from {@link #get()} in that abnormal 668 * completion results in {@code RuntimeException} or {@code Error}, 669 * not {@code ExecutionException}, and that interrupts of the 670 * calling thread do <em>not</em> cause the method to abruptly 671 * return by throwing {@code InterruptedException}. 672 * 673 * @return the computed result 674 */ 675 final V join() { 676 int s; 677 if (((s = doJoin()) & ABNORMAL) != 0) 678 reportException(s); 679 static if(!is(V == void)) { 680 return getRawResult(); 681 } 682 } 683 684 /** 685 * Commences performing this task, awaits its completion if 686 * necessary, and returns its result, or throws an (unchecked) 687 * {@code RuntimeException} or {@code Error} if the underlying 688 * computation did so. 689 * 690 * @return the computed result 691 */ 692 final V invoke() { 693 int s; 694 if (((s = doInvoke()) & ABNORMAL) != 0) 695 reportException(s); 696 697 static if(!is(V == void)) { 698 return getRawResult(); 699 } 700 } 701 702 /** 703 * Forks the given tasks, returning when {@code isDone} holds for 704 * each task or an (unchecked) exception is encountered, in which 705 * case the exception is rethrown. If more than one task 706 * encounters an exception, then this method throws any one of 707 * these exceptions. If any task encounters an exception, the 708 * other may be cancelled. However, the execution status of 709 * individual tasks is not guaranteed upon exceptional return. The 710 * status of each task may be obtained using {@link 711 * #getException()} and related methods to check if they have been 712 * cancelled, completed normally or exceptionally, or left 713 * unprocessed. 714 * 715 * @param t1 the first task 716 * @param t2 the second task 717 * @throws NullPointerException if any task is null 718 */ 719 static void invokeAll(IForkJoinTask t1, IForkJoinTask t2) { 720 int s1, s2; 721 implementationMissing(false); 722 // t2.fork(); 723 // if (((s1 = t1.doInvoke()) & ABNORMAL) != 0) 724 // t1.reportException(s1); 725 // if (((s2 = t2.doJoin()) & ABNORMAL) != 0) 726 // t2.reportException(s2); 727 } 728 729 /** 730 * Forks the given tasks, returning when {@code isDone} holds for 731 * each task or an (unchecked) exception is encountered, in which 732 * case the exception is rethrown. If more than one task 733 * encounters an exception, then this method throws any one of 734 * these exceptions. If any task encounters an exception, others 735 * may be cancelled. However, the execution status of individual 736 * tasks is not guaranteed upon exceptional return. The status of 737 * each task may be obtained using {@link #getException()} and 738 * related methods to check if they have been cancelled, completed 739 * normally or exceptionally, or left unprocessed. 740 * 741 * @param tasks the tasks 742 * @throws NullPointerException if any task is null 743 */ 744 static void invokeAll(IForkJoinTask[] tasks...) { 745 Throwable ex = null; 746 int last = cast(int)tasks.length - 1; 747 // for (int i = last; i >= 0; --i) { 748 // IForkJoinTask t = tasks[i]; 749 // if (t is null) { 750 // if (ex is null) 751 // ex = new NullPointerException(); 752 // } 753 // else if (i != 0) 754 // t.fork(); 755 // else if ((t.doInvoke() & ABNORMAL) != 0 && ex is null) 756 // ex = t.getException(); 757 // } 758 // for (int i = 1; i <= last; ++i) { 759 // IForkJoinTask t = tasks[i]; 760 // if (t !is null) { 761 // if (ex !is null) 762 // t.cancel(false); 763 // else if ((t.doJoin() & ABNORMAL) != 0) 764 // ex = t.getException(); 765 // } 766 // } 767 implementationMissing(false); 768 if (ex !is null) 769 ForkJoinTaskHelper.rethrow(ex); 770 } 771 772 /** 773 * Forks all tasks in the specified collection, returning when 774 * {@code isDone} holds for each task or an (unchecked) exception 775 * is encountered, in which case the exception is rethrown. If 776 * more than one task encounters an exception, then this method 777 * throws any one of these exceptions. If any task encounters an 778 * exception, others may be cancelled. However, the execution 779 * status of individual tasks is not guaranteed upon exceptional 780 * return. The status of each task may be obtained using {@link 781 * #getException()} and related methods to check if they have been 782 * cancelled, completed normally or exceptionally, or left 783 * unprocessed. 784 * 785 * @param tasks the collection of tasks 786 * @param (T) the type of the values returned from the tasks 787 * @return the tasks argument, to simplify usage 788 * @throws NullPointerException if tasks or any element are null 789 */ 790 static Collection!(T) invokeAll(T)(Collection!(T) tasks) if(is(T : IForkJoinTask)) { 791 // TODO: Tasks pending completion -@zxp at 12/21/2018, 10:36:15 PM 792 // 793 implementationMissing(false); 794 // if (!(tasks instanceof RandomAccess) || !(tasks instanceof List<?>)) { 795 // invokeAll(tasks.toArray(new IForkJoinTask[0])); 796 // return tasks; 797 // } 798 799 // List!(IForkJoinTask) ts = cast(List!(IForkJoinTask)) tasks; 800 // Throwable ex = null; 801 // int last = ts.size() - 1; 802 // for (int i = last; i >= 0; --i) { 803 // IForkJoinTask t = ts.get(i); 804 // if (t is null) { 805 // if (ex is null) 806 // ex = new NullPointerException(); 807 // } 808 // else if (i != 0) 809 // t.fork(); 810 // else if ((t.doInvoke() & ABNORMAL) != 0 && ex is null) 811 // ex = t.getException(); 812 // } 813 // for (int i = 1; i <= last; ++i) { 814 // IForkJoinTask t = ts.get(i); 815 // if (t !is null) { 816 // if (ex !is null) 817 // t.cancel(false); 818 // else if ((t.doJoin() & ABNORMAL) != 0) 819 // ex = t.getException(); 820 // } 821 // } 822 // if (ex !is null) 823 // rethrow(ex); 824 return tasks; 825 } 826 827 /** 828 * Attempts to cancel execution of this task. This attempt will 829 * fail if the task has already completed or could not be 830 * cancelled for some other reason. If successful, and this task 831 * has not started when {@code cancel} is called, execution of 832 * this task is suppressed. After this method returns 833 * successfully, unless there is an intervening call to {@link 834 * #reinitialize}, subsequent calls to {@link #isCancelled}, 835 * {@link #isDone}, and {@code cancel} will return {@code true} 836 * and calls to {@link #join} and related methods will result in 837 * {@code CancellationException}. 838 * 839 * <p>This method may be overridden in subclasses, but if so, must 840 * still ensure that these properties hold. In particular, the 841 * {@code cancel} method itself must not throw exceptions. 842 * 843 * <p>This method is designed to be invoked by <em>other</em> 844 * tasks. To terminate the current task, you can just return or 845 * throw an unchecked exception from its computation method, or 846 * invoke {@link #completeExceptionally(Throwable)}. 847 * 848 * @param mayInterruptIfRunning this value has no effect in the 849 * default implementation because interrupts are not used to 850 * control cancellation. 851 * 852 * @return {@code true} if this task is now cancelled 853 */ 854 bool cancel(bool mayInterruptIfRunning) { 855 int s = abnormalCompletion(DONE | ABNORMAL); 856 return (s & (ABNORMAL | THROWN)) == ABNORMAL; 857 } 858 859 final bool isDone() { 860 return status < 0; 861 } 862 863 final bool isCancelled() { 864 return (status & (ABNORMAL | THROWN)) == ABNORMAL; 865 } 866 867 /** 868 * Returns {@code true} if this task threw an exception or was cancelled. 869 * 870 * @return {@code true} if this task threw an exception or was cancelled 871 */ 872 final bool isCompletedAbnormally() { 873 return (status & ABNORMAL) != 0; 874 } 875 876 /** 877 * Returns {@code true} if this task completed without throwing an 878 * exception and was not cancelled. 879 * 880 * @return {@code true} if this task completed without throwing an 881 * exception and was not cancelled 882 */ 883 final bool isCompletedNormally() { 884 return (status & (DONE | ABNORMAL)) == DONE; 885 } 886 887 /** 888 * Returns the exception thrown by the base computation, or a 889 * {@code CancellationException} if cancelled, or {@code null} if 890 * none or if the method has not yet completed. 891 * 892 * @return the exception, or {@code null} if none 893 */ 894 final Throwable getException() { 895 int s = status; 896 return ((s & ABNORMAL) == 0 ? null : 897 (s & THROWN) == 0 ? new CancellationException() : 898 getThrowableException()); 899 } 900 901 /** 902 * Completes this task abnormally, and if not already aborted or 903 * cancelled, causes it to throw the given exception upon 904 * {@code join} and related operations. This method may be used 905 * to induce exceptions in asynchronous tasks, or to force 906 * completion of tasks that would not otherwise complete. Its use 907 * in other situations is discouraged. This method is 908 * overridable, but overridden versions must invoke {@code super} 909 * implementation to maintain guarantees. 910 * 911 * @param ex the exception to throw. If this exception is not a 912 * {@code RuntimeException} or {@code Error}, the actual exception 913 * thrown will be a {@code RuntimeException} with cause {@code ex}. 914 */ 915 void completeExceptionally(Exception ex) { 916 RuntimeException re = cast(RuntimeException)ex; 917 if(re !is null) { 918 setExceptionalCompletion(ex); 919 } else { 920 Error er = cast(Error)ex; 921 if(er is null) { 922 setExceptionalCompletion(new RuntimeException(ex)); 923 } else { 924 setExceptionalCompletion(ex); 925 } 926 } 927 } 928 929 /** 930 * Completes this task, and if not already aborted or cancelled, 931 * returning the given value as the result of subsequent 932 * invocations of {@code join} and related operations. This method 933 * may be used to provide results for asynchronous tasks, or to 934 * provide alternative handling for tasks that would not otherwise 935 * complete normally. Its use in other situations is 936 * discouraged. This method is overridable, but overridden 937 * versions must invoke {@code super} implementation to maintain 938 * guarantees. 939 * 940 * @param value the result value for this task 941 */ 942 static if(is(V == void)) { 943 void complete() { 944 // try { 945 // setRawResult(); 946 // } catch (Throwable rex) { 947 // setExceptionalCompletion(rex); 948 // return; 949 // } 950 setDone(); 951 } 952 } else { 953 void complete(V value) { 954 try { 955 setRawResult(value); 956 } catch (Throwable rex) { 957 setExceptionalCompletion(rex); 958 return; 959 } 960 setDone(); 961 } 962 } 963 964 /** 965 * Completes this task normally without setting a value. The most 966 * recent value established by {@link #setRawResult} (or {@code 967 * null} by default) will be returned as the result of subsequent 968 * invocations of {@code join} and related operations. 969 * 970 * @since 1.8 971 */ 972 final void quietlyComplete() { 973 setDone(); 974 } 975 976 /** 977 * Waits if necessary for the computation to complete, and then 978 * retrieves its result. 979 * 980 * @return the computed result 981 * @throws CancellationException if the computation was cancelled 982 * @throws ExecutionException if the computation threw an 983 * exception 984 * @throws InterruptedException if the current thread is not a 985 * member of a ForkJoinPool and was interrupted while waiting 986 */ 987 final V get() { 988 ForkJoinWorkerThread ft = cast(ForkJoinWorkerThread)Thread.getThis(); 989 int s = ft !is null ? doJoin() : externalInterruptibleAwaitDone(); 990 if ((s & THROWN) != 0) 991 throw new ExecutionException(getThrowableException()); 992 else if ((s & ABNORMAL) != 0) 993 throw new CancellationException(); 994 else { 995 static if(!is(V == void)) { 996 return getRawResult(); 997 } 998 } 999 } 1000 1001 /** 1002 * Waits if necessary for at most the given time for the computation 1003 * to complete, and then retrieves its result, if available. 1004 * 1005 * @param timeout the maximum time to wait 1006 * @param unit the time unit of the timeout argument 1007 * @return the computed result 1008 * @throws CancellationException if the computation was cancelled 1009 * @throws ExecutionException if the computation threw an 1010 * exception 1011 * @throws InterruptedException if the current thread is not a 1012 * member of a ForkJoinPool and was interrupted while waiting 1013 * @throws TimeoutException if the wait timed out 1014 */ 1015 final V get(Duration timeout) { 1016 int s; 1017 // TODO: Tasks pending completion -@zxp at 12/21/2018, 10:55:12 PM 1018 // 1019 // if (Thread.interrupted()) 1020 // throw new InterruptedException(); 1021 1022 if ((s = status) >= 0 && timeout > Duration.zero) { 1023 MonoTime deadline = MonoTime.currTime + timeout; 1024 // long deadline = (d == 0L) ? 1L : d; // avoid 0 1025 ForkJoinWorkerThread wt = cast(ForkJoinWorkerThread)Thread.getThis(); 1026 if (wt !is null) { 1027 s = wt.pool.awaitJoin(wt.workQueue, this, deadline); 1028 } 1029 else { 1030 ICountedCompleter ic = cast(ICountedCompleter)this; 1031 if(ic !is null) { 1032 s = ForkJoinPool.common.externalHelpComplete(ic, 0); 1033 } else if(ForkJoinPool.common.tryExternalUnpush(this)){ 1034 s = doExec(); 1035 } else 1036 s = 0; 1037 1038 if (s >= 0) { 1039 Duration ns; // measure in nanosecs, but wait in millisecs 1040 long ms; 1041 while ((s = status) >= 0 && 1042 (ns = deadline - MonoTime.currTime) > Duration.zero) { 1043 if ((ms = ns.total!(TimeUnit.Millisecond)()) > 0L) { 1044 // s = cast(int)STATUS.getAndBitwiseOr(this, SIGNAL); 1045 s = cast(int)(this.status | SIGNAL); 1046 if( s >= 0) { 1047 synchronized (this) { 1048 if (status >= 0) // OK to throw InterruptedException 1049 thisLocker.wait(dur!(TimeUnit.Millisecond)(ms)); 1050 else 1051 thisLocker.notifyAll(); 1052 } 1053 } 1054 } 1055 } 1056 } 1057 } 1058 } 1059 if (s >= 0) 1060 throw new TimeoutException(); 1061 else if ((s & THROWN) != 0) 1062 throw new ExecutionException(getThrowableException()); 1063 else if ((s & ABNORMAL) != 0) 1064 throw new CancellationException(); 1065 else { 1066 static if(!is(V == void)) { 1067 return getRawResult(); 1068 } 1069 } 1070 } 1071 1072 /** 1073 * Joins this task, without returning its result or throwing its 1074 * exception. This method may be useful when processing 1075 * collections of tasks when some have been cancelled or otherwise 1076 * known to have aborted. 1077 */ 1078 final void quietlyJoin() { 1079 doJoin(); 1080 } 1081 1082 /** 1083 * Commences performing this task and awaits its completion if 1084 * necessary, without returning its result or throwing its 1085 * exception. 1086 */ 1087 final void quietlyInvoke() { 1088 doInvoke(); 1089 } 1090 1091 /** 1092 * Possibly executes tasks until the pool hosting the current task 1093 * {@linkplain ForkJoinPool#isQuiescent is quiescent}. This 1094 * method may be of use in designs in which many tasks are forked, 1095 * but none are explicitly joined, instead executing them until 1096 * all are processed. 1097 */ 1098 // static void helpQuiesce() { 1099 // Thread t; 1100 // if ((t = Thread.getThis()) instanceof ForkJoinWorkerThread) { 1101 // ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t; 1102 // wt.pool.helpQuiescePool(wt.workQueue); 1103 // } 1104 // else 1105 // ForkJoinPool.quiesceCommonPool(); 1106 // } 1107 1108 /** 1109 * Resets the internal bookkeeping state of this task, allowing a 1110 * subsequent {@code fork}. This method allows repeated reuse of 1111 * this task, but only if reuse occurs when this task has either 1112 * never been forked, or has been forked, then completed and all 1113 * outstanding joins of this task have also completed. Effects 1114 * under any other usage conditions are not guaranteed. 1115 * This method may be useful when executing 1116 * pre-constructed trees of subtasks in loops. 1117 * 1118 * <p>Upon completion of this method, {@code isDone()} reports 1119 * {@code false}, and {@code getException()} reports {@code 1120 * null}. However, the value returned by {@code getRawResult} is 1121 * unaffected. To clear this value, you can invoke {@code 1122 * setRawResult(null)}. 1123 */ 1124 void reinitialize() { 1125 if ((status & THROWN) != 0) 1126 clearExceptionalCompletion(); 1127 else 1128 status = 0; 1129 } 1130 1131 /** 1132 * Returns the pool hosting the current thread, or {@code null} 1133 * if the current thread is executing outside of any ForkJoinPool. 1134 * 1135 * <p>This method returns {@code null} if and only if {@link 1136 * #inForkJoinPool} returns {@code false}. 1137 * 1138 * @return the pool, or {@code null} if none 1139 */ 1140 // static ForkJoinPool getPool() { 1141 // Thread t = Thread.getThis(); 1142 // return (t instanceof ForkJoinWorkerThread) ? 1143 // ((ForkJoinWorkerThread) t).pool : null; 1144 // } 1145 1146 /** 1147 * Returns {@code true} if the current thread is a {@link 1148 * ForkJoinWorkerThread} executing as a ForkJoinPool computation. 1149 * 1150 * @return {@code true} if the current thread is a {@link 1151 * ForkJoinWorkerThread} executing as a ForkJoinPool computation, 1152 * or {@code false} otherwise 1153 */ 1154 static bool inForkJoinPool() { 1155 ForkJoinWorkerThread t = cast(ForkJoinWorkerThread)Thread.getThis(); 1156 return t !is null; 1157 } 1158 1159 /** 1160 * Tries to unschedule this task for execution. This method will 1161 * typically (but is not guaranteed to) succeed if this task is 1162 * the most recently forked task by the current thread, and has 1163 * not commenced executing in another thread. This method may be 1164 * useful when arranging alternative local processing of tasks 1165 * that could have been, but were not, stolen. 1166 * 1167 * @return {@code true} if unforked 1168 */ 1169 bool tryUnfork() { 1170 ForkJoinWorkerThread t = cast(ForkJoinWorkerThread)Thread.getThis(); 1171 return t !is null? t.workQueue.tryUnpush(this) : 1172 ForkJoinPool.common.tryExternalUnpush(this); 1173 } 1174 1175 /** 1176 * Returns an estimate of the number of tasks that have been 1177 * forked by the current worker thread but not yet executed. This 1178 * value may be useful for heuristic decisions about whether to 1179 * fork other tasks. 1180 * 1181 * @return the number of tasks 1182 */ 1183 // static int getQueuedTaskCount() { 1184 // Thread t; ForkJoinPool.WorkQueue q; 1185 // if ((t = Thread.getThis()) instanceof ForkJoinWorkerThread) 1186 // q = ((ForkJoinWorkerThread)t).workQueue; 1187 // else 1188 // q = ForkJoinPool.commonSubmitterQueue(); 1189 // return (q is null) ? 0 : q.queueSize(); 1190 // } 1191 1192 /** 1193 * Returns an estimate of how many more locally queued tasks are 1194 * held by the current worker thread than there are other worker 1195 * threads that might steal them, or zero if this thread is not 1196 * operating in a ForkJoinPool. This value may be useful for 1197 * heuristic decisions about whether to fork other tasks. In many 1198 * usages of ForkJoinTasks, at steady state, each worker should 1199 * aim to maintain a small constant surplus (for example, 3) of 1200 * tasks, and to process computations locally if this threshold is 1201 * exceeded. 1202 * 1203 * @return the surplus number of tasks, which may be negative 1204 */ 1205 // static int getSurplusQueuedTaskCount() { 1206 // return ForkJoinPool.getSurplusQueuedTaskCount(); 1207 // } 1208 1209 // Extension methods 1210 static if(is(V == void)) { 1211 // protected abstract void setRawResult(); 1212 } else { 1213 1214 /** 1215 * Returns the result that would be returned by {@link #join}, even 1216 * if this task completed abnormally, or {@code null} if this task 1217 * is not known to have been completed. This method is designed 1218 * to aid debugging, as well as to support extensions. Its use in 1219 * any other context is discouraged. 1220 * 1221 * @return the result, or {@code null} if not completed 1222 */ 1223 abstract V getRawResult(); 1224 1225 /** 1226 * Forces the given value to be returned as a result. This method 1227 * is designed to support extensions, and should not in general be 1228 * called otherwise. 1229 * 1230 * @param value the value 1231 */ 1232 protected abstract void setRawResult(V value); 1233 } 1234 1235 /** 1236 * Immediately performs the base action of this task and returns 1237 * true if, upon return from this method, this task is guaranteed 1238 * to have completed normally. This method may return false 1239 * otherwise, to indicate that this task is not necessarily 1240 * complete (or is not known to be complete), for example in 1241 * asynchronous actions that require explicit invocations of 1242 * completion methods. This method may also throw an (unchecked) 1243 * exception to indicate abnormal exit. This method is designed to 1244 * support extensions, and should not in general be called 1245 * otherwise. 1246 * 1247 * @return {@code true} if this task is known to have completed normally 1248 */ 1249 protected abstract bool exec(); 1250 1251 /** 1252 * Returns, but does not unschedule or execute, a task queued by 1253 * the current thread but not yet executed, if one is immediately 1254 * available. There is no guarantee that this task will actually 1255 * be polled or executed next. Conversely, this method may return 1256 * null even if a task exists but cannot be accessed without 1257 * contention with other threads. This method is designed 1258 * primarily to support extensions, and is unlikely to be useful 1259 * otherwise. 1260 * 1261 * @return the next task, or {@code null} if none are available 1262 */ 1263 // protected static IForkJoinTask peekNextLocalTask() { 1264 // Thread t; ForkJoinPool.WorkQueue q; 1265 // if ((t = Thread.getThis()) instanceof ForkJoinWorkerThread) 1266 // q = ((ForkJoinWorkerThread)t).workQueue; 1267 // else 1268 // q = ForkJoinPool.commonSubmitterQueue(); 1269 // return (q is null) ? null : q.peek(); 1270 // } 1271 1272 /** 1273 * Unschedules and returns, without executing, the next task 1274 * queued by the current thread but not yet executed, if the 1275 * current thread is operating in a ForkJoinPool. This method is 1276 * designed primarily to support extensions, and is unlikely to be 1277 * useful otherwise. 1278 * 1279 * @return the next task, or {@code null} if none are available 1280 */ 1281 // protected static IForkJoinTask pollNextLocalTask() { 1282 // Thread t; 1283 // return ((t = Thread.getThis()) instanceof ForkJoinWorkerThread) ? 1284 // ((ForkJoinWorkerThread)t).workQueue.nextLocalTask() : 1285 // null; 1286 // } 1287 1288 /** 1289 * If the current thread is operating in a ForkJoinPool, 1290 * unschedules and returns, without executing, the next task 1291 * queued by the current thread but not yet executed, if one is 1292 * available, or if not available, a task that was forked by some 1293 * other thread, if available. Availability may be transient, so a 1294 * {@code null} result does not necessarily imply quiescence of 1295 * the pool this task is operating in. This method is designed 1296 * primarily to support extensions, and is unlikely to be useful 1297 * otherwise. 1298 * 1299 * @return a task, or {@code null} if none are available 1300 */ 1301 // protected static IForkJoinTask pollTask() { 1302 // Thread t; ForkJoinWorkerThread wt; 1303 // return ((t = Thread.getThis()) instanceof ForkJoinWorkerThread) ? 1304 // (wt = (ForkJoinWorkerThread)t).pool.nextTaskFor(wt.workQueue) : 1305 // null; 1306 // } 1307 1308 // /** 1309 // * If the current thread is operating in a ForkJoinPool, 1310 // * unschedules and returns, without executing, a task externally 1311 // * submitted to the pool, if one is available. Availability may be 1312 // * transient, so a {@code null} result does not necessarily imply 1313 // * quiescence of the pool. This method is designed primarily to 1314 // * support extensions, and is unlikely to be useful otherwise. 1315 // * 1316 // * @return a task, or {@code null} if none are available 1317 // * @since 9 1318 // */ 1319 // protected static IForkJoinTask pollSubmission() { 1320 // ForkJoinWorkerThread t = cast(ForkJoinWorkerThread)Thread.getThis(); 1321 // return t !is null ? t.pool.pollSubmission() : null; 1322 // } 1323 1324 // tag operations 1325 1326 /** 1327 * Returns the tag for this task. 1328 * 1329 * @return the tag for this task 1330 * @since 1.8 1331 */ 1332 final short getForkJoinTaskTag() { 1333 return cast(short)status; 1334 } 1335 1336 /** 1337 * Atomically sets the tag value for this task and returns the old value. 1338 * 1339 * @param newValue the new tag value 1340 * @return the previous value of the tag 1341 * @since 1.8 1342 */ 1343 final short setForkJoinTaskTag(short newValue) { 1344 while(true) { 1345 int s = status; 1346 if(AtomicHelper.compareAndSet(this.status, s, (s & ~SMASK) | (newValue & SMASK))) 1347 return cast(short)s; 1348 } 1349 // return 0; 1350 } 1351 1352 /** 1353 * Atomically conditionally sets the tag value for this task. 1354 * Among other applications, tags can be used as visit markers 1355 * in tasks operating on graphs, as in methods that check: {@code 1356 * if (task.compareAndSetForkJoinTaskTag((short)0, (short)1))} 1357 * before processing, otherwise exiting because the node has 1358 * already been visited. 1359 * 1360 * @param expect the expected tag value 1361 * @param update the new tag value 1362 * @return {@code true} if successful; i.e., the current value was 1363 * equal to {@code expect} and was changed to {@code update}. 1364 * @since 1.8 1365 */ 1366 final bool compareAndSetForkJoinTaskTag(short expect, short update) { 1367 for (int s;;) { 1368 if (cast(short)(s = status) != expect) 1369 return false; 1370 if (AtomicHelper.compareAndSet(this.status, s, 1371 (s & ~SMASK) | (update & SMASK))) 1372 return true; 1373 } 1374 } 1375 1376 1377 /** 1378 * Returns a new {@code ForkJoinTask} that performs the {@code run} 1379 * method of the given {@code Runnable} as its action, and returns 1380 * a null result upon {@link #join}. 1381 * 1382 * @param runnable the runnable action 1383 * @return the task 1384 */ 1385 // static IForkJoinTask adapt(Runnable runnable) { 1386 // return new AdaptedRunnableAction(runnable); 1387 // } 1388 1389 /** 1390 * Returns a new {@code ForkJoinTask} that performs the {@code run} 1391 * method of the given {@code Runnable} as its action, and returns 1392 * the given result upon {@link #join}. 1393 * 1394 * @param runnable the runnable action 1395 * @param result the result upon completion 1396 * @param (T) the type of the result 1397 * @return the task 1398 */ 1399 static ForkJoinTask!(T) adapt(T)(Runnable runnable, T result) { 1400 return new AdaptedRunnable!(T)(runnable, result); 1401 } 1402 1403 /** 1404 * Returns a new {@code ForkJoinTask} that performs the {@code call} 1405 * method of the given {@code Callable} as its action, and returns 1406 * its result upon {@link #join}, translating any checked exceptions 1407 * encountered into {@code RuntimeException}. 1408 * 1409 * @param callable the callable action 1410 * @param (T) the type of the callable's result 1411 * @return the task 1412 */ 1413 static ForkJoinTask!(T) adapt(T)(Callable!(T) callable) { 1414 return new AdaptedCallable!(T)(callable); 1415 } 1416 } 1417 1418 1419 1420 /** 1421 * Adapter for Runnables. This implements RunnableFuture 1422 * to be compliant with AbstractExecutorService constraints 1423 * when used in ForkJoinPool. 1424 */ 1425 final class AdaptedRunnable(T) : ForkJoinTask!(T), RunnableFuture!(T) { 1426 final Runnable runnable; 1427 T result; 1428 this(Runnable runnable, T result) { 1429 if (runnable is null) throw new NullPointerException(); 1430 this.runnable = runnable; 1431 this.result = result; // OK to set this even before completion 1432 } 1433 final T getRawResult() { return result; } 1434 final void setRawResult(T v) { result = v; } 1435 final bool exec() { runnable.run(); return true; } 1436 final void run() { invoke(); } 1437 string toString() { 1438 return super.toString() ~ "[Wrapped task = " ~ runnable ~ "]"; 1439 } 1440 } 1441 1442 /** 1443 * Adapter for Runnables without results. 1444 */ 1445 final class AdaptedRunnableAction : ForkJoinTask!(void), Runnable { 1446 Runnable runnable; 1447 this(Runnable runnable) { 1448 if (runnable is null) throw new NullPointerException(); 1449 this.runnable = runnable; 1450 } 1451 // final Void getRawResult() { return null; } 1452 // final void setRawResult(Void v) { } 1453 final override bool exec() { runnable.run(); return true; } 1454 final void run() { invoke(); } 1455 override bool cancel(bool mayInterruptIfRunning) { 1456 return super.cancel(mayInterruptIfRunning); 1457 } 1458 1459 // override bool isCancelled() { 1460 // return super.isCancelled(); 1461 // } 1462 1463 // override bool isDone() { 1464 // return super.isDone(); 1465 // } 1466 1467 // override void get() { 1468 // super.get(); 1469 // } 1470 1471 // override void get(Duration timeout) { 1472 // super.get(timeout); 1473 // } 1474 1475 override string toString() { 1476 return super.toString() ~ "[Wrapped task = " ~ (cast(Object)runnable).toString() ~ "]"; 1477 } 1478 } 1479 1480 /** 1481 * Adapter for Runnables in which failure forces worker exception. 1482 */ 1483 final class RunnableExecuteAction : ForkJoinTask!(void) { 1484 Runnable runnable; 1485 this(Runnable runnable) { 1486 if (runnable is null) throw new NullPointerException(); 1487 this.runnable = runnable; 1488 } 1489 // final Void getRawResult() { return null; } 1490 // final void setRawResult(Void v) { } 1491 final override bool exec() { runnable.run(); return true; } 1492 override void internalPropagateException(Throwable ex) { 1493 ForkJoinTaskHelper.rethrow(ex); // rethrow outside exec() catches. 1494 } 1495 } 1496 1497 /** 1498 * Adapter for Callables. 1499 */ 1500 final class AdaptedCallable(T) : ForkJoinTask!(T), RunnableFuture!(T) { 1501 final Callable!(T) callable; 1502 T result; 1503 this(Callable!(T) callable) { 1504 if (callable is null) throw new NullPointerException(); 1505 this.callable = callable; 1506 } 1507 final T getRawResult() { return result; } 1508 final void setRawResult(T v) { result = v; } 1509 final bool exec() { 1510 try { 1511 result = callable.call(); 1512 return true; 1513 } catch (RuntimeException rex) { 1514 throw rex; 1515 } catch (Exception ex) { 1516 throw new RuntimeException(ex); 1517 } 1518 } 1519 final void run() { invoke(); } 1520 string toString() { 1521 return super.toString() ~ "[Wrapped task = " ~ callable ~ "]"; 1522 } 1523 } 1524 1525 1526 /*************************************************/ 1527 // CountedCompleter 1528 /*************************************************/ 1529 1530 interface ICountedCompleter : IForkJoinTask { 1531 ICountedCompleter getCompleter(); 1532 } 1533 1534 /** 1535 * A {@link ForkJoinTask} with a completion action performed when 1536 * triggered and there are no remaining pending actions. 1537 * CountedCompleters are in general more robust in the 1538 * presence of subtask stalls and blockage than are other forms of 1539 * ForkJoinTasks, but are less intuitive to program. Uses of 1540 * CountedCompleter are similar to those of other completion based 1541 * components (such as {@link java.nio.channels.CompletionHandler}) 1542 * except that multiple <em>pending</em> completions may be necessary 1543 * to trigger the completion action {@link #onCompletion(CountedCompleter)}, 1544 * not just one. 1545 * Unless initialized otherwise, the {@linkplain #getPendingCount pending 1546 * count} starts at zero, but may be (atomically) changed using 1547 * methods {@link #setPendingCount}, {@link #addToPendingCount}, and 1548 * {@link #compareAndSetPendingCount}. Upon invocation of {@link 1549 * #tryComplete}, if the pending action count is nonzero, it is 1550 * decremented; otherwise, the completion action is performed, and if 1551 * this completer itself has a completer, the process is continued 1552 * with its completer. As is the case with related synchronization 1553 * components such as {@link Phaser} and {@link Semaphore}, these methods 1554 * affect only internal counts; they do not establish any further 1555 * internal bookkeeping. In particular, the identities of pending 1556 * tasks are not maintained. As illustrated below, you can create 1557 * subclasses that do record some or all pending tasks or their 1558 * results when needed. As illustrated below, utility methods 1559 * supporting customization of completion traversals are also 1560 * provided. However, because CountedCompleters provide only basic 1561 * synchronization mechanisms, it may be useful to create further 1562 * abstract subclasses that maintain linkages, fields, and additional 1563 * support methods appropriate for a set of related usages. 1564 * 1565 * <p>A concrete CountedCompleter class must define method {@link 1566 * #compute}, that should in most cases (as illustrated below), invoke 1567 * {@code tryComplete()} once before returning. The class may also 1568 * optionally override method {@link #onCompletion(CountedCompleter)} 1569 * to perform an action upon normal completion, and method 1570 * {@link #onExceptionalCompletion(Throwable, CountedCompleter)} to 1571 * perform an action upon any exception. 1572 * 1573 * <p>CountedCompleters most often do not bear results, in which case 1574 * they are normally declared as {@code CountedCompleter!(void)}, and 1575 * will always return {@code null} as a result value. In other cases, 1576 * you should override method {@link #getRawResult} to provide a 1577 * result from {@code join(), invoke()}, and related methods. In 1578 * general, this method should return the value of a field (or a 1579 * function of one or more fields) of the CountedCompleter object that 1580 * holds the result upon completion. Method {@link #setRawResult} by 1581 * default plays no role in CountedCompleters. It is possible, but 1582 * rarely applicable, to override this method to maintain other 1583 * objects or fields holding result data. 1584 * 1585 * <p>A CountedCompleter that does not itself have a completer (i.e., 1586 * one for which {@link #getCompleter} returns {@code null}) can be 1587 * used as a regular ForkJoinTask with this added functionality. 1588 * However, any completer that in turn has another completer serves 1589 * only as an internal helper for other computations, so its own task 1590 * status (as reported in methods such as {@link ForkJoinTask#isDone}) 1591 * is arbitrary; this status changes only upon explicit invocations of 1592 * {@link #complete}, {@link ForkJoinTask#cancel}, 1593 * {@link ForkJoinTask#completeExceptionally(Throwable)} or upon 1594 * exceptional completion of method {@code compute}. Upon any 1595 * exceptional completion, the exception may be relayed to a task's 1596 * completer (and its completer, and so on), if one exists and it has 1597 * not otherwise already completed. Similarly, cancelling an internal 1598 * CountedCompleter has only a local effect on that completer, so is 1599 * not often useful. 1600 * 1601 * <p><b>Sample Usages.</b> 1602 * 1603 * <p><b>Parallel recursive decomposition.</b> CountedCompleters may 1604 * be arranged in trees similar to those often used with {@link 1605 * RecursiveAction}s, although the constructions involved in setting 1606 * them up typically vary. Here, the completer of each task is its 1607 * parent in the computation tree. Even though they entail a bit more 1608 * bookkeeping, CountedCompleters may be better choices when applying 1609 * a possibly time-consuming operation (that cannot be further 1610 * subdivided) to each element of an array or collection; especially 1611 * when the operation takes a significantly different amount of time 1612 * to complete for some elements than others, either because of 1613 * intrinsic variation (for example I/O) or auxiliary effects such as 1614 * garbage collection. Because CountedCompleters provide their own 1615 * continuations, other tasks need not block waiting to perform them. 1616 * 1617 * <p>For example, here is an initial version of a utility method that 1618 * uses divide-by-two recursive decomposition to divide work into 1619 * single pieces (leaf tasks). Even when work is split into individual 1620 * calls, tree-based techniques are usually preferable to directly 1621 * forking leaf tasks, because they reduce inter-thread communication 1622 * and improve load balancing. In the recursive case, the second of 1623 * each pair of subtasks to finish triggers completion of their parent 1624 * (because no result combination is performed, the default no-op 1625 * implementation of method {@code onCompletion} is not overridden). 1626 * The utility method sets up the root task and invokes it (here, 1627 * implicitly using the {@link ForkJoinPool#commonPool()}). It is 1628 * straightforward and reliable (but not optimal) to always set the 1629 * pending count to the number of child tasks and call {@code 1630 * tryComplete()} immediately before returning. 1631 * 1632 * <pre> {@code 1633 * static <E> void forEach(E[] array, Consumer<E> action) { 1634 * class Task extends CountedCompleter!(void) { 1635 * final int lo, hi; 1636 * Task(Task parent, int lo, int hi) { 1637 * super(parent); this.lo = lo; this.hi = hi; 1638 * } 1639 * 1640 * void compute() { 1641 * if (hi - lo >= 2) { 1642 * int mid = (lo + hi) >>> 1; 1643 * // must set pending count before fork 1644 * setPendingCount(2); 1645 * new Task(this, mid, hi).fork(); // right child 1646 * new Task(this, lo, mid).fork(); // left child 1647 * } 1648 * else if (hi > lo) 1649 * action.accept(array[lo]); 1650 * tryComplete(); 1651 * } 1652 * } 1653 * new Task(null, 0, array.length).invoke(); 1654 * }}</pre> 1655 * 1656 * This design can be improved by noticing that in the recursive case, 1657 * the task has nothing to do after forking its right task, so can 1658 * directly invoke its left task before returning. (This is an analog 1659 * of tail recursion removal.) Also, when the last action in a task 1660 * is to fork or invoke a subtask (a "tail call"), the call to {@code 1661 * tryComplete()} can be optimized away, at the cost of making the 1662 * pending count look "off by one". 1663 * 1664 * <pre> {@code 1665 * void compute() { 1666 * if (hi - lo >= 2) { 1667 * int mid = (lo + hi) >>> 1; 1668 * setPendingCount(1); // looks off by one, but correct! 1669 * new Task(this, mid, hi).fork(); // right child 1670 * new Task(this, lo, mid).compute(); // direct invoke 1671 * } else { 1672 * if (hi > lo) 1673 * action.accept(array[lo]); 1674 * tryComplete(); 1675 * } 1676 * }}</pre> 1677 * 1678 * As a further optimization, notice that the left task need not even exist. 1679 * Instead of creating a new one, we can continue using the original task, 1680 * and add a pending count for each fork. Additionally, because no task 1681 * in this tree implements an {@link #onCompletion(CountedCompleter)} method, 1682 * {@code tryComplete} can be replaced with {@link #propagateCompletion}. 1683 * 1684 * <pre> {@code 1685 * void compute() { 1686 * int n = hi - lo; 1687 * for (; n >= 2; n /= 2) { 1688 * addToPendingCount(1); 1689 * new Task(this, lo + n/2, lo + n).fork(); 1690 * } 1691 * if (n > 0) 1692 * action.accept(array[lo]); 1693 * propagateCompletion(); 1694 * }}</pre> 1695 * 1696 * When pending counts can be precomputed, they can be established in 1697 * the constructor: 1698 * 1699 * <pre> {@code 1700 * static <E> void forEach(E[] array, Consumer<E> action) { 1701 * class Task extends CountedCompleter!(void) { 1702 * final int lo, hi; 1703 * Task(Task parent, int lo, int hi) { 1704 * super(parent, 31 - Integer.numberOfLeadingZeros(hi - lo)); 1705 * this.lo = lo; this.hi = hi; 1706 * } 1707 * 1708 * void compute() { 1709 * for (int n = hi - lo; n >= 2; n /= 2) 1710 * new Task(this, lo + n/2, lo + n).fork(); 1711 * action.accept(array[lo]); 1712 * propagateCompletion(); 1713 * } 1714 * } 1715 * if (array.length > 0) 1716 * new Task(null, 0, array.length).invoke(); 1717 * }}</pre> 1718 * 1719 * Additional optimizations of such classes might entail specializing 1720 * classes for leaf steps, subdividing by say, four, instead of two 1721 * per iteration, and using an adaptive threshold instead of always 1722 * subdividing down to single elements. 1723 * 1724 * <p><b>Searching.</b> A tree of CountedCompleters can search for a 1725 * value or property in different parts of a data structure, and 1726 * report a result in an {@link 1727 * hunt.concurrency.atomic.AtomicReference AtomicReference} as 1728 * soon as one is found. The others can poll the result to avoid 1729 * unnecessary work. (You could additionally {@linkplain #cancel 1730 * cancel} other tasks, but it is usually simpler and more efficient 1731 * to just let them notice that the result is set and if so skip 1732 * further processing.) Illustrating again with an array using full 1733 * partitioning (again, in practice, leaf tasks will almost always 1734 * process more than one element): 1735 * 1736 * <pre> {@code 1737 * class Searcher<E> extends CountedCompleter<E> { 1738 * final E[] array; final AtomicReference<E> result; final int lo, hi; 1739 * Searcher(ICountedCompleter p, E[] array, AtomicReference<E> result, int lo, int hi) { 1740 * super(p); 1741 * this.array = array; this.result = result; this.lo = lo; this.hi = hi; 1742 * } 1743 * E getRawResult() { return result.get(); } 1744 * void compute() { // similar to ForEach version 3 1745 * int l = lo, h = hi; 1746 * while (result.get() is null && h >= l) { 1747 * if (h - l >= 2) { 1748 * int mid = (l + h) >>> 1; 1749 * addToPendingCount(1); 1750 * new Searcher(this, array, result, mid, h).fork(); 1751 * h = mid; 1752 * } 1753 * else { 1754 * E x = array[l]; 1755 * if (matches(x) && result.compareAndSet(null, x)) 1756 * quietlyCompleteRoot(); // root task is now joinable 1757 * break; 1758 * } 1759 * } 1760 * tryComplete(); // normally complete whether or not found 1761 * } 1762 * bool matches(E e) { ... } // return true if found 1763 * 1764 * static <E> E search(E[] array) { 1765 * return new Searcher<E>(null, array, new AtomicReference<E>(), 0, array.length).invoke(); 1766 * } 1767 * }}</pre> 1768 * 1769 * In this example, as well as others in which tasks have no other 1770 * effects except to {@code compareAndSet} a common result, the 1771 * trailing unconditional invocation of {@code tryComplete} could be 1772 * made conditional ({@code if (result.get() is null) tryComplete();}) 1773 * because no further bookkeeping is required to manage completions 1774 * once the root task completes. 1775 * 1776 * <p><b>Recording subtasks.</b> CountedCompleter tasks that combine 1777 * results of multiple subtasks usually need to access these results 1778 * in method {@link #onCompletion(CountedCompleter)}. As illustrated in the following 1779 * class (that performs a simplified form of map-reduce where mappings 1780 * and reductions are all of type {@code E}), one way to do this in 1781 * divide and conquer designs is to have each subtask record its 1782 * sibling, so that it can be accessed in method {@code onCompletion}. 1783 * This technique applies to reductions in which the order of 1784 * combining left and right results does not matter; ordered 1785 * reductions require explicit left/right designations. Variants of 1786 * other streamlinings seen in the above examples may also apply. 1787 * 1788 * <pre> {@code 1789 * class MyMapper<E> { E apply(E v) { ... } } 1790 * class MyReducer<E> { E apply(E x, E y) { ... } } 1791 * class MapReducer<E> extends CountedCompleter<E> { 1792 * final E[] array; final MyMapper<E> mapper; 1793 * final MyReducer<E> reducer; final int lo, hi; 1794 * MapReducer<E> sibling; 1795 * E result; 1796 * MapReducer(ICountedCompleter p, E[] array, MyMapper<E> mapper, 1797 * MyReducer<E> reducer, int lo, int hi) { 1798 * super(p); 1799 * this.array = array; this.mapper = mapper; 1800 * this.reducer = reducer; this.lo = lo; this.hi = hi; 1801 * } 1802 * void compute() { 1803 * if (hi - lo >= 2) { 1804 * int mid = (lo + hi) >>> 1; 1805 * MapReducer<E> left = new MapReducer(this, array, mapper, reducer, lo, mid); 1806 * MapReducer<E> right = new MapReducer(this, array, mapper, reducer, mid, hi); 1807 * left.sibling = right; 1808 * right.sibling = left; 1809 * setPendingCount(1); // only right is pending 1810 * right.fork(); 1811 * left.compute(); // directly execute left 1812 * } 1813 * else { 1814 * if (hi > lo) 1815 * result = mapper.apply(array[lo]); 1816 * tryComplete(); 1817 * } 1818 * } 1819 * void onCompletion(ICountedCompleter caller) { 1820 * if (caller != this) { 1821 * MapReducer<E> child = (MapReducer<E>)caller; 1822 * MapReducer<E> sib = child.sibling; 1823 * if (sib is null || sib.result is null) 1824 * result = child.result; 1825 * else 1826 * result = reducer.apply(child.result, sib.result); 1827 * } 1828 * } 1829 * E getRawResult() { return result; } 1830 * 1831 * static <E> E mapReduce(E[] array, MyMapper<E> mapper, MyReducer<E> reducer) { 1832 * return new MapReducer<E>(null, array, mapper, reducer, 1833 * 0, array.length).invoke(); 1834 * } 1835 * }}</pre> 1836 * 1837 * Here, method {@code onCompletion} takes a form common to many 1838 * completion designs that combine results. This callback-style method 1839 * is triggered once per task, in either of the two different contexts 1840 * in which the pending count is, or becomes, zero: (1) by a task 1841 * itself, if its pending count is zero upon invocation of {@code 1842 * tryComplete}, or (2) by any of its subtasks when they complete and 1843 * decrement the pending count to zero. The {@code caller} argument 1844 * distinguishes cases. Most often, when the caller is {@code this}, 1845 * no action is necessary. Otherwise the caller argument can be used 1846 * (usually via a cast) to supply a value (and/or links to other 1847 * values) to be combined. Assuming proper use of pending counts, the 1848 * actions inside {@code onCompletion} occur (once) upon completion of 1849 * a task and its subtasks. No additional synchronization is required 1850 * within this method to ensure thread safety of accesses to fields of 1851 * this task or other completed tasks. 1852 * 1853 * <p><b>Completion Traversals</b>. If using {@code onCompletion} to 1854 * process completions is inapplicable or inconvenient, you can use 1855 * methods {@link #firstComplete} and {@link #nextComplete} to create 1856 * custom traversals. For example, to define a MapReducer that only 1857 * splits out right-hand tasks in the form of the third ForEach 1858 * example, the completions must cooperatively reduce along 1859 * unexhausted subtask links, which can be done as follows: 1860 * 1861 * <pre> {@code 1862 * class MapReducer<E> extends CountedCompleter<E> { // version 2 1863 * final E[] array; final MyMapper<E> mapper; 1864 * final MyReducer<E> reducer; final int lo, hi; 1865 * MapReducer<E> forks, next; // record subtask forks in list 1866 * E result; 1867 * MapReducer(ICountedCompleter p, E[] array, MyMapper<E> mapper, 1868 * MyReducer<E> reducer, int lo, int hi, MapReducer<E> next) { 1869 * super(p); 1870 * this.array = array; this.mapper = mapper; 1871 * this.reducer = reducer; this.lo = lo; this.hi = hi; 1872 * this.next = next; 1873 * } 1874 * void compute() { 1875 * int l = lo, h = hi; 1876 * while (h - l >= 2) { 1877 * int mid = (l + h) >>> 1; 1878 * addToPendingCount(1); 1879 * (forks = new MapReducer(this, array, mapper, reducer, mid, h, forks)).fork(); 1880 * h = mid; 1881 * } 1882 * if (h > l) 1883 * result = mapper.apply(array[l]); 1884 * // process completions by reducing along and advancing subtask links 1885 * for (ICountedCompleter c = firstComplete(); c !is null; c = c.nextComplete()) { 1886 * for (MapReducer t = (MapReducer)c, s = t.forks; s !is null; s = t.forks = s.next) 1887 * t.result = reducer.apply(t.result, s.result); 1888 * } 1889 * } 1890 * E getRawResult() { return result; } 1891 * 1892 * static <E> E mapReduce(E[] array, MyMapper<E> mapper, MyReducer<E> reducer) { 1893 * return new MapReducer<E>(null, array, mapper, reducer, 1894 * 0, array.length, null).invoke(); 1895 * } 1896 * }}</pre> 1897 * 1898 * <p><b>Triggers.</b> Some CountedCompleters are themselves never 1899 * forked, but instead serve as bits of plumbing in other designs; 1900 * including those in which the completion of one or more async tasks 1901 * triggers another async task. For example: 1902 * 1903 * <pre> {@code 1904 * class HeaderBuilder extends CountedCompleter<...> { ... } 1905 * class BodyBuilder extends CountedCompleter<...> { ... } 1906 * class PacketSender extends CountedCompleter<...> { 1907 * PacketSender(...) { super(null, 1); ... } // trigger on second completion 1908 * void compute() { } // never called 1909 * void onCompletion(ICountedCompleter caller) { sendPacket(); } 1910 * } 1911 * // sample use: 1912 * PacketSender p = new PacketSender(); 1913 * new HeaderBuilder(p, ...).fork(); 1914 * new BodyBuilder(p, ...).fork();}</pre> 1915 * 1916 * @since 1.8 1917 * @author Doug Lea 1918 */ 1919 abstract class CountedCompleter(T) : ForkJoinTask!(T), ICountedCompleter { 1920 1921 /** This task's completer, or null if none */ 1922 ICountedCompleter completer; 1923 /** The number of pending tasks until completion */ 1924 int pending; 1925 1926 /** 1927 * Creates a new CountedCompleter with the given completer 1928 * and initial pending count. 1929 * 1930 * @param completer this task's completer, or {@code null} if none 1931 * @param initialPendingCount the initial pending count 1932 */ 1933 protected this(ICountedCompleter completer, 1934 int initialPendingCount) { 1935 this.completer = completer; 1936 this.pending = initialPendingCount; 1937 } 1938 1939 /** 1940 * Creates a new CountedCompleter with the given completer 1941 * and an initial pending count of zero. 1942 * 1943 * @param completer this task's completer, or {@code null} if none 1944 */ 1945 protected this(ICountedCompleter completer) { 1946 this.completer = completer; 1947 } 1948 1949 /** 1950 * Creates a new CountedCompleter with no completer 1951 * and an initial pending count of zero. 1952 */ 1953 protected this() { 1954 this.completer = null; 1955 } 1956 1957 ICountedCompleter getCompleter() { 1958 return completer; 1959 } 1960 1961 /** 1962 * The main computation performed by this task. 1963 */ 1964 abstract void compute(); 1965 1966 /** 1967 * Performs an action when method {@link #tryComplete} is invoked 1968 * and the pending count is zero, or when the unconditional 1969 * method {@link #complete} is invoked. By default, this method 1970 * does nothing. You can distinguish cases by checking the 1971 * identity of the given caller argument. If not equal to {@code 1972 * this}, then it is typically a subtask that may contain results 1973 * (and/or links to other results) to combine. 1974 * 1975 * @param caller the task invoking this method (which may 1976 * be this task itself) 1977 */ 1978 void onCompletion(ICountedCompleter caller) { 1979 } 1980 1981 /** 1982 * Performs an action when method {@link 1983 * #completeExceptionally(Throwable)} is invoked or method {@link 1984 * #compute} throws an exception, and this task has not already 1985 * otherwise completed normally. On entry to this method, this task 1986 * {@link ForkJoinTask#isCompletedAbnormally}. The return value 1987 * of this method controls further propagation: If {@code true} 1988 * and this task has a completer that has not completed, then that 1989 * completer is also completed exceptionally, with the same 1990 * exception as this completer. The default implementation of 1991 * this method does nothing except return {@code true}. 1992 * 1993 * @param ex the exception 1994 * @param caller the task invoking this method (which may 1995 * be this task itself) 1996 * @return {@code true} if this exception should be propagated to this 1997 * task's completer, if one exists 1998 */ 1999 bool onExceptionalCompletion(Throwable ex, ICountedCompleter caller) { 2000 return true; 2001 } 2002 2003 /** 2004 * Returns the completer established in this task's constructor, 2005 * or {@code null} if none. 2006 * 2007 * @return the completer 2008 */ 2009 final ICountedCompleter getCompleter() { 2010 return completer; 2011 } 2012 2013 /** 2014 * Returns the current pending count. 2015 * 2016 * @return the current pending count 2017 */ 2018 final int getPendingCount() { 2019 return pending; 2020 } 2021 2022 /** 2023 * Sets the pending count to the given value. 2024 * 2025 * @param count the count 2026 */ 2027 final void setPendingCount(int count) { 2028 pending = count; 2029 } 2030 2031 /** 2032 * Adds (atomically) the given value to the pending count. 2033 * 2034 * @param delta the value to add 2035 */ 2036 final void addToPendingCount(int delta) { 2037 PENDING.getAndAdd(this, delta); 2038 } 2039 2040 /** 2041 * Sets (atomically) the pending count to the given count only if 2042 * it currently holds the given expected value. 2043 * 2044 * @param expected the expected value 2045 * @param count the new value 2046 * @return {@code true} if successful 2047 */ 2048 final bool compareAndSetPendingCount(int expected, int count) { 2049 return PENDING.compareAndSet(this, expected, count); 2050 } 2051 2052 /** 2053 * If the pending count is nonzero, (atomically) decrements it. 2054 * 2055 * @return the initial (undecremented) pending count holding on entry 2056 * to this method 2057 */ 2058 final int decrementPendingCountUnlessZero() { 2059 int c; 2060 do {} while ((c = pending) != 0 && 2061 !PENDING.weakCompareAndSet(this, c, c - 1)); 2062 return c; 2063 } 2064 2065 /** 2066 * Returns the root of the current computation; i.e., this 2067 * task if it has no completer, else its completer's root. 2068 * 2069 * @return the root of the current computation 2070 */ 2071 final ICountedCompleter getRoot() { 2072 ICountedCompleter a = this, p; 2073 while ((p = a.completer) !is null) 2074 a = p; 2075 return a; 2076 } 2077 2078 /** 2079 * If the pending count is nonzero, decrements the count; 2080 * otherwise invokes {@link #onCompletion(CountedCompleter)} 2081 * and then similarly tries to complete this task's completer, 2082 * if one exists, else marks this task as complete. 2083 */ 2084 final void tryComplete() { 2085 ICountedCompleter a = this, s = a; 2086 for (int c;;) { 2087 if ((c = a.pending) == 0) { 2088 a.onCompletion(s); 2089 if ((a = (s = a).completer) is null) { 2090 s.quietlyComplete(); 2091 return; 2092 } 2093 } 2094 else if (PENDING.weakCompareAndSet(a, c, c - 1)) 2095 return; 2096 } 2097 } 2098 2099 /** 2100 * Equivalent to {@link #tryComplete} but does not invoke {@link 2101 * #onCompletion(CountedCompleter)} along the completion path: 2102 * If the pending count is nonzero, decrements the count; 2103 * otherwise, similarly tries to complete this task's completer, if 2104 * one exists, else marks this task as complete. This method may be 2105 * useful in cases where {@code onCompletion} should not, or need 2106 * not, be invoked for each completer in a computation. 2107 */ 2108 final void propagateCompletion() { 2109 ICountedCompleter a = this, s; 2110 for (int c;;) { 2111 if ((c = a.pending) == 0) { 2112 if ((a = (s = a).completer) is null) { 2113 s.quietlyComplete(); 2114 return; 2115 } 2116 } 2117 else if (PENDING.weakCompareAndSet(a, c, c - 1)) 2118 return; 2119 } 2120 } 2121 2122 /** 2123 * Regardless of pending count, invokes 2124 * {@link #onCompletion(CountedCompleter)}, marks this task as 2125 * complete and further triggers {@link #tryComplete} on this 2126 * task's completer, if one exists. The given rawResult is 2127 * used as an argument to {@link #setRawResult} before invoking 2128 * {@link #onCompletion(CountedCompleter)} or marking this task 2129 * as complete; its value is meaningful only for classes 2130 * overriding {@code setRawResult}. This method does not modify 2131 * the pending count. 2132 * 2133 * <p>This method may be useful when forcing completion as soon as 2134 * any one (versus all) of several subtask results are obtained. 2135 * However, in the common (and recommended) case in which {@code 2136 * setRawResult} is not overridden, this effect can be obtained 2137 * more simply using {@link #quietlyCompleteRoot()}. 2138 * 2139 * @param rawResult the raw result 2140 */ 2141 void complete(T rawResult) { 2142 ICountedCompleter p; 2143 setRawResult(rawResult); 2144 onCompletion(this); 2145 quietlyComplete(); 2146 if ((p = completer) !is null) 2147 p.tryComplete(); 2148 } 2149 2150 /** 2151 * If this task's pending count is zero, returns this task; 2152 * otherwise decrements its pending count and returns {@code null}. 2153 * This method is designed to be used with {@link #nextComplete} in 2154 * completion traversal loops. 2155 * 2156 * @return this task, if pending count was zero, else {@code null} 2157 */ 2158 final ICountedCompleter firstComplete() { 2159 for (int c;;) { 2160 if ((c = pending) == 0) 2161 return this; 2162 else if (PENDING.weakCompareAndSet(this, c, c - 1)) 2163 return null; 2164 } 2165 } 2166 2167 /** 2168 * If this task does not have a completer, invokes {@link 2169 * ForkJoinTask#quietlyComplete} and returns {@code null}. Or, if 2170 * the completer's pending count is non-zero, decrements that 2171 * pending count and returns {@code null}. Otherwise, returns the 2172 * completer. This method can be used as part of a completion 2173 * traversal loop for homogeneous task hierarchies: 2174 * 2175 * <pre> {@code 2176 * for (ICountedCompleter c = firstComplete(); 2177 * c !is null; 2178 * c = c.nextComplete()) { 2179 * // ... process c ... 2180 * }}</pre> 2181 * 2182 * @return the completer, or {@code null} if none 2183 */ 2184 final ICountedCompleter nextComplete() { 2185 ICountedCompleter p; 2186 if ((p = completer) !is null) 2187 return p.firstComplete(); 2188 else { 2189 quietlyComplete(); 2190 return null; 2191 } 2192 } 2193 2194 /** 2195 * Equivalent to {@code getRoot().quietlyComplete()}. 2196 */ 2197 final void quietlyCompleteRoot() { 2198 for (ICountedCompleter a = this, p;;) { 2199 if ((p = a.completer) is null) { 2200 a.quietlyComplete(); 2201 return; 2202 } 2203 a = p; 2204 } 2205 } 2206 2207 /** 2208 * If this task has not completed, attempts to process at most the 2209 * given number of other unprocessed tasks for which this task is 2210 * on the completion path, if any are known to exist. 2211 * 2212 * @param maxTasks the maximum number of tasks to process. If 2213 * less than or equal to zero, then no tasks are 2214 * processed. 2215 */ 2216 final void helpComplete(int maxTasks) { 2217 Thread t = Thread.getThis(); 2218 ForkJoinWorkerThread wt = cast(ForkJoinWorkerThread)t; 2219 if (maxTasks > 0 && status >= 0) { 2220 if (wt !is null) 2221 wt.pool.helpComplete(wt.workQueue, this, maxTasks); 2222 else 2223 ForkJoinPool.common.externalHelpComplete(this, maxTasks); 2224 } 2225 } 2226 2227 /** 2228 * Supports ForkJoinTask exception propagation. 2229 */ 2230 void internalPropagateException(Throwable ex) { 2231 ICountedCompleter a = this, s = a; 2232 while (a.onExceptionalCompletion(ex, s) && 2233 (a = (s = a).completer) !is null && a.status >= 0 && 2234 isExceptionalStatus(a.recordExceptionalCompletion(ex))) { 2235 2236 } 2237 } 2238 2239 /** 2240 * Implements execution conventions for CountedCompleters. 2241 */ 2242 protected final bool exec() { 2243 compute(); 2244 return false; 2245 } 2246 2247 /** 2248 * Returns the result of the computation. By default, 2249 * returns {@code null}, which is appropriate for {@code Void} 2250 * actions, but in other cases should be overridden, almost 2251 * always to return a field or function of a field that 2252 * holds the result upon completion. 2253 * 2254 * @return the result of the computation 2255 */ 2256 T getRawResult() { return null; } 2257 2258 /** 2259 * A method that result-bearing CountedCompleters may optionally 2260 * use to help maintain result data. By default, does nothing. 2261 * Overrides are not recommended. However, if this method is 2262 * overridden to update existing objects or fields, then it must 2263 * in general be defined to be thread-safe. 2264 */ 2265 protected void setRawResult(T t) { } 2266 2267 // VarHandle mechanics 2268 // private static final VarHandle PENDING; 2269 // static { 2270 // try { 2271 // MethodHandles.Lookup l = MethodHandles.lookup(); 2272 // PENDING = l.findVarHandle(CountedCompleter.class, "pending", int.class); 2273 2274 // } catch (ReflectiveOperationException e) { 2275 // throw new ExceptionInInitializerError(e); 2276 // } 2277 // } 2278 } 2279 2280 2281 2282 /*************************************************/ 2283 // ForkJoinPool 2284 /*************************************************/