1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the domain, as explained at 33 * http://creativecommons.org/publicdomain/zero/1.0/ 34 */ 35 36 module hunt.concurrency.ForkJoinPool; 37 38 import hunt.concurrency.AbstractExecutorService; 39 import hunt.concurrency.atomic.AtomicHelper; 40 import hunt.concurrency.Exceptions; 41 import hunt.concurrency.ForkJoinTask; 42 import hunt.concurrency.ForkJoinTaskHelper; 43 import hunt.concurrency.ThreadLocalRandom; 44 import hunt.concurrency.thread; 45 46 import hunt.collection.List; 47 import hunt.collection.Collection; 48 import hunt.collection.Collections; 49 import hunt.logging.ConsoleLogger; 50 import hunt.Exceptions; 51 import hunt.Functions; 52 import hunt.system.Memory; 53 import hunt.util.Common; 54 import hunt.util.DateTime; 55 56 import core.atomic; 57 import core.sync.mutex; 58 import core.thread; 59 import core.time; 60 import std.algorithm; 61 import std.array; 62 import std.conv; 63 import std.datetime; 64 65 alias ReentrantLock = Mutex; 66 67 // import java.lang.Thread.UncaughtExceptionHandler; 68 // import java.lang.invoke.MethodHandles; 69 // import java.lang.invoke.VarHandle; 70 // import java.security.AccessController; 71 // import java.security.AccessControlContext; 72 // import java.security.Permission; 73 // import java.security.Permissions; 74 // import java.security.PrivilegedAction; 75 // import java.security.ProtectionDomain; 76 // import hunt.collection.ArrayList; 77 // import hunt.collection.Collection; 78 // import java.util.Collections; 79 // import java.util.List; 80 // import hunt.util.functional.Predicate; 81 // import hunt.concurrency.locks.LockSupport; 82 83 84 private { 85 86 // Constants shared across ForkJoinPool and WorkQueue 87 88 // Bounds 89 enum int SWIDTH = 16; // width of short 90 enum int SMASK = 0xffff; // short bits == max index 91 enum int MAX_CAP = 0x7fff; // max #workers - 1 92 enum int SQMASK = 0x007e; // max 64 (even) slots 93 94 // Masks and units for WorkQueue.phase and ctl sp subfield 95 enum int UNSIGNALLED = 1 << 31; // must be negative 96 enum int SS_SEQ = 1 << 16; // version count 97 enum int QLOCK = 1; // must be 1 98 99 // Mode bits and sentinels, some also used in WorkQueue id and.source fields 100 enum int OWNED = 1; // queue has owner thread 101 enum int FIFO = 1 << 16; // fifo queue or access mode 102 enum int SHUTDOWN = 1 << 18; 103 enum int TERMINATED = 1 << 19; 104 enum int STOP = 1 << 31; // must be negative 105 enum int QUIET = 1 << 30; // not scanning or working 106 enum int DORMANT = QUIET | UNSIGNALLED; 107 108 /** 109 * The maximum number of top-level polls per worker before 110 * checking other queues, expressed as a bit shift to, in effect, 111 * multiply by pool size, and then use as random value mask, so 112 * average bound is about poolSize*(1<<TOP_BOUND_SHIFT). See 113 * above for rationale. 114 */ 115 enum int TOP_BOUND_SHIFT = 10; 116 117 /** 118 * Initial capacity of work-stealing queue array. 119 * Must be a power of two, at least 2. 120 */ 121 enum int INITIAL_QUEUE_CAPACITY = 1 << 13; 122 123 /** 124 * Maximum capacity for queue arrays. Must be a power of two less 125 * than or equal to 1 << (31 - width of array entry) to ensure 126 * lack of wraparound of index calculations, but defined to a 127 * value a bit less than this to help users trap runaway programs 128 * before saturating systems. 129 */ 130 enum int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M 131 132 } 133 134 /** 135 * An {@link ExecutorService} for running {@link ForkJoinTask}s. 136 * A {@code ForkJoinPool} provides the entry point for submissions 137 * from non-{@code ForkJoinTask} clients, as well as management and 138 * monitoring operations. 139 * 140 * <p>A {@code ForkJoinPool} differs from other kinds of {@link 141 * ExecutorService} mainly by virtue of employing 142 * <em>work-stealing</em>: all threads in the pool attempt to find and 143 * execute tasks submitted to the pool and/or created by other active 144 * tasks (eventually blocking waiting for work if none exist). This 145 * enables efficient processing when most tasks spawn other subtasks 146 * (as do most {@code ForkJoinTask}s), as well as when many small 147 * tasks are submitted to the pool from external clients. Especially 148 * when setting <em>asyncMode</em> to true in constructors, {@code 149 * ForkJoinPool}s may also be appropriate for use with event-style 150 * tasks that are never joined. All worker threads are initialized 151 * with {@link Thread#isDaemon} set {@code true}. 152 * 153 * <p>A static {@link #commonPool()} is available and appropriate for 154 * most applications. The common pool is used by any ForkJoinTask that 155 * is not explicitly submitted to a specified pool. Using the common 156 * pool normally reduces resource usage (its threads are slowly 157 * reclaimed during periods of non-use, and reinstated upon subsequent 158 * use). 159 * 160 * <p>For applications that require separate or custom pools, a {@code 161 * ForkJoinPool} may be constructed with a given target parallelism 162 * level; by default, equal to the number of available processors. 163 * The pool attempts to maintain enough active (or available) threads 164 * by dynamically adding, suspending, or resuming internal worker 165 * threads, even if some tasks are stalled waiting to join others. 166 * However, no such adjustments are guaranteed in the face of blocked 167 * I/O or other unmanaged synchronization. The nested {@link 168 * ManagedBlocker} interface enables extension of the kinds of 169 * synchronization accommodated. The default policies may be 170 * overridden using a constructor with parameters corresponding to 171 * those documented in class {@link ThreadPoolExecutor}. 172 * 173 * <p>In addition to execution and lifecycle control methods, this 174 * class provides status check methods (for example 175 * {@link #getStealCount}) that are intended to aid in developing, 176 * tuning, and monitoring fork/join applications. Also, method 177 * {@link #toString} returns indications of pool state in a 178 * convenient form for informal monitoring. 179 * 180 * <p>As is the case with other ExecutorServices, there are three 181 * main task execution methods summarized in the following table. 182 * These are designed to be used primarily by clients not already 183 * engaged in fork/join computations in the current pool. The main 184 * forms of these methods accept instances of {@code ForkJoinTask}, 185 * but overloaded forms also allow mixed execution of plain {@code 186 * Runnable}- or {@code Callable}- based activities as well. However, 187 * tasks that are already executing in a pool should normally instead 188 * use the within-computation forms listed in the table unless using 189 * async event-style tasks that are not usually joined, in which case 190 * there is little difference among choice of methods. 191 * 192 * <table class="plain"> 193 * <caption>Summary of task execution methods</caption> 194 * <tr> 195 * <td></td> 196 * <th scope="col"> Call from non-fork/join clients</th> 197 * <th scope="col"> Call from within fork/join computations</th> 198 * </tr> 199 * <tr> 200 * <th scope="row" style="text-align:left"> Arrange async execution</th> 201 * <td> {@link #execute(ForkJoinTask)}</td> 202 * <td> {@link ForkJoinTask#fork}</td> 203 * </tr> 204 * <tr> 205 * <th scope="row" style="text-align:left"> Await and obtain result</th> 206 * <td> {@link #invoke(ForkJoinTask)}</td> 207 * <td> {@link ForkJoinTask#invoke}</td> 208 * </tr> 209 * <tr> 210 * <th scope="row" style="text-align:left"> Arrange exec and obtain Future</th> 211 * <td> {@link #submit(ForkJoinTask)}</td> 212 * <td> {@link ForkJoinTask#fork} (ForkJoinTasks <em>are</em> Futures)</td> 213 * </tr> 214 * </table> 215 * 216 * <p>The parameters used to construct the common pool may be controlled by 217 * setting the following {@linkplain System#getProperty system properties}: 218 * <ul> 219 * <li>{@code hunt.concurrency.ForkJoinPool.common.parallelism} 220 * - the parallelism level, a non-negative integer 221 * <li>{@code hunt.concurrency.ForkJoinPool.common.threadFactory} 222 * - the class name of a {@link ForkJoinWorkerThreadFactory}. 223 * The {@linkplain ClassLoader#getSystemClassLoader() system class loader} 224 * is used to load this class. 225 * <li>{@code hunt.concurrency.ForkJoinPool.common.exceptionHandler} 226 * - the class name of a {@link UncaughtExceptionHandler}. 227 * The {@linkplain ClassLoader#getSystemClassLoader() system class loader} 228 * is used to load this class. 229 * <li>{@code hunt.concurrency.ForkJoinPool.common.maximumSpares} 230 * - the maximum number of allowed extra threads to maintain target 231 * parallelism (default 256). 232 * </ul> 233 * If no thread factory is supplied via a system property, then the 234 * common pool uses a factory that uses the system class loader as the 235 * {@linkplain Thread#getContextClassLoader() thread context class loader}. 236 * In addition, if a {@link SecurityManager} is present, then 237 * the common pool uses a factory supplying threads that have no 238 * {@link Permissions} enabled. 239 * 240 * Upon any error in establishing these settings, default parameters 241 * are used. It is possible to disable or limit the use of threads in 242 * the common pool by setting the parallelism property to zero, and/or 243 * using a factory that may return {@code null}. However doing so may 244 * cause unjoined tasks to never be executed. 245 * 246 * <p><b>Implementation notes</b>: This implementation restricts the 247 * maximum number of running threads to 32767. Attempts to create 248 * pools with greater than the maximum number result in 249 * {@code IllegalArgumentException}. 250 * 251 * <p>This implementation rejects submitted tasks (that is, by throwing 252 * {@link RejectedExecutionException}) only when the pool is shut down 253 * or internal resources have been exhausted. 254 * 255 * @since 1.7 256 * @author Doug Lea 257 */ 258 class ForkJoinPool : AbstractExecutorService { 259 260 /* 261 * Implementation Overview 262 * 263 * This class and its nested classes provide the main 264 * functionality and control for a set of worker threads: 265 * Submissions from non-FJ threads enter into submission queues. 266 * Workers take these tasks and typically split them into subtasks 267 * that may be stolen by other workers. Work-stealing based on 268 * randomized scans generally leads to better throughput than 269 * "work dealing" in which producers assign tasks to idle threads, 270 * in part because threads that have finished other tasks before 271 * the signalled thread wakes up (which can be a long time) can 272 * take the task instead. Preference rules give first priority to 273 * processing tasks from their own queues (LIFO or FIFO, depending 274 * on mode), then to randomized FIFO steals of tasks in other 275 * queues. This framework began as vehicle for supporting 276 * tree-structured parallelism using work-stealing. Over time, 277 * its scalability advantages led to extensions and changes to 278 * better support more diverse usage contexts. Because most 279 * internal methods and nested classes are interrelated, their 280 * main rationale and descriptions are presented here; individual 281 * methods and nested classes contain only brief comments about 282 * details. 283 * 284 * WorkQueues 285 * ========== 286 * 287 * Most operations occur within work-stealing queues (in nested 288 * class WorkQueue). These are special forms of Deques that 289 * support only three of the four possible end-operations -- push, 290 * pop, and poll (aka steal), under the further constraints that 291 * push and pop are called only from the owning thread (or, as 292 * extended here, under a lock), while poll may be called from 293 * other threads. (If you are unfamiliar with them, you probably 294 * want to read Herlihy and Shavit's book "The Art of 295 * Multiprocessor programming", chapter 16 describing these in 296 * more detail before proceeding.) The main work-stealing queue 297 * design is roughly similar to those in the papers "Dynamic 298 * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005 299 * (http://research.sun.com/scalable/pubs/index.html) and 300 * "Idempotent work stealing" by Michael, Saraswat, and Vechev, 301 * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186). 302 * The main differences ultimately stem from GC requirements that 303 * we null out taken slots as soon as we can, to maintain as small 304 * a footprint as possible even in programs generating huge 305 * numbers of tasks. To accomplish this, we shift the CAS 306 * arbitrating pop vs poll (steal) from being on the indices 307 * ("base" and "top") to the slots themselves. 308 * 309 * Adding tasks then takes the form of a classic array push(task) 310 * in a circular buffer: 311 * q.array[q.top++ % length] = task; 312 * 313 * (The actual code needs to null-check and size-check the array, 314 * uses masking, not mod, for indexing a power-of-two-sized array, 315 * adds a release fence for publication, and possibly signals 316 * waiting workers to start scanning -- see below.) Both a 317 * successful pop and poll mainly entail a CAS of a slot from 318 * non-null to null. 319 * 320 * The pop operation (always performed by owner) is: 321 * if ((the task at top slot is not null) and 322 * (CAS slot to null)) 323 * decrement top and return task; 324 * 325 * And the poll operation (usually by a stealer) is 326 * if ((the task at base slot is not null) and 327 * (CAS slot to null)) 328 * increment base and return task; 329 * 330 * There are several variants of each of these. Most uses occur 331 * within operations that also interleave contention or emptiness 332 * tracking or inspection of elements before extracting them, so 333 * must interleave these with the above code. When performed by 334 * owner, getAndSet is used instead of CAS (see for example method 335 * nextLocalTask) which is usually more efficient, and possible 336 * because the top index cannot independently change during the 337 * operation. 338 * 339 * Memory ordering. See "Correct and Efficient Work-Stealing for 340 * Weak Memory Models" by Le, Pop, Cohen, and Nardelli, PPoPP 2013 341 * (http://www.di.ens.fr/~zappa/readings/ppopp13.pdf) for an 342 * analysis of memory ordering requirements in work-stealing 343 * algorithms similar to (but different than) the one used here. 344 * Extracting tasks in array slots via (fully fenced) CAS provides 345 * primary synchronization. The base and top indices imprecisely 346 * guide where to extract from. We do not usually require strict 347 * orderings of array and index updates. Many index accesses use 348 * plain mode, with ordering constrained by surrounding context 349 * (usually with respect to element CASes or the two WorkQueue 350 * fields source and phase). When not otherwise already 351 * constrained, reads of "base" by queue owners use acquire-mode, 352 * and some externally callable methods preface accesses with 353 * acquire fences. Additionally, to ensure that index update 354 * writes are not coalesced or postponed in loops etc, "opaque" 355 * mode is used in a few cases where timely writes are not 356 * otherwise ensured. The "locked" versions of push- and pop- 357 * based methods for shared queues differ from owned versions 358 * because locking already forces some of the ordering. 359 * 360 * Because indices and slot contents cannot always be consistent, 361 * a check that base == top indicates (momentary) emptiness, but 362 * otherwise may err on the side of possibly making the queue 363 * appear nonempty when a push, pop, or poll have not fully 364 * committed, or making it appear empty when an update of top has 365 * not yet been visibly written. (Method isEmpty() checks the 366 * case of a partially completed removal of the last element.) 367 * Because of this, the poll operation, considered individually, 368 * is not wait-free. One thief cannot successfully continue until 369 * another in-progress one (or, if previously empty, a push) 370 * visibly completes. This can stall threads when required to 371 * consume from a given queue (see method poll()). However, in 372 * the aggregate, we ensure at least probabilistic 373 * non-blockingness. If an attempted steal fails, a scanning 374 * thief chooses a different random victim target to try next. So, 375 * in order for one thief to progress, it suffices for any 376 * in-progress poll or new push on any empty queue to complete. 377 * 378 * This approach also enables support of a user mode in which 379 * local task processing is in FIFO, not LIFO order, simply by 380 * using poll rather than pop. This can be useful in 381 * message-passing frameworks in which tasks are never joined. 382 * 383 * WorkQueues are also used in a similar way for tasks submitted 384 * to the pool. We cannot mix these tasks in the same queues used 385 * by workers. Instead, we randomly associate submission queues 386 * with submitting threads, using a form of hashing. The 387 * ThreadLocalRandom probe value serves as a hash code for 388 * choosing existing queues, and may be randomly repositioned upon 389 * contention with other submitters. In essence, submitters act 390 * like workers except that they are restricted to executing local 391 * tasks that they submitted. Insertion of tasks in shared mode 392 * requires a lock but we use only a simple spinlock (using field 393 * phase), because submitters encountering a busy queue move to a 394 * different position to use or create other queues -- they block 395 * only when creating and registering new queues. Because it is 396 * used only as a spinlock, unlocking requires only a "releasing" 397 * store (using setRelease) unless otherwise signalling. 398 * 399 * Management 400 * ========== 401 * 402 * The main throughput advantages of work-stealing stem from 403 * decentralized control -- workers mostly take tasks from 404 * themselves or each other, at rates that can exceed a billion 405 * per second. The pool itself creates, activates (enables 406 * scanning for and running tasks), deactivates, blocks, and 407 * terminates threads, all with minimal central information. 408 * There are only a few properties that we can globally track or 409 * maintain, so we pack them into a small number of variables, 410 * often maintaining atomicity without blocking or locking. 411 * Nearly all essentially atomic control state is held in a few 412 * variables that are by far most often read (not 413 * written) as status and consistency checks. We pack as much 414 * information into them as we can. 415 * 416 * Field "ctl" contains 64 bits holding information needed to 417 * atomically decide to add, enqueue (on an event queue), and 418 * dequeue and release workers. To enable this packing, we 419 * restrict maximum parallelism to (1<<15)-1 (which is far in 420 * excess of normal operating range) to allow ids, counts, and 421 * their negations (used for thresholding) to fit into 16bit 422 * subfields. 423 * 424 * Field "mode" holds configuration parameters as well as lifetime 425 * status, atomically and monotonically setting SHUTDOWN, STOP, 426 * and finally TERMINATED bits. 427 * 428 * Field "workQueues" holds references to WorkQueues. It is 429 * updated (only during worker creation and termination) under 430 * lock (using field workerNamePrefix as lock), but is otherwise 431 * concurrently readable, and accessed directly. We also ensure 432 * that uses of the array reference itself never become too stale 433 * in case of resizing, by arranging that (re-)reads are separated 434 * by at least one acquiring read access. To simplify index-based 435 * operations, the array size is always a power of two, and all 436 * readers must tolerate null slots. Worker queues are at odd 437 * indices. Shared (submission) queues are at even indices, up to 438 * a maximum of 64 slots, to limit growth even if the array needs 439 * to expand to add more workers. Grouping them together in this 440 * way simplifies and speeds up task scanning. 441 * 442 * All worker thread creation is on-demand, triggered by task 443 * submissions, replacement of terminated workers, and/or 444 * compensation for blocked workers. However, all other support 445 * code is set up to work with other policies. To ensure that we 446 * do not hold on to worker references that would prevent GC, all 447 * accesses to workQueues are via indices into the workQueues 448 * array (which is one source of some of the messy code 449 * constructions here). In essence, the workQueues array serves as 450 * a weak reference mechanism. Thus for example the stack top 451 * subfield of ctl stores indices, not references. 452 * 453 * Queuing Idle Workers. Unlike HPC work-stealing frameworks, we 454 * cannot let workers spin indefinitely scanning for tasks when 455 * none can be found immediately, and we cannot start/resume 456 * workers unless there appear to be tasks available. On the 457 * other hand, we must quickly prod them into action when new 458 * tasks are submitted or generated. In many usages, ramp-up time 459 * is the main limiting factor in overall performance, which is 460 * compounded at program start-up by JIT compilation and 461 * allocation. So we streamline this as much as possible. 462 * 463 * The "ctl" field atomically maintains total worker and 464 * "released" worker counts, plus the head of the available worker 465 * queue (actually stack, represented by the lower 32bit subfield 466 * of ctl). Released workers are those known to be scanning for 467 * and/or running tasks. Unreleased ("available") workers are 468 * recorded in the ctl stack. These workers are made available for 469 * signalling by enqueuing in ctl (see method runWorker). The 470 * "queue" is a form of Treiber stack. This is ideal for 471 * activating threads in most-recently used order, and improves 472 * performance and locality, outweighing the disadvantages of 473 * being prone to contention and inability to release a worker 474 * unless it is topmost on stack. To avoid missed signal problems 475 * inherent in any wait/signal design, available workers rescan 476 * for (and if found run) tasks after enqueuing. Normally their 477 * release status will be updated while doing so, but the released 478 * worker ctl count may underestimate the number of active 479 * threads. (However, it is still possible to determine quiescence 480 * via a validation traversal -- see isQuiescent). After an 481 * unsuccessful rescan, available workers are blocked until 482 * signalled (see signalWork). The top stack state holds the 483 * value of the "phase" field of the worker: its index and status, 484 * plus a version counter that, in addition to the count subfields 485 * (also serving as version stamps) provide protection against 486 * Treiber stack ABA effects. 487 * 488 * Creating workers. To create a worker, we pre-increment counts 489 * (serving as a reservation), and attempt to construct a 490 * ForkJoinWorkerThread via its factory. Upon construction, the 491 * new thread invokes registerWorker, where it constructs a 492 * WorkQueue and is assigned an index in the workQueues array 493 * (expanding the array if necessary). The thread is then started. 494 * Upon any exception across these steps, or null return from 495 * factory, deregisterWorker adjusts counts and records 496 * accordingly. If a null return, the pool continues running with 497 * fewer than the target number workers. If exceptional, the 498 * exception is propagated, generally to some external caller. 499 * Worker index assignment avoids the bias in scanning that would 500 * occur if entries were sequentially packed starting at the front 501 * of the workQueues array. We treat the array as a simple 502 * power-of-two hash table, expanding as needed. The seedIndex 503 * increment ensures no collisions until a resize is needed or a 504 * worker is deregistered and replaced, and thereafter keeps 505 * probability of collision low. We cannot use 506 * ThreadLocalRandom.getProbe() for similar purposes here because 507 * the thread has not started yet, but do so for creating 508 * submission queues for existing external threads (see 509 * externalPush). 510 * 511 * WorkQueue field "phase" is used by both workers and the pool to 512 * manage and track whether a worker is UNSIGNALLED (possibly 513 * blocked waiting for a signal). When a worker is enqueued its 514 * phase field is set. Note that phase field updates lag queue CAS 515 * releases so usage requires care -- seeing a negative phase does 516 * not guarantee that the worker is available. When queued, the 517 * lower 16 bits of scanState must hold its pool index. So we 518 * place the index there upon initialization and otherwise keep it 519 * there or restore it when necessary. 520 * 521 * The ctl field also serves as the basis for memory 522 * synchronization surrounding activation. This uses a more 523 * efficient version of a Dekker-like rule that task producers and 524 * consumers sync with each other by both writing/CASing ctl (even 525 * if to its current value). This would be extremely costly. So 526 * we relax it in several ways: (1) Producers only signal when 527 * their queue is possibly empty at some point during a push 528 * operation (which requires conservatively checking size zero or 529 * one to cover races). (2) Other workers propagate this signal 530 * when they find tasks in a queue with size greater than one. (3) 531 * Workers only enqueue after scanning (see below) and not finding 532 * any tasks. (4) Rather than CASing ctl to its current value in 533 * the common case where no action is required, we reduce write 534 * contention by equivalently prefacing signalWork when called by 535 * an external task producer using a memory access with 536 * full-semantics or a "fullFence". 537 * 538 * Almost always, too many signals are issued, in part because a 539 * task producer cannot tell if some existing worker is in the 540 * midst of finishing one task (or already scanning) and ready to 541 * take another without being signalled. So the producer might 542 * instead activate a different worker that does not find any 543 * work, and then inactivates. This scarcely matters in 544 * steady-state computations involving all workers, but can create 545 * contention and bookkeeping bottlenecks during ramp-up, 546 * ramp-down, and small computations involving only a few workers. 547 * 548 * Scanning. Method scan (from runWorker) performs top-level 549 * scanning for tasks. (Similar scans appear in helpQuiesce and 550 * pollScan.) Each scan traverses and tries to poll from each 551 * queue starting at a random index. Scans are not performed in 552 * ideal random permutation order, to reduce cacheline 553 * contention. The pseudorandom generator need not have 554 * high-quality statistical properties in the long term, but just 555 * within computations; We use Marsaglia XorShifts (often via 556 * ThreadLocalRandom.nextSecondarySeed), which are cheap and 557 * suffice. Scanning also includes contention reduction: When 558 * scanning workers fail to extract an apparently existing task, 559 * they soon restart at a different pseudorandom index. This form 560 * of backoff improves throughput when many threads are trying to 561 * take tasks from few queues, which can be common in some usages. 562 * Scans do not otherwise explicitly take into account core 563 * affinities, loads, cache localities, etc, However, they do 564 * exploit temporal locality (which usually approximates these) by 565 * preferring to re-poll from the same queue after a successful 566 * poll before trying others (see method topLevelExec). However 567 * this preference is bounded (see TOP_BOUND_SHIFT) as a safeguard 568 * against infinitely unfair looping under unbounded user task 569 * recursion, and also to reduce long-term contention when many 570 * threads poll few queues holding many small tasks. The bound is 571 * high enough to avoid much impact on locality and scheduling 572 * overhead. 573 * 574 * Trimming workers. To release resources after periods of lack of 575 * use, a worker starting to wait when the pool is quiescent will 576 * time out and terminate (see method runWorker) if the pool has 577 * remained quiescent for period given by field keepAlive. 578 * 579 * Shutdown and Termination. A call to shutdownNow invokes 580 * tryTerminate to atomically set a runState bit. The calling 581 * thread, as well as every other worker thereafter terminating, 582 * helps terminate others by cancelling their unprocessed tasks, 583 * and waking them up, doing so repeatedly until stable. Calls to 584 * non-abrupt shutdown() preface this by checking whether 585 * termination should commence by sweeping through queues (until 586 * stable) to ensure lack of in-flight submissions and workers 587 * about to process them before triggering the "STOP" phase of 588 * termination. 589 * 590 * Joining Tasks 591 * ============= 592 * 593 * Any of several actions may be taken when one worker is waiting 594 * to join a task stolen (or always held) by another. Because we 595 * are multiplexing many tasks on to a pool of workers, we can't 596 * always just let them block (as in Thread.join). We also cannot 597 * just reassign the joiner's run-time stack with another and 598 * replace it later, which would be a form of "continuation", that 599 * even if possible is not necessarily a good idea since we may 600 * need both an unblocked task and its continuation to progress. 601 * Instead we combine two tactics: 602 * 603 * Helping: Arranging for the joiner to execute some task that it 604 * would be running if the steal had not occurred. 605 * 606 * Compensating: Unless there are already enough live threads, 607 * method tryCompensate() may create or re-activate a spare 608 * thread to compensate for blocked joiners until they unblock. 609 * 610 * A third form (implemented in tryRemoveAndExec) amounts to 611 * helping a hypothetical compensator: If we can readily tell that 612 * a possible action of a compensator is to steal and execute the 613 * task being joined, the joining thread can do so directly, 614 * without the need for a compensation thread. 615 * 616 * The ManagedBlocker extension API can't use helping so relies 617 * only on compensation in method awaitBlocker. 618 * 619 * The algorithm in awaitJoin entails a form of "linear helping". 620 * Each worker records (in field source) the id of the queue from 621 * which it last stole a task. The scan in method awaitJoin uses 622 * these markers to try to find a worker to help (i.e., steal back 623 * a task from and execute it) that could hasten completion of the 624 * actively joined task. Thus, the joiner executes a task that 625 * would be on its own local deque if the to-be-joined task had 626 * not been stolen. This is a conservative variant of the approach 627 * described in Wagner & Calder "Leapfrogging: a portable 628 * technique for implementing efficient futures" SIGPLAN Notices, 629 * 1993 (http://portal.acm.org/citation.cfm?id=155354). It differs 630 * mainly in that we only record queue ids, not full dependency 631 * links. This requires a linear scan of the workQueues array to 632 * locate stealers, but isolates cost to when it is needed, rather 633 * than adding to per-task overhead. Searches can fail to locate 634 * stealers GC stalls and the like delay recording sources. 635 * Further, even when accurately identified, stealers might not 636 * ever produce a task that the joiner can in turn help with. So, 637 * compensation is tried upon failure to find tasks to run. 638 * 639 * Compensation does not by default aim to keep exactly the target 640 * parallelism number of unblocked threads running at any given 641 * time. Some previous versions of this class employed immediate 642 * compensations for any blocked join. However, in practice, the 643 * vast majority of blockages are byproducts of GC and 644 * other JVM or OS activities that are made worse by replacement 645 * when they cause longer-term oversubscription. Rather than 646 * impose arbitrary policies, we allow users to override the 647 * default of only adding threads upon apparent starvation. The 648 * compensation mechanism may also be bounded. Bounds for the 649 * commonPool (see COMMON_MAX_SPARES) better enable JVMs to cope 650 * with programming errors and abuse before running out of 651 * resources to do so. 652 * 653 * Common Pool 654 * =========== 655 * 656 * The static common pool always exists after static 657 * initialization. Since it (or any other created pool) need 658 * never be used, we minimize initial construction overhead and 659 * footprint to the setup of about a dozen fields. 660 * 661 * When external threads submit to the common pool, they can 662 * perform subtask processing (see externalHelpComplete and 663 * related methods) upon joins. This caller-helps policy makes it 664 * sensible to set common pool parallelism level to one (or more) 665 * less than the total number of available cores, or even zero for 666 * pure caller-runs. We do not need to record whether external 667 * submissions are to the common pool -- if not, external help 668 * methods return quickly. These submitters would otherwise be 669 * blocked waiting for completion, so the extra effort (with 670 * liberally sprinkled task status checks) in inapplicable cases 671 * amounts to an odd form of limited spin-wait before blocking in 672 * ForkJoinTask.join. 673 * 674 * As a more appropriate default in managed environments, unless 675 * overridden by system properties, we use workers of subclass 676 * InnocuousForkJoinWorkerThread when there is a SecurityManager 677 * present. These workers have no permissions set, do not belong 678 * to any user-defined ThreadGroupEx, and erase all ThreadLocals 679 * after executing any top-level task (see 680 * WorkQueue.afterTopLevelExec). The associated mechanics (mainly 681 * in ForkJoinWorkerThread) may be JVM-dependent and must access 682 * particular Thread class fields to achieve this effect. 683 * 684 * Memory placement 685 * ================ 686 * 687 * Performance can be very sensitive to placement of instances of 688 * ForkJoinPool and WorkQueues and their queue arrays. To reduce 689 * false-sharing impact, the @Contended annotation isolates 690 * adjacent WorkQueue instances, as well as the ForkJoinPool.ctl 691 * field. WorkQueue arrays are allocated (by their threads) with 692 * larger initial sizes than most ever need, mostly to reduce 693 * false sharing with current garbage collectors that use cardmark 694 * tables. 695 * 696 * Style notes 697 * =========== 698 * 699 * Memory ordering relies mainly on VarHandles. This can be 700 * awkward and ugly, but also reflects the need to control 701 * outcomes across the unusual cases that arise in very racy code 702 * with very few invariants. All fields are read into locals 703 * before use, and null-checked if they are references. Array 704 * accesses using masked indices include checks (that are always 705 * true) that the array length is non-zero to avoid compilers 706 * inserting more expensive traps. This is usually done in a 707 * "C"-like style of listing declarations at the heads of methods 708 * or blocks, and using inline assignments on first encounter. 709 * Nearly all explicit checks lead to bypass/return, not exception 710 * throws, because they may legitimately arise due to 711 * cancellation/revocation during shutdown. 712 * 713 * There is a lot of representation-level coupling among classes 714 * ForkJoinPool, ForkJoinWorkerThread, and ForkJoinTask. The 715 * fields of WorkQueue maintain data structures managed by 716 * ForkJoinPool, so are directly accessed. There is little point 717 * trying to reduce this, since any associated future changes in 718 * representations will need to be accompanied by algorithmic 719 * changes anyway. Several methods intrinsically sprawl because 720 * they must accumulate sets of consistent reads of fields held in 721 * local variables. Some others are artificially broken up to 722 * reduce producer/consumer imbalances due to dynamic compilation. 723 * There are also other coding oddities (including several 724 * unnecessary-looking hoisted null checks) that help some methods 725 * perform reasonably even when interpreted (not compiled). 726 * 727 * The order of declarations in this file is (with a few exceptions): 728 * (1) Static utility functions 729 * (2) Nested (static) classes 730 * (3) Static fields 731 * (4) Fields, along with constants used when unpacking some of them 732 * (5) Internal control methods 733 * (6) Callbacks and other support for ForkJoinTask methods 734 * (7) Exported methods 735 * (8) Static block initializing statics in minimally dependent order 736 */ 737 738 // Static utilities 739 740 /** 741 * If there is a security manager, makes sure caller has 742 * permission to modify threads. 743 */ 744 // private static void checkPermission() { 745 // SecurityManager security = System.getSecurityManager(); 746 // if (security !is null) 747 // security.checkPermission(modifyThreadPermission); 748 // } 749 750 751 // static fields (initialized in static initializer below) 752 753 /** 754 * Creates a new ForkJoinWorkerThread. This factory is used unless 755 * overridden in ForkJoinPool constructors. 756 */ 757 __gshared ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory; 758 759 /** 760 * Permission required for callers of methods that may start or 761 * kill threads. 762 */ 763 // __gshared RuntimePermission modifyThreadPermission; 764 765 /** 766 * Common (static) pool. Non-null for use unless a static 767 * construction exception, but internal usages null-check on use 768 * to paranoically avoid potential initialization circularities 769 * as well as to simplify generated code. 770 */ 771 __gshared ForkJoinPool common; 772 773 /** 774 * Common pool parallelism. To allow simpler use and management 775 * when common pool threads are disabled, we allow the underlying 776 * common.parallelism field to be zero, but in that case still report 777 * parallelism as 1 to reflect resulting caller-runs mechanics. 778 */ 779 __gshared int COMMON_PARALLELISM; 780 781 /** 782 * Limit on spare thread construction in tryCompensate. 783 */ 784 private __gshared int COMMON_MAX_SPARES; 785 786 /** 787 * Sequence number for creating workerNamePrefix. 788 */ 789 private shared static int poolNumberSequence; 790 791 /** 792 * Returns the next sequence number. We don't expect this to 793 * ever contend, so use simple builtin sync. 794 */ 795 private static int nextPoolId() { 796 return AtomicHelper.increment(poolNumberSequence); 797 } 798 799 // static configuration constants 800 801 /** 802 * Default idle timeout value (in milliseconds) for the thread 803 * triggering quiescence to park waiting for new work 804 */ 805 private enum long DEFAULT_KEEPALIVE = 60_000L; 806 807 /** 808 * Undershoot tolerance for idle timeouts 809 */ 810 private enum long TIMEOUT_SLOP = 20L; 811 812 /** 813 * The default value for COMMON_MAX_SPARES. Overridable using the 814 * "hunt.concurrency.ForkJoinPool.common.maximumSpares" system 815 * property. The default value is far in excess of normal 816 * requirements, but also far short of MAX_CAP and typical OS 817 * thread limits, so allows JVMs to catch misuse/abuse before 818 * running out of resources needed to do so. 819 */ 820 private enum int DEFAULT_COMMON_MAX_SPARES = 256; 821 822 /** 823 * Increment for seed generators. See class ThreadLocal for 824 * explanation. 825 */ 826 private enum int SEED_INCREMENT = 0x9e3779b9; 827 828 /* 829 * Bits and masks for field ctl, packed with 4 16 bit subfields: 830 * RC: Number of released (unqueued) workers minus target parallelism 831 * TC: Number of total workers minus target parallelism 832 * SS: version count and status of top waiting thread 833 * ID: poolIndex of top of Treiber stack of waiters 834 * 835 * When convenient, we can extract the lower 32 stack top bits 836 * (including version bits) as sp=(int)ctl. The offsets of counts 837 * by the target parallelism and the positionings of fields makes 838 * it possible to perform the most common checks via sign tests of 839 * fields: When ac is negative, there are not enough unqueued 840 * workers, when tc is negative, there are not enough total 841 * workers. When sp is non-zero, there are waiting workers. To 842 * deal with possibly negative fields, we use casts in and out of 843 * "short" and/or signed shifts to maintain signedness. 844 * 845 * Because it occupies uppermost bits, we can add one release count 846 * using getAndAddLong of RC_UNIT, rather than CAS, when returning 847 * from a blocked join. Other updates entail multiple subfields 848 * and masking, requiring CAS. 849 * 850 * The limits packed in field "bounds" are also offset by the 851 * parallelism level to make them comparable to the ctl rc and tc 852 * fields. 853 */ 854 855 // Lower and upper word masks 856 private enum long SP_MASK = 0xffffffffL; 857 private enum long UC_MASK = ~SP_MASK; 858 859 // Release counts 860 private enum int RC_SHIFT = 48; 861 private enum long RC_UNIT = 0x0001L << RC_SHIFT; 862 private enum long RC_MASK = 0xffffL << RC_SHIFT; 863 864 // Total counts 865 private enum int TC_SHIFT = 32; 866 private enum long TC_UNIT = 0x0001L << TC_SHIFT; 867 private enum long TC_MASK = 0xffffL << TC_SHIFT; 868 private enum long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); // sign 869 870 // Instance fields 871 872 long stealCount; // collects worker nsteals 873 long keepAlive; // milliseconds before dropping if idle 874 int indexSeed; // next worker index 875 int bounds; // min, max threads packed as shorts 876 int mode; // parallelism, runstate, queue mode 877 WorkQueue[] workQueues; // main registry 878 string workerNamePrefix; // for worker thread string; sync lock 879 Object workerNameLocker; 880 ForkJoinWorkerThreadFactory factory; 881 UncaughtExceptionHandler ueh; // per-worker UEH 882 Predicate!(ForkJoinPool) saturate; 883 884 // @jdk.internal.vm.annotation.Contended("fjpctl") // segregate 885 shared long ctl; // main pool control 886 887 // Creating, registering and deregistering workers 888 889 /** 890 * Tries to construct and start one worker. Assumes that total 891 * count has already been incremented as a reservation. Invokes 892 * deregisterWorker on any failure. 893 * 894 * @return true if successful 895 */ 896 private bool createWorker() { 897 ForkJoinWorkerThreadFactory fac = factory; 898 Throwable ex = null; 899 ForkJoinWorkerThread wt = null; 900 try { 901 if (fac !is null && (wt = fac.newThread(this)) !is null) { 902 wt.start(); 903 return true; 904 } 905 } catch (Throwable rex) { 906 ex = rex; 907 } 908 deregisterWorker(wt, ex); 909 return false; 910 } 911 912 /** 913 * Tries to add one worker, incrementing ctl counts before doing 914 * so, relying on createWorker to back out on failure. 915 * 916 * @param c incoming ctl value, with total count negative and no 917 * idle workers. On CAS failure, c is refreshed and retried if 918 * this holds (otherwise, a new worker is not needed). 919 */ 920 private void tryAddWorker(long c) { 921 do { 922 long nc = ((RC_MASK & (c + RC_UNIT)) | 923 (TC_MASK & (c + TC_UNIT))); 924 if (ctl == c && AtomicHelper.compareAndSet(this.ctl, c, nc)) { 925 createWorker(); 926 break; 927 } 928 } while (((c = ctl) & ADD_WORKER) != 0L && cast(int)c == 0); 929 } 930 931 /** 932 * Callback from ForkJoinWorkerThread constructor to establish and 933 * record its WorkQueue. 934 * 935 * @param wt the worker thread 936 * @return the worker's queue 937 */ 938 final WorkQueue registerWorker(ForkJoinWorkerThread wt) { 939 UncaughtExceptionHandler handler; 940 wt.isDaemon(true); // configure thread 941 if ((handler = ueh) !is null) 942 wt.setUncaughtExceptionHandler(handler); 943 int tid = 0; // for thread name 944 int idbits = mode & FIFO; 945 string prefix = workerNamePrefix; 946 WorkQueue w = new WorkQueue(this, wt); 947 if (prefix !is null) { 948 synchronized (this) { 949 WorkQueue[] ws = workQueues; 950 int n; 951 int s = indexSeed += SEED_INCREMENT; 952 idbits |= (s & ~(SMASK | FIFO | DORMANT)); 953 if (ws !is null && (n = cast(int)ws.length) > 1) { 954 int m = n - 1; 955 tid = m & ((s << 1) | 1); // odd-numbered indices 956 for (int probes = n >>> 1;;) { // find empty slot 957 WorkQueue q; 958 if ((q = ws[tid]) is null || q.phase == QUIET) 959 break; 960 else if (--probes == 0) { 961 tid = n | 1; // resize below 962 break; 963 } 964 else 965 tid = (tid + 2) & m; 966 } 967 w.phase = w.id = tid | idbits; // now publishable 968 969 if (tid < n) 970 ws[tid] = w; 971 else { // expand array 972 int an = n << 1; 973 WorkQueue[] as = new WorkQueue[an]; 974 as[tid] = w; 975 int am = an - 1; 976 for (int j = 0; j < n; ++j) { 977 WorkQueue v; // copy external queue 978 if ((v = ws[j]) !is null) // position may change 979 as[v.id & am & SQMASK] = v; 980 if (++j >= n) 981 break; 982 as[j] = ws[j]; // copy worker 983 } 984 workQueues = as; 985 } 986 } 987 } 988 wt.name(prefix ~ tid.to!string()); 989 } 990 return w; 991 } 992 993 /** 994 * Final callback from terminating worker, as well as upon failure 995 * to construct or start a worker. Removes record of worker from 996 * array, and adjusts counts. If pool is shutting down, tries to 997 * complete termination. 998 * 999 * @param wt the worker thread, or null if construction failed 1000 * @param ex the exception causing failure, or null if none 1001 */ 1002 final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) { 1003 WorkQueue w = null; 1004 int phase = 0; 1005 if (wt !is null && (w = wt.workQueue) !is null) { 1006 int wid = w.id; 1007 long ns = cast(long)w.nsteals & 0xffffffffL; 1008 if (!workerNamePrefix.empty()) { 1009 synchronized (this) { 1010 WorkQueue[] ws; size_t n, i; // remove index from array 1011 if ((ws = workQueues) !is null && (n = ws.length) > 0 && 1012 ws[i = wid & (n - 1)] == w) 1013 ws[i] = null; 1014 stealCount += ns; 1015 } 1016 } 1017 phase = w.phase; 1018 } 1019 if (phase != QUIET) { // else pre-adjusted 1020 long c; // decrement counts 1021 do {} while (!AtomicHelper.compareAndSet(this.ctl, c = ctl, 1022 ((RC_MASK & (c - RC_UNIT)) | 1023 (TC_MASK & (c - TC_UNIT)) | 1024 (SP_MASK & c)))); 1025 } 1026 if (w !is null) 1027 w.cancelAll(); // cancel remaining tasks 1028 1029 if (!tryTerminate(false, false) && // possibly replace worker 1030 w !is null && w.array !is null) // avoid repeated failures 1031 signalWork(); 1032 1033 if (ex is null) // help clean on way out 1034 ForkJoinTaskHelper.helpExpungeStaleExceptions(); 1035 else // rethrow 1036 ForkJoinTaskHelper.rethrow(ex); 1037 } 1038 1039 /** 1040 * Tries to create or release a worker if too few are running. 1041 */ 1042 final void signalWork() { 1043 for (;;) { 1044 long c; int sp; WorkQueue[] ws; int i; WorkQueue v; 1045 if ((c = ctl) >= 0L) // enough workers 1046 break; 1047 else if ((sp = cast(int)c) == 0) { // no idle workers 1048 if ((c & ADD_WORKER) != 0L) // too few workers 1049 tryAddWorker(c); 1050 break; 1051 } 1052 else if ((ws = workQueues) is null) 1053 break; // unstarted/terminated 1054 else if (ws.length <= (i = sp & SMASK)) 1055 break; // terminated 1056 else if ((v = ws[i]) is null) 1057 break; // terminating 1058 else { 1059 int np = sp & ~UNSIGNALLED; 1060 int vp = v.phase; 1061 long nc = (v.stackPred & SP_MASK) | (UC_MASK & (c + RC_UNIT)); 1062 Thread vt = v.owner; 1063 if (sp == vp && AtomicHelper.compareAndSet(this.ctl, c, nc)) { 1064 v.phase = np; 1065 if (vt !is null && v.source < 0) 1066 LockSupport.unpark(vt); 1067 break; 1068 } 1069 } 1070 } 1071 } 1072 1073 /** 1074 * Tries to decrement counts (sometimes implicitly) and possibly 1075 * arrange for a compensating worker in preparation for blocking: 1076 * If not all core workers yet exist, creates one, else if any are 1077 * unreleased (possibly including caller) releases one, else if 1078 * fewer than the minimum allowed number of workers running, 1079 * checks to see that they are all active, and if so creates an 1080 * extra worker unless over maximum limit and policy is to 1081 * saturate. Most of these steps can fail due to interference, in 1082 * which case 0 is returned so caller will retry. A negative 1083 * return value indicates that the caller doesn't need to 1084 * re-adjust counts when later unblocked. 1085 * 1086 * @return 1: block then adjust, -1: block without adjust, 0 : retry 1087 */ 1088 private int tryCompensate(WorkQueue w) { 1089 int t, n, sp; 1090 long c = ctl; 1091 WorkQueue[] ws = workQueues; 1092 if ((t = cast(short)(c >>> TC_SHIFT)) >= 0) { 1093 if (ws is null || (n = cast(int)ws.length) <= 0 || w is null) 1094 return 0; // disabled 1095 else if ((sp = cast(int)c) != 0) { // replace or release 1096 WorkQueue v = ws[sp & (n - 1)]; 1097 int wp = w.phase; 1098 long uc = UC_MASK & ((wp < 0) ? c + RC_UNIT : c); 1099 int np = sp & ~UNSIGNALLED; 1100 if (v !is null) { 1101 int vp = v.phase; 1102 Thread vt = v.owner; 1103 long nc = (cast(long)v.stackPred & SP_MASK) | uc; 1104 if (vp == sp && AtomicHelper.compareAndSet(this.ctl, c, nc)) { 1105 v.phase = np; 1106 if (vt !is null && v.source < 0) 1107 LockSupport.unpark(vt); 1108 return (wp < 0) ? -1 : 1; 1109 } 1110 } 1111 return 0; 1112 } 1113 else if (cast(int)(c >> RC_SHIFT) - // reduce parallelism 1114 cast(short)(bounds & SMASK) > 0) { 1115 long nc = ((RC_MASK & (c - RC_UNIT)) | (~RC_MASK & c)); 1116 return AtomicHelper.compareAndSet(this.ctl, c, nc) ? 1 : 0; 1117 } 1118 else { // validate 1119 int md = mode, pc = md & SMASK, tc = pc + t, bc = 0; 1120 bool unstable = false; 1121 for (int i = 1; i < n; i += 2) { 1122 WorkQueue q; ThreadEx wt; ThreadState ts; 1123 if ((q = ws[i]) !is null) { 1124 if (q.source == 0) { 1125 unstable = true; 1126 break; 1127 } 1128 else { 1129 --tc; 1130 if ((wt = q.owner) !is null && 1131 ((ts = wt.getState()) == ThreadState.BLOCKED || 1132 ts == ThreadState.WAITING)) 1133 ++bc; // worker is blocking 1134 } 1135 } 1136 } 1137 if (unstable || tc != 0 || ctl != c) 1138 return 0; // inconsistent 1139 else if (t + pc >= MAX_CAP || t >= (bounds >>> SWIDTH)) { 1140 Predicate!(ForkJoinPool) sat; 1141 if ((sat = saturate) !is null && sat(this)) 1142 return -1; 1143 else if (bc < pc) { // lagging 1144 Thread.yield(); // for retry spins 1145 return 0; 1146 } 1147 else 1148 throw new RejectedExecutionException( 1149 "Thread limit exceeded replacing blocked worker"); 1150 } 1151 } 1152 } 1153 1154 long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK); // expand pool 1155 return AtomicHelper.compareAndSet(this.ctl, c, nc) && createWorker() ? 1 : 0; 1156 } 1157 1158 /** 1159 * Top-level runloop for workers, called by ForkJoinWorkerThread.run. 1160 * See above for explanation. 1161 */ 1162 final void runWorker(WorkQueue w) { 1163 int r = (w.id ^ ThreadLocalRandom.nextSecondarySeed()) | FIFO; // rng 1164 w.array = new IForkJoinTask[INITIAL_QUEUE_CAPACITY]; // initialize 1165 for (;;) { 1166 int phase; 1167 if (scan(w, r)) { // scan until apparently empty 1168 r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // move (xorshift) 1169 } 1170 else if ((phase = w.phase) >= 0) { // enqueue, then rescan 1171 long np = (w.phase = (phase + SS_SEQ) | UNSIGNALLED) & SP_MASK; 1172 long c, nc; 1173 do { 1174 w.stackPred = cast(int)(c = ctl); 1175 nc = ((c - RC_UNIT) & UC_MASK) | np; 1176 } while (!AtomicHelper.compareAndSet(this.ctl, c, nc)); 1177 } 1178 else { // already queued 1179 int pred = w.stackPred; 1180 ThreadEx.interrupted(); // clear before park 1181 w.source = DORMANT; // enable signal 1182 long c = ctl; 1183 int md = mode, rc = (md & SMASK) + cast(int)(c >> RC_SHIFT); 1184 if (md < 0) // terminating 1185 break; 1186 else if (rc <= 0 && (md & SHUTDOWN) != 0 && 1187 tryTerminate(false, false)) 1188 break; // quiescent shutdown 1189 else if (rc <= 0 && pred != 0 && phase == cast(int)c) { 1190 long nc = (UC_MASK & (c - TC_UNIT)) | (SP_MASK & pred); 1191 long d = keepAlive + DateTimeHelper.currentTimeMillis(); 1192 LockSupport.parkUntil(this, d); 1193 if (ctl == c && // drop on timeout if all idle 1194 d - DateTimeHelper.currentTimeMillis() <= TIMEOUT_SLOP && 1195 AtomicHelper.compareAndSet(this.ctl, c, nc)) { 1196 w.phase = QUIET; 1197 break; 1198 } 1199 } 1200 else if (w.phase < 0) 1201 LockSupport.park(this); // OK if spuriously woken 1202 w.source = 0; // disable signal 1203 } 1204 } 1205 } 1206 1207 /** 1208 * Scans for and if found executes one or more top-level tasks from a queue. 1209 * 1210 * @return true if found an apparently non-empty queue, and 1211 * possibly ran task(s). 1212 */ 1213 private bool scan(WorkQueue w, int r) { 1214 WorkQueue[] ws; int n; 1215 if ((ws = workQueues) !is null && (n = cast(int)ws.length) > 0 && w !is null) { 1216 for (int m = n - 1, j = r & m;;) { 1217 WorkQueue q; int b; 1218 if ((q = ws[j]) !is null && q.top != (b = q.base)) { 1219 int qid = q.id; 1220 IForkJoinTask[] a; size_t cap, k; IForkJoinTask t; 1221 if ((a = q.array) !is null && (cap = cast(int)a.length) > 0) { 1222 k = (cap - 1) & b; 1223 // import core.atomic; 1224 // auto ss = core.atomic.atomicLoad((cast(shared)a[k])); 1225 // FIXME: Needing refactor or cleanup -@zxp at 2/6/2019, 5:12:19 PM 1226 // 1227 // t = AtomicHelper.load(a[k]); 1228 t = a[k]; 1229 if (q.base == b++ && t !is null && 1230 AtomicHelper.compareAndSet(a[k], t, null)) { 1231 q.base = b; 1232 w.source = qid; 1233 if (q.top - b > 0) 1234 signalWork(); 1235 w.topLevelExec(t, q, // random fairness bound 1236 r & ((n << TOP_BOUND_SHIFT) - 1)); 1237 } 1238 } 1239 return true; 1240 } 1241 else if (--n > 0) 1242 j = (j + 1) & m; 1243 else 1244 break; 1245 } 1246 } 1247 return false; 1248 } 1249 1250 /** 1251 * Helps and/or blocks until the given task is done or timeout. 1252 * First tries locally helping, then scans other queues for a task 1253 * produced by one of w's stealers; compensating and blocking if 1254 * none are found (rescanning if tryCompensate fails). 1255 * 1256 * @param w caller 1257 * @param task the task 1258 * @param deadline for timed waits, if nonzero 1259 * @return task status on exit 1260 */ 1261 final int awaitJoin(WorkQueue w, IForkJoinTask task, MonoTime deadline) { 1262 if(w is null || task is null) { 1263 return 0; 1264 } 1265 int s = 0; 1266 int seed = ThreadLocalRandom.nextSecondarySeed(); 1267 ICountedCompleter cc = cast(ICountedCompleter)task; 1268 if(cc !is null) { 1269 s = w.helpCC(cc, 0, false); 1270 if(s<0) 1271 return 0; 1272 } 1273 1274 w.tryRemoveAndExec(task); 1275 int src = w.source, id = w.id; 1276 int r = (seed >>> 16) | 1, step = (seed & ~1) | 2; 1277 s = task.getStatus(); 1278 while (s >= 0) { 1279 WorkQueue[] ws; 1280 int n = (ws = workQueues) is null ? 0 : cast(int)ws.length, m = n - 1; 1281 while (n > 0) { 1282 WorkQueue q; int b; 1283 if ((q = ws[r & m]) !is null && q.source == id && 1284 q.top != (b = q.base)) { 1285 IForkJoinTask[] a; int cap, k; 1286 int qid = q.id; 1287 if ((a = q.array) !is null && (cap = cast(int)a.length) > 0) { 1288 k = (cap - 1) & b; 1289 // FIXME: Needing refactor or cleanup -@zxp at 2/6/2019, 5:13:08 PM 1290 // 1291 // IForkJoinTask t = AtomicHelper.load(a[k]); 1292 IForkJoinTask t = a[k]; 1293 if (q.source == id && q.base == b++ && 1294 t !is null && AtomicHelper.compareAndSet(a[k], t, cast(IForkJoinTask)null)) { 1295 q.base = b; 1296 w.source = qid; 1297 t.doExec(); 1298 w.source = src; 1299 } 1300 } 1301 break; 1302 } 1303 else { 1304 r += step; 1305 --n; 1306 } 1307 } 1308 1309 if ((s = task.getStatus()) < 0) 1310 break; 1311 else if (n == 0) { // empty scan 1312 long ms; int block; 1313 Duration ns; 1314 if (deadline == MonoTime.zero()) 1315 ms = 0L; // untimed 1316 else if ((ns = deadline - MonoTime.currTime) <= Duration.zero()) 1317 break; // timeout 1318 else if ((ms = ns.total!(TimeUnit.Millisecond)()) <= 0L) 1319 ms = 1L; // avoid 0 for timed wait 1320 if ((block = tryCompensate(w)) != 0) { 1321 task.internalWait(ms); 1322 AtomicHelper.getAndAdd(this.ctl, (block > 0) ? RC_UNIT : 0L); 1323 } 1324 s = task.getStatus(); 1325 } 1326 } 1327 return s; 1328 } 1329 1330 /** 1331 * Runs tasks until {@code isQuiescent()}. Rather than blocking 1332 * when tasks cannot be found, rescans until all others cannot 1333 * find tasks either. 1334 */ 1335 final void helpQuiescePool(WorkQueue w) { 1336 int prevSrc = w.source; 1337 int seed = ThreadLocalRandom.nextSecondarySeed(); 1338 int r = seed >>> 16, step = r | 1; 1339 for (int source = prevSrc, released = -1;;) { // -1 until known 1340 IForkJoinTask localTask; WorkQueue[] ws; 1341 while ((localTask = w.nextLocalTask()) !is null) 1342 localTask.doExec(); 1343 if (w.phase >= 0 && released == -1) 1344 released = 1; 1345 bool quiet = true, empty = true; 1346 int n = (ws = workQueues) is null ? 0 : cast(int)ws.length; 1347 for (int m = n - 1; n > 0; r += step, --n) { 1348 WorkQueue q; int b; 1349 if ((q = ws[r & m]) !is null) { 1350 int qs = q.source; 1351 if (q.top != (b = q.base)) { 1352 quiet = empty = false; 1353 IForkJoinTask[] a; int cap, k; 1354 int qid = q.id; 1355 if ((a = q.array) !is null && (cap = cast(int)a.length) > 0) { 1356 if (released == 0) { // increment 1357 released = 1; 1358 AtomicHelper.getAndAdd(this.ctl, RC_UNIT); 1359 } 1360 k = (cap - 1) & b; 1361 // IForkJoinTask t = AtomicHelper.load(a[k]); 1362 // FIXME: Needing refactor or cleanup -@zxp at 2/6/2019, 9:32:07 PM 1363 // 1364 IForkJoinTask t = a[k]; 1365 if (q.base == b++ && t !is null) { 1366 if(AtomicHelper.compareAndSet(a[k], t, null)) { 1367 q.base = b; 1368 w.source = qid; 1369 t.doExec(); 1370 w.source = source = prevSrc; 1371 } 1372 } 1373 } 1374 break; 1375 } 1376 else if ((qs & QUIET) == 0) 1377 quiet = false; 1378 } 1379 } 1380 if (quiet) { 1381 if (released == 0) { 1382 AtomicHelper.getAndAdd(this.ctl, RC_UNIT); 1383 } 1384 w.source = prevSrc; 1385 break; 1386 } 1387 else if (empty) { 1388 if (source != QUIET) 1389 w.source = source = QUIET; 1390 if (released == 1) { // decrement 1391 released = 0; 1392 AtomicHelper.getAndAdd(this.ctl, RC_MASK & -RC_UNIT); 1393 } 1394 } 1395 } 1396 } 1397 1398 /** 1399 * Scans for and returns a polled task, if available. 1400 * Used only for untracked polls. 1401 * 1402 * @param submissionsOnly if true, only scan submission queues 1403 */ 1404 private IForkJoinTask pollScan(bool submissionsOnly) { 1405 WorkQueue[] ws; int n; 1406 rescan: while ((mode & STOP) == 0 && (ws = workQueues) !is null && 1407 (n = cast(int)ws.length) > 0) { 1408 int m = n - 1; 1409 int r = ThreadLocalRandom.nextSecondarySeed(); 1410 int h = r >>> 16; 1411 int origin, step; 1412 if (submissionsOnly) { 1413 origin = (r & ~1) & m; // even indices and steps 1414 step = (h & ~1) | 2; 1415 } 1416 else { 1417 origin = r & m; 1418 step = h | 1; 1419 } 1420 bool nonempty = false; 1421 for (int i = origin, oldSum = 0, checkSum = 0;;) { 1422 WorkQueue q; 1423 if ((q = ws[i]) !is null) { 1424 int b; IForkJoinTask t; 1425 if (q.top - (b = q.base) > 0) { 1426 nonempty = true; 1427 if ((t = q.poll()) !is null) 1428 return t; 1429 } 1430 else 1431 checkSum += b + q.id; 1432 } 1433 if ((i = (i + step) & m) == origin) { 1434 if (!nonempty && oldSum == (oldSum = checkSum)) 1435 break rescan; 1436 checkSum = 0; 1437 nonempty = false; 1438 } 1439 } 1440 } 1441 return null; 1442 } 1443 1444 /** 1445 * Gets and removes a local or stolen task for the given worker. 1446 * 1447 * @return a task, if available 1448 */ 1449 final IForkJoinTask nextTaskFor(WorkQueue w) { 1450 IForkJoinTask t; 1451 if (w is null || (t = w.nextLocalTask()) is null) 1452 t = pollScan(false); 1453 return t; 1454 } 1455 1456 // External operations 1457 1458 /** 1459 * Adds the given task to a submission queue at submitter's 1460 * current queue, creating one if null or contended. 1461 * 1462 * @param task the task. Caller must ensure non-null. 1463 */ 1464 final void externalPush(IForkJoinTask task) { 1465 int r; // initialize caller's probe 1466 if ((r = ThreadLocalRandom.getProbe()) == 0) { 1467 ThreadLocalRandom.localInit(); 1468 r = ThreadLocalRandom.getProbe(); 1469 } 1470 for (;;) { 1471 WorkQueue q; 1472 int md = mode, n; 1473 WorkQueue[] ws = workQueues; 1474 if ((md & SHUTDOWN) != 0 || ws is null || (n = cast(int)ws.length) <= 0) 1475 throw new RejectedExecutionException(); 1476 else if ((q = ws[(n - 1) & r & SQMASK]) is null) { // add queue 1477 int qid = (r | QUIET) & ~(FIFO | OWNED); 1478 Object lock = workerNameLocker; 1479 IForkJoinTask[] qa = 1480 new IForkJoinTask[INITIAL_QUEUE_CAPACITY]; 1481 q = new WorkQueue(this, null); 1482 q.array = qa; 1483 q.id = qid; 1484 q.source = QUIET; 1485 if (lock !is null) { // unless disabled, lock pool to install 1486 synchronized (lock) { 1487 WorkQueue[] vs; int i, vn; 1488 if ((vs = workQueues) !is null && (vn = cast(int)vs.length) > 0 && 1489 vs[i = qid & (vn - 1) & SQMASK] is null) 1490 vs[i] = q; // else another thread already installed 1491 } 1492 } 1493 } 1494 else if (!q.tryLockPhase()) // move if busy 1495 r = ThreadLocalRandom.advanceProbe(r); 1496 else { 1497 if (q.lockedPush(task)) 1498 signalWork(); 1499 return; 1500 } 1501 } 1502 } 1503 1504 /** 1505 * Pushes a possibly-external submission. 1506 */ 1507 private ForkJoinTask!(T) externalSubmit(T)(ForkJoinTask!(T) task) { 1508 if (task is null) 1509 throw new NullPointerException(); 1510 ForkJoinWorkerThread w = cast(ForkJoinWorkerThread)Thread.getThis(); 1511 WorkQueue q; 1512 if ( w !is null && w.pool is this && 1513 (q = w.workQueue) !is null) 1514 q.push(task); 1515 else 1516 externalPush(task); 1517 return task; 1518 } 1519 1520 /** 1521 * Returns common pool queue for an external thread. 1522 */ 1523 static WorkQueue commonSubmitterQueue() { 1524 ForkJoinPool p = common; 1525 int r = ThreadLocalRandom.getProbe(); 1526 WorkQueue[] ws; int n; 1527 return (p !is null && (ws = p.workQueues) !is null && 1528 (n = cast(int)ws.length) > 0) ? 1529 ws[(n - 1) & r & SQMASK] : null; 1530 } 1531 1532 /** 1533 * Performs tryUnpush for an external submitter. 1534 */ 1535 final bool tryExternalUnpush(IForkJoinTask task) { 1536 int r = ThreadLocalRandom.getProbe(); 1537 WorkQueue[] ws; WorkQueue w; int n; 1538 return ((ws = workQueues) !is null && 1539 (n = cast(int)ws.length) > 0 && 1540 (w = ws[(n - 1) & r & SQMASK]) !is null && 1541 w.tryLockedUnpush(task)); 1542 } 1543 1544 /** 1545 * Performs helpComplete for an external submitter. 1546 */ 1547 final int externalHelpComplete(ICountedCompleter task, int maxTasks) { 1548 int r = ThreadLocalRandom.getProbe(); 1549 WorkQueue[] ws; WorkQueue w; int n; 1550 return ((ws = workQueues) !is null && (n = cast(int)ws.length) > 0 && 1551 (w = ws[(n - 1) & r & SQMASK]) !is null) ? 1552 w.helpCC(task, maxTasks, true) : 0; 1553 } 1554 1555 /** 1556 * Tries to steal and run tasks within the target's computation. 1557 * The maxTasks argument supports external usages; internal calls 1558 * use zero, allowing unbounded steps (external calls trap 1559 * non-positive values). 1560 * 1561 * @param w caller 1562 * @param maxTasks if non-zero, the maximum number of other tasks to run 1563 * @return task status on exit 1564 */ 1565 final int helpComplete(WorkQueue w, ICountedCompleter task, 1566 int maxTasks) { 1567 return (w is null) ? 0 : w.helpCC(task, maxTasks, false); 1568 } 1569 1570 /** 1571 * Returns a cheap heuristic guide for task partitioning when 1572 * programmers, frameworks, tools, or languages have little or no 1573 * idea about task granularity. In essence, by offering this 1574 * method, we ask users only about tradeoffs in overhead vs 1575 * expected throughput and its variance, rather than how finely to 1576 * partition tasks. 1577 * 1578 * In a steady state strict (tree-structured) computation, each 1579 * thread makes available for stealing enough tasks for other 1580 * threads to remain active. Inductively, if all threads play by 1581 * the same rules, each thread should make available only a 1582 * constant number of tasks. 1583 * 1584 * The minimum useful constant is just 1. But using a value of 1 1585 * would require immediate replenishment upon each steal to 1586 * maintain enough tasks, which is infeasible. Further, 1587 * partitionings/granularities of offered tasks should minimize 1588 * steal rates, which in general means that threads nearer the top 1589 * of computation tree should generate more than those nearer the 1590 * bottom. In perfect steady state, each thread is at 1591 * approximately the same level of computation tree. However, 1592 * producing extra tasks amortizes the uncertainty of progress and 1593 * diffusion assumptions. 1594 * 1595 * So, users will want to use values larger (but not much larger) 1596 * than 1 to both smooth over shortages and hedge 1597 * against uneven progress; as traded off against the cost of 1598 * extra task overhead. We leave the user to pick a threshold 1599 * value to compare with the results of this call to guide 1600 * decisions, but recommend values such as 3. 1601 * 1602 * When all threads are active, it is on average OK to estimate 1603 * surplus strictly locally. In steady-state, if one thread is 1604 * maintaining say 2 surplus tasks, then so are others. So we can 1605 * just use estimated queue length. However, this strategy alone 1606 * leads to serious mis-estimates in some non-steady-state 1607 * conditions (ramp-up, ramp-down, other stalls). We can detect 1608 * many of these by further considering the number of "idle" 1609 * threads, that are known to have zero queued tasks, so 1610 * compensate by a factor of (#idle/#active) threads. 1611 */ 1612 static int getSurplusQueuedTaskCount() { 1613 Thread t = Thread.getThis(); 1614 ForkJoinWorkerThread wt = cast(ForkJoinWorkerThread)t; 1615 ForkJoinPool pool; WorkQueue q; 1616 1617 if (wt !is null && (pool = wt.pool) !is null && 1618 (q = wt.workQueue) !is null) { 1619 int p = pool.mode & SMASK; 1620 int a = p + cast(int)(pool.ctl >> RC_SHIFT); 1621 int n = q.top - q.base; 1622 return n - (a > (p >>>= 1) ? 0 : 1623 a > (p >>>= 1) ? 1 : 1624 a > (p >>>= 1) ? 2 : 1625 a > (p >>>= 1) ? 4 : 1626 8); 1627 } 1628 return 0; 1629 } 1630 1631 // Termination 1632 1633 /** 1634 * Possibly initiates and/or completes termination. 1635 * 1636 * @param now if true, unconditionally terminate, else only 1637 * if no work and no active workers 1638 * @param enable if true, terminate when next possible 1639 * @return true if terminating or terminated 1640 */ 1641 private bool tryTerminate(bool now, bool enable) { 1642 int md; // 3 phases: try to set SHUTDOWN, then STOP, then TERMINATED 1643 1644 while (((md = mode) & SHUTDOWN) == 0) { 1645 if (!enable || this == common) // cannot shutdown 1646 return false; 1647 else { 1648 AtomicHelper.compareAndSet(this.mode, md, md | SHUTDOWN); 1649 } 1650 1651 } 1652 1653 while (((md = mode) & STOP) == 0) { // try to initiate termination 1654 if (!now) { // check if quiescent & empty 1655 for (long oldSum = 0L;;) { // repeat until stable 1656 bool running = false; 1657 long checkSum = ctl; 1658 WorkQueue[] ws = workQueues; 1659 if ((md & SMASK) + cast(int)(checkSum >> RC_SHIFT) > 0) 1660 running = true; 1661 else if (ws !is null) { 1662 WorkQueue w; 1663 for (int i = 0; i < ws.length; ++i) { 1664 if ((w = ws[i]) !is null) { 1665 int s = w.source, p = w.phase; 1666 int d = w.id, b = w.base; 1667 if (b != w.top || 1668 ((d & 1) == 1 && (s >= 0 || p >= 0))) { 1669 running = true; 1670 break; // working, scanning, or have work 1671 } 1672 checkSum += ((cast(long)s << 48) + (cast(long)p << 32) + 1673 (cast(long)b << 16) + cast(long)d); 1674 } 1675 } 1676 } 1677 if (((md = mode) & STOP) != 0) 1678 break; // already triggered 1679 else if (running) 1680 return false; 1681 else if (workQueues == ws && oldSum == (oldSum = checkSum)) 1682 break; 1683 } 1684 } 1685 if ((md & STOP) == 0) 1686 AtomicHelper.compareAndSet(this.mode, md, md | STOP); 1687 } 1688 1689 while (((md = mode) & TERMINATED) == 0) { // help terminate others 1690 for (long oldSum = 0L;;) { // repeat until stable 1691 WorkQueue[] ws; WorkQueue w; 1692 long checkSum = ctl; 1693 if ((ws = workQueues) !is null) { 1694 for (int i = 0; i < ws.length; ++i) { 1695 if ((w = ws[i]) !is null) { 1696 ForkJoinWorkerThread wt = w.owner; 1697 w.cancelAll(); // clear queues 1698 if (wt !is null) { 1699 try { // unblock join or park 1700 wt.interrupt(); 1701 } catch (Throwable ignore) { 1702 } 1703 } 1704 checkSum += (cast(long)w.phase << 32) + w.base; 1705 } 1706 } 1707 } 1708 if (((md = mode) & TERMINATED) != 0 || 1709 (workQueues == ws && oldSum == (oldSum = checkSum))) 1710 break; 1711 } 1712 if ((md & TERMINATED) != 0) 1713 break; 1714 else if ((md & SMASK) + cast(short)(ctl >>> TC_SHIFT) > 0) 1715 break; 1716 else if (AtomicHelper.compareAndSet(this.mode, md, md | TERMINATED)) { 1717 synchronized (this) { 1718 // notifyAll(); // for awaitTermination 1719 // TODO: Tasks pending completion -@zxp at 2/4/2019, 11:03:21 AM 1720 // 1721 } 1722 break; 1723 } 1724 } 1725 return true; 1726 } 1727 1728 // Exported methods 1729 1730 // Constructors 1731 1732 /** 1733 * Creates a {@code ForkJoinPool} with parallelism equal to {@link 1734 * java.lang.Runtime#availableProcessors}, using defaults for all 1735 * other parameters (see {@link #ForkJoinPool(int, 1736 * ForkJoinWorkerThreadFactory, UncaughtExceptionHandler, bool, 1737 * int, int, int, Predicate, long, TimeUnit)}). 1738 * 1739 * @throws SecurityException if a security manager exists and 1740 * the caller is not permitted to modify threads 1741 * because it does not hold {@link 1742 * java.lang.RuntimePermission}{@code ("modifyThread")} 1743 */ 1744 this() { 1745 this(min(MAX_CAP, totalCPUs), 1746 defaultForkJoinWorkerThreadFactory, null, false, 1747 0, MAX_CAP, 1, null, dur!(TimeUnit.Millisecond)(DEFAULT_KEEPALIVE)); 1748 } 1749 1750 /** 1751 * Creates a {@code ForkJoinPool} with the indicated parallelism 1752 * level, using defaults for all other parameters (see {@link 1753 * #ForkJoinPool(int, ForkJoinWorkerThreadFactory, 1754 * UncaughtExceptionHandler, bool, int, int, int, Predicate, 1755 * long, TimeUnit)}). 1756 * 1757 * @param parallelism the parallelism level 1758 * @throws IllegalArgumentException if parallelism less than or 1759 * equal to zero, or greater than implementation limit 1760 * @throws SecurityException if a security manager exists and 1761 * the caller is not permitted to modify threads 1762 * because it does not hold {@link 1763 * java.lang.RuntimePermission}{@code ("modifyThread")} 1764 */ 1765 this(int parallelism) { 1766 this(parallelism, defaultForkJoinWorkerThreadFactory, null, false, 1767 0, MAX_CAP, 1, null, dur!(TimeUnit.Millisecond)(DEFAULT_KEEPALIVE)); 1768 } 1769 1770 /** 1771 * Creates a {@code ForkJoinPool} with the given parameters (using 1772 * defaults for others -- see {@link #ForkJoinPool(int, 1773 * ForkJoinWorkerThreadFactory, UncaughtExceptionHandler, bool, 1774 * int, int, int, Predicate, long, TimeUnit)}). 1775 * 1776 * @param parallelism the parallelism level. For default value, 1777 * use {@link java.lang.Runtime#availableProcessors}. 1778 * @param factory the factory for creating new threads. For default value, 1779 * use {@link #defaultForkJoinWorkerThreadFactory}. 1780 * @param handler the handler for internal worker threads that 1781 * terminate due to unrecoverable errors encountered while executing 1782 * tasks. For default value, use {@code null}. 1783 * @param asyncMode if true, 1784 * establishes local first-in-first-out scheduling mode for forked 1785 * tasks that are never joined. This mode may be more appropriate 1786 * than default locally stack-based mode in applications in which 1787 * worker threads only process event-style asynchronous tasks. 1788 * For default value, use {@code false}. 1789 * @throws IllegalArgumentException if parallelism less than or 1790 * equal to zero, or greater than implementation limit 1791 * @throws NullPointerException if the factory is null 1792 * @throws SecurityException if a security manager exists and 1793 * the caller is not permitted to modify threads 1794 * because it does not hold {@link 1795 * java.lang.RuntimePermission}{@code ("modifyThread")} 1796 */ 1797 this(int parallelism, 1798 ForkJoinWorkerThreadFactory factory, 1799 UncaughtExceptionHandler handler, 1800 bool asyncMode) { 1801 this(parallelism, factory, handler, asyncMode, 1802 0, MAX_CAP, 1, null, dur!(TimeUnit.Millisecond)(DEFAULT_KEEPALIVE)); 1803 } 1804 1805 /** 1806 * Creates a {@code ForkJoinPool} with the given parameters. 1807 * 1808 * @param parallelism the parallelism level. For default value, 1809 * use {@link java.lang.Runtime#availableProcessors}. 1810 * 1811 * @param factory the factory for creating new threads. For 1812 * default value, use {@link #defaultForkJoinWorkerThreadFactory}. 1813 * 1814 * @param handler the handler for internal worker threads that 1815 * terminate due to unrecoverable errors encountered while 1816 * executing tasks. For default value, use {@code null}. 1817 * 1818 * @param asyncMode if true, establishes local first-in-first-out 1819 * scheduling mode for forked tasks that are never joined. This 1820 * mode may be more appropriate than default locally stack-based 1821 * mode in applications in which worker threads only process 1822 * event-style asynchronous tasks. For default value, use {@code 1823 * false}. 1824 * 1825 * @param corePoolSize the number of threads to keep in the pool 1826 * (unless timed out after an elapsed keep-alive). Normally (and 1827 * by default) this is the same value as the parallelism level, 1828 * but may be set to a larger value to reduce dynamic overhead if 1829 * tasks regularly block. Using a smaller value (for example 1830 * {@code 0}) has the same effect as the default. 1831 * 1832 * @param maximumPoolSize the maximum number of threads allowed. 1833 * When the maximum is reached, attempts to replace blocked 1834 * threads fail. (However, because creation and termination of 1835 * different threads may overlap, and may be managed by the given 1836 * thread factory, this value may be transiently exceeded.) To 1837 * arrange the same value as is used by default for the common 1838 * pool, use {@code 256} plus the {@code parallelism} level. (By 1839 * default, the common pool allows a maximum of 256 spare 1840 * threads.) Using a value (for example {@code 1841 * Integer.MAX_VALUE}) larger than the implementation's total 1842 * thread limit has the same effect as using this limit (which is 1843 * the default). 1844 * 1845 * @param minimumRunnable the minimum allowed number of core 1846 * threads not blocked by a join or {@link ManagedBlocker}. To 1847 * ensure progress, when too few unblocked threads exist and 1848 * unexecuted tasks may exist, new threads are constructed, up to 1849 * the given maximumPoolSize. For the default value, use {@code 1850 * 1}, that ensures liveness. A larger value might improve 1851 * throughput in the presence of blocked activities, but might 1852 * not, due to increased overhead. A value of zero may be 1853 * acceptable when submitted tasks cannot have dependencies 1854 * requiring additional threads. 1855 * 1856 * @param saturate if non-null, a predicate invoked upon attempts 1857 * to create more than the maximum total allowed threads. By 1858 * default, when a thread is about to block on a join or {@link 1859 * ManagedBlocker}, but cannot be replaced because the 1860 * maximumPoolSize would be exceeded, a {@link 1861 * RejectedExecutionException} is thrown. But if this predicate 1862 * returns {@code true}, then no exception is thrown, so the pool 1863 * continues to operate with fewer than the target number of 1864 * runnable threads, which might not ensure progress. 1865 * 1866 * @param keepAliveTime the elapsed time since last use before 1867 * a thread is terminated (and then later replaced if needed). 1868 * For the default value, use {@code 60, TimeUnit.SECONDS}. 1869 * 1870 * @param unit the time unit for the {@code keepAliveTime} argument 1871 * 1872 * @throws IllegalArgumentException if parallelism is less than or 1873 * equal to zero, or is greater than implementation limit, 1874 * or if maximumPoolSize is less than parallelism, 1875 * of if the keepAliveTime is less than or equal to zero. 1876 * @throws NullPointerException if the factory is null 1877 * @throws SecurityException if a security manager exists and 1878 * the caller is not permitted to modify threads 1879 * because it does not hold {@link 1880 * java.lang.RuntimePermission}{@code ("modifyThread")} 1881 * @since 9 1882 */ 1883 this(int parallelism, 1884 ForkJoinWorkerThreadFactory factory, 1885 UncaughtExceptionHandler handler, 1886 bool asyncMode, 1887 int corePoolSize, 1888 int maximumPoolSize, 1889 int minimumRunnable, 1890 Predicate!(ForkJoinPool) saturate, 1891 Duration keepAliveTime) { 1892 // check, encode, pack parameters 1893 if (parallelism <= 0 || parallelism > MAX_CAP || 1894 maximumPoolSize < parallelism || keepAliveTime <= Duration.zero) 1895 throw new IllegalArgumentException(); 1896 if (factory is null) 1897 throw new NullPointerException(); 1898 long ms = max(keepAliveTime.total!(TimeUnit.Millisecond), TIMEOUT_SLOP); 1899 trace("111111"); 1900 1901 int corep = min(max(corePoolSize, parallelism), MAX_CAP); 1902 long c = (((cast(long)(-corep) << TC_SHIFT) & TC_MASK) | 1903 ((cast(long)(-parallelism) << RC_SHIFT) & RC_MASK)); 1904 int m = parallelism | (asyncMode ? FIFO : 0); 1905 int maxSpares = min(maximumPoolSize, MAX_CAP) - parallelism; 1906 int minAvail = min(max(minimumRunnable, 0), MAX_CAP); 1907 int b = ((minAvail - parallelism) & SMASK) | (maxSpares << SWIDTH); 1908 int n = (parallelism > 1) ? parallelism - 1 : 1; // at least 2 slots 1909 n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; 1910 n = (n + 1) << 1; // power of two, including space for submission queues 1911 1912 this.workerNamePrefix = "ForkJoinPool-" ~ nextPoolId().to!string() ~ "-worker-"; 1913 this.workQueues = new WorkQueue[n]; 1914 this.factory = factory; 1915 this.ueh = handler; 1916 this.saturate = saturate; 1917 this.keepAlive = ms; 1918 this.bounds = b; 1919 this.mode = m; 1920 this.ctl = c; 1921 // checkPermission(); 1922 workerNameLocker = new Object(); 1923 trace("111111"); 1924 } 1925 1926 // private static Object newInstanceFromSystemProperty(string property) { 1927 // string className = System.getProperty(property); 1928 // return (className is null) 1929 // ? null 1930 // : ClassLoader.getSystemClassLoader().loadClass(className) 1931 // .getConstructor().newInstance(); 1932 // } 1933 1934 /** 1935 * Constructor for common pool using parameters possibly 1936 * overridden by system properties 1937 */ 1938 // private this(byte forCommonPoolOnly) { 1939 // int parallelism = -1; 1940 // ForkJoinWorkerThreadFactory fac = null; 1941 // UncaughtExceptionHandler handler = null; 1942 // try { // ignore exceptions in accessing/parsing properties 1943 // string pp = System.getProperty 1944 // ("hunt.concurrency.ForkJoinPool.common.parallelism"); 1945 // if (pp !is null) 1946 // parallelism = Integer.parseInt(pp); 1947 // fac = (ForkJoinWorkerThreadFactory) newInstanceFromSystemProperty( 1948 // "hunt.concurrency.ForkJoinPool.common.threadFactory"); 1949 // handler = (UncaughtExceptionHandler) newInstanceFromSystemProperty( 1950 // "hunt.concurrency.ForkJoinPool.common.exceptionHandler"); 1951 // } catch (Exception ignore) { 1952 // } 1953 1954 // if (fac is null) { 1955 // if (System.getSecurityManager() is null) 1956 // fac = defaultForkJoinWorkerThreadFactory; 1957 // else // use security-managed default 1958 // fac = new InnocuousForkJoinWorkerThreadFactory(); 1959 // } 1960 // if (parallelism < 0 && // default 1 less than #cores 1961 // (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0) 1962 // parallelism = 1; 1963 // if (parallelism > MAX_CAP) 1964 // parallelism = MAX_CAP; 1965 1966 // long c = (((cast(long)(-parallelism) << TC_SHIFT) & TC_MASK) | 1967 // ((cast(long)(-parallelism) << RC_SHIFT) & RC_MASK)); 1968 // int b = ((1 - parallelism) & SMASK) | (COMMON_MAX_SPARES << SWIDTH); 1969 // int n = (parallelism > 1) ? parallelism - 1 : 1; 1970 // n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; 1971 // n = (n + 1) << 1; 1972 1973 // this.workerNamePrefix = "ForkJoinPool.commonPool-worker-"; 1974 // this.workQueues = new WorkQueue[n]; 1975 // this.factory = fac; 1976 // this.ueh = handler; 1977 // this.saturate = null; 1978 // this.keepAlive = DEFAULT_KEEPALIVE; 1979 // this.bounds = b; 1980 // this.mode = parallelism; 1981 // this.ctl = c; 1982 // } 1983 1984 /** 1985 * Returns the common pool instance. This pool is statically 1986 * constructed; its run state is unaffected by attempts to {@link 1987 * #shutdown} or {@link #shutdownNow}. However this pool and any 1988 * ongoing processing are automatically terminated upon program 1989 * {@link System#exit}. Any program that relies on asynchronous 1990 * task processing to complete before program termination should 1991 * invoke {@code commonPool().}{@link #awaitQuiescence awaitQuiescence}, 1992 * before exit. 1993 * 1994 * @return the common pool instance 1995 * @since 1.8 1996 */ 1997 static ForkJoinPool commonPool() { 1998 // assert common !is null : "static init error"; 1999 return common; 2000 } 2001 2002 // Execution methods 2003 2004 /** 2005 * Performs the given task, returning its result upon completion. 2006 * If the computation encounters an unchecked Exception or Error, 2007 * it is rethrown as the outcome of this invocation. Rethrown 2008 * exceptions behave in the same way as regular exceptions, but, 2009 * when possible, contain stack traces (as displayed for example 2010 * using {@code ex.printStackTrace()}) of both the current thread 2011 * as well as the thread actually encountering the exception; 2012 * minimally only the latter. 2013 * 2014 * @param task the task 2015 * @param (T) the type of the task's result 2016 * @return the task's result 2017 * @throws NullPointerException if the task is null 2018 * @throws RejectedExecutionException if the task cannot be 2019 * scheduled for execution 2020 */ 2021 T invoke(T)(ForkJoinTask!(T) task) { 2022 if (task is null) 2023 throw new NullPointerException(); 2024 externalSubmit!(T)(task); 2025 return task.join(); 2026 } 2027 2028 /** 2029 * Arranges for (asynchronous) execution of the given task. 2030 * 2031 * @param task the task 2032 * @throws NullPointerException if the task is null 2033 * @throws RejectedExecutionException if the task cannot be 2034 * scheduled for execution 2035 */ 2036 void execute(IForkJoinTask task) { 2037 implementationMissing(false); 2038 // externalSubmit(task); 2039 } 2040 2041 // AbstractExecutorService methods 2042 2043 /** 2044 * @throws NullPointerException if the task is null 2045 * @throws RejectedExecutionException if the task cannot be 2046 * scheduled for execution 2047 */ 2048 void execute(Runnable task) { 2049 if (task is null) 2050 throw new NullPointerException(); 2051 IForkJoinTask job = cast(IForkJoinTask) task; 2052 if (job is null) // avoid re-wrap 2053 job = new RunnableExecuteAction(task); 2054 // externalSubmit(job); 2055 implementationMissing(false); 2056 } 2057 2058 /** 2059 * Submits a ForkJoinTask for execution. 2060 * 2061 * @param task the task to submit 2062 * @param (T) the type of the task's result 2063 * @return the task 2064 * @throws NullPointerException if the task is null 2065 * @throws RejectedExecutionException if the task cannot be 2066 * scheduled for execution 2067 */ 2068 ForkJoinTask!(T) submitTask(T)(ForkJoinTask!(T) task) { 2069 return externalSubmit(task); 2070 } 2071 2072 /** 2073 * @throws NullPointerException if the task is null 2074 * @throws RejectedExecutionException if the task cannot be 2075 * scheduled for execution 2076 */ 2077 ForkJoinTask!(T) submitTask(T)(Callable!(T) task) { 2078 return externalSubmit(new AdaptedCallable!(T)(task)); 2079 } 2080 2081 /** 2082 * @throws NullPointerException if the task is null 2083 * @throws RejectedExecutionException if the task cannot be 2084 * scheduled for execution 2085 */ 2086 ForkJoinTask!(T) submitTask(T)(Runnable task, T result) { 2087 return externalSubmit(new AdaptedRunnable!(T)(task, result)); 2088 } 2089 2090 /** 2091 * @throws NullPointerException if the task is null 2092 * @throws RejectedExecutionException if the task cannot be 2093 * scheduled for execution 2094 */ 2095 2096 IForkJoinTask submitTask(Runnable task) { 2097 if (task is null) 2098 throw new NullPointerException(); 2099 IForkJoinTask t = cast(IForkJoinTask)task; 2100 return externalSubmit(t !is null 2101 ? cast(ForkJoinTask!(void)) task // avoid re-wrap 2102 : new AdaptedRunnableAction(task)); 2103 } 2104 2105 /** 2106 * @throws NullPointerException {@inheritDoc} 2107 * @throws RejectedExecutionException {@inheritDoc} 2108 */ 2109 List!(Future!(T)) invokeAll(T)(Collection!(Callable!(T)) tasks) { 2110 // In previous versions of this class, this method constructed 2111 // a task to run ForkJoinTask.invokeAll, but now external 2112 // invocation of multiple tasks is at least as efficient. 2113 ArrayList!(Future!(T)) futures = new ArrayList!(Future!(T))(tasks.size()); 2114 2115 try { 2116 foreach (Callable!(T) t ; tasks) { 2117 ForkJoinTask!(T) f = new AdaptedCallable!(T)(t); 2118 futures.add(f); 2119 externalSubmit(f); 2120 } 2121 for (int i = 0, size = futures.size(); i < size; i++) 2122 (cast(IForkJoinTask)(futures.get(i))).quietlyJoin(); 2123 return futures; 2124 } catch (Throwable t) { 2125 for (int i = 0, size = futures.size(); i < size; i++) 2126 futures.get(i).cancel(false); 2127 throw t; 2128 } 2129 } 2130 2131 /** 2132 * Returns the factory used for constructing new workers. 2133 * 2134 * @return the factory used for constructing new workers 2135 */ 2136 ForkJoinWorkerThreadFactory getFactory() { 2137 return factory; 2138 } 2139 2140 /** 2141 * Returns the handler for internal worker threads that terminate 2142 * due to unrecoverable errors encountered while executing tasks. 2143 * 2144 * @return the handler, or {@code null} if none 2145 */ 2146 UncaughtExceptionHandler getUncaughtExceptionHandler() { 2147 return ueh; 2148 } 2149 2150 /** 2151 * Returns the targeted parallelism level of this pool. 2152 * 2153 * @return the targeted parallelism level of this pool 2154 */ 2155 int getParallelism() { 2156 int par = mode & SMASK; 2157 return (par > 0) ? par : 1; 2158 } 2159 2160 /** 2161 * Returns the targeted parallelism level of the common pool. 2162 * 2163 * @return the targeted parallelism level of the common pool 2164 * @since 1.8 2165 */ 2166 static int getCommonPoolParallelism() { 2167 return COMMON_PARALLELISM; 2168 } 2169 2170 /** 2171 * Returns the number of worker threads that have started but not 2172 * yet terminated. The result returned by this method may differ 2173 * from {@link #getParallelism} when threads are created to 2174 * maintain parallelism when others are cooperatively blocked. 2175 * 2176 * @return the number of worker threads 2177 */ 2178 int getPoolSize() { 2179 return ((mode & SMASK) + cast(short)(ctl >>> TC_SHIFT)); 2180 } 2181 2182 /** 2183 * Returns {@code true} if this pool uses local first-in-first-out 2184 * scheduling mode for forked tasks that are never joined. 2185 * 2186 * @return {@code true} if this pool uses async mode 2187 */ 2188 bool getAsyncMode() { 2189 return (mode & FIFO) != 0; 2190 } 2191 2192 /** 2193 * Returns an estimate of the number of worker threads that are 2194 * not blocked waiting to join tasks or for other managed 2195 * synchronization. This method may overestimate the 2196 * number of running threads. 2197 * 2198 * @return the number of worker threads 2199 */ 2200 int getRunningThreadCount() { 2201 WorkQueue[] ws; WorkQueue w; 2202 // VarHandle.acquireFence(); 2203 int rc = 0; 2204 if ((ws = workQueues) !is null) { 2205 for (int i = 1; i < cast(int)ws.length; i += 2) { 2206 if ((w = ws[i]) !is null && w.isApparentlyUnblocked()) 2207 ++rc; 2208 } 2209 } 2210 return rc; 2211 } 2212 2213 /** 2214 * Returns an estimate of the number of threads that are currently 2215 * stealing or executing tasks. This method may overestimate the 2216 * number of active threads. 2217 * 2218 * @return the number of active threads 2219 */ 2220 int getActiveThreadCount() { 2221 int r = (mode & SMASK) + cast(int)(ctl >> RC_SHIFT); 2222 return (r <= 0) ? 0 : r; // suppress momentarily negative values 2223 } 2224 2225 /** 2226 * Returns {@code true} if all worker threads are currently idle. 2227 * An idle worker is one that cannot obtain a task to execute 2228 * because none are available to steal from other threads, and 2229 * there are no pending submissions to the pool. This method is 2230 * conservative; it might not return {@code true} immediately upon 2231 * idleness of all threads, but will eventually become true if 2232 * threads remain inactive. 2233 * 2234 * @return {@code true} if all threads are currently idle 2235 */ 2236 bool isQuiescent() { 2237 for (;;) { 2238 long c = ctl; 2239 int md = mode, pc = md & SMASK; 2240 int tc = pc + cast(short)(c >>> TC_SHIFT); 2241 int rc = pc + cast(int)(c >> RC_SHIFT); 2242 if ((md & (STOP | TERMINATED)) != 0) 2243 return true; 2244 else if (rc > 0) 2245 return false; 2246 else { 2247 WorkQueue[] ws; WorkQueue v; 2248 if ((ws = workQueues) !is null) { 2249 for (int i = 1; i < ws.length; i += 2) { 2250 if ((v = ws[i]) !is null) { 2251 if (v.source > 0) 2252 return false; 2253 --tc; 2254 } 2255 } 2256 } 2257 if (tc == 0 && ctl == c) 2258 return true; 2259 } 2260 } 2261 } 2262 2263 /** 2264 * Returns an estimate of the total number of tasks stolen from 2265 * one thread's work queue by another. The reported value 2266 * underestimates the actual total number of steals when the pool 2267 * is not quiescent. This value may be useful for monitoring and 2268 * tuning fork/join programs: in general, steal counts should be 2269 * high enough to keep threads busy, but low enough to avoid 2270 * overhead and contention across threads. 2271 * 2272 * @return the number of steals 2273 */ 2274 long getStealCount() { 2275 long count = stealCount; 2276 WorkQueue[] ws; WorkQueue w; 2277 if ((ws = workQueues) !is null) { 2278 for (int i = 1; i < ws.length; i += 2) { 2279 if ((w = ws[i]) !is null) 2280 count += cast(long)w.nsteals & 0xffffffffL; 2281 } 2282 } 2283 return count; 2284 } 2285 2286 /** 2287 * Returns an estimate of the total number of tasks currently held 2288 * in queues by worker threads (but not including tasks submitted 2289 * to the pool that have not begun executing). This value is only 2290 * an approximation, obtained by iterating across all threads in 2291 * the pool. This method may be useful for tuning task 2292 * granularities. 2293 * 2294 * @return the number of queued tasks 2295 */ 2296 long getQueuedTaskCount() { 2297 WorkQueue[] ws; WorkQueue w; 2298 // VarHandle.acquireFence(); 2299 int count = 0; 2300 if ((ws = workQueues) !is null) { 2301 for (int i = 1; i < cast(int)ws.length; i += 2) { 2302 if ((w = ws[i]) !is null) 2303 count += w.queueSize(); 2304 } 2305 } 2306 return count; 2307 } 2308 2309 /** 2310 * Returns an estimate of the number of tasks submitted to this 2311 * pool that have not yet begun executing. This method may take 2312 * time proportional to the number of submissions. 2313 * 2314 * @return the number of queued submissions 2315 */ 2316 int getQueuedSubmissionCount() { 2317 WorkQueue[] ws; WorkQueue w; 2318 // VarHandle.acquireFence(); 2319 int count = 0; 2320 if ((ws = workQueues) !is null) { 2321 for (int i = 0; i < cast(int)ws.length; i += 2) { 2322 if ((w = ws[i]) !is null) 2323 count += w.queueSize(); 2324 } 2325 } 2326 return count; 2327 } 2328 2329 /** 2330 * Returns {@code true} if there are any tasks submitted to this 2331 * pool that have not yet begun executing. 2332 * 2333 * @return {@code true} if there are any queued submissions 2334 */ 2335 bool hasQueuedSubmissions() { 2336 WorkQueue[] ws; WorkQueue w; 2337 // VarHandle.acquireFence(); 2338 if ((ws = workQueues) !is null) { 2339 for (int i = 0; i < cast(int)ws.length; i += 2) { 2340 if ((w = ws[i]) !is null && !w.isEmpty()) 2341 return true; 2342 } 2343 } 2344 return false; 2345 } 2346 2347 /** 2348 * Removes and returns the next unexecuted submission if one is 2349 * available. This method may be useful in extensions to this 2350 * class that re-assign work in systems with multiple pools. 2351 * 2352 * @return the next submission, or {@code null} if none 2353 */ 2354 protected IForkJoinTask pollSubmission() { 2355 return pollScan(true); 2356 } 2357 2358 /** 2359 * Removes all available unexecuted submitted and forked tasks 2360 * from scheduling queues and adds them to the given collection, 2361 * without altering their execution status. These may include 2362 * artificially generated or wrapped tasks. This method is 2363 * designed to be invoked only when the pool is known to be 2364 * quiescent. Invocations at other times may not remove all 2365 * tasks. A failure encountered while attempting to add elements 2366 * to collection {@code c} may result in elements being in 2367 * neither, either or both collections when the associated 2368 * exception is thrown. The behavior of this operation is 2369 * undefined if the specified collection is modified while the 2370 * operation is in progress. 2371 * 2372 * @param c the collection to transfer elements into 2373 * @return the number of elements transferred 2374 */ 2375 protected int drainTasksTo(Collection!IForkJoinTask c) { 2376 WorkQueue[] ws; WorkQueue w; IForkJoinTask t; 2377 // VarHandle.acquireFence(); 2378 int count = 0; 2379 if ((ws = workQueues) !is null) { 2380 for (int i = 0; i < ws.length; ++i) { 2381 if ((w = ws[i]) !is null) { 2382 while ((t = w.poll()) !is null) { 2383 c.add(t); 2384 ++count; 2385 } 2386 } 2387 } 2388 } 2389 return count; 2390 } 2391 2392 /** 2393 * Returns a string identifying this pool, as well as its state, 2394 * including indications of run state, parallelism level, and 2395 * worker and task counts. 2396 * 2397 * @return a string identifying this pool, as well as its state 2398 */ 2399 override string toString() { 2400 // Use a single pass through workQueues to collect counts 2401 int md = mode; // read fields first 2402 long c = ctl; 2403 long st = stealCount; 2404 long qt = 0L, qs = 0L; int rc = 0; 2405 WorkQueue[] ws; WorkQueue w; 2406 if ((ws = workQueues) !is null) { 2407 for (int i = 0; i < ws.length; ++i) { 2408 if ((w = ws[i]) !is null) { 2409 int size = w.queueSize(); 2410 if ((i & 1) == 0) 2411 qs += size; 2412 else { 2413 qt += size; 2414 st += cast(long)w.nsteals & 0xffffffffL; 2415 if (w.isApparentlyUnblocked()) 2416 ++rc; 2417 } 2418 } 2419 } 2420 } 2421 2422 int pc = (md & SMASK); 2423 int tc = pc + cast(short)(c >>> TC_SHIFT); 2424 int ac = pc + cast(int)(c >> RC_SHIFT); 2425 if (ac < 0) // ignore negative 2426 ac = 0; 2427 string level = ((md & TERMINATED) != 0 ? "Terminated" : 2428 (md & STOP) != 0 ? "Terminating" : 2429 (md & SHUTDOWN) != 0 ? "Shutting down" : 2430 "Running"); 2431 return super.toString() ~ 2432 "[" ~ level ~ 2433 ", parallelism = " ~ pc.to!string() ~ 2434 ", size = " ~ tc.to!string() ~ 2435 ", active = " ~ ac.to!string() ~ 2436 ", running = " ~ rc.to!string() ~ 2437 ", steals = " ~ st.to!string() ~ 2438 ", tasks = " ~ qt.to!string() ~ 2439 ", submissions = " ~ qs.to!string() ~ 2440 "]"; 2441 } 2442 2443 /** 2444 * Possibly initiates an orderly shutdown in which previously 2445 * submitted tasks are executed, but no new tasks will be 2446 * accepted. Invocation has no effect on execution state if this 2447 * is the {@link #commonPool()}, and no additional effect if 2448 * already shut down. Tasks that are in the process of being 2449 * submitted concurrently during the course of this method may or 2450 * may not be rejected. 2451 * 2452 * @throws SecurityException if a security manager exists and 2453 * the caller is not permitted to modify threads 2454 * because it does not hold {@link 2455 * java.lang.RuntimePermission}{@code ("modifyThread")} 2456 */ 2457 void shutdown() { 2458 // checkPermission(); 2459 tryTerminate(false, true); 2460 } 2461 2462 /** 2463 * Possibly attempts to cancel and/or stop all tasks, and reject 2464 * all subsequently submitted tasks. Invocation has no effect on 2465 * execution state if this is the {@link #commonPool()}, and no 2466 * additional effect if already shut down. Otherwise, tasks that 2467 * are in the process of being submitted or executed concurrently 2468 * during the course of this method may or may not be 2469 * rejected. This method cancels both existing and unexecuted 2470 * tasks, in order to permit termination in the presence of task 2471 * dependencies. So the method always returns an empty list 2472 * (unlike the case for some other Executors). 2473 * 2474 * @return an empty list 2475 * @throws SecurityException if a security manager exists and 2476 * the caller is not permitted to modify threads 2477 * because it does not hold {@link 2478 * java.lang.RuntimePermission}{@code ("modifyThread")} 2479 */ 2480 List!(Runnable) shutdownNow() { 2481 // checkPermission(); 2482 tryTerminate(true, true); 2483 return Collections.emptyList!(Runnable)(); 2484 } 2485 2486 /** 2487 * Returns {@code true} if all tasks have completed following shut down. 2488 * 2489 * @return {@code true} if all tasks have completed following shut down 2490 */ 2491 bool isTerminated() { 2492 return (mode & TERMINATED) != 0; 2493 } 2494 2495 /** 2496 * Returns {@code true} if the process of termination has 2497 * commenced but not yet completed. This method may be useful for 2498 * debugging. A return of {@code true} reported a sufficient 2499 * period after shutdown may indicate that submitted tasks have 2500 * ignored or suppressed interruption, or are waiting for I/O, 2501 * causing this executor not to properly terminate. (See the 2502 * advisory notes for class {@link ForkJoinTask} stating that 2503 * tasks should not normally entail blocking operations. But if 2504 * they do, they must abort them on interrupt.) 2505 * 2506 * @return {@code true} if terminating but not yet terminated 2507 */ 2508 bool isTerminating() { 2509 int md = mode; 2510 return (md & STOP) != 0 && (md & TERMINATED) == 0; 2511 } 2512 2513 /** 2514 * Returns {@code true} if this pool has been shut down. 2515 * 2516 * @return {@code true} if this pool has been shut down 2517 */ 2518 bool isShutdown() { 2519 return (mode & SHUTDOWN) != 0; 2520 } 2521 2522 /** 2523 * Blocks until all tasks have completed execution after a 2524 * shutdown request, or the timeout occurs, or the current thread 2525 * is interrupted, whichever happens first. Because the {@link 2526 * #commonPool()} never terminates until program shutdown, when 2527 * applied to the common pool, this method is equivalent to {@link 2528 * #awaitQuiescence(long, TimeUnit)} but always returns {@code false}. 2529 * 2530 * @param timeout the maximum time to wait 2531 * @param unit the time unit of the timeout argument 2532 * @return {@code true} if this executor terminated and 2533 * {@code false} if the timeout elapsed before termination 2534 * @throws InterruptedException if interrupted while waiting 2535 */ 2536 bool awaitTermination(Duration timeout) 2537 { 2538 if (ThreadEx.interrupted()) 2539 throw new InterruptedException(); 2540 if (this == common) { 2541 awaitQuiescence(timeout); 2542 return false; 2543 } 2544 long nanos = timeout.total!(TimeUnit.HectoNanosecond); 2545 if (isTerminated()) 2546 return true; 2547 if (nanos <= 0L) 2548 return false; 2549 long deadline = Clock.currStdTime + nanos; 2550 synchronized (this) { 2551 for (;;) { 2552 if (isTerminated()) 2553 return true; 2554 if (nanos <= 0L) 2555 return false; 2556 // long millis = TimeUnit.NANOSECONDS.toMillis(nanos); 2557 // wait(millis > 0L ? millis : 1L); 2558 // ThreadEx.currentThread().par 2559 ThreadEx.sleep(dur!(TimeUnit.HectoNanosecond)(nanos)); 2560 nanos = deadline - Clock.currStdTime; 2561 } 2562 } 2563 } 2564 2565 /** 2566 * If called by a ForkJoinTask operating in this pool, equivalent 2567 * in effect to {@link ForkJoinTask#helpQuiesce}. Otherwise, 2568 * waits and/or attempts to assist performing tasks until this 2569 * pool {@link #isQuiescent} or the indicated timeout elapses. 2570 * 2571 * @param timeout the maximum time to wait 2572 * @param unit the time unit of the timeout argument 2573 * @return {@code true} if quiescent; {@code false} if the 2574 * timeout elapsed. 2575 */ 2576 bool awaitQuiescence(Duration timeout) { 2577 long nanos = timeout.total!(TimeUnit.HectoNanosecond)(); 2578 Thread thread = Thread.getThis(); 2579 ForkJoinWorkerThread wt = cast(ForkJoinWorkerThread)thread; 2580 if (wt !is null && wt.pool is this) { 2581 helpQuiescePool(wt.workQueue); 2582 return true; 2583 } 2584 else { 2585 for (long startTime = Clock.currStdTime;;) { 2586 IForkJoinTask t; 2587 if ((t = pollScan(false)) !is null) 2588 t.doExec(); 2589 else if (isQuiescent()) 2590 return true; 2591 else if ((Clock.currStdTime - startTime) > nanos) 2592 return false; 2593 else 2594 Thread.yield(); // cannot block 2595 } 2596 } 2597 } 2598 2599 /** 2600 * Waits and/or attempts to assist performing tasks indefinitely 2601 * until the {@link #commonPool()} {@link #isQuiescent}. 2602 */ 2603 static void quiesceCommonPool() { 2604 common.awaitQuiescence(Duration.max); 2605 } 2606 2607 /** 2608 * Runs the given possibly blocking task. When {@linkplain 2609 * ForkJoinTask#inForkJoinPool() running in a ForkJoinPool}, this 2610 * method possibly arranges for a spare thread to be activated if 2611 * necessary to ensure sufficient parallelism while the current 2612 * thread is blocked in {@link ManagedBlocker#block blocker.block()}. 2613 * 2614 * <p>This method repeatedly calls {@code blocker.isReleasable()} and 2615 * {@code blocker.block()} until either method returns {@code true}. 2616 * Every call to {@code blocker.block()} is preceded by a call to 2617 * {@code blocker.isReleasable()} that returned {@code false}. 2618 * 2619 * <p>If not running in a ForkJoinPool, this method is 2620 * behaviorally equivalent to 2621 * <pre> {@code 2622 * while (!blocker.isReleasable()) 2623 * if (blocker.block()) 2624 * break;}</pre> 2625 * 2626 * If running in a ForkJoinPool, the pool may first be expanded to 2627 * ensure sufficient parallelism available during the call to 2628 * {@code blocker.block()}. 2629 * 2630 * @param blocker the blocker task 2631 * @throws InterruptedException if {@code blocker.block()} did so 2632 */ 2633 static void managedBlock(ManagedBlocker blocker) { 2634 if (blocker is null) throw new NullPointerException(); 2635 ForkJoinPool p; 2636 WorkQueue w; 2637 Thread t = Thread.getThis(); 2638 ForkJoinWorkerThread wt = cast(ForkJoinWorkerThread)t; 2639 if (wt !is null && (p = wt.pool) !is null && 2640 (w = wt.workQueue) !is null) { 2641 int block; 2642 while (!blocker.isReleasable()) { 2643 if ((block = p.tryCompensate(w)) != 0) { 2644 try { 2645 do {} while (!blocker.isReleasable() && 2646 !blocker.block()); 2647 } finally { 2648 AtomicHelper.getAndAdd(p.ctl, (block > 0) ? RC_UNIT : 0L); 2649 } 2650 break; 2651 } 2652 } 2653 } 2654 else { 2655 do {} while (!blocker.isReleasable() && 2656 !blocker.block()); 2657 } 2658 } 2659 2660 /** 2661 * If the given executor is a ForkJoinPool, poll and execute 2662 * AsynchronousCompletionTasks from worker's queue until none are 2663 * available or blocker is released. 2664 */ 2665 static void helpAsyncBlocker(Executor e, ManagedBlocker blocker) { 2666 ForkJoinPool p = cast(ForkJoinPool)e; 2667 if (p !is null) { 2668 WorkQueue w; WorkQueue[] ws; int r, n; 2669 2670 Thread thread = Thread.getThis(); 2671 ForkJoinWorkerThread wt = cast(ForkJoinWorkerThread)thread; 2672 if (wt !is null && wt.pool is p) 2673 w = wt.workQueue; 2674 else if ((r = ThreadLocalRandom.getProbe()) != 0 && 2675 (ws = p.workQueues) !is null && (n = cast(int)ws.length) > 0) 2676 w = ws[(n - 1) & r & SQMASK]; 2677 else 2678 w = null; 2679 if (w !is null) 2680 w.helpAsyncBlocker(blocker); 2681 } 2682 } 2683 2684 // AbstractExecutorService overrides. These rely on undocumented 2685 // fact that ForkJoinTask.adapt returns ForkJoinTasks that also 2686 // implement RunnableFuture. 2687 2688 protected RunnableFuture!(T) newTaskFor(T)(Runnable runnable, T value) { 2689 return new AdaptedRunnable!(T)(runnable, value); 2690 } 2691 2692 protected RunnableFuture!(T) newTaskFor(T)(Callable!(T) callable) { 2693 return new AdaptedCallable!(T)(callable); 2694 } 2695 2696 // VarHandle mechanics 2697 // private static final VarHandle CTL; 2698 // private static final VarHandle MODE; 2699 // static final VarHandle QA; 2700 2701 shared static this() { 2702 // try { 2703 // MethodHandles.Lookup l = MethodHandles.lookup(); 2704 // CTL = l.findVarHandle(ForkJoinPool.class, "ctl", long.class); 2705 // MODE = l.findVarHandle(ForkJoinPool.class, "mode", int.class); 2706 // QA = MethodHandles.arrayElementVarHandle(ForkJoinTask[].class); 2707 // } catch (ReflectiveOperationException e) { 2708 // throw new ExceptionInInitializerError(e); 2709 // } 2710 2711 // Reduce the risk of rare disastrous classloading in first call to 2712 // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773 2713 // Class<?> ensureLoaded = LockSupport.class; 2714 2715 int commonMaxSpares = DEFAULT_COMMON_MAX_SPARES; 2716 // try { 2717 // string p = System.getProperty 2718 // ("hunt.concurrency.ForkJoinPool.common.maximumSpares"); 2719 // if (p !is null) 2720 // commonMaxSpares = Integer.parseInt(p); 2721 // } catch (Exception ignore) {} 2722 COMMON_MAX_SPARES = commonMaxSpares; 2723 2724 defaultForkJoinWorkerThreadFactory = 2725 new DefaultForkJoinWorkerThreadFactory(); 2726 // modifyThreadPermission = new RuntimePermission("modifyThread"); 2727 2728 // common = AccessController.doPrivileged(new PrivilegedAction<>() { 2729 // ForkJoinPool run() { 2730 // return new ForkJoinPool((byte)0); }}); 2731 common = new ForkJoinPool(cast(byte)1); 2732 2733 COMMON_PARALLELISM = max(common.mode & SMASK, 1); 2734 } 2735 2736 } 2737 2738 2739 /** 2740 * Factory for innocuous worker threads. 2741 */ 2742 private final class InnocuousForkJoinWorkerThreadFactory 2743 : ForkJoinWorkerThreadFactory { 2744 2745 /** 2746 * An ACC to restrict permissions for the factory itself. 2747 * The constructed workers have no permissions set. 2748 */ 2749 // private static final AccessControlContext ACC = contextWithPermissions( 2750 // modifyThreadPermission, 2751 // new RuntimePermission("enableContextClassLoaderOverride"), 2752 // new RuntimePermission("modifyThreadGroup"), 2753 // new RuntimePermission("getClassLoader"), 2754 // new RuntimePermission("setContextClassLoader")); 2755 2756 final ForkJoinWorkerThread newThread(ForkJoinPool pool) { 2757 return new InnocuousForkJoinWorkerThread(pool); 2758 // return AccessController.doPrivileged( 2759 // new PrivilegedAction<>() { 2760 // ForkJoinWorkerThread run() { 2761 // return new ForkJoinWorkerThread. 2762 // InnocuousForkJoinWorkerThread(pool); }}, 2763 // ACC); 2764 } 2765 } 2766 2767 2768 /** 2769 * Factory for creating new {@link ForkJoinWorkerThread}s. 2770 * A {@code ForkJoinWorkerThreadFactory} must be defined and used 2771 * for {@code ForkJoinWorkerThread} subclasses that extend base 2772 * functionality or initialize threads with different contexts. 2773 */ 2774 interface ForkJoinWorkerThreadFactory { 2775 /** 2776 * Returns a new worker thread operating in the given pool. 2777 * Returning null or throwing an exception may result in tasks 2778 * never being executed. If this method throws an exception, 2779 * it is relayed to the caller of the method (for example 2780 * {@code execute}) causing attempted thread creation. If this 2781 * method returns null or throws an exception, it is not 2782 * retried until the next attempted creation (for example 2783 * another call to {@code execute}). 2784 * 2785 * @param pool the pool this thread works in 2786 * @return the new worker thread, or {@code null} if the request 2787 * to create a thread is rejected 2788 * @throws NullPointerException if the pool is null 2789 */ 2790 ForkJoinWorkerThread newThread(ForkJoinPool pool); 2791 } 2792 2793 // Nested classes 2794 2795 // static AccessControlContext contextWithPermissions(Permission ... perms) { 2796 // Permissions permissions = new Permissions(); 2797 // for (Permission perm : perms) 2798 // permissions.add(perm); 2799 // return new AccessControlContext( 2800 // new ProtectionDomain[] { new ProtectionDomain(null, permissions) }); 2801 // } 2802 2803 /** 2804 * Default ForkJoinWorkerThreadFactory implementation; creates a 2805 * new ForkJoinWorkerThread using the system class loader as the 2806 * thread context class loader. 2807 */ 2808 private final class DefaultForkJoinWorkerThreadFactory : ForkJoinWorkerThreadFactory { 2809 // private static final AccessControlContext ACC = contextWithPermissions( 2810 // new RuntimePermission("getClassLoader"), 2811 // new RuntimePermission("setContextClassLoader")); 2812 2813 final ForkJoinWorkerThread newThread(ForkJoinPool pool) { 2814 return new ForkJoinWorkerThread(pool); 2815 // return AccessController.doPrivileged( 2816 // new PrivilegedAction<>() { 2817 // ForkJoinWorkerThread run() { 2818 // return new ForkJoinWorkerThread( 2819 // pool, ClassLoader.getSystemClassLoader()); }}, 2820 // ACC); 2821 } 2822 } 2823 2824 2825 /** 2826 * Interface for extending managed parallelism for tasks running 2827 * in {@link ForkJoinPool}s. 2828 * 2829 * <p>A {@code ManagedBlocker} provides two methods. Method 2830 * {@link #isReleasable} must return {@code true} if blocking is 2831 * not necessary. Method {@link #block} blocks the current thread 2832 * if necessary (perhaps internally invoking {@code isReleasable} 2833 * before actually blocking). These actions are performed by any 2834 * thread invoking {@link ForkJoinPool#managedBlock(ManagedBlocker)}. 2835 * The unusual methods in this API accommodate synchronizers that 2836 * may, but don't usually, block for long periods. Similarly, they 2837 * allow more efficient internal handling of cases in which 2838 * additional workers may be, but usually are not, needed to 2839 * ensure sufficient parallelism. Toward this end, 2840 * implementations of method {@code isReleasable} must be amenable 2841 * to repeated invocation. 2842 * 2843 * <p>For example, here is a ManagedBlocker based on a 2844 * ReentrantLock: 2845 * <pre> {@code 2846 * class ManagedLocker implements ManagedBlocker { 2847 * final ReentrantLock lock; 2848 * bool hasLock = false; 2849 * ManagedLocker(ReentrantLock lock) { this.lock = lock; } 2850 * bool block() { 2851 * if (!hasLock) 2852 * lock.lock(); 2853 * return true; 2854 * } 2855 * bool isReleasable() { 2856 * return hasLock || (hasLock = lock.tryLock()); 2857 * } 2858 * }}</pre> 2859 * 2860 * <p>Here is a class that possibly blocks waiting for an 2861 * item on a given queue: 2862 * <pre> {@code 2863 * class QueueTaker!(E) : ManagedBlocker { 2864 * final BlockingQueue!(E) queue; 2865 * E item = null; 2866 * QueueTaker(BlockingQueue!(E) q) { this.queue = q; } 2867 * bool block() { 2868 * if (item is null) 2869 * item = queue.take(); 2870 * return true; 2871 * } 2872 * bool isReleasable() { 2873 * return item !is null || (item = queue.poll()) !is null; 2874 * } 2875 * E getItem() { // call after pool.managedBlock completes 2876 * return item; 2877 * } 2878 * }}</pre> 2879 */ 2880 static interface ManagedBlocker { 2881 /** 2882 * Possibly blocks the current thread, for example waiting for 2883 * a lock or condition. 2884 * 2885 * @return {@code true} if no additional blocking is necessary 2886 * (i.e., if isReleasable would return true) 2887 * @throws InterruptedException if interrupted while waiting 2888 * (the method is not required to do so, but is allowed to) 2889 */ 2890 bool block(); 2891 2892 /** 2893 * Returns {@code true} if blocking is unnecessary. 2894 * @return {@code true} if blocking is unnecessary 2895 */ 2896 bool isReleasable(); 2897 } 2898 2899 2900 2901 /** 2902 * A thread managed by a {@link ForkJoinPool}, which executes 2903 * {@link ForkJoinTask}s. 2904 * This class is subclassable solely for the sake of adding 2905 * functionality -- there are no overridable methods dealing with 2906 * scheduling or execution. However, you can override initialization 2907 * and termination methods surrounding the main task processing loop. 2908 * If you do create such a subclass, you will also need to supply a 2909 * custom {@link ForkJoinPool.ForkJoinWorkerThreadFactory} to 2910 * {@linkplain ForkJoinPool#ForkJoinPool(int, ForkJoinWorkerThreadFactory, 2911 * UncaughtExceptionHandler, bool, int, int, int, Predicate, long, TimeUnit) 2912 * use it} in a {@code ForkJoinPool}. 2913 * 2914 * @since 1.7 2915 * @author Doug Lea 2916 */ 2917 class ForkJoinWorkerThread : ThreadEx { 2918 /* 2919 * ForkJoinWorkerThreads are managed by ForkJoinPools and perform 2920 * ForkJoinTasks. For explanation, see the internal documentation 2921 * of class ForkJoinPool. 2922 * 2923 * This class just maintains links to its pool and WorkQueue. The 2924 * pool field is set immediately upon construction, but the 2925 * workQueue field is not set until a call to registerWorker 2926 * completes. This leads to a visibility race, that is tolerated 2927 * by requiring that the workQueue field is only accessed by the 2928 * owning thread. 2929 * 2930 * Support for (non-public) subclass InnocuousForkJoinWorkerThread 2931 * requires that we break quite a lot of encapsulation (via helper 2932 * methods in ThreadLocalRandom) both here and in the subclass to 2933 * access and set Thread fields. 2934 */ 2935 2936 ForkJoinPool pool; // the pool this thread works in 2937 WorkQueue workQueue; // work-stealing mechanics 2938 2939 /** 2940 * Creates a ForkJoinWorkerThread operating in the given pool. 2941 * 2942 * @param pool the pool this thread works in 2943 * @throws NullPointerException if pool is null 2944 */ 2945 protected this(ForkJoinPool pool) { 2946 // Use a placeholder until a useful name can be set in registerWorker 2947 super("aForkJoinWorkerThread"); 2948 this.pool = pool; 2949 this.workQueue = pool.registerWorker(this); 2950 } 2951 2952 /** 2953 * Version for use by the default pool. Supports setting the 2954 * context class loader. This is a separate constructor to avoid 2955 * affecting the protected constructor. 2956 */ 2957 // this(ForkJoinPool pool, ClassLoader ccl) { 2958 // super("aForkJoinWorkerThread"); 2959 // super.setContextClassLoader(ccl); 2960 // this.pool = pool; 2961 // this.workQueue = pool.registerWorker(this); 2962 // } 2963 2964 /** 2965 * Version for InnocuousForkJoinWorkerThread. 2966 */ 2967 this(ForkJoinPool pool, 2968 // ClassLoader ccl, 2969 ThreadGroupEx threadGroup, 2970 ) { // AccessControlContext acc 2971 super(threadGroup, null, "aForkJoinWorkerThread"); 2972 // super.setContextClassLoader(ccl); 2973 // ThreadLocalRandom.setInheritedAccessControlContext(this, acc); 2974 // ThreadLocalRandom.eraseThreadLocals(this); // clear before registering 2975 this.pool = pool; 2976 this.workQueue = pool.registerWorker(this); 2977 } 2978 2979 /** 2980 * Returns the pool hosting this thread. 2981 * 2982 * @return the pool 2983 */ 2984 ForkJoinPool getPool() { 2985 return pool; 2986 } 2987 2988 /** 2989 * Returns the unique index number of this thread in its pool. 2990 * The returned value ranges from zero to the maximum number of 2991 * threads (minus one) that may exist in the pool, and does not 2992 * change during the lifetime of the thread. This method may be 2993 * useful for applications that track status or collect results 2994 * per-worker-thread rather than per-task. 2995 * 2996 * @return the index number 2997 */ 2998 int getPoolIndex() { 2999 return workQueue.getPoolIndex(); 3000 } 3001 3002 /** 3003 * Initializes internal state after construction but before 3004 * processing any tasks. If you override this method, you must 3005 * invoke {@code super.onStart()} at the beginning of the method. 3006 * Initialization requires care: Most fields must have legal 3007 * default values, to ensure that attempted accesses from other 3008 * threads work correctly even before this thread starts 3009 * processing tasks. 3010 */ 3011 protected void onStart() { 3012 } 3013 3014 /** 3015 * Performs cleanup associated with termination of this worker 3016 * thread. If you override this method, you must invoke 3017 * {@code super.onTermination} at the end of the overridden method. 3018 * 3019 * @param exception the exception causing this thread to abort due 3020 * to an unrecoverable error, or {@code null} if completed normally 3021 */ 3022 protected void onTermination(Throwable exception) { 3023 } 3024 3025 /** 3026 * This method is required to be public, but should never be 3027 * called explicitly. It performs the main run loop to execute 3028 * {@link ForkJoinTask}s. 3029 */ 3030 override void run() { 3031 if (workQueue.array is null) { // only run once 3032 Throwable exception = null; 3033 scope(exit) { 3034 pool.deregisterWorker(this, exception); 3035 } 3036 3037 try { 3038 onStart(); 3039 pool.runWorker(workQueue); 3040 } catch (Throwable ex) { 3041 exception = ex; 3042 } finally { 3043 onTermination(exception); 3044 } 3045 } 3046 } 3047 3048 /** 3049 * Non-hook method for InnocuousForkJoinWorkerThread. 3050 */ 3051 void afterTopLevelExec() { 3052 } 3053 3054 3055 int awaitJoin(IForkJoinTask task) { 3056 int s = task.getStatus(); 3057 WorkQueue w = workQueue; 3058 if(w.tryUnpush(task) && (s = task.doExec()) < 0 ) 3059 return s; 3060 else 3061 return pool.awaitJoin(w, task, MonoTime.zero()); 3062 } 3063 3064 /** 3065 * If the current thread is operating in a ForkJoinPool, 3066 * unschedules and returns, without executing, a task externally 3067 * submitted to the pool, if one is available. Availability may be 3068 * transient, so a {@code null} result does not necessarily imply 3069 * quiescence of the pool. This method is designed primarily to 3070 * support extensions, and is unlikely to be useful otherwise. 3071 * 3072 * @return a task, or {@code null} if none are available 3073 * @since 9 3074 */ 3075 protected static IForkJoinTask pollSubmission() { 3076 ForkJoinWorkerThread t = cast(ForkJoinWorkerThread)Thread.getThis(); 3077 return t !is null ? t.pool.pollSubmission() : null; 3078 } 3079 } 3080 3081 3082 /** 3083 * A worker thread that has no permissions, is not a member of any 3084 * user-defined ThreadGroupEx, uses the system class loader as 3085 * thread context class loader, and erases all ThreadLocals after 3086 * running each top-level task. 3087 */ 3088 final class InnocuousForkJoinWorkerThread : ForkJoinWorkerThread { 3089 /** The ThreadGroupEx for all InnocuousForkJoinWorkerThreads */ 3090 private __gshared ThreadGroupEx innocuousThreadGroup; 3091 // AccessController.doPrivileged(new PrivilegedAction<>() { 3092 // ThreadGroupEx run() { 3093 // ThreadGroupEx group = Thread.getThis().getThreadGroup(); 3094 // for (ThreadGroupEx p; (p = group.getParent()) !is null; ) 3095 // group = p; 3096 // return new ThreadGroupEx( 3097 // group, "InnocuousForkJoinWorkerThreadGroup"); 3098 // }}); 3099 3100 /** An AccessControlContext supporting no privileges */ 3101 // private static final AccessControlContext INNOCUOUS_ACC = 3102 // new AccessControlContext( 3103 // new ProtectionDomain[] { new ProtectionDomain(null, null) }); 3104 3105 shared static this() { 3106 // ThreadGroupEx group = Thread.getThis().getThreadGroup(); 3107 // for (ThreadGroupEx p; (p = group.getParent()) !is null; ) 3108 // group = p; 3109 innocuousThreadGroup = new ThreadGroupEx( 3110 null, "InnocuousForkJoinWorkerThreadGroup"); 3111 } 3112 3113 this(ForkJoinPool pool) { 3114 super(pool, 3115 // ClassLoader.getSystemClassLoader(), 3116 innocuousThreadGroup, 3117 // INNOCUOUS_ACC 3118 ); 3119 } 3120 3121 override // to erase ThreadLocals 3122 void afterTopLevelExec() { 3123 // ThreadLocalRandom.eraseThreadLocals(this); 3124 } 3125 3126 override // to silently fail 3127 void setUncaughtExceptionHandler(UncaughtExceptionHandler x) { } 3128 3129 // override // paranoically 3130 // void setContextClassLoader(ClassLoader cl) { 3131 // throw new SecurityException("setContextClassLoader"); 3132 // } 3133 } 3134 3135 3136 /** 3137 * Queues supporting work-stealing as well as external task 3138 * submission. See above for descriptions and algorithms. 3139 */ 3140 final class WorkQueue { 3141 int source; // source queue id, or sentinel 3142 int id; // pool index, mode, tag 3143 shared int base; // index of next slot for poll 3144 shared int top; // index of next slot for push 3145 shared int phase; // versioned, negative: queued, 1: locked 3146 int stackPred; // pool stack (ctl) predecessor link 3147 int nsteals; // number of steals 3148 IForkJoinTask[] array; // the queued tasks; power of 2 size 3149 ForkJoinPool pool; // the containing pool (may be null) 3150 ForkJoinWorkerThread owner; // owning thread or null if shared 3151 3152 this(ForkJoinPool pool, ForkJoinWorkerThread owner) { 3153 this.pool = pool; 3154 this.owner = owner; 3155 // Place indices in the center of array (that is not yet allocated) 3156 base = top = INITIAL_QUEUE_CAPACITY >>> 1; 3157 } 3158 3159 /** 3160 * Tries to lock shared queue by CASing phase field. 3161 */ 3162 final bool tryLockPhase() { 3163 return cas(&this.phase, 0, 1); 3164 // return PHASE.compareAndSet(this, 0, 1); 3165 } 3166 3167 final void releasePhaseLock() { 3168 // PHASE.setRelease(this, 0); 3169 atomicStore(this.phase, 0); 3170 } 3171 3172 /** 3173 * Returns an exportable index (used by ForkJoinWorkerThread). 3174 */ 3175 final int getPoolIndex() { 3176 return (id & 0xffff) >>> 1; // ignore odd/even tag bit 3177 } 3178 3179 /** 3180 * Returns the approximate number of tasks in the queue. 3181 */ 3182 final int queueSize() { 3183 // int n = cast(int)BASE.getAcquire(this) - top; 3184 int n = atomicLoad(this.base) - top; 3185 return (n >= 0) ? 0 : -n; // ignore negative 3186 } 3187 3188 /** 3189 * Provides a more accurate estimate of whether this queue has 3190 * any tasks than does queueSize, by checking whether a 3191 * near-empty queue has at least one unclaimed task. 3192 */ 3193 final bool isEmpty() { 3194 IForkJoinTask[] a; int n, cap, b; 3195 // VarHandle.acquireFence(); // needed by external callers 3196 return ((n = (b = base) - top) >= 0 || // possibly one task 3197 (n == -1 && ((a = array) is null || 3198 (cap = cast(int)a.length) == 0 || 3199 a[(cap - 1) & b] is null))); 3200 } 3201 3202 /** 3203 * Pushes a task. Call only by owner in unshared queues. 3204 * 3205 * @param task the task. Caller must ensure non-null. 3206 * @throws RejectedExecutionException if array cannot be resized 3207 */ 3208 final void push(IForkJoinTask task) { 3209 IForkJoinTask[] a; 3210 int s = top, d, cap, m; 3211 ForkJoinPool p = pool; 3212 if ((a = array) !is null && (cap = cast(int)a.length) > 0) { 3213 m = cap - 1; 3214 // FIXME: Needing refactor or cleanup -@zxp at 2019/2/9 8:40:55 3215 // 3216 // AtomicHelper.store(a[m & s], task); 3217 a[m & s] = task; 3218 // QA.setRelease(a, (m = cap - 1) & s, task); 3219 top = s + 1; 3220 if (((d = s - atomicLoad(this.base)) & ~1) == 0 && 3221 p !is null) { // size 0 or 1 3222 // VarHandle.fullFence(); 3223 p.signalWork(); 3224 } 3225 else if (d == m) 3226 growArray(false); 3227 } 3228 } 3229 3230 /** 3231 * Version of push for shared queues. Call only with phase lock held. 3232 * @return true if should signal work 3233 */ 3234 final bool lockedPush(IForkJoinTask task) { 3235 IForkJoinTask[] a; 3236 bool signal = false; 3237 int s = top, b = base, cap, d; 3238 if ((a = array) !is null && (cap = cast(int)a.length) > 0) { 3239 a[(cap - 1) & s] = task; 3240 top = s + 1; 3241 if (b - s + cap - 1 == 0) 3242 growArray(true); 3243 else { 3244 phase = 0; // full unlock 3245 if (((s - base) & ~1) == 0) // size 0 or 1 3246 signal = true; 3247 } 3248 } 3249 return signal; 3250 } 3251 3252 /** 3253 * Doubles the capacity of array. Call either by owner or with 3254 * lock held -- it is OK for base, but not top, to move while 3255 * resizings are in progress. 3256 */ 3257 final void growArray(bool locked) { 3258 IForkJoinTask[] newA = null; 3259 try { 3260 IForkJoinTask[] oldA; int oldSize, newSize; 3261 if ((oldA = array) !is null && (oldSize = cast(int)oldA.length) > 0 && 3262 (newSize = oldSize << 1) <= MAXIMUM_QUEUE_CAPACITY && 3263 newSize > 0) { 3264 try { 3265 newA = new IForkJoinTask[newSize]; 3266 } catch (OutOfMemoryError ex) { 3267 } 3268 if (newA !is null) { // poll from old array, push to new 3269 int oldMask = oldSize - 1, newMask = newSize - 1; 3270 for (int s = top - 1, k = oldMask; k >= 0; --k) { 3271 // IForkJoinTask x = AtomicHelper.getAndSet(oldA[s & oldMask], null); 3272 // FIXME: Needing refactor or cleanup -@zxp at 2019/2/9 下午8:57:26 3273 // 3274 IForkJoinTask x = oldA[s & oldMask]; 3275 oldA[s & oldMask] = null; 3276 3277 if (x !is null) 3278 newA[s-- & newMask] = x; 3279 else 3280 break; 3281 } 3282 array = newA; 3283 // VarHandle.releaseFence(); 3284 } 3285 } 3286 } finally { 3287 if (locked) 3288 phase = 0; 3289 } 3290 if (newA is null) 3291 throw new RejectedExecutionException("Queue capacity exceeded"); 3292 } 3293 3294 /** 3295 * Takes next task, if one exists, in FIFO order. 3296 */ 3297 final IForkJoinTask poll() { 3298 int b, k, cap; 3299 IForkJoinTask[] a; 3300 while ((a = array) !is null && (cap = cast(int)a.length) > 0 && 3301 top - (b = base) > 0) { 3302 k = (cap - 1) & b; 3303 // FIXME: Needing refactor or cleanup -@zxp at 2019/2/9 8:42:05 3304 // 3305 3306 // IForkJoinTask t = AtomicHelper.load(a[k]); 3307 IForkJoinTask t = a[k]; 3308 if (base == b++) { 3309 if (t is null) 3310 Thread.yield(); // await index advance 3311 else if (AtomicHelper.compareAndSet(a[k], t, null)) { 3312 // else if (QA.compareAndSet(a, k, t, null)) { 3313 AtomicHelper.store(this.base, b); 3314 return t; 3315 } 3316 } 3317 } 3318 return null; 3319 } 3320 3321 /** 3322 * Takes next task, if one exists, in order specified by mode. 3323 */ 3324 final IForkJoinTask nextLocalTask() { 3325 IForkJoinTask t = null; 3326 int md = id, b, s, d, cap; IForkJoinTask[] a; 3327 if ((a = array) !is null && (cap = cast(int)a.length) > 0 && 3328 (d = (s = top) - (b = base)) > 0) { 3329 if ((md & FIFO) == 0 || d == 1) { 3330 auto index = (cap - 1) & --s; 3331 t = cast(IForkJoinTask)a[index]; 3332 a[index] = null; 3333 if(t !is null) { 3334 AtomicHelper.store(this.top, s); 3335 } 3336 // FIXME: Needing refactor or cleanup -@zxp at 2019/2/9 9:10:45 3337 // 3338 // if ((t = cast(IForkJoinTask) 3339 // AtomicHelper.getAndSet(a[(cap - 1) & --s], null)) !is null) 3340 // AtomicHelper.store(this.top, s); 3341 } else { 3342 // t = cast(IForkJoinTask) 3343 // AtomicHelper.getAndSet(a[(cap - 1) & b++], null) 3344 auto index = (cap - 1) & b++; 3345 t = cast(IForkJoinTask)a[index]; 3346 a[index] = null; 3347 if (t !is null) { 3348 AtomicHelper.store(this.base, b); 3349 } 3350 else // on contention in FIFO mode, use regular poll 3351 t = poll(); 3352 } 3353 } 3354 return t; 3355 } 3356 3357 /** 3358 * Returns next task, if one exists, in order specified by mode. 3359 */ 3360 final IForkJoinTask peek() { 3361 int cap; IForkJoinTask[] a; 3362 return ((a = array) !is null && (cap = cast(int)a.length) > 0) ? 3363 a[(cap - 1) & ((id & FIFO) != 0 ? base : top - 1)] : null; 3364 } 3365 3366 /** 3367 * Pops the given task only if it is at the current top. 3368 */ 3369 final bool tryUnpush(IForkJoinTask task) { 3370 bool popped = false; 3371 int s, cap; IForkJoinTask[] a; 3372 if ((a = array) !is null && (cap = cast(int)a.length) > 0 && 3373 (s = top) != base) { 3374 popped = AtomicHelper.compareAndSet(a[(cap - 1) & --s], task, null); 3375 if(popped) { 3376 AtomicHelper.store(this.top, s); 3377 } 3378 } 3379 return popped; 3380 } 3381 3382 /** 3383 * Shared version of tryUnpush. 3384 */ 3385 final bool tryLockedUnpush(IForkJoinTask task) { 3386 bool popped = false; 3387 int s = top - 1, k, cap; IForkJoinTask[] a; 3388 if ((a = array) !is null && (cap = cast(int)a.length) > 0 && 3389 a[k = (cap - 1) & s] == task && tryLockPhase()) { 3390 if (top == s + 1 && array == a) { 3391 popped = AtomicHelper.compareAndSet(a[k], task, null); 3392 if(popped) top = s; 3393 } 3394 releasePhaseLock(); 3395 } 3396 return popped; 3397 } 3398 3399 /** 3400 * Removes and cancels all known tasks, ignoring any exceptions. 3401 */ 3402 final void cancelAll() { 3403 for (IForkJoinTask t; (t = poll()) !is null; ) 3404 IForkJoinTask.cancelIgnoringExceptions(t); 3405 } 3406 3407 // Specialized execution methods 3408 3409 /** 3410 * Runs the given (stolen) task if nonnull, as well as 3411 * remaining local tasks and others available from the given 3412 * queue, up to bound n (to avoid infinite unfairness). 3413 */ 3414 final void topLevelExec(IForkJoinTask t, WorkQueue q, int n) { 3415 if (t !is null && q !is null) { // hoist checks 3416 int nstolen = 1; 3417 for (;;) { 3418 t.doExec(); 3419 if (n-- < 0) 3420 break; 3421 else if ((t = nextLocalTask()) is null) { 3422 if ((t = q.poll()) is null) 3423 break; 3424 else 3425 ++nstolen; 3426 } 3427 } 3428 ForkJoinWorkerThread thread = owner; 3429 nsteals += nstolen; 3430 source = 0; 3431 if (thread !is null) 3432 thread.afterTopLevelExec(); 3433 } 3434 } 3435 3436 /** 3437 * If present, removes task from queue and executes it. 3438 */ 3439 final void tryRemoveAndExec(IForkJoinTask task) { 3440 IForkJoinTask[] a; int s, cap; 3441 if ((a = array) !is null && (cap = cast(int)a.length) > 0 && 3442 (s = top) - base > 0) { // traverse from top 3443 for (int m = cap - 1, ns = s - 1, i = ns; ; --i) { 3444 int index = i & m; 3445 IForkJoinTask t = a[index]; //QA.get(a, index); 3446 if (t is null) 3447 break; 3448 else if (t == task) { 3449 // if (AtomicHelper.compareAndSet(a[index], t, null)) 3450 if(a[index] == t) { 3451 a[index] = null; 3452 top = ns; // safely shift down 3453 for (int j = i; j != ns; ++j) { 3454 IForkJoinTask f; 3455 int pindex = (j + 1) & m; 3456 f = a[pindex];// QA.get(a, pindex); 3457 a[pindex] = null; 3458 // AtomicHelper.store(a[pindex], null); 3459 // QA.setVolatile(a, pindex, null); 3460 int jindex = j & m; 3461 a[jindex] = f; 3462 // AtomicHelper.store(a[jindex], f); 3463 // QA.setRelease(a, jindex, f); 3464 } 3465 // VarHandle.releaseFence(); 3466 t.doExec(); 3467 } 3468 break; 3469 } 3470 } 3471 } 3472 } 3473 3474 /** 3475 * Tries to pop and run tasks within the target's computation 3476 * until done, not found, or limit exceeded. 3477 * 3478 * @param task root of CountedCompleter computation 3479 * @param limit max runs, or zero for no limit 3480 * @param shared true if must lock to extract task 3481 * @return task status on exit 3482 */ 3483 final int helpCC(ICountedCompleter task, int limit, bool isShared) { 3484 int status = 0; 3485 if (task !is null && (status = task.getStatus()) >= 0) { 3486 int s, k, cap; IForkJoinTask[] a; 3487 while ((a = array) !is null && (cap = cast(int)a.length) > 0 && 3488 (s = top) - base > 0) { 3489 ICountedCompleter v = null; 3490 IForkJoinTask o = a[k = (cap - 1) & (s - 1)]; 3491 ICountedCompleter t = cast(ICountedCompleter)o; 3492 if (t !is null) { 3493 for (ICountedCompleter f = t;;) { 3494 if (f != task) { 3495 if ((f = f.getCompleter()) is null) 3496 break; 3497 } 3498 else if (isShared) { 3499 if (tryLockPhase()) { 3500 if (top == s && array == a && 3501 AtomicHelper.compareAndSet(a[k], t, null)) { 3502 top = s - 1; 3503 v = t; 3504 } 3505 releasePhaseLock(); 3506 } 3507 break; 3508 } 3509 else { 3510 if (AtomicHelper.compareAndSet(a[k], t, null)) { 3511 top = s - 1; 3512 v = t; 3513 } 3514 break; 3515 } 3516 } 3517 } 3518 if (v !is null) 3519 v.doExec(); 3520 if ((status = task.getStatus()) < 0 || v is null || 3521 (limit != 0 && --limit == 0)) 3522 break; 3523 } 3524 } 3525 return status; 3526 } 3527 3528 /** 3529 * Tries to poll and run AsynchronousCompletionTasks until 3530 * none found or blocker is released 3531 * 3532 * @param blocker the blocker 3533 */ 3534 final void helpAsyncBlocker(ManagedBlocker blocker) { 3535 if (blocker !is null) { 3536 int b, k, cap; IForkJoinTask[] a; IForkJoinTask t; 3537 while ((a = array) !is null && (cap = cast(int)a.length) > 0 && 3538 top - (b = base) > 0) { 3539 k = (cap - 1) & b; 3540 // t = AtomicHelper.load(a[k]); 3541 t = a[k]; 3542 if (blocker.isReleasable()) 3543 break; 3544 else if (base == b++ && t !is null) { 3545 AsynchronousCompletionTask at = cast(AsynchronousCompletionTask)t; 3546 if (at is null) 3547 break; 3548 else if (AtomicHelper.compareAndSet(a[k], t, null)) { 3549 AtomicHelper.store(this.base, b); 3550 t.doExec(); 3551 } 3552 } 3553 } 3554 } 3555 } 3556 3557 /** 3558 * Returns true if owned and not known to be blocked. 3559 */ 3560 final bool isApparentlyUnblocked() { 3561 ThreadEx wt; ThreadState s; 3562 return ((wt = owner) !is null && 3563 (s = wt.getState()) != ThreadState.BLOCKED && 3564 s != ThreadState.WAITING && 3565 s != ThreadState.TIMED_WAITING); 3566 } 3567 3568 // VarHandle mechanics. 3569 // static final VarHandle PHASE; 3570 // static final VarHandle BASE; 3571 // static final VarHandle TOP; 3572 // static { 3573 // try { 3574 // MethodHandles.Lookup l = MethodHandles.lookup(); 3575 // PHASE = l.findVarHandle(WorkQueue.class, "phase", int.class); 3576 // BASE = l.findVarHandle(WorkQueue.class, "base", int.class); 3577 // TOP = l.findVarHandle(WorkQueue.class, "top", int.class); 3578 // } catch (ReflectiveOperationException e) { 3579 // throw new ExceptionInInitializerError(e); 3580 // } 3581 // } 3582 } 3583 3584 3585 3586 /** 3587 * A marker interface identifying asynchronous tasks produced by 3588 * {@code async} methods. This may be useful for monitoring, 3589 * debugging, and tracking asynchronous activities. 3590 * 3591 * @since 1.8 3592 */ 3593 interface AsynchronousCompletionTask { 3594 }