1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the domain, as explained at 33 * http://creativecommons.org/publicdomain/zero/1.0/ 34 */ 35 36 module hunt.concurrency.ForkJoinPool; 37 38 import hunt.concurrency.AbstractExecutorService; 39 import hunt.concurrency.atomic.AtomicHelper; 40 import hunt.concurrency.Exceptions; 41 import hunt.concurrency.ForkJoinTask; 42 import hunt.concurrency.ForkJoinTaskHelper; 43 import hunt.concurrency.ThreadLocalRandom; 44 import hunt.concurrency.thread; 45 46 import hunt.collection.List; 47 import hunt.collection.Collection; 48 import hunt.collection.Collections; 49 import hunt.logging.ConsoleLogger; 50 import hunt.Exceptions; 51 import hunt.Functions; 52 import hunt.system.Environment; 53 import hunt.system.Memory; 54 import hunt.util.Common; 55 import hunt.util.Configuration; 56 import hunt.util.DateTime; 57 58 import core.atomic; 59 import core.sync.mutex; 60 import core.thread; 61 import core.time; 62 63 import std.algorithm; 64 import std.array; 65 import std.conv; 66 import std.datetime; 67 68 alias ReentrantLock = Mutex; 69 70 // import java.lang.Thread.UncaughtExceptionHandler; 71 // import java.lang.invoke.MethodHandles; 72 // import java.lang.invoke.VarHandle; 73 // import java.security.AccessController; 74 // import java.security.AccessControlContext; 75 // import java.security.Permission; 76 // import java.security.Permissions; 77 // import java.security.PrivilegedAction; 78 // import java.security.ProtectionDomain; 79 // import hunt.collection.ArrayList; 80 // import hunt.collection.Collection; 81 // import java.util.Collections; 82 // import java.util.List; 83 // import hunt.util.functional.Predicate; 84 // import hunt.concurrency.locks.LockSupport; 85 86 87 private { 88 89 // Constants shared across ForkJoinPool and WorkQueue 90 91 // Bounds 92 enum int SWIDTH = 16; // width of short 93 enum int SMASK = 0xffff; // short bits == max index 94 enum int MAX_CAP = 0x7fff; // max #workers - 1 95 enum int SQMASK = 0x007e; // max 64 (even) slots 96 97 // Masks and units for WorkQueue.phase and ctl sp subfield 98 enum int UNSIGNALLED = 1 << 31; // must be negative 99 enum int SS_SEQ = 1 << 16; // version count 100 enum int QLOCK = 1; // must be 1 101 102 // Mode bits and sentinels, some also used in WorkQueue id and.source fields 103 enum int OWNED = 1; // queue has owner thread 104 enum int FIFO = 1 << 16; // fifo queue or access mode 105 enum int SHUTDOWN = 1 << 18; 106 enum int TERMINATED = 1 << 19; 107 enum int STOP = 1 << 31; // must be negative 108 enum int QUIET = 1 << 30; // not scanning or working 109 enum int DORMANT = QUIET | UNSIGNALLED; 110 111 /** 112 * The maximum number of top-level polls per worker before 113 * checking other queues, expressed as a bit shift to, in effect, 114 * multiply by pool size, and then use as random value mask, so 115 * average bound is about poolSize*(1<<TOP_BOUND_SHIFT). See 116 * above for rationale. 117 */ 118 enum int TOP_BOUND_SHIFT = 10; 119 120 /** 121 * Initial capacity of work-stealing queue array. 122 * Must be a power of two, at least 2. 123 */ 124 enum int INITIAL_QUEUE_CAPACITY = 1 << 13; 125 126 /** 127 * Maximum capacity for queue arrays. Must be a power of two less 128 * than or equal to 1 << (31 - width of array entry) to ensure 129 * lack of wraparound of index calculations, but defined to a 130 * value a bit less than this to help users trap runaway programs 131 * before saturating systems. 132 */ 133 enum int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M 134 135 } 136 137 /** 138 * An {@link ExecutorService} for running {@link ForkJoinTask}s. 139 * A {@code ForkJoinPool} provides the entry point for submissions 140 * from non-{@code ForkJoinTask} clients, as well as management and 141 * monitoring operations. 142 * 143 * <p>A {@code ForkJoinPool} differs from other kinds of {@link 144 * ExecutorService} mainly by virtue of employing 145 * <em>work-stealing</em>: all threads in the pool attempt to find and 146 * execute tasks submitted to the pool and/or created by other active 147 * tasks (eventually blocking waiting for work if none exist). This 148 * enables efficient processing when most tasks spawn other subtasks 149 * (as do most {@code ForkJoinTask}s), as well as when many small 150 * tasks are submitted to the pool from external clients. Especially 151 * when setting <em>asyncMode</em> to true in constructors, {@code 152 * ForkJoinPool}s may also be appropriate for use with event-style 153 * tasks that are never joined. All worker threads are initialized 154 * with {@link Thread#isDaemon} set {@code true}. 155 * 156 * <p>A static {@link #commonPool()} is available and appropriate for 157 * most applications. The common pool is used by any ForkJoinTask that 158 * is not explicitly submitted to a specified pool. Using the common 159 * pool normally reduces resource usage (its threads are slowly 160 * reclaimed during periods of non-use, and reinstated upon subsequent 161 * use). 162 * 163 * <p>For applications that require separate or custom pools, a {@code 164 * ForkJoinPool} may be constructed with a given target parallelism 165 * level; by default, equal to the number of available processors. 166 * The pool attempts to maintain enough active (or available) threads 167 * by dynamically adding, suspending, or resuming internal worker 168 * threads, even if some tasks are stalled waiting to join others. 169 * However, no such adjustments are guaranteed in the face of blocked 170 * I/O or other unmanaged synchronization. The nested {@link 171 * ManagedBlocker} interface enables extension of the kinds of 172 * synchronization accommodated. The default policies may be 173 * overridden using a constructor with parameters corresponding to 174 * those documented in class {@link ThreadPoolExecutor}. 175 * 176 * <p>In addition to execution and lifecycle control methods, this 177 * class provides status check methods (for example 178 * {@link #getStealCount}) that are intended to aid in developing, 179 * tuning, and monitoring fork/join applications. Also, method 180 * {@link #toString} returns indications of pool state in a 181 * convenient form for informal monitoring. 182 * 183 * <p>As is the case with other ExecutorServices, there are three 184 * main task execution methods summarized in the following table. 185 * These are designed to be used primarily by clients not already 186 * engaged in fork/join computations in the current pool. The main 187 * forms of these methods accept instances of {@code ForkJoinTask}, 188 * but overloaded forms also allow mixed execution of plain {@code 189 * Runnable}- or {@code Callable}- based activities as well. However, 190 * tasks that are already executing in a pool should normally instead 191 * use the within-computation forms listed in the table unless using 192 * async event-style tasks that are not usually joined, in which case 193 * there is little difference among choice of methods. 194 * 195 * <table class="plain"> 196 * <caption>Summary of task execution methods</caption> 197 * <tr> 198 * <td></td> 199 * <th scope="col"> Call from non-fork/join clients</th> 200 * <th scope="col"> Call from within fork/join computations</th> 201 * </tr> 202 * <tr> 203 * <th scope="row" style="text-align:left"> Arrange async execution</th> 204 * <td> {@link #execute(ForkJoinTask)}</td> 205 * <td> {@link ForkJoinTask#fork}</td> 206 * </tr> 207 * <tr> 208 * <th scope="row" style="text-align:left"> Await and obtain result</th> 209 * <td> {@link #invoke(ForkJoinTask)}</td> 210 * <td> {@link ForkJoinTask#invoke}</td> 211 * </tr> 212 * <tr> 213 * <th scope="row" style="text-align:left"> Arrange exec and obtain Future</th> 214 * <td> {@link #submit(ForkJoinTask)}</td> 215 * <td> {@link ForkJoinTask#fork} (ForkJoinTasks <em>are</em> Futures)</td> 216 * </tr> 217 * </table> 218 * 219 * <p>The parameters used to construct the common pool may be controlled by 220 * setting the following {@linkplain System#getProperty system properties}: 221 * <ul> 222 * <li>{@code hunt.concurrency.ForkJoinPool.common.parallelism} 223 * - the parallelism level, a non-negative integer 224 * <li>{@code hunt.concurrency.ForkJoinPool.common.threadFactory} 225 * - the class name of a {@link ForkJoinWorkerThreadFactory}. 226 * The {@linkplain ClassLoader#getSystemClassLoader() system class loader} 227 * is used to load this class. 228 * <li>{@code hunt.concurrency.ForkJoinPool.common.exceptionHandler} 229 * - the class name of a {@link UncaughtExceptionHandler}. 230 * The {@linkplain ClassLoader#getSystemClassLoader() system class loader} 231 * is used to load this class. 232 * <li>{@code hunt.concurrency.ForkJoinPool.common.maximumSpares} 233 * - the maximum number of allowed extra threads to maintain target 234 * parallelism (default 256). 235 * </ul> 236 * If no thread factory is supplied via a system property, then the 237 * common pool uses a factory that uses the system class loader as the 238 * {@linkplain Thread#getContextClassLoader() thread context class loader}. 239 * In addition, if a {@link SecurityManager} is present, then 240 * the common pool uses a factory supplying threads that have no 241 * {@link Permissions} enabled. 242 * 243 * Upon any error in establishing these settings, default parameters 244 * are used. It is possible to disable or limit the use of threads in 245 * the common pool by setting the parallelism property to zero, and/or 246 * using a factory that may return {@code null}. However doing so may 247 * cause unjoined tasks to never be executed. 248 * 249 * <p><b>Implementation notes</b>: This implementation restricts the 250 * maximum number of running threads to 32767. Attempts to create 251 * pools with greater than the maximum number result in 252 * {@code IllegalArgumentException}. 253 * 254 * <p>This implementation rejects submitted tasks (that is, by throwing 255 * {@link RejectedExecutionException}) only when the pool is shut down 256 * or internal resources have been exhausted. 257 * 258 * @author Doug Lea 259 */ 260 class ForkJoinPool : AbstractExecutorService { 261 262 /* 263 * Implementation Overview 264 * 265 * This class and its nested classes provide the main 266 * functionality and control for a set of worker threads: 267 * Submissions from non-FJ threads enter into submission queues. 268 * Workers take these tasks and typically split them into subtasks 269 * that may be stolen by other workers. Work-stealing based on 270 * randomized scans generally leads to better throughput than 271 * "work dealing" in which producers assign tasks to idle threads, 272 * in part because threads that have finished other tasks before 273 * the signalled thread wakes up (which can be a long time) can 274 * take the task instead. Preference rules give first priority to 275 * processing tasks from their own queues (LIFO or FIFO, depending 276 * on mode), then to randomized FIFO steals of tasks in other 277 * queues. This framework began as vehicle for supporting 278 * tree-structured parallelism using work-stealing. Over time, 279 * its scalability advantages led to extensions and changes to 280 * better support more diverse usage contexts. Because most 281 * internal methods and nested classes are interrelated, their 282 * main rationale and descriptions are presented here; individual 283 * methods and nested classes contain only brief comments about 284 * details. 285 * 286 * WorkQueues 287 * ========== 288 * 289 * Most operations occur within work-stealing queues (in nested 290 * class WorkQueue). These are special forms of Deques that 291 * support only three of the four possible end-operations -- push, 292 * pop, and poll (aka steal), under the further constraints that 293 * push and pop are called only from the owning thread (or, as 294 * extended here, under a lock), while poll may be called from 295 * other threads. (If you are unfamiliar with them, you probably 296 * want to read Herlihy and Shavit's book "The Art of 297 * Multiprocessor programming", chapter 16 describing these in 298 * more detail before proceeding.) The main work-stealing queue 299 * design is roughly similar to those in the papers "Dynamic 300 * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005 301 * (http://research.sun.com/scalable/pubs/index.html) and 302 * "Idempotent work stealing" by Michael, Saraswat, and Vechev, 303 * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186). 304 * The main differences ultimately stem from GC requirements that 305 * we null out taken slots as soon as we can, to maintain as small 306 * a footprint as possible even in programs generating huge 307 * numbers of tasks. To accomplish this, we shift the CAS 308 * arbitrating pop vs poll (steal) from being on the indices 309 * ("base" and "top") to the slots themselves. 310 * 311 * Adding tasks then takes the form of a classic array push(task) 312 * in a circular buffer: 313 * q.array[q.top++ % length] = task; 314 * 315 * (The actual code needs to null-check and size-check the array, 316 * uses masking, not mod, for indexing a power-of-two-sized array, 317 * adds a release fence for publication, and possibly signals 318 * waiting workers to start scanning -- see below.) Both a 319 * successful pop and poll mainly entail a CAS of a slot from 320 * non-null to null. 321 * 322 * The pop operation (always performed by owner) is: 323 * if ((the task at top slot is not null) and 324 * (CAS slot to null)) 325 * decrement top and return task; 326 * 327 * And the poll operation (usually by a stealer) is 328 * if ((the task at base slot is not null) and 329 * (CAS slot to null)) 330 * increment base and return task; 331 * 332 * There are several variants of each of these. Most uses occur 333 * within operations that also interleave contention or emptiness 334 * tracking or inspection of elements before extracting them, so 335 * must interleave these with the above code. When performed by 336 * owner, getAndSet is used instead of CAS (see for example method 337 * nextLocalTask) which is usually more efficient, and possible 338 * because the top index cannot independently change during the 339 * operation. 340 * 341 * Memory ordering. See "Correct and Efficient Work-Stealing for 342 * Weak Memory Models" by Le, Pop, Cohen, and Nardelli, PPoPP 2013 343 * (http://www.di.ens.fr/~zappa/readings/ppopp13.pdf) for an 344 * analysis of memory ordering requirements in work-stealing 345 * algorithms similar to (but different than) the one used here. 346 * Extracting tasks in array slots via (fully fenced) CAS provides 347 * primary synchronization. The base and top indices imprecisely 348 * guide where to extract from. We do not usually require strict 349 * orderings of array and index updates. Many index accesses use 350 * plain mode, with ordering constrained by surrounding context 351 * (usually with respect to element CASes or the two WorkQueue 352 * fields source and phase). When not otherwise already 353 * constrained, reads of "base" by queue owners use acquire-mode, 354 * and some externally callable methods preface accesses with 355 * acquire fences. Additionally, to ensure that index update 356 * writes are not coalesced or postponed in loops etc, "opaque" 357 * mode is used in a few cases where timely writes are not 358 * otherwise ensured. The "locked" versions of push- and pop- 359 * based methods for shared queues differ from owned versions 360 * because locking already forces some of the ordering. 361 * 362 * Because indices and slot contents cannot always be consistent, 363 * a check that base == top indicates (momentary) emptiness, but 364 * otherwise may err on the side of possibly making the queue 365 * appear nonempty when a push, pop, or poll have not fully 366 * committed, or making it appear empty when an update of top has 367 * not yet been visibly written. (Method isEmpty() checks the 368 * case of a partially completed removal of the last element.) 369 * Because of this, the poll operation, considered individually, 370 * is not wait-free. One thief cannot successfully continue until 371 * another in-progress one (or, if previously empty, a push) 372 * visibly completes. This can stall threads when required to 373 * consume from a given queue (see method poll()). However, in 374 * the aggregate, we ensure at least probabilistic 375 * non-blockingness. If an attempted steal fails, a scanning 376 * thief chooses a different random victim target to try next. So, 377 * in order for one thief to progress, it suffices for any 378 * in-progress poll or new push on any empty queue to complete. 379 * 380 * This approach also enables support of a user mode in which 381 * local task processing is in FIFO, not LIFO order, simply by 382 * using poll rather than pop. This can be useful in 383 * message-passing frameworks in which tasks are never joined. 384 * 385 * WorkQueues are also used in a similar way for tasks submitted 386 * to the pool. We cannot mix these tasks in the same queues used 387 * by workers. Instead, we randomly associate submission queues 388 * with submitting threads, using a form of hashing. The 389 * ThreadLocalRandom probe value serves as a hash code for 390 * choosing existing queues, and may be randomly repositioned upon 391 * contention with other submitters. In essence, submitters act 392 * like workers except that they are restricted to executing local 393 * tasks that they submitted. Insertion of tasks in shared mode 394 * requires a lock but we use only a simple spinlock (using field 395 * phase), because submitters encountering a busy queue move to a 396 * different position to use or create other queues -- they block 397 * only when creating and registering new queues. Because it is 398 * used only as a spinlock, unlocking requires only a "releasing" 399 * store (using setRelease) unless otherwise signalling. 400 * 401 * Management 402 * ========== 403 * 404 * The main throughput advantages of work-stealing stem from 405 * decentralized control -- workers mostly take tasks from 406 * themselves or each other, at rates that can exceed a billion 407 * per second. The pool itself creates, activates (enables 408 * scanning for and running tasks), deactivates, blocks, and 409 * terminates threads, all with minimal central information. 410 * There are only a few properties that we can globally track or 411 * maintain, so we pack them into a small number of variables, 412 * often maintaining atomicity without blocking or locking. 413 * Nearly all essentially atomic control state is held in a few 414 * variables that are by far most often read (not 415 * written) as status and consistency checks. We pack as much 416 * information into them as we can. 417 * 418 * Field "ctl" contains 64 bits holding information needed to 419 * atomically decide to add, enqueue (on an event queue), and 420 * dequeue and release workers. To enable this packing, we 421 * restrict maximum parallelism to (1<<15)-1 (which is far in 422 * excess of normal operating range) to allow ids, counts, and 423 * their negations (used for thresholding) to fit into 16bit 424 * subfields. 425 * 426 * Field "mode" holds configuration parameters as well as lifetime 427 * status, atomically and monotonically setting SHUTDOWN, STOP, 428 * and finally TERMINATED bits. 429 * 430 * Field "workQueues" holds references to WorkQueues. It is 431 * updated (only during worker creation and termination) under 432 * lock (using field workerNamePrefix as lock), but is otherwise 433 * concurrently readable, and accessed directly. We also ensure 434 * that uses of the array reference itself never become too stale 435 * in case of resizing, by arranging that (re-)reads are separated 436 * by at least one acquiring read access. To simplify index-based 437 * operations, the array size is always a power of two, and all 438 * readers must tolerate null slots. Worker queues are at odd 439 * indices. Shared (submission) queues are at even indices, up to 440 * a maximum of 64 slots, to limit growth even if the array needs 441 * to expand to add more workers. Grouping them together in this 442 * way simplifies and speeds up task scanning. 443 * 444 * All worker thread creation is on-demand, triggered by task 445 * submissions, replacement of terminated workers, and/or 446 * compensation for blocked workers. However, all other support 447 * code is set up to work with other policies. To ensure that we 448 * do not hold on to worker references that would prevent GC, all 449 * accesses to workQueues are via indices into the workQueues 450 * array (which is one source of some of the messy code 451 * constructions here). In essence, the workQueues array serves as 452 * a weak reference mechanism. Thus for example the stack top 453 * subfield of ctl stores indices, not references. 454 * 455 * Queuing Idle Workers. Unlike HPC work-stealing frameworks, we 456 * cannot let workers spin indefinitely scanning for tasks when 457 * none can be found immediately, and we cannot start/resume 458 * workers unless there appear to be tasks available. On the 459 * other hand, we must quickly prod them into action when new 460 * tasks are submitted or generated. In many usages, ramp-up time 461 * is the main limiting factor in overall performance, which is 462 * compounded at program start-up by JIT compilation and 463 * allocation. So we streamline this as much as possible. 464 * 465 * The "ctl" field atomically maintains total worker and 466 * "released" worker counts, plus the head of the available worker 467 * queue (actually stack, represented by the lower 32bit subfield 468 * of ctl). Released workers are those known to be scanning for 469 * and/or running tasks. Unreleased ("available") workers are 470 * recorded in the ctl stack. These workers are made available for 471 * signalling by enqueuing in ctl (see method runWorker). The 472 * "queue" is a form of Treiber stack. This is ideal for 473 * activating threads in most-recently used order, and improves 474 * performance and locality, outweighing the disadvantages of 475 * being prone to contention and inability to release a worker 476 * unless it is topmost on stack. To avoid missed signal problems 477 * inherent in any wait/signal design, available workers rescan 478 * for (and if found run) tasks after enqueuing. Normally their 479 * release status will be updated while doing so, but the released 480 * worker ctl count may underestimate the number of active 481 * threads. (However, it is still possible to determine quiescence 482 * via a validation traversal -- see isQuiescent). After an 483 * unsuccessful rescan, available workers are blocked until 484 * signalled (see signalWork). The top stack state holds the 485 * value of the "phase" field of the worker: its index and status, 486 * plus a version counter that, in addition to the count subfields 487 * (also serving as version stamps) provide protection against 488 * Treiber stack ABA effects. 489 * 490 * Creating workers. To create a worker, we pre-increment counts 491 * (serving as a reservation), and attempt to construct a 492 * ForkJoinWorkerThread via its factory. Upon construction, the 493 * new thread invokes registerWorker, where it constructs a 494 * WorkQueue and is assigned an index in the workQueues array 495 * (expanding the array if necessary). The thread is then started. 496 * Upon any exception across these steps, or null return from 497 * factory, deregisterWorker adjusts counts and records 498 * accordingly. If a null return, the pool continues running with 499 * fewer than the target number workers. If exceptional, the 500 * exception is propagated, generally to some external caller. 501 * Worker index assignment avoids the bias in scanning that would 502 * occur if entries were sequentially packed starting at the front 503 * of the workQueues array. We treat the array as a simple 504 * power-of-two hash table, expanding as needed. The seedIndex 505 * increment ensures no collisions until a resize is needed or a 506 * worker is deregistered and replaced, and thereafter keeps 507 * probability of collision low. We cannot use 508 * ThreadLocalRandom.getProbe() for similar purposes here because 509 * the thread has not started yet, but do so for creating 510 * submission queues for existing external threads (see 511 * externalPush). 512 * 513 * WorkQueue field "phase" is used by both workers and the pool to 514 * manage and track whether a worker is UNSIGNALLED (possibly 515 * blocked waiting for a signal). When a worker is enqueued its 516 * phase field is set. Note that phase field updates lag queue CAS 517 * releases so usage requires care -- seeing a negative phase does 518 * not guarantee that the worker is available. When queued, the 519 * lower 16 bits of scanState must hold its pool index. So we 520 * place the index there upon initialization and otherwise keep it 521 * there or restore it when necessary. 522 * 523 * The ctl field also serves as the basis for memory 524 * synchronization surrounding activation. This uses a more 525 * efficient version of a Dekker-like rule that task producers and 526 * consumers sync with each other by both writing/CASing ctl (even 527 * if to its current value). This would be extremely costly. So 528 * we relax it in several ways: (1) Producers only signal when 529 * their queue is possibly empty at some point during a push 530 * operation (which requires conservatively checking size zero or 531 * one to cover races). (2) Other workers propagate this signal 532 * when they find tasks in a queue with size greater than one. (3) 533 * Workers only enqueue after scanning (see below) and not finding 534 * any tasks. (4) Rather than CASing ctl to its current value in 535 * the common case where no action is required, we reduce write 536 * contention by equivalently prefacing signalWork when called by 537 * an external task producer using a memory access with 538 * full-semantics or a "fullFence". 539 * 540 * Almost always, too many signals are issued, in part because a 541 * task producer cannot tell if some existing worker is in the 542 * midst of finishing one task (or already scanning) and ready to 543 * take another without being signalled. So the producer might 544 * instead activate a different worker that does not find any 545 * work, and then inactivates. This scarcely matters in 546 * steady-state computations involving all workers, but can create 547 * contention and bookkeeping bottlenecks during ramp-up, 548 * ramp-down, and small computations involving only a few workers. 549 * 550 * Scanning. Method scan (from runWorker) performs top-level 551 * scanning for tasks. (Similar scans appear in helpQuiesce and 552 * pollScan.) Each scan traverses and tries to poll from each 553 * queue starting at a random index. Scans are not performed in 554 * ideal random permutation order, to reduce cacheline 555 * contention. The pseudorandom generator need not have 556 * high-quality statistical properties in the long term, but just 557 * within computations; We use Marsaglia XorShifts (often via 558 * ThreadLocalRandom.nextSecondarySeed), which are cheap and 559 * suffice. Scanning also includes contention reduction: When 560 * scanning workers fail to extract an apparently existing task, 561 * they soon restart at a different pseudorandom index. This form 562 * of backoff improves throughput when many threads are trying to 563 * take tasks from few queues, which can be common in some usages. 564 * Scans do not otherwise explicitly take into account core 565 * affinities, loads, cache localities, etc, However, they do 566 * exploit temporal locality (which usually approximates these) by 567 * preferring to re-poll from the same queue after a successful 568 * poll before trying others (see method topLevelExec). However 569 * this preference is bounded (see TOP_BOUND_SHIFT) as a safeguard 570 * against infinitely unfair looping under unbounded user task 571 * recursion, and also to reduce long-term contention when many 572 * threads poll few queues holding many small tasks. The bound is 573 * high enough to avoid much impact on locality and scheduling 574 * overhead. 575 * 576 * Trimming workers. To release resources after periods of lack of 577 * use, a worker starting to wait when the pool is quiescent will 578 * time out and terminate (see method runWorker) if the pool has 579 * remained quiescent for period given by field keepAlive. 580 * 581 * Shutdown and Termination. A call to shutdownNow invokes 582 * tryTerminate to atomically set a runState bit. The calling 583 * thread, as well as every other worker thereafter terminating, 584 * helps terminate others by cancelling their unprocessed tasks, 585 * and waking them up, doing so repeatedly until stable. Calls to 586 * non-abrupt shutdown() preface this by checking whether 587 * termination should commence by sweeping through queues (until 588 * stable) to ensure lack of in-flight submissions and workers 589 * about to process them before triggering the "STOP" phase of 590 * termination. 591 * 592 * Joining Tasks 593 * ============= 594 * 595 * Any of several actions may be taken when one worker is waiting 596 * to join a task stolen (or always held) by another. Because we 597 * are multiplexing many tasks on to a pool of workers, we can't 598 * always just let them block (as in Thread.join). We also cannot 599 * just reassign the joiner's run-time stack with another and 600 * replace it later, which would be a form of "continuation", that 601 * even if possible is not necessarily a good idea since we may 602 * need both an unblocked task and its continuation to progress. 603 * Instead we combine two tactics: 604 * 605 * Helping: Arranging for the joiner to execute some task that it 606 * would be running if the steal had not occurred. 607 * 608 * Compensating: Unless there are already enough live threads, 609 * method tryCompensate() may create or re-activate a spare 610 * thread to compensate for blocked joiners until they unblock. 611 * 612 * A third form (implemented in tryRemoveAndExec) amounts to 613 * helping a hypothetical compensator: If we can readily tell that 614 * a possible action of a compensator is to steal and execute the 615 * task being joined, the joining thread can do so directly, 616 * without the need for a compensation thread. 617 * 618 * The ManagedBlocker extension API can't use helping so relies 619 * only on compensation in method awaitBlocker. 620 * 621 * The algorithm in awaitJoin entails a form of "linear helping". 622 * Each worker records (in field source) the id of the queue from 623 * which it last stole a task. The scan in method awaitJoin uses 624 * these markers to try to find a worker to help (i.e., steal back 625 * a task from and execute it) that could hasten completion of the 626 * actively joined task. Thus, the joiner executes a task that 627 * would be on its own local deque if the to-be-joined task had 628 * not been stolen. This is a conservative variant of the approach 629 * described in Wagner & Calder "Leapfrogging: a portable 630 * technique for implementing efficient futures" SIGPLAN Notices, 631 * 1993 (http://portal.acm.org/citation.cfm?id=155354). It differs 632 * mainly in that we only record queue ids, not full dependency 633 * links. This requires a linear scan of the workQueues array to 634 * locate stealers, but isolates cost to when it is needed, rather 635 * than adding to per-task overhead. Searches can fail to locate 636 * stealers GC stalls and the like delay recording sources. 637 * Further, even when accurately identified, stealers might not 638 * ever produce a task that the joiner can in turn help with. So, 639 * compensation is tried upon failure to find tasks to run. 640 * 641 * Compensation does not by default aim to keep exactly the target 642 * parallelism number of unblocked threads running at any given 643 * time. Some previous versions of this class employed immediate 644 * compensations for any blocked join. However, in practice, the 645 * vast majority of blockages are byproducts of GC and 646 * other JVM or OS activities that are made worse by replacement 647 * when they cause longer-term oversubscription. Rather than 648 * impose arbitrary policies, we allow users to override the 649 * default of only adding threads upon apparent starvation. The 650 * compensation mechanism may also be bounded. Bounds for the 651 * commonPool (see COMMON_MAX_SPARES) better enable JVMs to cope 652 * with programming errors and abuse before running out of 653 * resources to do so. 654 * 655 * Common Pool 656 * =========== 657 * 658 * The static common pool always exists after static 659 * initialization. Since it (or any other created pool) need 660 * never be used, we minimize initial construction overhead and 661 * footprint to the setup of about a dozen fields. 662 * 663 * When external threads submit to the common pool, they can 664 * perform subtask processing (see externalHelpComplete and 665 * related methods) upon joins. This caller-helps policy makes it 666 * sensible to set common pool parallelism level to one (or more) 667 * less than the total number of available cores, or even zero for 668 * pure caller-runs. We do not need to record whether external 669 * submissions are to the common pool -- if not, external help 670 * methods return quickly. These submitters would otherwise be 671 * blocked waiting for completion, so the extra effort (with 672 * liberally sprinkled task status checks) in inapplicable cases 673 * amounts to an odd form of limited spin-wait before blocking in 674 * ForkJoinTask.join. 675 * 676 * As a more appropriate default in managed environments, unless 677 * overridden by system properties, we use workers of subclass 678 * InnocuousForkJoinWorkerThread when there is a SecurityManager 679 * present. These workers have no permissions set, do not belong 680 * to any user-defined ThreadGroupEx, and erase all ThreadLocals 681 * after executing any top-level task (see 682 * WorkQueue.afterTopLevelExec). The associated mechanics (mainly 683 * in ForkJoinWorkerThread) may be JVM-dependent and must access 684 * particular Thread class fields to achieve this effect. 685 * 686 * Memory placement 687 * ================ 688 * 689 * Performance can be very sensitive to placement of instances of 690 * ForkJoinPool and WorkQueues and their queue arrays. To reduce 691 * false-sharing impact, the @Contended annotation isolates 692 * adjacent WorkQueue instances, as well as the ForkJoinPool.ctl 693 * field. WorkQueue arrays are allocated (by their threads) with 694 * larger initial sizes than most ever need, mostly to reduce 695 * false sharing with current garbage collectors that use cardmark 696 * tables. 697 * 698 * Style notes 699 * =========== 700 * 701 * Memory ordering relies mainly on VarHandles. This can be 702 * awkward and ugly, but also reflects the need to control 703 * outcomes across the unusual cases that arise in very racy code 704 * with very few invariants. All fields are read into locals 705 * before use, and null-checked if they are references. Array 706 * accesses using masked indices include checks (that are always 707 * true) that the array length is non-zero to avoid compilers 708 * inserting more expensive traps. This is usually done in a 709 * "C"-like style of listing declarations at the heads of methods 710 * or blocks, and using inline assignments on first encounter. 711 * Nearly all explicit checks lead to bypass/return, not exception 712 * throws, because they may legitimately arise due to 713 * cancellation/revocation during shutdown. 714 * 715 * There is a lot of representation-level coupling among classes 716 * ForkJoinPool, ForkJoinWorkerThread, and ForkJoinTask. The 717 * fields of WorkQueue maintain data structures managed by 718 * ForkJoinPool, so are directly accessed. There is little point 719 * trying to reduce this, since any associated future changes in 720 * representations will need to be accompanied by algorithmic 721 * changes anyway. Several methods intrinsically sprawl because 722 * they must accumulate sets of consistent reads of fields held in 723 * local variables. Some others are artificially broken up to 724 * reduce producer/consumer imbalances due to dynamic compilation. 725 * There are also other coding oddities (including several 726 * unnecessary-looking hoisted null checks) that help some methods 727 * perform reasonably even when interpreted (not compiled). 728 * 729 * The order of declarations in this file is (with a few exceptions): 730 * (1) Static utility functions 731 * (2) Nested (static) classes 732 * (3) Static fields 733 * (4) Fields, along with constants used when unpacking some of them 734 * (5) Internal control methods 735 * (6) Callbacks and other support for ForkJoinTask methods 736 * (7) Exported methods 737 * (8) Static block initializing statics in minimally dependent order 738 */ 739 740 // Static utilities 741 742 /** 743 * If there is a security manager, makes sure caller has 744 * permission to modify threads. 745 */ 746 // private static void checkPermission() { 747 // SecurityManager security = System.getSecurityManager(); 748 // if (security !is null) 749 // security.checkPermission(modifyThreadPermission); 750 // } 751 752 753 // static fields (initialized in static initializer below) 754 755 /** 756 * Creates a new ForkJoinWorkerThread. This factory is used unless 757 * overridden in ForkJoinPool constructors. 758 */ 759 __gshared ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory; 760 761 /** 762 * Permission required for callers of methods that may start or 763 * kill threads. 764 */ 765 // __gshared RuntimePermission modifyThreadPermission; 766 767 /** 768 * Common (static) pool. Non-null for use unless a static 769 * construction exception, but internal usages null-check on use 770 * to paranoically avoid potential initialization circularities 771 * as well as to simplify generated code. 772 */ 773 __gshared ForkJoinPool common; 774 775 /** 776 * Common pool parallelism. To allow simpler use and management 777 * when common pool threads are disabled, we allow the underlying 778 * common.parallelism field to be zero, but in that case still report 779 * parallelism as 1 to reflect resulting caller-runs mechanics. 780 */ 781 __gshared int COMMON_PARALLELISM; 782 783 /** 784 * Limit on spare thread construction in tryCompensate. 785 */ 786 private __gshared int COMMON_MAX_SPARES; 787 788 /** 789 * Sequence number for creating workerNamePrefix. 790 */ 791 private shared static int poolNumberSequence; 792 793 /** 794 * Returns the next sequence number. We don't expect this to 795 * ever contend, so use simple builtin sync. 796 */ 797 private static int nextPoolId() { 798 return AtomicHelper.increment(poolNumberSequence); 799 } 800 801 // static configuration constants 802 803 /** 804 * Default idle timeout value (in milliseconds) for the thread 805 * triggering quiescence to park waiting for new work 806 */ 807 private enum long DEFAULT_KEEPALIVE = 60_000L; 808 809 /** 810 * Undershoot tolerance for idle timeouts 811 */ 812 private enum long TIMEOUT_SLOP = 20L; 813 814 /** 815 * The default value for COMMON_MAX_SPARES. Overridable using the 816 * "hunt.concurrency.ForkJoinPool.common.maximumSpares" system 817 * property. The default value is far in excess of normal 818 * requirements, but also far short of MAX_CAP and typical OS 819 * thread limits, so allows JVMs to catch misuse/abuse before 820 * running out of resources needed to do so. 821 */ 822 private enum int DEFAULT_COMMON_MAX_SPARES = 256; 823 824 /** 825 * Increment for seed generators. See class ThreadLocal for 826 * explanation. 827 */ 828 private enum int SEED_INCREMENT = 0x9e3779b9; 829 830 /* 831 * Bits and masks for field ctl, packed with 4 16 bit subfields: 832 * RC: Number of released (unqueued) workers minus target parallelism 833 * TC: Number of total workers minus target parallelism 834 * SS: version count and status of top waiting thread 835 * ID: poolIndex of top of Treiber stack of waiters 836 * 837 * When convenient, we can extract the lower 32 stack top bits 838 * (including version bits) as sp=(int)ctl. The offsets of counts 839 * by the target parallelism and the positionings of fields makes 840 * it possible to perform the most common checks via sign tests of 841 * fields: When ac is negative, there are not enough unqueued 842 * workers, when tc is negative, there are not enough total 843 * workers. When sp is non-zero, there are waiting workers. To 844 * deal with possibly negative fields, we use casts in and out of 845 * "short" and/or signed shifts to maintain signedness. 846 * 847 * Because it occupies uppermost bits, we can add one release count 848 * using getAndAddLong of RC_UNIT, rather than CAS, when returning 849 * from a blocked join. Other updates entail multiple subfields 850 * and masking, requiring CAS. 851 * 852 * The limits packed in field "bounds" are also offset by the 853 * parallelism level to make them comparable to the ctl rc and tc 854 * fields. 855 */ 856 857 // Lower and upper word masks 858 private enum long SP_MASK = 0xffffffffL; 859 private enum long UC_MASK = ~SP_MASK; 860 861 // Release counts 862 private enum int RC_SHIFT = 48; 863 private enum long RC_UNIT = 0x0001L << RC_SHIFT; 864 private enum long RC_MASK = 0xffffL << RC_SHIFT; 865 866 // Total counts 867 private enum int TC_SHIFT = 32; 868 private enum long TC_UNIT = 0x0001L << TC_SHIFT; 869 private enum long TC_MASK = 0xffffL << TC_SHIFT; 870 private enum long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); // sign 871 872 // Instance fields 873 874 long stealCount; // collects worker nsteals 875 Duration keepAlive; // milliseconds before dropping if idle 876 int indexSeed; // next worker index 877 int bounds; // min, max threads packed as shorts 878 int mode; // parallelism, runstate, queue mode 879 WorkQueue[] workQueues; // main registry 880 string workerNamePrefix; // for worker thread string; sync lock 881 Object workerNameLocker; 882 ForkJoinWorkerThreadFactory factory; 883 UncaughtExceptionHandler ueh; // per-worker UEH 884 Predicate!(ForkJoinPool) saturate; 885 886 // @jdk.internal.vm.annotation.Contended("fjpctl") // segregate 887 shared long ctl; // main pool control 888 889 // Creating, registering and deregistering workers 890 891 /** 892 * Tries to construct and start one worker. Assumes that total 893 * count has already been incremented as a reservation. Invokes 894 * deregisterWorker on any failure. 895 * 896 * @return true if successful 897 */ 898 private bool createWorker() { 899 ForkJoinWorkerThreadFactory fac = factory; 900 Throwable ex = null; 901 ForkJoinWorkerThread wt = null; 902 try { 903 if (fac !is null && (wt = fac.newThread(this)) !is null) { 904 wt.start(); 905 return true; 906 } 907 } catch (Throwable rex) { 908 ex = rex; 909 } 910 deregisterWorker(wt, ex); 911 return false; 912 } 913 914 /** 915 * Tries to add one worker, incrementing ctl counts before doing 916 * so, relying on createWorker to back out on failure. 917 * 918 * @param c incoming ctl value, with total count negative and no 919 * idle workers. On CAS failure, c is refreshed and retried if 920 * this holds (otherwise, a new worker is not needed). 921 */ 922 private void tryAddWorker(long c) { 923 do { 924 long nc = ((RC_MASK & (c + RC_UNIT)) | 925 (TC_MASK & (c + TC_UNIT))); 926 if (ctl == c && AtomicHelper.compareAndSet(this.ctl, c, nc)) { 927 // version(HUNT_CONCURRENCY_DEBUG) tracef("nc=%d, ctl=%d, c=%d", nc, ctl, c); 928 createWorker(); 929 break; 930 } 931 } while (((c = ctl) & ADD_WORKER) != 0L && cast(int)c == 0); 932 } 933 934 /** 935 * Callback from ForkJoinWorkerThread constructor to establish and 936 * record its WorkQueue. 937 * 938 * @param wt the worker thread 939 * @return the worker's queue 940 */ 941 final WorkQueue registerWorker(ForkJoinWorkerThread wt) { 942 UncaughtExceptionHandler handler; 943 wt.isDaemon(true); // configure thread 944 if ((handler = ueh) !is null) 945 wt.setUncaughtExceptionHandler(handler); 946 int tid = 0; // for thread name 947 int idbits = mode & FIFO; 948 string prefix = workerNamePrefix; 949 WorkQueue w = new WorkQueue(this, wt); 950 if (prefix !is null) { 951 synchronized (this) { 952 WorkQueue[] ws = workQueues; 953 int n; 954 int s = indexSeed += SEED_INCREMENT; 955 idbits |= (s & ~(SMASK | FIFO | DORMANT)); 956 if (ws !is null && (n = cast(int)ws.length) > 1) { 957 int m = n - 1; 958 tid = m & ((s << 1) | 1); // odd-numbered indices 959 for (int probes = n >>> 1;;) { // find empty slot 960 WorkQueue q; 961 if ((q = ws[tid]) is null || q.phase == QUIET) 962 break; 963 else if (--probes == 0) { 964 tid = n | 1; // resize below 965 break; 966 } 967 else 968 tid = (tid + 2) & m; 969 } 970 w.phase = w.id = tid | idbits; // now publishable 971 972 if (tid < n) 973 ws[tid] = w; 974 else { // expand array 975 int an = n << 1; 976 WorkQueue[] as = new WorkQueue[an]; 977 as[tid] = w; 978 int am = an - 1; 979 for (int j = 0; j < n; ++j) { 980 WorkQueue v; // copy external queue 981 if ((v = ws[j]) !is null) // position may change 982 as[v.id & am & SQMASK] = v; 983 if (++j >= n) 984 break; 985 as[j] = ws[j]; // copy worker 986 } 987 workQueues = as; 988 } 989 } 990 } 991 wt.name(prefix ~ tid.to!string()); 992 } 993 return w; 994 } 995 996 /** 997 * Final callback from terminating worker, as well as upon failure 998 * to construct or start a worker. Removes record of worker from 999 * array, and adjusts counts. If pool is shutting down, tries to 1000 * complete termination. 1001 * 1002 * @param wt the worker thread, or null if construction failed 1003 * @param ex the exception causing failure, or null if none 1004 */ 1005 final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) { 1006 WorkQueue w = null; 1007 int phase = 0; 1008 if (wt !is null && (w = wt.workQueue) !is null) { 1009 int wid = w.id; 1010 long ns = cast(long)w.nsteals & 0xffffffffL; 1011 if (!workerNamePrefix.empty()) { 1012 synchronized (this) { 1013 WorkQueue[] ws; size_t n, i; // remove index from array 1014 if ((ws = workQueues) !is null && (n = ws.length) > 0 && 1015 ws[i = wid & (n - 1)] == w) 1016 ws[i] = null; 1017 stealCount += ns; 1018 } 1019 } 1020 phase = w.phase; 1021 } 1022 if (phase != QUIET) { // else pre-adjusted 1023 long c; // decrement counts 1024 do {} while (!AtomicHelper.compareAndSet(this.ctl, c = ctl, 1025 ((RC_MASK & (c - RC_UNIT)) | 1026 (TC_MASK & (c - TC_UNIT)) | 1027 (SP_MASK & c)))); 1028 } 1029 if (w !is null) 1030 w.cancelAll(); // cancel remaining tasks 1031 1032 if (!tryTerminate(false, false) && // possibly replace worker 1033 w !is null && w.array !is null) // avoid repeated failures 1034 signalWork(); 1035 1036 if (ex !is null) // rethrow 1037 ForkJoinTaskHelper.rethrow(ex); 1038 } 1039 1040 /** 1041 * Tries to create or release a worker if too few are running. 1042 */ 1043 final void signalWork() { 1044 for (;;) { 1045 long c; int sp; WorkQueue[] ws; int i; WorkQueue v; 1046 if ((c = ctl) >= 0L) // enough workers 1047 break; 1048 else if ((sp = cast(int)c) == 0) { // no idle workers 1049 if ((c & ADD_WORKER) != 0L) // too few workers 1050 tryAddWorker(c); 1051 break; 1052 } 1053 else if ((ws = workQueues) is null) 1054 break; // unstarted/terminated 1055 else if (ws.length <= (i = sp & SMASK)) 1056 break; // terminated 1057 else if ((v = ws[i]) is null) 1058 break; // terminating 1059 else { 1060 int np = sp & ~UNSIGNALLED; 1061 int vp = v.phase; 1062 long nc = (v.stackPred & SP_MASK) | (UC_MASK & (c + RC_UNIT)); 1063 Thread vt = v.owner; 1064 if (sp == vp && AtomicHelper.compareAndSet(this.ctl, c, nc)) { 1065 v.phase = np; 1066 if (vt !is null && v.source < 0) 1067 LockSupport.unpark(vt); 1068 break; 1069 } 1070 } 1071 } 1072 } 1073 1074 /** 1075 * Tries to decrement counts (sometimes implicitly) and possibly 1076 * arrange for a compensating worker in preparation for blocking: 1077 * If not all core workers yet exist, creates one, else if any are 1078 * unreleased (possibly including caller) releases one, else if 1079 * fewer than the minimum allowed number of workers running, 1080 * checks to see that they are all active, and if so creates an 1081 * extra worker unless over maximum limit and policy is to 1082 * saturate. Most of these steps can fail due to interference, in 1083 * which case 0 is returned so caller will retry. A negative 1084 * return value indicates that the caller doesn't need to 1085 * re-adjust counts when later unblocked. 1086 * 1087 * @return 1: block then adjust, -1: block without adjust, 0 : retry 1088 */ 1089 private int tryCompensate(WorkQueue w) { 1090 int t, n, sp; 1091 long c = ctl; 1092 WorkQueue[] ws = workQueues; 1093 if ((t = cast(short)(c >>> TC_SHIFT)) >= 0) { 1094 if (ws is null || (n = cast(int)ws.length) <= 0 || w is null) 1095 return 0; // disabled 1096 else if ((sp = cast(int)c) != 0) { // replace or release 1097 WorkQueue v = ws[sp & (n - 1)]; 1098 int wp = w.phase; 1099 long uc = UC_MASK & ((wp < 0) ? c + RC_UNIT : c); 1100 int np = sp & ~UNSIGNALLED; 1101 if (v !is null) { 1102 int vp = v.phase; 1103 Thread vt = v.owner; 1104 long nc = (cast(long)v.stackPred & SP_MASK) | uc; 1105 if (vp == sp && AtomicHelper.compareAndSet(this.ctl, c, nc)) { 1106 v.phase = np; 1107 if (vt !is null && v.source < 0) 1108 LockSupport.unpark(vt); 1109 return (wp < 0) ? -1 : 1; 1110 } 1111 } 1112 return 0; 1113 } 1114 else if (cast(int)(c >> RC_SHIFT) - // reduce parallelism 1115 cast(short)(bounds & SMASK) > 0) { 1116 long nc = ((RC_MASK & (c - RC_UNIT)) | (~RC_MASK & c)); 1117 return AtomicHelper.compareAndSet(this.ctl, c, nc) ? 1 : 0; 1118 } 1119 else { // validate 1120 int md = mode, pc = md & SMASK, tc = pc + t, bc = 0; 1121 bool unstable = false; 1122 for (int i = 1; i < n; i += 2) { 1123 WorkQueue q; ThreadEx wt; ThreadState ts; 1124 if ((q = ws[i]) !is null) { 1125 if (q.source == 0) { 1126 unstable = true; 1127 break; 1128 } 1129 else { 1130 --tc; 1131 if ((wt = q.owner) !is null && 1132 ((ts = wt.getState()) == ThreadState.BLOCKED || 1133 ts == ThreadState.WAITING)) 1134 ++bc; // worker is blocking 1135 } 1136 } 1137 } 1138 if (unstable || tc != 0 || ctl != c) 1139 return 0; // inconsistent 1140 else if (t + pc >= MAX_CAP || t >= (bounds >>> SWIDTH)) { 1141 Predicate!(ForkJoinPool) sat; 1142 if ((sat = saturate) !is null && sat(this)) 1143 return -1; 1144 else if (bc < pc) { // lagging 1145 Thread.yield(); // for retry spins 1146 return 0; 1147 } 1148 else 1149 throw new RejectedExecutionException( 1150 "Thread limit exceeded replacing blocked worker"); 1151 } 1152 } 1153 } 1154 1155 long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK); // expand pool 1156 return AtomicHelper.compareAndSet(this.ctl, c, nc) && createWorker() ? 1 : 0; 1157 } 1158 1159 /** 1160 * Top-level runloop for workers, called by ForkJoinWorkerThread.run. 1161 * See above for explanation. 1162 */ 1163 final void runWorker(WorkQueue w) { 1164 int r = (w.id ^ ThreadLocalRandom.nextSecondarySeed()) | FIFO; // rng 1165 w.array = new IForkJoinTask[INITIAL_QUEUE_CAPACITY]; // initialize 1166 while(true) { 1167 int phase; 1168 if (scan(w, r)) { // scan until apparently empty 1169 r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // move (xorshift) 1170 } 1171 else if ((phase = w.phase) >= 0) { // enqueue, then rescan 1172 long np = (w.phase = (phase + SS_SEQ) | UNSIGNALLED) & SP_MASK; 1173 long c, nc; 1174 do { 1175 w.stackPred = cast(int)(c = ctl); 1176 nc = ((c - RC_UNIT) & UC_MASK) | np; 1177 } while (!AtomicHelper.compareAndSet(this.ctl, c, nc)); 1178 1179 version(HUNT_CONCURRENCY_DEBUG) { 1180 // infof("ctl=%d, c=%d, nc=%d, stackPred=%d", ctl, c, nc, w.stackPred); 1181 } 1182 } 1183 else { // already queued 1184 1185 int pred = w.stackPred; 1186 ThreadEx.interrupted(); // clear before park 1187 w.source = DORMANT; // enable signal 1188 long c = ctl; 1189 int md = mode, rc = (md & SMASK) + cast(int)(c >> RC_SHIFT); 1190 1191 version(HUNT_CONCURRENCY_DEBUG) { 1192 // tracef("md=%d, rc=%d, c=%d, pred=%d, phase=%d", md, rc, c, pred, phase); 1193 } 1194 1195 if (md < 0) { // terminating 1196 break; 1197 } else if (rc <= 0 && (md & SHUTDOWN) != 0 && tryTerminate(false, false)) { 1198 break; // quiescent shutdown 1199 } else if (rc <= 0 && pred != 0 && phase == cast(int)c) { 1200 long nc = (UC_MASK & (c - TC_UNIT)) | (SP_MASK & pred); 1201 MonoTime d = MonoTime.currTime + keepAlive; // DateTime.currentTimeMillis(); 1202 LockSupport.parkUntil(this, d); 1203 if (ctl == c && // drop on timeout if all idle 1204 d - MonoTime.currTime <= TIMEOUT_SLOP.msecs && 1205 AtomicHelper.compareAndSet(this.ctl, c, nc)) { 1206 w.phase = QUIET; 1207 break; 1208 } 1209 } else if (w.phase < 0) { 1210 LockSupport.park(this); // OK if spuriously woken 1211 break; 1212 } 1213 w.source = 0; // disable signal 1214 } 1215 } 1216 } 1217 1218 /** 1219 * Scans for and if found executes one or more top-level tasks from a queue. 1220 * 1221 * @return true if found an apparently non-empty queue, and 1222 * possibly ran task(s). 1223 */ 1224 private bool scan(WorkQueue w, int r) { 1225 WorkQueue[] ws; int n; 1226 if ((ws = workQueues) !is null && (n = cast(int)ws.length) > 0 && w !is null) { 1227 for (int m = n - 1, j = r & m;;) { 1228 WorkQueue q; int b; 1229 if ((q = ws[j]) !is null && q.top != (b = q.base)) { 1230 int qid = q.id; 1231 IForkJoinTask[] a; 1232 size_t cap, k; 1233 IForkJoinTask t; 1234 1235 if ((a = q.array) !is null && (cap = cast(int)a.length) > 0) { 1236 k = (cap - 1) & b; 1237 // import core.atomic; 1238 // auto ss = core.atomic.atomicLoad((cast(shared)a[k])); 1239 // FIXME: Needing refactor or cleanup -@zxp at 2/6/2019, 5:12:19 PM 1240 // IForkJoinTask tt = a[k]; 1241 // t = AtomicHelper.load(tt); 1242 t = a[k]; 1243 // tracef("k=%d, t is null: %s", k, t is null); 1244 if (q.base == b++ && t !is null && 1245 AtomicHelper.compareAndSet(a[k], t, cast(IForkJoinTask)null)) { 1246 q.base = b; 1247 w.source = qid; 1248 if (q.top - b > 0) signalWork(); 1249 1250 // infof("IForkJoinTask: %s", typeid(cast(Object)t)); 1251 w.topLevelExec(t, q, // random fairness bound 1252 r & ((n << TOP_BOUND_SHIFT) - 1)); 1253 } 1254 } 1255 return true; 1256 } 1257 else if (--n > 0) { 1258 j = (j + 1) & m; 1259 } 1260 else { 1261 break; 1262 } 1263 } 1264 } 1265 return false; 1266 } 1267 1268 /** 1269 * Helps and/or blocks until the given task is done or timeout. 1270 * First tries locally helping, then scans other queues for a task 1271 * produced by one of w's stealers; compensating and blocking if 1272 * none are found (rescanning if tryCompensate fails). 1273 * 1274 * @param w caller 1275 * @param task the task 1276 * @param deadline for timed waits, if nonzero 1277 * @return task status on exit 1278 */ 1279 final int awaitJoin(WorkQueue w, IForkJoinTask task, MonoTime deadline) { 1280 if(w is null || task is null) { 1281 return 0; 1282 } 1283 int s = 0; 1284 int seed = ThreadLocalRandom.nextSecondarySeed(); 1285 ICountedCompleter cc = cast(ICountedCompleter)task; 1286 if(cc !is null) { 1287 s = w.helpCC(cc, 0, false); 1288 if(s<0) 1289 return 0; 1290 } 1291 1292 w.tryRemoveAndExec(task); 1293 int src = w.source, id = w.id; 1294 int r = (seed >>> 16) | 1, step = (seed & ~1) | 2; 1295 s = task.getStatus(); 1296 while (s >= 0) { 1297 WorkQueue[] ws; 1298 int n = (ws = workQueues) is null ? 0 : cast(int)ws.length, m = n - 1; 1299 while (n > 0) { 1300 WorkQueue q; int b; 1301 if ((q = ws[r & m]) !is null && q.source == id && 1302 q.top != (b = q.base)) { 1303 IForkJoinTask[] a; int cap, k; 1304 int qid = q.id; 1305 if ((a = q.array) !is null && (cap = cast(int)a.length) > 0) { 1306 k = (cap - 1) & b; 1307 // FIXME: Needing refactor or cleanup -@zxp at 2/6/2019, 5:13:08 PM 1308 // 1309 // IForkJoinTask t = AtomicHelper.load(a[k]); 1310 IForkJoinTask t = a[k]; 1311 if (q.source == id && q.base == b++ && 1312 t !is null && AtomicHelper.compareAndSet(a[k], t, cast(IForkJoinTask)null)) { 1313 q.base = b; 1314 w.source = qid; 1315 t.doExec(); 1316 w.source = src; 1317 } 1318 } 1319 break; 1320 } 1321 else { 1322 r += step; 1323 --n; 1324 } 1325 } 1326 1327 if ((s = task.getStatus()) < 0) 1328 break; 1329 else if (n == 0) { // empty scan 1330 long ms; int block; 1331 Duration ns; 1332 if (deadline == MonoTime.zero()) 1333 ms = 0L; // untimed 1334 else if ((ns = deadline - MonoTime.currTime) <= Duration.zero()) 1335 break; // timeout 1336 else if ((ms = ns.total!(TimeUnit.Millisecond)()) <= 0L) 1337 ms = 1L; // avoid 0 for timed wait 1338 if ((block = tryCompensate(w)) != 0) { 1339 task.internalWait(ms); 1340 AtomicHelper.getAndAdd(this.ctl, (block > 0) ? RC_UNIT : 0L); 1341 } 1342 s = task.getStatus(); 1343 } 1344 } 1345 return s; 1346 } 1347 1348 /** 1349 * Runs tasks until {@code isQuiescent()}. Rather than blocking 1350 * when tasks cannot be found, rescans until all others cannot 1351 * find tasks either. 1352 */ 1353 final void helpQuiescePool(WorkQueue w) { 1354 int prevSrc = w.source; 1355 int seed = ThreadLocalRandom.nextSecondarySeed(); 1356 int r = seed >>> 16, step = r | 1; 1357 for (int source = prevSrc, released = -1;;) { // -1 until known 1358 IForkJoinTask localTask; WorkQueue[] ws; 1359 while ((localTask = w.nextLocalTask()) !is null) 1360 localTask.doExec(); 1361 if (w.phase >= 0 && released == -1) 1362 released = 1; 1363 bool quiet = true, empty = true; 1364 int n = (ws = workQueues) is null ? 0 : cast(int)ws.length; 1365 for (int m = n - 1; n > 0; r += step, --n) { 1366 WorkQueue q; int b; 1367 if ((q = ws[r & m]) !is null) { 1368 int qs = q.source; 1369 if (q.top != (b = q.base)) { 1370 quiet = empty = false; 1371 IForkJoinTask[] a; int cap, k; 1372 int qid = q.id; 1373 if ((a = q.array) !is null && (cap = cast(int)a.length) > 0) { 1374 if (released == 0) { // increment 1375 released = 1; 1376 AtomicHelper.getAndAdd(this.ctl, RC_UNIT); 1377 } 1378 k = (cap - 1) & b; 1379 // IForkJoinTask t = AtomicHelper.load(a[k]); 1380 // FIXME: Needing refactor or cleanup -@zxp at 2/6/2019, 9:32:07 PM 1381 // 1382 IForkJoinTask t = a[k]; 1383 if (q.base == b++ && t !is null) { 1384 if(AtomicHelper.compareAndSet(a[k], t, null)) { 1385 q.base = b; 1386 w.source = qid; 1387 t.doExec(); 1388 w.source = source = prevSrc; 1389 } 1390 } 1391 } 1392 break; 1393 } 1394 else if ((qs & QUIET) == 0) 1395 quiet = false; 1396 } 1397 } 1398 if (quiet) { 1399 if (released == 0) { 1400 AtomicHelper.getAndAdd(this.ctl, RC_UNIT); 1401 } 1402 w.source = prevSrc; 1403 break; 1404 } 1405 else if (empty) { 1406 if (source != QUIET) 1407 w.source = source = QUIET; 1408 if (released == 1) { // decrement 1409 released = 0; 1410 AtomicHelper.getAndAdd(this.ctl, RC_MASK & -RC_UNIT); 1411 } 1412 } 1413 } 1414 } 1415 1416 /** 1417 * Scans for and returns a polled task, if available. 1418 * Used only for untracked polls. 1419 * 1420 * @param submissionsOnly if true, only scan submission queues 1421 */ 1422 private IForkJoinTask pollScan(bool submissionsOnly) { 1423 WorkQueue[] ws; int n; 1424 rescan: while ((mode & STOP) == 0 && (ws = workQueues) !is null && 1425 (n = cast(int)ws.length) > 0) { 1426 int m = n - 1; 1427 int r = ThreadLocalRandom.nextSecondarySeed(); 1428 int h = r >>> 16; 1429 int origin, step; 1430 if (submissionsOnly) { 1431 origin = (r & ~1) & m; // even indices and steps 1432 step = (h & ~1) | 2; 1433 } 1434 else { 1435 origin = r & m; 1436 step = h | 1; 1437 } 1438 bool nonempty = false; 1439 for (int i = origin, oldSum = 0, checkSum = 0;;) { 1440 WorkQueue q; 1441 if ((q = ws[i]) !is null) { 1442 int b; IForkJoinTask t; 1443 if (q.top - (b = q.base) > 0) { 1444 nonempty = true; 1445 if ((t = q.poll()) !is null) 1446 return t; 1447 } 1448 else 1449 checkSum += b + q.id; 1450 } 1451 if ((i = (i + step) & m) == origin) { 1452 if (!nonempty && oldSum == (oldSum = checkSum)) 1453 break rescan; 1454 checkSum = 0; 1455 nonempty = false; 1456 } 1457 } 1458 } 1459 return null; 1460 } 1461 1462 /** 1463 * Gets and removes a local or stolen task for the given worker. 1464 * 1465 * @return a task, if available 1466 */ 1467 final IForkJoinTask nextTaskFor(WorkQueue w) { 1468 IForkJoinTask t; 1469 if (w is null || (t = w.nextLocalTask()) is null) 1470 t = pollScan(false); 1471 return t; 1472 } 1473 1474 // External operations 1475 1476 /** 1477 * Adds the given task to a submission queue at submitter's 1478 * current queue, creating one if null or contended. 1479 * 1480 * @param task the task. Caller must ensure non-null. 1481 */ 1482 final void externalPush(IForkJoinTask task) { 1483 // initialize caller's probe 1484 int r = ThreadLocalRandom.getProbe(); 1485 if (r == 0) { 1486 ThreadLocalRandom.localInit(); 1487 r = ThreadLocalRandom.getProbe(); 1488 } 1489 1490 for (;;) { 1491 WorkQueue q; 1492 int md = mode, n; 1493 WorkQueue[] ws = workQueues; 1494 if ((md & SHUTDOWN) != 0 || ws is null || (n = cast(int)ws.length) <= 0) 1495 throw new RejectedExecutionException(); 1496 else if ((q = ws[(n - 1) & r & SQMASK]) is null) { // add queue 1497 int qid = (r | QUIET) & ~(FIFO | OWNED); 1498 Object lock = workerNameLocker; 1499 IForkJoinTask[] qa = 1500 new IForkJoinTask[INITIAL_QUEUE_CAPACITY]; 1501 q = new WorkQueue(this, null); 1502 q.array = qa; 1503 q.id = qid; 1504 q.source = QUIET; 1505 if (lock !is null) { // unless disabled, lock pool to install 1506 synchronized (lock) { 1507 WorkQueue[] vs; int i, vn; 1508 if ((vs = workQueues) !is null && (vn = cast(int)vs.length) > 0 && 1509 vs[i = qid & (vn - 1) & SQMASK] is null) 1510 vs[i] = q; // else another thread already installed 1511 } 1512 } 1513 } 1514 else if (!q.tryLockPhase()) // move if busy 1515 r = ThreadLocalRandom.advanceProbe(r); 1516 else { 1517 if (q.lockedPush(task)) 1518 signalWork(); 1519 return; 1520 } 1521 } 1522 } 1523 1524 /** 1525 * Pushes a possibly-external submission. 1526 */ 1527 private IForkJoinTask externalSubmit(IForkJoinTask task) { 1528 if (task is null) 1529 throw new NullPointerException(); 1530 ForkJoinWorkerThread w = cast(ForkJoinWorkerThread)Thread.getThis(); 1531 WorkQueue q; 1532 if ( w !is null && w.pool is this && 1533 (q = w.workQueue) !is null) 1534 q.push(task); 1535 else 1536 externalPush(task); 1537 return task; 1538 } 1539 1540 private ForkJoinTask!(T) externalSubmit(T)(ForkJoinTask!(T) task) { 1541 if (task is null) 1542 throw new NullPointerException(); 1543 ForkJoinWorkerThread w = cast(ForkJoinWorkerThread)Thread.getThis(); 1544 WorkQueue q; 1545 if ( w !is null && w.pool is this && 1546 (q = w.workQueue) !is null) 1547 q.push(task); 1548 else 1549 externalPush(task); 1550 return task; 1551 } 1552 1553 /** 1554 * Returns common pool queue for an external thread. 1555 */ 1556 static WorkQueue commonSubmitterQueue() { 1557 ForkJoinPool p = common; 1558 int r = ThreadLocalRandom.getProbe(); 1559 WorkQueue[] ws; int n; 1560 return (p !is null && (ws = p.workQueues) !is null && 1561 (n = cast(int)ws.length) > 0) ? 1562 ws[(n - 1) & r & SQMASK] : null; 1563 } 1564 1565 /** 1566 * Performs tryUnpush for an external submitter. 1567 */ 1568 final bool tryExternalUnpush(IForkJoinTask task) { 1569 int r = ThreadLocalRandom.getProbe(); 1570 WorkQueue[] ws; WorkQueue w; int n; 1571 return ((ws = workQueues) !is null && 1572 (n = cast(int)ws.length) > 0 && 1573 (w = ws[(n - 1) & r & SQMASK]) !is null && 1574 w.tryLockedUnpush(task)); 1575 } 1576 1577 /** 1578 * Performs helpComplete for an external submitter. 1579 */ 1580 final int externalHelpComplete(ICountedCompleter task, int maxTasks) { 1581 int r = ThreadLocalRandom.getProbe(); 1582 WorkQueue[] ws; WorkQueue w; int n; 1583 return ((ws = workQueues) !is null && (n = cast(int)ws.length) > 0 && 1584 (w = ws[(n - 1) & r & SQMASK]) !is null) ? 1585 w.helpCC(task, maxTasks, true) : 0; 1586 } 1587 1588 /** 1589 * Tries to steal and run tasks within the target's computation. 1590 * The maxTasks argument supports external usages; internal calls 1591 * use zero, allowing unbounded steps (external calls trap 1592 * non-positive values). 1593 * 1594 * @param w caller 1595 * @param maxTasks if non-zero, the maximum number of other tasks to run 1596 * @return task status on exit 1597 */ 1598 final int helpComplete(WorkQueue w, ICountedCompleter task, 1599 int maxTasks) { 1600 return (w is null) ? 0 : w.helpCC(task, maxTasks, false); 1601 } 1602 1603 /** 1604 * Returns a cheap heuristic guide for task partitioning when 1605 * programmers, frameworks, tools, or languages have little or no 1606 * idea about task granularity. In essence, by offering this 1607 * method, we ask users only about tradeoffs in overhead vs 1608 * expected throughput and its variance, rather than how finely to 1609 * partition tasks. 1610 * 1611 * In a steady state strict (tree-structured) computation, each 1612 * thread makes available for stealing enough tasks for other 1613 * threads to remain active. Inductively, if all threads play by 1614 * the same rules, each thread should make available only a 1615 * constant number of tasks. 1616 * 1617 * The minimum useful constant is just 1. But using a value of 1 1618 * would require immediate replenishment upon each steal to 1619 * maintain enough tasks, which is infeasible. Further, 1620 * partitionings/granularities of offered tasks should minimize 1621 * steal rates, which in general means that threads nearer the top 1622 * of computation tree should generate more than those nearer the 1623 * bottom. In perfect steady state, each thread is at 1624 * approximately the same level of computation tree. However, 1625 * producing extra tasks amortizes the uncertainty of progress and 1626 * diffusion assumptions. 1627 * 1628 * So, users will want to use values larger (but not much larger) 1629 * than 1 to both smooth over shortages and hedge 1630 * against uneven progress; as traded off against the cost of 1631 * extra task overhead. We leave the user to pick a threshold 1632 * value to compare with the results of this call to guide 1633 * decisions, but recommend values such as 3. 1634 * 1635 * When all threads are active, it is on average OK to estimate 1636 * surplus strictly locally. In steady-state, if one thread is 1637 * maintaining say 2 surplus tasks, then so are others. So we can 1638 * just use estimated queue length. However, this strategy alone 1639 * leads to serious mis-estimates in some non-steady-state 1640 * conditions (ramp-up, ramp-down, other stalls). We can detect 1641 * many of these by further considering the number of "idle" 1642 * threads, that are known to have zero queued tasks, so 1643 * compensate by a factor of (#idle/#active) threads. 1644 */ 1645 static int getSurplusQueuedTaskCount() { 1646 Thread t = Thread.getThis(); 1647 ForkJoinWorkerThread wt = cast(ForkJoinWorkerThread)t; 1648 ForkJoinPool pool; WorkQueue q; 1649 1650 if (wt !is null && (pool = wt.pool) !is null && 1651 (q = wt.workQueue) !is null) { 1652 int p = pool.mode & SMASK; 1653 int a = p + cast(int)(pool.ctl >> RC_SHIFT); 1654 int n = q.top - q.base; 1655 return n - (a > (p >>>= 1) ? 0 : 1656 a > (p >>>= 1) ? 1 : 1657 a > (p >>>= 1) ? 2 : 1658 a > (p >>>= 1) ? 4 : 1659 8); 1660 } 1661 return 0; 1662 } 1663 1664 // Termination 1665 1666 /** 1667 * Possibly initiates and/or completes termination. 1668 * 1669 * @param now if true, unconditionally terminate, else only 1670 * if no work and no active workers 1671 * @param enable if true, terminate when next possible 1672 * @return true if terminating or terminated 1673 */ 1674 private bool tryTerminate(bool now, bool enable) { 1675 int md; // 3 phases: try to set SHUTDOWN, then STOP, then TERMINATED 1676 1677 while (((md = mode) & SHUTDOWN) == 0) { 1678 if (!enable || this == common) // cannot shutdown 1679 return false; 1680 else { 1681 AtomicHelper.compareAndSet(this.mode, md, md | SHUTDOWN); 1682 } 1683 1684 } 1685 1686 while (((md = mode) & STOP) == 0) { // try to initiate termination 1687 if (!now) { // check if quiescent & empty 1688 for (long oldSum = 0L;;) { // repeat until stable 1689 bool running = false; 1690 long checkSum = ctl; 1691 WorkQueue[] ws = workQueues; 1692 if ((md & SMASK) + cast(int)(checkSum >> RC_SHIFT) > 0) 1693 running = true; 1694 else if (ws !is null) { 1695 WorkQueue w; 1696 for (int i = 0; i < ws.length; ++i) { 1697 if ((w = ws[i]) !is null) { 1698 int s = w.source, p = w.phase; 1699 int d = w.id, b = w.base; 1700 if (b != w.top || 1701 ((d & 1) == 1 && (s >= 0 || p >= 0))) { 1702 running = true; 1703 break; // working, scanning, or have work 1704 } 1705 checkSum += ((cast(long)s << 48) + (cast(long)p << 32) + 1706 (cast(long)b << 16) + cast(long)d); 1707 } 1708 } 1709 } 1710 if (((md = mode) & STOP) != 0) 1711 break; // already triggered 1712 else if (running) 1713 return false; 1714 else if (workQueues == ws && oldSum == (oldSum = checkSum)) 1715 break; 1716 } 1717 } 1718 if ((md & STOP) == 0) 1719 AtomicHelper.compareAndSet(this.mode, md, md | STOP); 1720 } 1721 1722 while (((md = mode) & TERMINATED) == 0) { // help terminate others 1723 for (long oldSum = 0L;;) { // repeat until stable 1724 WorkQueue[] ws; WorkQueue w; 1725 long checkSum = ctl; 1726 if ((ws = workQueues) !is null) { 1727 for (int i = 0; i < ws.length; ++i) { 1728 if ((w = ws[i]) !is null) { 1729 ForkJoinWorkerThread wt = w.owner; 1730 w.cancelAll(); // clear queues 1731 if (wt !is null) { 1732 try { // unblock join or park 1733 wt.interrupt(); 1734 } catch (Throwable ignore) { 1735 } 1736 } 1737 checkSum += (cast(long)w.phase << 32) + w.base; 1738 } 1739 } 1740 } 1741 if (((md = mode) & TERMINATED) != 0 || 1742 (workQueues == ws && oldSum == (oldSum = checkSum))) 1743 break; 1744 } 1745 if ((md & TERMINATED) != 0) 1746 break; 1747 else if ((md & SMASK) + cast(short)(ctl >>> TC_SHIFT) > 0) 1748 break; 1749 else if (AtomicHelper.compareAndSet(this.mode, md, md | TERMINATED)) { 1750 synchronized (this) { 1751 // notifyAll(); // for awaitTermination 1752 // TODO: Tasks pending completion -@zxp at 2/4/2019, 11:03:21 AM 1753 // 1754 } 1755 break; 1756 } 1757 } 1758 return true; 1759 } 1760 1761 // Exported methods 1762 1763 // Constructors 1764 1765 /** 1766 * Creates a {@code ForkJoinPool} with parallelism equal to {@link 1767 * java.lang.Runtime#availableProcessors}, using defaults for all 1768 * other parameters (see {@link #ForkJoinPool(int, 1769 * ForkJoinWorkerThreadFactory, UncaughtExceptionHandler, bool, 1770 * int, int, int, Predicate, long, TimeUnit)}). 1771 * 1772 * @throws SecurityException if a security manager exists and 1773 * the caller is not permitted to modify threads 1774 * because it does not hold {@link 1775 * java.lang.RuntimePermission}{@code ("modifyThread")} 1776 */ 1777 this() { 1778 this(min(MAX_CAP, totalCPUs), 1779 defaultForkJoinWorkerThreadFactory, null, false, 1780 0, MAX_CAP, 1, null, dur!(TimeUnit.Millisecond)(DEFAULT_KEEPALIVE)); 1781 } 1782 1783 /** 1784 * Creates a {@code ForkJoinPool} with the indicated parallelism 1785 * level, using defaults for all other parameters (see {@link 1786 * #ForkJoinPool(int, ForkJoinWorkerThreadFactory, 1787 * UncaughtExceptionHandler, bool, int, int, int, Predicate, 1788 * long, TimeUnit)}). 1789 * 1790 * @param parallelism the parallelism level 1791 * @throws IllegalArgumentException if parallelism less than or 1792 * equal to zero, or greater than implementation limit 1793 * @throws SecurityException if a security manager exists and 1794 * the caller is not permitted to modify threads 1795 * because it does not hold {@link 1796 * java.lang.RuntimePermission}{@code ("modifyThread")} 1797 */ 1798 this(int parallelism) { 1799 this(parallelism, defaultForkJoinWorkerThreadFactory, null, false, 1800 0, MAX_CAP, 1, null, dur!(TimeUnit.Millisecond)(DEFAULT_KEEPALIVE)); 1801 } 1802 1803 /** 1804 * Creates a {@code ForkJoinPool} with the given parameters (using 1805 * defaults for others -- see {@link #ForkJoinPool(int, 1806 * ForkJoinWorkerThreadFactory, UncaughtExceptionHandler, bool, 1807 * int, int, int, Predicate, long, TimeUnit)}). 1808 * 1809 * @param parallelism the parallelism level. For default value, 1810 * use {@link java.lang.Runtime#availableProcessors}. 1811 * @param factory the factory for creating new threads. For default value, 1812 * use {@link #defaultForkJoinWorkerThreadFactory}. 1813 * @param handler the handler for internal worker threads that 1814 * terminate due to unrecoverable errors encountered while executing 1815 * tasks. For default value, use {@code null}. 1816 * @param asyncMode if true, 1817 * establishes local first-in-first-out scheduling mode for forked 1818 * tasks that are never joined. This mode may be more appropriate 1819 * than default locally stack-based mode in applications in which 1820 * worker threads only process event-style asynchronous tasks. 1821 * For default value, use {@code false}. 1822 * @throws IllegalArgumentException if parallelism less than or 1823 * equal to zero, or greater than implementation limit 1824 * @throws NullPointerException if the factory is null 1825 * @throws SecurityException if a security manager exists and 1826 * the caller is not permitted to modify threads 1827 * because it does not hold {@link 1828 * java.lang.RuntimePermission}{@code ("modifyThread")} 1829 */ 1830 this(int parallelism, 1831 ForkJoinWorkerThreadFactory factory, 1832 UncaughtExceptionHandler handler, 1833 bool asyncMode) { 1834 this(parallelism, factory, handler, asyncMode, 1835 0, MAX_CAP, 1, null, dur!(TimeUnit.Millisecond)(DEFAULT_KEEPALIVE)); 1836 } 1837 1838 /** 1839 * Creates a {@code ForkJoinPool} with the given parameters. 1840 * 1841 * @param parallelism the parallelism level. For default value, 1842 * use {@link java.lang.Runtime#availableProcessors}. 1843 * 1844 * @param factory the factory for creating new threads. For 1845 * default value, use {@link #defaultForkJoinWorkerThreadFactory}. 1846 * 1847 * @param handler the handler for internal worker threads that 1848 * terminate due to unrecoverable errors encountered while 1849 * executing tasks. For default value, use {@code null}. 1850 * 1851 * @param asyncMode if true, establishes local first-in-first-out 1852 * scheduling mode for forked tasks that are never joined. This 1853 * mode may be more appropriate than default locally stack-based 1854 * mode in applications in which worker threads only process 1855 * event-style asynchronous tasks. For default value, use {@code 1856 * false}. 1857 * 1858 * @param corePoolSize the number of threads to keep in the pool 1859 * (unless timed out after an elapsed keep-alive). Normally (and 1860 * by default) this is the same value as the parallelism level, 1861 * but may be set to a larger value to reduce dynamic overhead if 1862 * tasks regularly block. Using a smaller value (for example 1863 * {@code 0}) has the same effect as the default. 1864 * 1865 * @param maximumPoolSize the maximum number of threads allowed. 1866 * When the maximum is reached, attempts to replace blocked 1867 * threads fail. (However, because creation and termination of 1868 * different threads may overlap, and may be managed by the given 1869 * thread factory, this value may be transiently exceeded.) To 1870 * arrange the same value as is used by default for the common 1871 * pool, use {@code 256} plus the {@code parallelism} level. (By 1872 * default, the common pool allows a maximum of 256 spare 1873 * threads.) Using a value (for example {@code 1874 * Integer.MAX_VALUE}) larger than the implementation's total 1875 * thread limit has the same effect as using this limit (which is 1876 * the default). 1877 * 1878 * @param minimumRunnable the minimum allowed number of core 1879 * threads not blocked by a join or {@link ManagedBlocker}. To 1880 * ensure progress, when too few unblocked threads exist and 1881 * unexecuted tasks may exist, new threads are constructed, up to 1882 * the given maximumPoolSize. For the default value, use {@code 1883 * 1}, that ensures liveness. A larger value might improve 1884 * throughput in the presence of blocked activities, but might 1885 * not, due to increased overhead. A value of zero may be 1886 * acceptable when submitted tasks cannot have dependencies 1887 * requiring additional threads. 1888 * 1889 * @param saturate if non-null, a predicate invoked upon attempts 1890 * to create more than the maximum total allowed threads. By 1891 * default, when a thread is about to block on a join or {@link 1892 * ManagedBlocker}, but cannot be replaced because the 1893 * maximumPoolSize would be exceeded, a {@link 1894 * RejectedExecutionException} is thrown. But if this predicate 1895 * returns {@code true}, then no exception is thrown, so the pool 1896 * continues to operate with fewer than the target number of 1897 * runnable threads, which might not ensure progress. 1898 * 1899 * @param keepAliveTime the elapsed time since last use before 1900 * a thread is terminated (and then later replaced if needed). 1901 * For the default value, use {@code 60, TimeUnit.SECONDS}. 1902 * 1903 * @param unit the time unit for the {@code keepAliveTime} argument 1904 * 1905 * @throws IllegalArgumentException if parallelism is less than or 1906 * equal to zero, or is greater than implementation limit, 1907 * or if maximumPoolSize is less than parallelism, 1908 * of if the keepAliveTime is less than or equal to zero. 1909 * @throws NullPointerException if the factory is null 1910 * @throws SecurityException if a security manager exists and 1911 * the caller is not permitted to modify threads 1912 * because it does not hold {@link 1913 * java.lang.RuntimePermission}{@code ("modifyThread")} 1914 */ 1915 this(int parallelism, 1916 ForkJoinWorkerThreadFactory factory, 1917 UncaughtExceptionHandler handler, 1918 bool asyncMode, 1919 int corePoolSize, 1920 int maximumPoolSize, 1921 int minimumRunnable, 1922 Predicate!(ForkJoinPool) saturate, 1923 Duration keepAliveTime) { 1924 // check, encode, pack parameters 1925 if (parallelism <= 0 || parallelism > MAX_CAP || 1926 maximumPoolSize < parallelism || keepAliveTime <= Duration.zero) 1927 throw new IllegalArgumentException(); 1928 if (factory is null) 1929 throw new NullPointerException(); 1930 long ms = max(keepAliveTime.total!(TimeUnit.Millisecond), TIMEOUT_SLOP); 1931 1932 int corep = min(max(corePoolSize, parallelism), MAX_CAP); 1933 long c = (((cast(long)(-corep) << TC_SHIFT) & TC_MASK) | 1934 ((cast(long)(-parallelism) << RC_SHIFT) & RC_MASK)); 1935 int m = parallelism | (asyncMode ? FIFO : 0); 1936 int maxSpares = min(maximumPoolSize, MAX_CAP) - parallelism; 1937 int minAvail = min(max(minimumRunnable, 0), MAX_CAP); 1938 int b = ((minAvail - parallelism) & SMASK) | (maxSpares << SWIDTH); 1939 int n = (parallelism > 1) ? parallelism - 1 : 1; // at least 2 slots 1940 n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; 1941 n = (n + 1) << 1; // power of two, including space for submission queues 1942 1943 this.workerNamePrefix = "ForkJoinPool-" ~ nextPoolId().to!string() ~ "-worker-"; 1944 this.workQueues = new WorkQueue[n]; 1945 this.factory = factory; 1946 this.ueh = handler; 1947 this.saturate = saturate; 1948 this.keepAlive = ms.msecs; 1949 this.bounds = b; 1950 this.mode = m; 1951 this.ctl = c; 1952 // checkPermission(); 1953 workerNameLocker = new Object(); 1954 } 1955 1956 private static Object newInstanceFrom(string className) { 1957 return (className.empty) ? null : Object.factory(className); 1958 } 1959 1960 /** 1961 * Constructor for common pool using parameters possibly 1962 * overridden by system properties 1963 */ 1964 private this(byte forCommonPoolOnly) { 1965 int parallelism = -1; 1966 ForkJoinWorkerThreadFactory fac = null; 1967 UncaughtExceptionHandler handler = null; 1968 try { // ignore exceptions in accessing/parsing properties 1969 ConfigBuilder config = Environment.getProperties(); 1970 string pp = config.getProperty("hunt.concurrency.ForkJoinPool.common.parallelism"); 1971 if (!pp.empty) { 1972 parallelism = pp.to!int(); 1973 } 1974 1975 string className = config.getProperty("hunt.concurrency.ForkJoinPool.common.threadFactory"); 1976 fac = cast(ForkJoinWorkerThreadFactory) newInstanceFrom(className); 1977 1978 className = config.getProperty("hunt.concurrency.ForkJoinPool.common.exceptionHandler"); 1979 handler = cast(UncaughtExceptionHandler) newInstanceFrom(className); 1980 } catch (Exception ignore) { 1981 version(HUNT_DEBUG) { 1982 warning(ignore); 1983 } 1984 } 1985 1986 if (fac is null) { 1987 // if (System.getSecurityManager() is null) 1988 fac = defaultForkJoinWorkerThreadFactory; 1989 // else // use security-managed default 1990 // fac = new InnocuousForkJoinWorkerThreadFactory(); 1991 } 1992 if (parallelism < 0 && // default 1 less than #cores 1993 (parallelism = totalCPUs - 1) <= 0) 1994 parallelism = 1; 1995 if (parallelism > MAX_CAP) 1996 parallelism = MAX_CAP; 1997 1998 long c = (((cast(long)(-parallelism) << TC_SHIFT) & TC_MASK) | 1999 ((cast(long)(-parallelism) << RC_SHIFT) & RC_MASK)); 2000 int b = ((1 - parallelism) & SMASK) | (COMMON_MAX_SPARES << SWIDTH); 2001 int n = (parallelism > 1) ? parallelism - 1 : 1; 2002 n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; 2003 n = (n + 1) << 1; 2004 2005 this.workerNamePrefix = "ForkJoinPool.commonPool-worker-"; 2006 this.workerNameLocker = new Object(); 2007 this.workQueues = new WorkQueue[n]; 2008 this.factory = fac; 2009 this.ueh = handler; 2010 this.saturate = null; 2011 this.keepAlive = DEFAULT_KEEPALIVE.msecs; 2012 this.bounds = b; 2013 this.mode = parallelism; 2014 this.ctl = c; 2015 } 2016 2017 /** 2018 * Returns the common pool instance. This pool is statically 2019 * constructed; its run state is unaffected by attempts to {@link 2020 * #shutdown} or {@link #shutdownNow}. However this pool and any 2021 * ongoing processing are automatically terminated upon program 2022 * {@link System#exit}. Any program that relies on asynchronous 2023 * task processing to complete before program termination should 2024 * invoke {@code commonPool().}{@link #awaitQuiescence awaitQuiescence}, 2025 * before exit. 2026 * 2027 * @return the common pool instance 2028 */ 2029 static ForkJoinPool commonPool() { 2030 return common; 2031 } 2032 2033 // Execution methods 2034 2035 /** 2036 * Performs the given task, returning its result upon completion. 2037 * If the computation encounters an unchecked Exception or Error, 2038 * it is rethrown as the outcome of this invocation. Rethrown 2039 * exceptions behave in the same way as regular exceptions, but, 2040 * when possible, contain stack traces (as displayed for example 2041 * using {@code ex.printStackTrace()}) of both the current thread 2042 * as well as the thread actually encountering the exception; 2043 * minimally only the latter. 2044 * 2045 * @param task the task 2046 * @param (T) the type of the task's result 2047 * @return the task's result 2048 * @throws NullPointerException if the task is null 2049 * @throws RejectedExecutionException if the task cannot be 2050 * scheduled for execution 2051 */ 2052 T invoke(T)(ForkJoinTask!(T) task) { 2053 if (task is null) 2054 throw new NullPointerException(); 2055 externalSubmit!(T)(task); 2056 version(HUNT_CONCURRENCY_DEBUG) { 2057 infof("waiting the result..."); 2058 T c = task.join(); 2059 infof("final result: %s", c); 2060 return c; 2061 } else { 2062 return task.join(); 2063 } 2064 } 2065 2066 /** 2067 * Arranges for (asynchronous) execution of the given task. 2068 * 2069 * @param task the task 2070 * @throws NullPointerException if the task is null 2071 * @throws RejectedExecutionException if the task cannot be 2072 * scheduled for execution 2073 */ 2074 void execute(IForkJoinTask task) { 2075 externalSubmit(task); 2076 } 2077 2078 // AbstractExecutorService methods 2079 2080 /** 2081 * @throws NullPointerException if the task is null 2082 * @throws RejectedExecutionException if the task cannot be 2083 * scheduled for execution 2084 */ 2085 void execute(Runnable task) { 2086 if (task is null) 2087 throw new NullPointerException(); 2088 IForkJoinTask job = cast(IForkJoinTask) task; 2089 if (job is null) { // avoid re-wrap 2090 warning("job is null"); 2091 job = new RunnableExecuteAction(task); 2092 } 2093 externalSubmit(job); 2094 } 2095 2096 /** 2097 * Submits a ForkJoinTask for execution. 2098 * 2099 * @param task the task to submit 2100 * @param (T) the type of the task's result 2101 * @return the task 2102 * @throws NullPointerException if the task is null 2103 * @throws RejectedExecutionException if the task cannot be 2104 * scheduled for execution 2105 */ 2106 ForkJoinTask!(T) submitTask(T)(ForkJoinTask!(T) task) { 2107 return externalSubmit(task); 2108 } 2109 2110 /** 2111 * @throws NullPointerException if the task is null 2112 * @throws RejectedExecutionException if the task cannot be 2113 * scheduled for execution 2114 */ 2115 ForkJoinTask!(T) submitTask(T)(Callable!(T) task) { 2116 return externalSubmit(new AdaptedCallable!(T)(task)); 2117 } 2118 2119 /** 2120 * @throws NullPointerException if the task is null 2121 * @throws RejectedExecutionException if the task cannot be 2122 * scheduled for execution 2123 */ 2124 ForkJoinTask!(T) submitTask(T)(Runnable task, T result) { 2125 return externalSubmit(new AdaptedRunnable!(T)(task, result)); 2126 } 2127 2128 /** 2129 * @throws NullPointerException if the task is null 2130 * @throws RejectedExecutionException if the task cannot be 2131 * scheduled for execution 2132 */ 2133 2134 IForkJoinTask submitTask(Runnable task) { 2135 if (task is null) 2136 throw new NullPointerException(); 2137 IForkJoinTask t = cast(IForkJoinTask)task; 2138 return externalSubmit(t !is null 2139 ? cast(ForkJoinTask!(void)) task // avoid re-wrap 2140 : new AdaptedRunnableAction(task)); 2141 } 2142 2143 /** 2144 * @throws NullPointerException {@inheritDoc} 2145 * @throws RejectedExecutionException {@inheritDoc} 2146 */ 2147 List!(Future!(T)) invokeAll(T)(Collection!(Callable!(T)) tasks) { 2148 // In previous versions of this class, this method constructed 2149 // a task to run ForkJoinTask.invokeAll, but now external 2150 // invocation of multiple tasks is at least as efficient. 2151 ArrayList!(Future!(T)) futures = new ArrayList!(Future!(T))(tasks.size()); 2152 2153 try { 2154 foreach (Callable!(T) t ; tasks) { 2155 ForkJoinTask!(T) f = new AdaptedCallable!(T)(t); 2156 futures.add(f); 2157 externalSubmit(f); 2158 } 2159 for (int i = 0, size = futures.size(); i < size; i++) 2160 (cast(IForkJoinTask)(futures.get(i))).quietlyJoin(); 2161 return futures; 2162 } catch (Throwable t) { 2163 for (int i = 0, size = futures.size(); i < size; i++) 2164 futures.get(i).cancel(false); 2165 throw t; 2166 } 2167 } 2168 2169 /** 2170 * Returns the factory used for constructing new workers. 2171 * 2172 * @return the factory used for constructing new workers 2173 */ 2174 ForkJoinWorkerThreadFactory getFactory() { 2175 return factory; 2176 } 2177 2178 /** 2179 * Returns the handler for internal worker threads that terminate 2180 * due to unrecoverable errors encountered while executing tasks. 2181 * 2182 * @return the handler, or {@code null} if none 2183 */ 2184 UncaughtExceptionHandler getUncaughtExceptionHandler() { 2185 return ueh; 2186 } 2187 2188 /** 2189 * Returns the targeted parallelism level of this pool. 2190 * 2191 * @return the targeted parallelism level of this pool 2192 */ 2193 int getParallelism() { 2194 int par = mode & SMASK; 2195 return (par > 0) ? par : 1; 2196 } 2197 2198 /** 2199 * Returns the targeted parallelism level of the common pool. 2200 * 2201 * @return the targeted parallelism level of the common pool 2202 */ 2203 static int getCommonPoolParallelism() { 2204 return COMMON_PARALLELISM; 2205 } 2206 2207 /** 2208 * Returns the number of worker threads that have started but not 2209 * yet terminated. The result returned by this method may differ 2210 * from {@link #getParallelism} when threads are created to 2211 * maintain parallelism when others are cooperatively blocked. 2212 * 2213 * @return the number of worker threads 2214 */ 2215 int getPoolSize() { 2216 return ((mode & SMASK) + cast(short)(ctl >>> TC_SHIFT)); 2217 } 2218 2219 /** 2220 * Returns {@code true} if this pool uses local first-in-first-out 2221 * scheduling mode for forked tasks that are never joined. 2222 * 2223 * @return {@code true} if this pool uses async mode 2224 */ 2225 bool getAsyncMode() { 2226 return (mode & FIFO) != 0; 2227 } 2228 2229 /** 2230 * Returns an estimate of the number of worker threads that are 2231 * not blocked waiting to join tasks or for other managed 2232 * synchronization. This method may overestimate the 2233 * number of running threads. 2234 * 2235 * @return the number of worker threads 2236 */ 2237 int getRunningThreadCount() { 2238 WorkQueue[] ws; WorkQueue w; 2239 // VarHandle.acquireFence(); 2240 int rc = 0; 2241 if ((ws = workQueues) !is null) { 2242 for (int i = 1; i < cast(int)ws.length; i += 2) { 2243 if ((w = ws[i]) !is null && w.isApparentlyUnblocked()) 2244 ++rc; 2245 } 2246 } 2247 return rc; 2248 } 2249 2250 /** 2251 * Returns an estimate of the number of threads that are currently 2252 * stealing or executing tasks. This method may overestimate the 2253 * number of active threads. 2254 * 2255 * @return the number of active threads 2256 */ 2257 int getActiveThreadCount() { 2258 int r = (mode & SMASK) + cast(int)(ctl >> RC_SHIFT); 2259 return (r <= 0) ? 0 : r; // suppress momentarily negative values 2260 } 2261 2262 /** 2263 * Returns {@code true} if all worker threads are currently idle. 2264 * An idle worker is one that cannot obtain a task to execute 2265 * because none are available to steal from other threads, and 2266 * there are no pending submissions to the pool. This method is 2267 * conservative; it might not return {@code true} immediately upon 2268 * idleness of all threads, but will eventually become true if 2269 * threads remain inactive. 2270 * 2271 * @return {@code true} if all threads are currently idle 2272 */ 2273 bool isQuiescent() { 2274 for (;;) { 2275 long c = ctl; 2276 int md = mode, pc = md & SMASK; 2277 int tc = pc + cast(short)(c >>> TC_SHIFT); 2278 int rc = pc + cast(int)(c >> RC_SHIFT); 2279 if ((md & (STOP | TERMINATED)) != 0) 2280 return true; 2281 else if (rc > 0) 2282 return false; 2283 else { 2284 WorkQueue[] ws; WorkQueue v; 2285 if ((ws = workQueues) !is null) { 2286 for (int i = 1; i < ws.length; i += 2) { 2287 if ((v = ws[i]) !is null) { 2288 if (v.source > 0) 2289 return false; 2290 --tc; 2291 } 2292 } 2293 } 2294 if (tc == 0 && ctl == c) 2295 return true; 2296 } 2297 } 2298 } 2299 2300 /** 2301 * Returns an estimate of the total number of tasks stolen from 2302 * one thread's work queue by another. The reported value 2303 * underestimates the actual total number of steals when the pool 2304 * is not quiescent. This value may be useful for monitoring and 2305 * tuning fork/join programs: in general, steal counts should be 2306 * high enough to keep threads busy, but low enough to avoid 2307 * overhead and contention across threads. 2308 * 2309 * @return the number of steals 2310 */ 2311 long getStealCount() { 2312 long count = stealCount; 2313 WorkQueue[] ws; WorkQueue w; 2314 if ((ws = workQueues) !is null) { 2315 for (int i = 1; i < ws.length; i += 2) { 2316 if ((w = ws[i]) !is null) 2317 count += cast(long)w.nsteals & 0xffffffffL; 2318 } 2319 } 2320 return count; 2321 } 2322 2323 /** 2324 * Returns an estimate of the total number of tasks currently held 2325 * in queues by worker threads (but not including tasks submitted 2326 * to the pool that have not begun executing). This value is only 2327 * an approximation, obtained by iterating across all threads in 2328 * the pool. This method may be useful for tuning task 2329 * granularities. 2330 * 2331 * @return the number of queued tasks 2332 */ 2333 long getQueuedTaskCount() { 2334 WorkQueue[] ws; WorkQueue w; 2335 // VarHandle.acquireFence(); 2336 int count = 0; 2337 if ((ws = workQueues) !is null) { 2338 for (int i = 1; i < cast(int)ws.length; i += 2) { 2339 if ((w = ws[i]) !is null) 2340 count += w.queueSize(); 2341 } 2342 } 2343 return count; 2344 } 2345 2346 /** 2347 * Returns an estimate of the number of tasks submitted to this 2348 * pool that have not yet begun executing. This method may take 2349 * time proportional to the number of submissions. 2350 * 2351 * @return the number of queued submissions 2352 */ 2353 int getQueuedSubmissionCount() { 2354 WorkQueue[] ws; WorkQueue w; 2355 // VarHandle.acquireFence(); 2356 int count = 0; 2357 if ((ws = workQueues) !is null) { 2358 for (int i = 0; i < cast(int)ws.length; i += 2) { 2359 if ((w = ws[i]) !is null) 2360 count += w.queueSize(); 2361 } 2362 } 2363 return count; 2364 } 2365 2366 /** 2367 * Returns {@code true} if there are any tasks submitted to this 2368 * pool that have not yet begun executing. 2369 * 2370 * @return {@code true} if there are any queued submissions 2371 */ 2372 bool hasQueuedSubmissions() { 2373 WorkQueue[] ws; WorkQueue w; 2374 // VarHandle.acquireFence(); 2375 if ((ws = workQueues) !is null) { 2376 for (int i = 0; i < cast(int)ws.length; i += 2) { 2377 if ((w = ws[i]) !is null && !w.isEmpty()) 2378 return true; 2379 } 2380 } 2381 return false; 2382 } 2383 2384 /** 2385 * Removes and returns the next unexecuted submission if one is 2386 * available. This method may be useful in extensions to this 2387 * class that re-assign work in systems with multiple pools. 2388 * 2389 * @return the next submission, or {@code null} if none 2390 */ 2391 protected IForkJoinTask pollSubmission() { 2392 return pollScan(true); 2393 } 2394 2395 /** 2396 * Removes all available unexecuted submitted and forked tasks 2397 * from scheduling queues and adds them to the given collection, 2398 * without altering their execution status. These may include 2399 * artificially generated or wrapped tasks. This method is 2400 * designed to be invoked only when the pool is known to be 2401 * quiescent. Invocations at other times may not remove all 2402 * tasks. A failure encountered while attempting to add elements 2403 * to collection {@code c} may result in elements being in 2404 * neither, either or both collections when the associated 2405 * exception is thrown. The behavior of this operation is 2406 * undefined if the specified collection is modified while the 2407 * operation is in progress. 2408 * 2409 * @param c the collection to transfer elements into 2410 * @return the number of elements transferred 2411 */ 2412 protected int drainTasksTo(Collection!IForkJoinTask c) { 2413 WorkQueue[] ws; WorkQueue w; IForkJoinTask t; 2414 // VarHandle.acquireFence(); 2415 int count = 0; 2416 if ((ws = workQueues) !is null) { 2417 for (int i = 0; i < ws.length; ++i) { 2418 if ((w = ws[i]) !is null) { 2419 while ((t = w.poll()) !is null) { 2420 c.add(t); 2421 ++count; 2422 } 2423 } 2424 } 2425 } 2426 return count; 2427 } 2428 2429 /** 2430 * Returns a string identifying this pool, as well as its state, 2431 * including indications of run state, parallelism level, and 2432 * worker and task counts. 2433 * 2434 * @return a string identifying this pool, as well as its state 2435 */ 2436 override string toString() { 2437 // Use a single pass through workQueues to collect counts 2438 int md = mode; // read fields first 2439 long c = ctl; 2440 long st = stealCount; 2441 long qt = 0L, qs = 0L; int rc = 0; 2442 WorkQueue[] ws; WorkQueue w; 2443 if ((ws = workQueues) !is null) { 2444 for (int i = 0; i < ws.length; ++i) { 2445 if ((w = ws[i]) !is null) { 2446 int size = w.queueSize(); 2447 if ((i & 1) == 0) 2448 qs += size; 2449 else { 2450 qt += size; 2451 st += cast(long)w.nsteals & 0xffffffffL; 2452 if (w.isApparentlyUnblocked()) 2453 ++rc; 2454 } 2455 } 2456 } 2457 } 2458 2459 int pc = (md & SMASK); 2460 int tc = pc + cast(short)(c >>> TC_SHIFT); 2461 int ac = pc + cast(int)(c >> RC_SHIFT); 2462 if (ac < 0) // ignore negative 2463 ac = 0; 2464 string level = ((md & TERMINATED) != 0 ? "Terminated" : 2465 (md & STOP) != 0 ? "Terminating" : 2466 (md & SHUTDOWN) != 0 ? "Shutting down" : 2467 "Running"); 2468 return super.toString() ~ 2469 "[" ~ level ~ 2470 ", parallelism = " ~ pc.to!string() ~ 2471 ", size = " ~ tc.to!string() ~ 2472 ", active = " ~ ac.to!string() ~ 2473 ", running = " ~ rc.to!string() ~ 2474 ", steals = " ~ st.to!string() ~ 2475 ", tasks = " ~ qt.to!string() ~ 2476 ", submissions = " ~ qs.to!string() ~ 2477 "]"; 2478 } 2479 2480 /** 2481 * Possibly initiates an orderly shutdown in which previously 2482 * submitted tasks are executed, but no new tasks will be 2483 * accepted. Invocation has no effect on execution state if this 2484 * is the {@link #commonPool()}, and no additional effect if 2485 * already shut down. Tasks that are in the process of being 2486 * submitted concurrently during the course of this method may or 2487 * may not be rejected. 2488 * 2489 * @throws SecurityException if a security manager exists and 2490 * the caller is not permitted to modify threads 2491 * because it does not hold {@link 2492 * java.lang.RuntimePermission}{@code ("modifyThread")} 2493 */ 2494 void shutdown() { 2495 // checkPermission(); 2496 tryTerminate(false, true); 2497 } 2498 2499 /** 2500 * Possibly attempts to cancel and/or stop all tasks, and reject 2501 * all subsequently submitted tasks. Invocation has no effect on 2502 * execution state if this is the {@link #commonPool()}, and no 2503 * additional effect if already shut down. Otherwise, tasks that 2504 * are in the process of being submitted or executed concurrently 2505 * during the course of this method may or may not be 2506 * rejected. This method cancels both existing and unexecuted 2507 * tasks, in order to permit termination in the presence of task 2508 * dependencies. So the method always returns an empty list 2509 * (unlike the case for some other Executors). 2510 * 2511 * @return an empty list 2512 * @throws SecurityException if a security manager exists and 2513 * the caller is not permitted to modify threads 2514 * because it does not hold {@link 2515 * java.lang.RuntimePermission}{@code ("modifyThread")} 2516 */ 2517 List!(Runnable) shutdownNow() { 2518 // checkPermission(); 2519 tryTerminate(true, true); 2520 return Collections.emptyList!(Runnable)(); 2521 } 2522 2523 /** 2524 * Returns {@code true} if all tasks have completed following shut down. 2525 * 2526 * @return {@code true} if all tasks have completed following shut down 2527 */ 2528 bool isTerminated() { 2529 return (mode & TERMINATED) != 0; 2530 } 2531 2532 /** 2533 * Returns {@code true} if the process of termination has 2534 * commenced but not yet completed. This method may be useful for 2535 * debugging. A return of {@code true} reported a sufficient 2536 * period after shutdown may indicate that submitted tasks have 2537 * ignored or suppressed interruption, or are waiting for I/O, 2538 * causing this executor not to properly terminate. (See the 2539 * advisory notes for class {@link ForkJoinTask} stating that 2540 * tasks should not normally entail blocking operations. But if 2541 * they do, they must abort them on interrupt.) 2542 * 2543 * @return {@code true} if terminating but not yet terminated 2544 */ 2545 bool isTerminating() { 2546 int md = mode; 2547 return (md & STOP) != 0 && (md & TERMINATED) == 0; 2548 } 2549 2550 /** 2551 * Returns {@code true} if this pool has been shut down. 2552 * 2553 * @return {@code true} if this pool has been shut down 2554 */ 2555 bool isShutdown() { 2556 return (mode & SHUTDOWN) != 0; 2557 } 2558 2559 /** 2560 * Blocks until all tasks have completed execution after a 2561 * shutdown request, or the timeout occurs, or the current thread 2562 * is interrupted, whichever happens first. Because the {@link 2563 * #commonPool()} never terminates until program shutdown, when 2564 * applied to the common pool, this method is equivalent to {@link 2565 * #awaitQuiescence(long, TimeUnit)} but always returns {@code false}. 2566 * 2567 * @param timeout the maximum time to wait 2568 * @param unit the time unit of the timeout argument 2569 * @return {@code true} if this executor terminated and 2570 * {@code false} if the timeout elapsed before termination 2571 * @throws InterruptedException if interrupted while waiting 2572 */ 2573 bool awaitTermination(Duration timeout) 2574 { 2575 if (ThreadEx.interrupted()) 2576 throw new InterruptedException(); 2577 if (this == common) { 2578 awaitQuiescence(timeout); 2579 return false; 2580 } 2581 long nanos = timeout.total!(TimeUnit.HectoNanosecond); 2582 if (isTerminated()) 2583 return true; 2584 if (nanos <= 0L) 2585 return false; 2586 long deadline = Clock.currStdTime + nanos; 2587 synchronized (this) { 2588 for (;;) { 2589 if (isTerminated()) 2590 return true; 2591 if (nanos <= 0L) 2592 return false; 2593 // long millis = TimeUnit.NANOSECONDS.toMillis(nanos); 2594 // wait(millis > 0L ? millis : 1L); 2595 // ThreadEx.currentThread().par 2596 ThreadEx.sleep(dur!(TimeUnit.HectoNanosecond)(nanos)); 2597 nanos = deadline - Clock.currStdTime; 2598 } 2599 } 2600 } 2601 2602 /** 2603 * If called by a ForkJoinTask operating in this pool, equivalent 2604 * in effect to {@link ForkJoinTask#helpQuiesce}. Otherwise, 2605 * waits and/or attempts to assist performing tasks until this 2606 * pool {@link #isQuiescent} or the indicated timeout elapses. 2607 * 2608 * @param timeout the maximum time to wait 2609 * @param unit the time unit of the timeout argument 2610 * @return {@code true} if quiescent; {@code false} if the 2611 * timeout elapsed. 2612 */ 2613 bool awaitQuiescence(Duration timeout) { 2614 long nanos = timeout.total!(TimeUnit.HectoNanosecond)(); 2615 Thread thread = Thread.getThis(); 2616 ForkJoinWorkerThread wt = cast(ForkJoinWorkerThread)thread; 2617 if (wt !is null && wt.pool is this) { 2618 helpQuiescePool(wt.workQueue); 2619 return true; 2620 } 2621 else { 2622 for (long startTime = Clock.currStdTime;;) { 2623 IForkJoinTask t; 2624 if ((t = pollScan(false)) !is null) 2625 t.doExec(); 2626 else if (isQuiescent()) 2627 return true; 2628 else if ((Clock.currStdTime - startTime) > nanos) 2629 return false; 2630 else 2631 Thread.yield(); // cannot block 2632 } 2633 } 2634 } 2635 2636 /** 2637 * Waits and/or attempts to assist performing tasks indefinitely 2638 * until the {@link #commonPool()} {@link #isQuiescent}. 2639 */ 2640 static void quiesceCommonPool() { 2641 common.awaitQuiescence(Duration.max); 2642 } 2643 2644 /** 2645 * Runs the given possibly blocking task. When {@linkplain 2646 * ForkJoinTask#inForkJoinPool() running in a ForkJoinPool}, this 2647 * method possibly arranges for a spare thread to be activated if 2648 * necessary to ensure sufficient parallelism while the current 2649 * thread is blocked in {@link ManagedBlocker#block blocker.block()}. 2650 * 2651 * <p>This method repeatedly calls {@code blocker.isReleasable()} and 2652 * {@code blocker.block()} until either method returns {@code true}. 2653 * Every call to {@code blocker.block()} is preceded by a call to 2654 * {@code blocker.isReleasable()} that returned {@code false}. 2655 * 2656 * <p>If not running in a ForkJoinPool, this method is 2657 * behaviorally equivalent to 2658 * <pre> {@code 2659 * while (!blocker.isReleasable()) 2660 * if (blocker.block()) 2661 * break;}</pre> 2662 * 2663 * If running in a ForkJoinPool, the pool may first be expanded to 2664 * ensure sufficient parallelism available during the call to 2665 * {@code blocker.block()}. 2666 * 2667 * @param blocker the blocker task 2668 * @throws InterruptedException if {@code blocker.block()} did so 2669 */ 2670 static void managedBlock(ManagedBlocker blocker) { 2671 if (blocker is null) throw new NullPointerException(); 2672 ForkJoinPool p; 2673 WorkQueue w; 2674 Thread t = Thread.getThis(); 2675 ForkJoinWorkerThread wt = cast(ForkJoinWorkerThread)t; 2676 if (wt !is null && (p = wt.pool) !is null && 2677 (w = wt.workQueue) !is null) { 2678 int block; 2679 while (!blocker.isReleasable()) { 2680 if ((block = p.tryCompensate(w)) != 0) { 2681 try { 2682 do {} while (!blocker.isReleasable() && 2683 !blocker.block()); 2684 } finally { 2685 AtomicHelper.getAndAdd(p.ctl, (block > 0) ? RC_UNIT : 0L); 2686 } 2687 break; 2688 } 2689 } 2690 } 2691 else { 2692 do {} while (!blocker.isReleasable() && 2693 !blocker.block()); 2694 } 2695 } 2696 2697 /** 2698 * If the given executor is a ForkJoinPool, poll and execute 2699 * AsynchronousCompletionTasks from worker's queue until none are 2700 * available or blocker is released. 2701 */ 2702 static void helpAsyncBlocker(Executor e, ManagedBlocker blocker) { 2703 ForkJoinPool p = cast(ForkJoinPool)e; 2704 if (p !is null) { 2705 WorkQueue w; WorkQueue[] ws; int r, n; 2706 2707 Thread thread = Thread.getThis(); 2708 ForkJoinWorkerThread wt = cast(ForkJoinWorkerThread)thread; 2709 if (wt !is null && wt.pool is p) 2710 w = wt.workQueue; 2711 else if ((r = ThreadLocalRandom.getProbe()) != 0 && 2712 (ws = p.workQueues) !is null && (n = cast(int)ws.length) > 0) 2713 w = ws[(n - 1) & r & SQMASK]; 2714 else 2715 w = null; 2716 if (w !is null) 2717 w.helpAsyncBlocker(blocker); 2718 } 2719 } 2720 2721 // AbstractExecutorService overrides. These rely on undocumented 2722 // fact that ForkJoinTask.adapt returns ForkJoinTasks that also 2723 // implement RunnableFuture. 2724 2725 protected RunnableFuture!(T) newTaskFor(T)(Runnable runnable, T value) { 2726 return new AdaptedRunnable!(T)(runnable, value); 2727 } 2728 2729 protected RunnableFuture!(T) newTaskFor(T)(Callable!(T) callable) { 2730 return new AdaptedCallable!(T)(callable); 2731 } 2732 2733 shared static this() { 2734 int commonMaxSpares = DEFAULT_COMMON_MAX_SPARES; 2735 try { 2736 ConfigBuilder config = Environment.getProperties(); 2737 string p = config.getProperty 2738 ("hunt.concurrency.ForkJoinPool.common.maximumSpares"); 2739 if (!p.empty()) 2740 commonMaxSpares = p.to!int(); 2741 } catch (Exception ignore) { 2742 version(HUNT_DEBUG) warning(ignore.toString()); 2743 } 2744 COMMON_MAX_SPARES = commonMaxSpares; 2745 2746 defaultForkJoinWorkerThreadFactory = 2747 new DefaultForkJoinWorkerThreadFactory(); 2748 common = new ForkJoinPool(cast(byte)0); 2749 COMMON_PARALLELISM = max(common.mode & SMASK, 1); 2750 } 2751 } 2752 2753 2754 /** 2755 * Factory for innocuous worker threads. 2756 */ 2757 private final class InnocuousForkJoinWorkerThreadFactory 2758 : ForkJoinWorkerThreadFactory { 2759 2760 /** 2761 * An ACC to restrict permissions for the factory itself. 2762 * The constructed workers have no permissions set. 2763 */ 2764 // private static final AccessControlContext ACC = contextWithPermissions( 2765 // modifyThreadPermission, 2766 // new RuntimePermission("enableContextClassLoaderOverride"), 2767 // new RuntimePermission("modifyThreadGroup"), 2768 // new RuntimePermission("getClassLoader"), 2769 // new RuntimePermission("setContextClassLoader")); 2770 2771 final ForkJoinWorkerThread newThread(ForkJoinPool pool) { 2772 return new InnocuousForkJoinWorkerThread(pool); 2773 // return AccessController.doPrivileged( 2774 // new PrivilegedAction<>() { 2775 // ForkJoinWorkerThread run() { 2776 // return new ForkJoinWorkerThread. 2777 // InnocuousForkJoinWorkerThread(pool); }}, 2778 // ACC); 2779 } 2780 } 2781 2782 2783 /** 2784 * Factory for creating new {@link ForkJoinWorkerThread}s. 2785 * A {@code ForkJoinWorkerThreadFactory} must be defined and used 2786 * for {@code ForkJoinWorkerThread} subclasses that extend base 2787 * functionality or initialize threads with different contexts. 2788 */ 2789 interface ForkJoinWorkerThreadFactory { 2790 /** 2791 * Returns a new worker thread operating in the given pool. 2792 * Returning null or throwing an exception may result in tasks 2793 * never being executed. If this method throws an exception, 2794 * it is relayed to the caller of the method (for example 2795 * {@code execute}) causing attempted thread creation. If this 2796 * method returns null or throws an exception, it is not 2797 * retried until the next attempted creation (for example 2798 * another call to {@code execute}). 2799 * 2800 * @param pool the pool this thread works in 2801 * @return the new worker thread, or {@code null} if the request 2802 * to create a thread is rejected 2803 * @throws NullPointerException if the pool is null 2804 */ 2805 ForkJoinWorkerThread newThread(ForkJoinPool pool); 2806 } 2807 2808 // Nested classes 2809 2810 // static AccessControlContext contextWithPermissions(Permission ... perms) { 2811 // Permissions permissions = new Permissions(); 2812 // for (Permission perm : perms) 2813 // permissions.add(perm); 2814 // return new AccessControlContext( 2815 // new ProtectionDomain[] { new ProtectionDomain(null, permissions) }); 2816 // } 2817 2818 /** 2819 * Default ForkJoinWorkerThreadFactory implementation; creates a 2820 * new ForkJoinWorkerThread using the system class loader as the 2821 * thread context class loader. 2822 */ 2823 private final class DefaultForkJoinWorkerThreadFactory : ForkJoinWorkerThreadFactory { 2824 // private static final AccessControlContext ACC = contextWithPermissions( 2825 // new RuntimePermission("getClassLoader"), 2826 // new RuntimePermission("setContextClassLoader")); 2827 2828 final ForkJoinWorkerThread newThread(ForkJoinPool pool) { 2829 return new ForkJoinWorkerThread(pool); 2830 // return AccessController.doPrivileged( 2831 // new PrivilegedAction<>() { 2832 // ForkJoinWorkerThread run() { 2833 // return new ForkJoinWorkerThread( 2834 // pool, ClassLoader.getSystemClassLoader()); }}, 2835 // ACC); 2836 } 2837 } 2838 2839 2840 /** 2841 * Interface for extending managed parallelism for tasks running 2842 * in {@link ForkJoinPool}s. 2843 * 2844 * <p>A {@code ManagedBlocker} provides two methods. Method 2845 * {@link #isReleasable} must return {@code true} if blocking is 2846 * not necessary. Method {@link #block} blocks the current thread 2847 * if necessary (perhaps internally invoking {@code isReleasable} 2848 * before actually blocking). These actions are performed by any 2849 * thread invoking {@link ForkJoinPool#managedBlock(ManagedBlocker)}. 2850 * The unusual methods in this API accommodate synchronizers that 2851 * may, but don't usually, block for long periods. Similarly, they 2852 * allow more efficient internal handling of cases in which 2853 * additional workers may be, but usually are not, needed to 2854 * ensure sufficient parallelism. Toward this end, 2855 * implementations of method {@code isReleasable} must be amenable 2856 * to repeated invocation. 2857 * 2858 * <p>For example, here is a ManagedBlocker based on a 2859 * ReentrantLock: 2860 * <pre> {@code 2861 * class ManagedLocker implements ManagedBlocker { 2862 * final ReentrantLock lock; 2863 * bool hasLock = false; 2864 * ManagedLocker(ReentrantLock lock) { this.lock = lock; } 2865 * bool block() { 2866 * if (!hasLock) 2867 * lock.lock(); 2868 * return true; 2869 * } 2870 * bool isReleasable() { 2871 * return hasLock || (hasLock = lock.tryLock()); 2872 * } 2873 * }}</pre> 2874 * 2875 * <p>Here is a class that possibly blocks waiting for an 2876 * item on a given queue: 2877 * <pre> {@code 2878 * class QueueTaker!(E) : ManagedBlocker { 2879 * final BlockingQueue!(E) queue; 2880 * E item = null; 2881 * QueueTaker(BlockingQueue!(E) q) { this.queue = q; } 2882 * bool block() { 2883 * if (item is null) 2884 * item = queue.take(); 2885 * return true; 2886 * } 2887 * bool isReleasable() { 2888 * return item !is null || (item = queue.poll()) !is null; 2889 * } 2890 * E getItem() { // call after pool.managedBlock completes 2891 * return item; 2892 * } 2893 * }}</pre> 2894 */ 2895 static interface ManagedBlocker { 2896 /** 2897 * Possibly blocks the current thread, for example waiting for 2898 * a lock or condition. 2899 * 2900 * @return {@code true} if no additional blocking is necessary 2901 * (i.e., if isReleasable would return true) 2902 * @throws InterruptedException if interrupted while waiting 2903 * (the method is not required to do so, but is allowed to) 2904 */ 2905 bool block(); 2906 2907 /** 2908 * Returns {@code true} if blocking is unnecessary. 2909 * @return {@code true} if blocking is unnecessary 2910 */ 2911 bool isReleasable(); 2912 } 2913 2914 2915 2916 /** 2917 * A thread managed by a {@link ForkJoinPool}, which executes 2918 * {@link ForkJoinTask}s. 2919 * This class is subclassable solely for the sake of adding 2920 * functionality -- there are no overridable methods dealing with 2921 * scheduling or execution. However, you can override initialization 2922 * and termination methods surrounding the main task processing loop. 2923 * If you do create such a subclass, you will also need to supply a 2924 * custom {@link ForkJoinPool.ForkJoinWorkerThreadFactory} to 2925 * {@linkplain ForkJoinPool#ForkJoinPool(int, ForkJoinWorkerThreadFactory, 2926 * UncaughtExceptionHandler, bool, int, int, int, Predicate, long, TimeUnit) 2927 * use it} in a {@code ForkJoinPool}. 2928 * 2929 * @author Doug Lea 2930 */ 2931 class ForkJoinWorkerThread : ThreadEx { 2932 /* 2933 * ForkJoinWorkerThreads are managed by ForkJoinPools and perform 2934 * ForkJoinTasks. For explanation, see the internal documentation 2935 * of class ForkJoinPool. 2936 * 2937 * This class just maintains links to its pool and WorkQueue. The 2938 * pool field is set immediately upon construction, but the 2939 * workQueue field is not set until a call to registerWorker 2940 * completes. This leads to a visibility race, that is tolerated 2941 * by requiring that the workQueue field is only accessed by the 2942 * owning thread. 2943 * 2944 * Support for (non-public) subclass InnocuousForkJoinWorkerThread 2945 * requires that we break quite a lot of encapsulation (via helper 2946 * methods in ThreadLocalRandom) both here and in the subclass to 2947 * access and set Thread fields. 2948 */ 2949 2950 ForkJoinPool pool; // the pool this thread works in 2951 WorkQueue workQueue; // work-stealing mechanics 2952 2953 /** 2954 * Creates a ForkJoinWorkerThread operating in the given pool. 2955 * 2956 * @param pool the pool this thread works in 2957 * @throws NullPointerException if pool is null 2958 */ 2959 protected this(ForkJoinPool pool) { 2960 // Use a placeholder until a useful name can be set in registerWorker 2961 super("aForkJoinWorkerThread"); 2962 this.pool = pool; 2963 this.workQueue = pool.registerWorker(this); 2964 } 2965 2966 /** 2967 * Version for use by the default pool. Supports setting the 2968 * context class loader. This is a separate constructor to avoid 2969 * affecting the protected constructor. 2970 */ 2971 // this(ForkJoinPool pool, ClassLoader ccl) { 2972 // super("aForkJoinWorkerThread"); 2973 // super.setContextClassLoader(ccl); 2974 // this.pool = pool; 2975 // this.workQueue = pool.registerWorker(this); 2976 // } 2977 2978 /** 2979 * Version for InnocuousForkJoinWorkerThread. 2980 */ 2981 this(ForkJoinPool pool, 2982 // ClassLoader ccl, 2983 ThreadGroupEx threadGroup, 2984 ) { // AccessControlContext acc 2985 super(threadGroup, null, "aForkJoinWorkerThread"); 2986 // super.setContextClassLoader(ccl); 2987 // ThreadLocalRandom.setInheritedAccessControlContext(this, acc); 2988 // ThreadLocalRandom.eraseThreadLocals(this); // clear before registering 2989 this.pool = pool; 2990 this.workQueue = pool.registerWorker(this); 2991 } 2992 2993 /** 2994 * Returns the pool hosting this thread. 2995 * 2996 * @return the pool 2997 */ 2998 ForkJoinPool getPool() { 2999 return pool; 3000 } 3001 3002 /** 3003 * Returns the unique index number of this thread in its pool. 3004 * The returned value ranges from zero to the maximum number of 3005 * threads (minus one) that may exist in the pool, and does not 3006 * change during the lifetime of the thread. This method may be 3007 * useful for applications that track status or collect results 3008 * per-worker-thread rather than per-task. 3009 * 3010 * @return the index number 3011 */ 3012 int getPoolIndex() { 3013 return workQueue.getPoolIndex(); 3014 } 3015 3016 /** 3017 * Initializes internal state after construction but before 3018 * processing any tasks. If you override this method, you must 3019 * invoke {@code super.onStart()} at the beginning of the method. 3020 * Initialization requires care: Most fields must have legal 3021 * default values, to ensure that attempted accesses from other 3022 * threads work correctly even before this thread starts 3023 * processing tasks. 3024 */ 3025 protected void onStart() { 3026 } 3027 3028 /** 3029 * Performs cleanup associated with termination of this worker 3030 * thread. If you override this method, you must invoke 3031 * {@code super.onTermination} at the end of the overridden method. 3032 * 3033 * @param exception the exception causing this thread to abort due 3034 * to an unrecoverable error, or {@code null} if completed normally 3035 */ 3036 protected void onTermination(Throwable exception) { 3037 } 3038 3039 /** 3040 * This method is required to be public, but should never be 3041 * called explicitly. It performs the main run loop to execute 3042 * {@link ForkJoinTask}s. 3043 */ 3044 override void run() { 3045 if (workQueue.array !is null) 3046 return; 3047 // only run once 3048 Throwable exception = null; 3049 3050 try { 3051 onStart(); 3052 pool.runWorker(workQueue); 3053 } catch (Throwable ex) { 3054 version(HUNT_DEBUG) { 3055 warning(ex); 3056 } else { 3057 warning(ex.msg); 3058 } 3059 exception = ex; 3060 } 3061 3062 try { 3063 onTermination(exception); 3064 } catch(Throwable ex) { 3065 version(HUNT_DEBUG) { 3066 warning(ex); 3067 } else { 3068 warning(ex.msg); 3069 } 3070 if (exception is null) 3071 exception = ex; 3072 } 3073 pool.deregisterWorker(this, exception); 3074 } 3075 3076 /** 3077 * Non-hook method for InnocuousForkJoinWorkerThread. 3078 */ 3079 void afterTopLevelExec() { 3080 } 3081 3082 3083 int awaitJoin(IForkJoinTask task) { 3084 int s = task.getStatus(); 3085 WorkQueue w = workQueue; 3086 if(w.tryUnpush(task) && (s = task.doExec()) < 0 ) 3087 return s; 3088 else 3089 return pool.awaitJoin(w, task, MonoTime.zero()); 3090 } 3091 3092 /** 3093 * If the current thread is operating in a ForkJoinPool, 3094 * unschedules and returns, without executing, a task externally 3095 * submitted to the pool, if one is available. Availability may be 3096 * transient, so a {@code null} result does not necessarily imply 3097 * quiescence of the pool. This method is designed primarily to 3098 * support extensions, and is unlikely to be useful otherwise. 3099 * 3100 * @return a task, or {@code null} if none are available 3101 */ 3102 protected static IForkJoinTask pollSubmission() { 3103 ForkJoinWorkerThread t = cast(ForkJoinWorkerThread)Thread.getThis(); 3104 return t !is null ? t.pool.pollSubmission() : null; 3105 } 3106 } 3107 3108 3109 /** 3110 * A worker thread that has no permissions, is not a member of any 3111 * user-defined ThreadGroupEx, uses the system class loader as 3112 * thread context class loader, and erases all ThreadLocals after 3113 * running each top-level task. 3114 */ 3115 final class InnocuousForkJoinWorkerThread : ForkJoinWorkerThread { 3116 /** The ThreadGroupEx for all InnocuousForkJoinWorkerThreads */ 3117 private __gshared ThreadGroupEx innocuousThreadGroup; 3118 // AccessController.doPrivileged(new PrivilegedAction<>() { 3119 // ThreadGroupEx run() { 3120 // ThreadGroupEx group = Thread.getThis().getThreadGroup(); 3121 // for (ThreadGroupEx p; (p = group.getParent()) !is null; ) 3122 // group = p; 3123 // return new ThreadGroupEx( 3124 // group, "InnocuousForkJoinWorkerThreadGroup"); 3125 // }}); 3126 3127 /** An AccessControlContext supporting no privileges */ 3128 // private static final AccessControlContext INNOCUOUS_ACC = 3129 // new AccessControlContext( 3130 // new ProtectionDomain[] { new ProtectionDomain(null, null) }); 3131 3132 shared static this() { 3133 // ThreadGroupEx group = Thread.getThis().getThreadGroup(); 3134 // for (ThreadGroupEx p; (p = group.getParent()) !is null; ) 3135 // group = p; 3136 innocuousThreadGroup = new ThreadGroupEx( 3137 null, "InnocuousForkJoinWorkerThreadGroup"); 3138 } 3139 3140 this(ForkJoinPool pool) { 3141 super(pool, 3142 // ClassLoader.getSystemClassLoader(), 3143 innocuousThreadGroup, 3144 // INNOCUOUS_ACC 3145 ); 3146 } 3147 3148 override // to erase ThreadLocals 3149 void afterTopLevelExec() { 3150 // ThreadLocalRandom.eraseThreadLocals(this); 3151 } 3152 3153 override // to silently fail 3154 void setUncaughtExceptionHandler(UncaughtExceptionHandler x) { } 3155 3156 // override // paranoically 3157 // void setContextClassLoader(ClassLoader cl) { 3158 // throw new SecurityException("setContextClassLoader"); 3159 // } 3160 } 3161 3162 3163 /** 3164 * Queues supporting work-stealing as well as external task 3165 * submission. See above for descriptions and algorithms. 3166 */ 3167 final class WorkQueue { 3168 int source; // source queue id, or sentinel 3169 int id; // pool index, mode, tag 3170 shared int base; // index of next slot for poll 3171 shared int top; // index of next slot for push 3172 shared int phase; // versioned, negative: queued, 1: locked 3173 int stackPred; // pool stack (ctl) predecessor link 3174 int nsteals; // number of steals 3175 IForkJoinTask[] array; // the queued tasks; power of 2 size 3176 ForkJoinPool pool; // the containing pool (may be null) 3177 ForkJoinWorkerThread owner; // owning thread or null if shared 3178 3179 this(ForkJoinPool pool, ForkJoinWorkerThread owner) { 3180 this.pool = pool; 3181 this.owner = owner; 3182 // Place indices in the center of array (that is not yet allocated) 3183 base = top = INITIAL_QUEUE_CAPACITY >>> 1; 3184 } 3185 3186 /** 3187 * Tries to lock shared queue by CASing phase field. 3188 */ 3189 final bool tryLockPhase() { 3190 return cas(&this.phase, 0, 1); 3191 // return PHASE.compareAndSet(this, 0, 1); 3192 } 3193 3194 final void releasePhaseLock() { 3195 // PHASE.setRelease(this, 0); 3196 atomicStore(this.phase, 0); 3197 } 3198 3199 /** 3200 * Returns an exportable index (used by ForkJoinWorkerThread). 3201 */ 3202 final int getPoolIndex() { 3203 return (id & 0xffff) >>> 1; // ignore odd/even tag bit 3204 } 3205 3206 /** 3207 * Returns the approximate number of tasks in the queue. 3208 */ 3209 final int queueSize() { 3210 // int n = cast(int)BASE.getAcquire(this) - top; 3211 int n = atomicLoad(this.base) - top; 3212 return (n >= 0) ? 0 : -n; // ignore negative 3213 } 3214 3215 /** 3216 * Provides a more accurate estimate of whether this queue has 3217 * any tasks than does queueSize, by checking whether a 3218 * near-empty queue has at least one unclaimed task. 3219 */ 3220 final bool isEmpty() { 3221 IForkJoinTask[] a; int n, cap, b; 3222 // VarHandle.acquireFence(); // needed by external callers 3223 return ((n = (b = base) - top) >= 0 || // possibly one task 3224 (n == -1 && ((a = array) is null || 3225 (cap = cast(int)a.length) == 0 || 3226 a[(cap - 1) & b] is null))); 3227 } 3228 3229 /** 3230 * Pushes a task. Call only by owner in unshared queues. 3231 * 3232 * @param task the task. Caller must ensure non-null. 3233 * @throws RejectedExecutionException if array cannot be resized 3234 */ 3235 final void push(IForkJoinTask task) { 3236 IForkJoinTask[] a; 3237 int s = top, d, cap, m; 3238 ForkJoinPool p = pool; 3239 if ((a = array) !is null && (cap = cast(int)a.length) > 0) { 3240 m = cap - 1; 3241 // FIXME: Needing refactor or cleanup -@zxp at 2019/2/9 8:40:55 3242 // 3243 // AtomicHelper.store(a[m & s], task); 3244 a[m & s] = task; 3245 // QA.setRelease(a, (m = cap - 1) & s, task); 3246 top = s + 1; 3247 if (((d = s - atomicLoad(this.base)) & ~1) == 0 && 3248 p !is null) { // size 0 or 1 3249 // VarHandle.fullFence(); 3250 p.signalWork(); 3251 } 3252 else if (d == m) 3253 growArray(false); 3254 } 3255 } 3256 3257 /** 3258 * Version of push for shared queues. Call only with phase lock held. 3259 * @return true if should signal work 3260 */ 3261 final bool lockedPush(IForkJoinTask task) { 3262 IForkJoinTask[] a; 3263 bool signal = false; 3264 int s = top, b = base, cap, d; 3265 if ((a = array) !is null && (cap = cast(int)a.length) > 0) { 3266 a[(cap - 1) & s] = task; 3267 top = s + 1; 3268 if (b - s + cap - 1 == 0) 3269 growArray(true); 3270 else { 3271 phase = 0; // full unlock 3272 if (((s - base) & ~1) == 0) // size 0 or 1 3273 signal = true; 3274 } 3275 } 3276 return signal; 3277 } 3278 3279 /** 3280 * Doubles the capacity of array. Call either by owner or with 3281 * lock held -- it is OK for base, but not top, to move while 3282 * resizings are in progress. 3283 */ 3284 final void growArray(bool locked) { 3285 IForkJoinTask[] newA = null; 3286 try { 3287 IForkJoinTask[] oldA; int oldSize, newSize; 3288 if ((oldA = array) !is null && (oldSize = cast(int)oldA.length) > 0 && 3289 (newSize = oldSize << 1) <= MAXIMUM_QUEUE_CAPACITY && 3290 newSize > 0) { 3291 try { 3292 newA = new IForkJoinTask[newSize]; 3293 } catch (OutOfMemoryError ex) { 3294 } 3295 if (newA !is null) { // poll from old array, push to new 3296 int oldMask = oldSize - 1, newMask = newSize - 1; 3297 for (int s = top - 1, k = oldMask; k >= 0; --k) { 3298 // IForkJoinTask x = AtomicHelper.getAndSet(oldA[s & oldMask], null); 3299 // FIXME: Needing refactor or cleanup -@zxp at 2019/2/9 下午8:57:26 3300 // 3301 IForkJoinTask x = oldA[s & oldMask]; 3302 oldA[s & oldMask] = null; 3303 3304 if (x !is null) 3305 newA[s-- & newMask] = x; 3306 else 3307 break; 3308 } 3309 array = newA; 3310 // VarHandle.releaseFence(); 3311 } 3312 } 3313 } finally { 3314 if (locked) 3315 phase = 0; 3316 } 3317 if (newA is null) 3318 throw new RejectedExecutionException("Queue capacity exceeded"); 3319 } 3320 3321 /** 3322 * Takes next task, if one exists, in FIFO order. 3323 */ 3324 final IForkJoinTask poll() { 3325 int b, k, cap; 3326 IForkJoinTask[] a; 3327 while ((a = array) !is null && (cap = cast(int)a.length) > 0 && 3328 top - (b = base) > 0) { 3329 k = (cap - 1) & b; 3330 // FIXME: Needing refactor or cleanup -@zxp at 2019/2/9 8:42:05 3331 // 3332 3333 // IForkJoinTask t = AtomicHelper.load(a[k]); 3334 IForkJoinTask t = a[k]; 3335 if (base == b++) { 3336 if (t is null) 3337 Thread.yield(); // await index advance 3338 else if (AtomicHelper.compareAndSet(a[k], t, null)) { 3339 // else if (QA.compareAndSet(a, k, t, null)) { 3340 AtomicHelper.store(this.base, b); 3341 return t; 3342 } 3343 } 3344 } 3345 return null; 3346 } 3347 3348 /** 3349 * Takes next task, if one exists, in order specified by mode. 3350 */ 3351 final IForkJoinTask nextLocalTask() { 3352 IForkJoinTask t = null; 3353 int md = id, b, s, d, cap; IForkJoinTask[] a; 3354 if ((a = array) !is null && (cap = cast(int)a.length) > 0 && 3355 (d = (s = top) - (b = base)) > 0) { 3356 if ((md & FIFO) == 0 || d == 1) { 3357 auto index = (cap - 1) & --s; 3358 t = AtomicHelper.getAndSet(a[index], cast(IForkJoinTask)null); 3359 if(t !is null) { 3360 AtomicHelper.store(this.top, s); 3361 } 3362 } else { 3363 auto index = (cap - 1) & b++; 3364 t = AtomicHelper.getAndSet(a[index], cast(IForkJoinTask)null); 3365 if (t !is null) { 3366 AtomicHelper.store(this.base, b); 3367 } 3368 else // on contention in FIFO mode, use regular poll 3369 t = poll(); 3370 } 3371 } 3372 return t; 3373 } 3374 3375 /** 3376 * Returns next task, if one exists, in order specified by mode. 3377 */ 3378 final IForkJoinTask peek() { 3379 int cap; IForkJoinTask[] a; 3380 return ((a = array) !is null && (cap = cast(int)a.length) > 0) ? 3381 a[(cap - 1) & ((id & FIFO) != 0 ? base : top - 1)] : null; 3382 } 3383 3384 /** 3385 * Pops the given task only if it is at the current top. 3386 */ 3387 final bool tryUnpush(IForkJoinTask task) { 3388 bool popped = false; 3389 int s, cap; IForkJoinTask[] a; 3390 if ((a = array) !is null && (cap = cast(int)a.length) > 0 && 3391 (s = top) != base) { 3392 popped = AtomicHelper.compareAndSet(a[(cap - 1) & --s], task, null); 3393 if(popped) { 3394 AtomicHelper.store(this.top, s); 3395 } 3396 } 3397 return popped; 3398 } 3399 3400 /** 3401 * Shared version of tryUnpush. 3402 */ 3403 final bool tryLockedUnpush(IForkJoinTask task) { 3404 bool popped = false; 3405 int s = top - 1, k, cap; IForkJoinTask[] a; 3406 if ((a = array) !is null && (cap = cast(int)a.length) > 0 && 3407 a[k = (cap - 1) & s] == task && tryLockPhase()) { 3408 if (top == s + 1 && array == a) { 3409 popped = AtomicHelper.compareAndSet(a[k], task, null); 3410 if(popped) top = s; 3411 } 3412 releasePhaseLock(); 3413 } 3414 return popped; 3415 } 3416 3417 /** 3418 * Removes and cancels all known tasks, ignoring any exceptions. 3419 */ 3420 final void cancelAll() { 3421 for (IForkJoinTask t; (t = poll()) !is null; ) 3422 IForkJoinTask.cancelIgnoringExceptions(t); 3423 } 3424 3425 // Specialized execution methods 3426 3427 /** 3428 * Runs the given (stolen) task if nonnull, as well as 3429 * remaining local tasks and others available from the given 3430 * queue, up to bound n (to avoid infinite unfairness). 3431 */ 3432 final void topLevelExec(IForkJoinTask t, WorkQueue q, int n) { 3433 if (t !is null && q !is null) { // hoist checks 3434 int nstolen = 1; 3435 for (;;) { 3436 t.doExec(); 3437 if (n-- < 0) 3438 break; 3439 else if ((t = nextLocalTask()) is null) { 3440 if ((t = q.poll()) is null) 3441 break; 3442 else 3443 ++nstolen; 3444 } 3445 } 3446 ForkJoinWorkerThread thread = owner; 3447 nsteals += nstolen; 3448 source = 0; 3449 if (thread !is null) 3450 thread.afterTopLevelExec(); 3451 } 3452 } 3453 3454 /** 3455 * If present, removes task from queue and executes it. 3456 */ 3457 final void tryRemoveAndExec(IForkJoinTask task) { 3458 IForkJoinTask[] a; int s, cap; 3459 if ((a = array) !is null && (cap = cast(int)a.length) > 0 && 3460 (s = top) - base > 0) { // traverse from top 3461 for (int m = cap - 1, ns = s - 1, i = ns; ; --i) { 3462 int index = i & m; 3463 IForkJoinTask t = a[index]; //QA.get(a, index); 3464 if (t is null) 3465 break; 3466 else if (t == task) { 3467 // if (AtomicHelper.compareAndSet(a[index], t, null)) 3468 if(a[index] == t) { 3469 a[index] = null; 3470 top = ns; // safely shift down 3471 for (int j = i; j != ns; ++j) { 3472 IForkJoinTask f; 3473 int pindex = (j + 1) & m; 3474 f = a[pindex];// QA.get(a, pindex); 3475 a[pindex] = null; 3476 // AtomicHelper.store(a[pindex], null); 3477 // QA.setVolatile(a, pindex, null); 3478 int jindex = j & m; 3479 a[jindex] = f; 3480 // AtomicHelper.store(a[jindex], f); 3481 // QA.setRelease(a, jindex, f); 3482 } 3483 // VarHandle.releaseFence(); 3484 t.doExec(); 3485 } 3486 break; 3487 } 3488 } 3489 } 3490 } 3491 3492 /** 3493 * Tries to pop and run tasks within the target's computation 3494 * until done, not found, or limit exceeded. 3495 * 3496 * @param task root of CountedCompleter computation 3497 * @param limit max runs, or zero for no limit 3498 * @param shared true if must lock to extract task 3499 * @return task status on exit 3500 */ 3501 final int helpCC(ICountedCompleter task, int limit, bool isShared) { 3502 int status = 0; 3503 if (task !is null && (status = task.getStatus()) >= 0) { 3504 int s, k, cap; IForkJoinTask[] a; 3505 while ((a = array) !is null && (cap = cast(int)a.length) > 0 && 3506 (s = top) - base > 0) { 3507 ICountedCompleter v = null; 3508 IForkJoinTask o = a[k = (cap - 1) & (s - 1)]; 3509 ICountedCompleter t = cast(ICountedCompleter)o; 3510 if (t !is null) { 3511 for (ICountedCompleter f = t;;) { 3512 if (f != task) { 3513 if ((f = f.getCompleter()) is null) 3514 break; 3515 } 3516 else if (isShared) { 3517 if (tryLockPhase()) { 3518 if (top == s && array == a && 3519 AtomicHelper.compareAndSet(a[k], t, cast(IForkJoinTask)null)) { 3520 top = s - 1; 3521 v = t; 3522 } 3523 releasePhaseLock(); 3524 } 3525 break; 3526 } 3527 else { 3528 if (AtomicHelper.compareAndSet(a[k], t, cast(IForkJoinTask)null)) { 3529 top = s - 1; 3530 v = t; 3531 } 3532 break; 3533 } 3534 } 3535 } 3536 if (v !is null) 3537 v.doExec(); 3538 if ((status = task.getStatus()) < 0 || v is null || 3539 (limit != 0 && --limit == 0)) 3540 break; 3541 } 3542 } 3543 return status; 3544 } 3545 3546 /** 3547 * Tries to poll and run AsynchronousCompletionTasks until 3548 * none found or blocker is released 3549 * 3550 * @param blocker the blocker 3551 */ 3552 final void helpAsyncBlocker(ManagedBlocker blocker) { 3553 if (blocker !is null) { 3554 int b, k, cap; IForkJoinTask[] a; IForkJoinTask t; 3555 while ((a = array) !is null && (cap = cast(int)a.length) > 0 && 3556 top - (b = base) > 0) { 3557 k = (cap - 1) & b; 3558 // t = AtomicHelper.load(a[k]); 3559 t = a[k]; 3560 if (blocker.isReleasable()) 3561 break; 3562 else if (base == b++ && t !is null) { 3563 AsynchronousCompletionTask at = cast(AsynchronousCompletionTask)t; 3564 if (at is null) 3565 break; 3566 else if (AtomicHelper.compareAndSet(a[k], t, null)) { 3567 AtomicHelper.store(this.base, b); 3568 t.doExec(); 3569 } 3570 } 3571 } 3572 } 3573 } 3574 3575 /** 3576 * Returns true if owned and not known to be blocked. 3577 */ 3578 final bool isApparentlyUnblocked() { 3579 ThreadEx wt; ThreadState s; 3580 return ((wt = owner) !is null && 3581 (s = wt.getState()) != ThreadState.BLOCKED && 3582 s != ThreadState.WAITING && 3583 s != ThreadState.TIMED_WAITING); 3584 } 3585 3586 // VarHandle mechanics. 3587 // static final VarHandle PHASE; 3588 // static final VarHandle BASE; 3589 // static final VarHandle TOP; 3590 // static { 3591 // try { 3592 // MethodHandles.Lookup l = MethodHandles.lookup(); 3593 // PHASE = l.findVarHandle(WorkQueue.class, "phase", int.class); 3594 // BASE = l.findVarHandle(WorkQueue.class, "base", int.class); 3595 // TOP = l.findVarHandle(WorkQueue.class, "top", int.class); 3596 // } catch (ReflectiveOperationException e) { 3597 // throw new ExceptionInInitializerError(e); 3598 // } 3599 // } 3600 } 3601 3602 3603 3604 /** 3605 * A marker interface identifying asynchronous tasks produced by 3606 * {@code async} methods. This may be useful for monitoring, 3607 * debugging, and tracking asynchronous activities. 3608 * 3609 */ 3610 interface AsynchronousCompletionTask { 3611 }