1 /*
2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3  *
4  * This code is free software; you can redistribute it and/or modify it
5  * under the terms of the GNU General Public License version 2 only, as
6  * published by the Free Software Foundation.  Oracle designates this
7  * particular file as subject to the "Classpath" exception as provided
8  * by Oracle in the LICENSE file that accompanied this code.
9  *
10  * This code is distributed in the hope that it will be useful, but WITHOUT
11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
13  * version 2 for more details (a copy is included in the LICENSE file that
14  * accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License version
17  * 2 along with this work; if not, write to the Free Software Foundation,
18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19  *
20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21  * or visit www.oracle.com if you need additional information or have any
22  * questions.
23  */
24 
25 /*
26  * This file is available under and governed by the GNU General Public
27  * License version 2 only, as published by the Free Software Foundation.
28  * However, the following notice accompanied the original version of this
29  * file:
30  *
31  * Written by Doug Lea with assistance from members of JCP JSR-166
32  * Expert Group and released to the domain, as explained at
33  * http://creativecommons.org/publicdomain/zero/1.0/
34  */
35 
36 module hunt.concurrency.ForkJoinPool;
37 
38 import hunt.concurrency.AbstractExecutorService;
39 import hunt.concurrency.atomic.AtomicHelper;
40 import hunt.concurrency.Exceptions;
41 import hunt.concurrency.ForkJoinTask;
42 import hunt.concurrency.ForkJoinTaskHelper;
43 import hunt.concurrency.ThreadLocalRandom;
44 import hunt.concurrency.thread;
45 
46 import hunt.collection.List;
47 import hunt.collection.Collection;
48 import hunt.collection.Collections;
49 import hunt.logging.ConsoleLogger;
50 import hunt.Exceptions;
51 import hunt.Functions;
52 import hunt.system.Memory;
53 import hunt.util.Common;
54 import hunt.util.DateTime;
55 
56 import core.atomic;
57 import core.sync.mutex;
58 import core.thread;
59 import core.time;
60 import std.algorithm;
61 import std.array;
62 import std.conv;
63 import std.datetime;
64 
65 alias ReentrantLock = Mutex;
66 
67 // import java.lang.Thread.UncaughtExceptionHandler;
68 // import java.lang.invoke.MethodHandles;
69 // import java.lang.invoke.VarHandle;
70 // import java.security.AccessController;
71 // import java.security.AccessControlContext;
72 // import java.security.Permission;
73 // import java.security.Permissions;
74 // import java.security.PrivilegedAction;
75 // import java.security.ProtectionDomain;
76 // import hunt.collection.ArrayList;
77 // import hunt.collection.Collection;
78 // import java.util.Collections;
79 // import java.util.List;
80 // import hunt.util.functional.Predicate;
81 // import hunt.concurrency.locks.LockSupport;
82 
83 
84 private {
85 
86 // Constants shared across ForkJoinPool and WorkQueue
87 
88 // Bounds
89 enum int SWIDTH       = 16;            // width of short
90 enum int SMASK        = 0xffff;        // short bits == max index
91 enum int MAX_CAP      = 0x7fff;        // max #workers - 1
92 enum int SQMASK       = 0x007e;        // max 64 (even) slots
93 
94 // Masks and units for WorkQueue.phase and ctl sp subfield
95 enum int UNSIGNALLED  = 1 << 31;       // must be negative
96 enum int SS_SEQ       = 1 << 16;       // version count
97 enum int QLOCK        = 1;             // must be 1
98 
99 // Mode bits and sentinels, some also used in WorkQueue id and.source fields
100 enum int OWNED        = 1;             // queue has owner thread
101 enum int FIFO         = 1 << 16;       // fifo queue or access mode
102 enum int SHUTDOWN     = 1 << 18;
103 enum int TERMINATED   = 1 << 19;
104 enum int STOP         = 1 << 31;       // must be negative
105 enum int QUIET        = 1 << 30;       // not scanning or working
106 enum int DORMANT      = QUIET | UNSIGNALLED;
107 
108 /**
109  * The maximum number of top-level polls per worker before
110  * checking other queues, expressed as a bit shift to, in effect,
111  * multiply by pool size, and then use as random value mask, so
112  * average bound is about poolSize*(1<<TOP_BOUND_SHIFT).  See
113  * above for rationale.
114  */
115 enum int TOP_BOUND_SHIFT = 10;
116 
117 /**
118  * Initial capacity of work-stealing queue array.
119  * Must be a power of two, at least 2.
120  */
121 enum int INITIAL_QUEUE_CAPACITY = 1 << 13;
122 
123 /**
124  * Maximum capacity for queue arrays. Must be a power of two less
125  * than or equal to 1 << (31 - width of array entry) to ensure
126  * lack of wraparound of index calculations, but defined to a
127  * value a bit less than this to help users trap runaway programs
128  * before saturating systems.
129  */
130 enum int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M
131 
132 }
133 
134 /**
135  * An {@link ExecutorService} for running {@link ForkJoinTask}s.
136  * A {@code ForkJoinPool} provides the entry point for submissions
137  * from non-{@code ForkJoinTask} clients, as well as management and
138  * monitoring operations.
139  *
140  * <p>A {@code ForkJoinPool} differs from other kinds of {@link
141  * ExecutorService} mainly by virtue of employing
142  * <em>work-stealing</em>: all threads in the pool attempt to find and
143  * execute tasks submitted to the pool and/or created by other active
144  * tasks (eventually blocking waiting for work if none exist). This
145  * enables efficient processing when most tasks spawn other subtasks
146  * (as do most {@code ForkJoinTask}s), as well as when many small
147  * tasks are submitted to the pool from external clients.  Especially
148  * when setting <em>asyncMode</em> to true in constructors, {@code
149  * ForkJoinPool}s may also be appropriate for use with event-style
150  * tasks that are never joined. All worker threads are initialized
151  * with {@link Thread#isDaemon} set {@code true}.
152  *
153  * <p>A static {@link #commonPool()} is available and appropriate for
154  * most applications. The common pool is used by any ForkJoinTask that
155  * is not explicitly submitted to a specified pool. Using the common
156  * pool normally reduces resource usage (its threads are slowly
157  * reclaimed during periods of non-use, and reinstated upon subsequent
158  * use).
159  *
160  * <p>For applications that require separate or custom pools, a {@code
161  * ForkJoinPool} may be constructed with a given target parallelism
162  * level; by default, equal to the number of available processors.
163  * The pool attempts to maintain enough active (or available) threads
164  * by dynamically adding, suspending, or resuming internal worker
165  * threads, even if some tasks are stalled waiting to join others.
166  * However, no such adjustments are guaranteed in the face of blocked
167  * I/O or other unmanaged synchronization. The nested {@link
168  * ManagedBlocker} interface enables extension of the kinds of
169  * synchronization accommodated. The default policies may be
170  * overridden using a constructor with parameters corresponding to
171  * those documented in class {@link ThreadPoolExecutor}.
172  *
173  * <p>In addition to execution and lifecycle control methods, this
174  * class provides status check methods (for example
175  * {@link #getStealCount}) that are intended to aid in developing,
176  * tuning, and monitoring fork/join applications. Also, method
177  * {@link #toString} returns indications of pool state in a
178  * convenient form for informal monitoring.
179  *
180  * <p>As is the case with other ExecutorServices, there are three
181  * main task execution methods summarized in the following table.
182  * These are designed to be used primarily by clients not already
183  * engaged in fork/join computations in the current pool.  The main
184  * forms of these methods accept instances of {@code ForkJoinTask},
185  * but overloaded forms also allow mixed execution of plain {@code
186  * Runnable}- or {@code Callable}- based activities as well.  However,
187  * tasks that are already executing in a pool should normally instead
188  * use the within-computation forms listed in the table unless using
189  * async event-style tasks that are not usually joined, in which case
190  * there is little difference among choice of methods.
191  *
192  * <table class="plain">
193  * <caption>Summary of task execution methods</caption>
194  *  <tr>
195  *    <td></td>
196  *    <th scope="col"> Call from non-fork/join clients</th>
197  *    <th scope="col"> Call from within fork/join computations</th>
198  *  </tr>
199  *  <tr>
200  *    <th scope="row" style="text-align:left"> Arrange async execution</th>
201  *    <td> {@link #execute(ForkJoinTask)}</td>
202  *    <td> {@link ForkJoinTask#fork}</td>
203  *  </tr>
204  *  <tr>
205  *    <th scope="row" style="text-align:left"> Await and obtain result</th>
206  *    <td> {@link #invoke(ForkJoinTask)}</td>
207  *    <td> {@link ForkJoinTask#invoke}</td>
208  *  </tr>
209  *  <tr>
210  *    <th scope="row" style="text-align:left"> Arrange exec and obtain Future</th>
211  *    <td> {@link #submit(ForkJoinTask)}</td>
212  *    <td> {@link ForkJoinTask#fork} (ForkJoinTasks <em>are</em> Futures)</td>
213  *  </tr>
214  * </table>
215  *
216  * <p>The parameters used to construct the common pool may be controlled by
217  * setting the following {@linkplain System#getProperty system properties}:
218  * <ul>
219  * <li>{@code hunt.concurrency.ForkJoinPool.common.parallelism}
220  * - the parallelism level, a non-negative integer
221  * <li>{@code hunt.concurrency.ForkJoinPool.common.threadFactory}
222  * - the class name of a {@link ForkJoinWorkerThreadFactory}.
223  * The {@linkplain ClassLoader#getSystemClassLoader() system class loader}
224  * is used to load this class.
225  * <li>{@code hunt.concurrency.ForkJoinPool.common.exceptionHandler}
226  * - the class name of a {@link UncaughtExceptionHandler}.
227  * The {@linkplain ClassLoader#getSystemClassLoader() system class loader}
228  * is used to load this class.
229  * <li>{@code hunt.concurrency.ForkJoinPool.common.maximumSpares}
230  * - the maximum number of allowed extra threads to maintain target
231  * parallelism (default 256).
232  * </ul>
233  * If no thread factory is supplied via a system property, then the
234  * common pool uses a factory that uses the system class loader as the
235  * {@linkplain Thread#getContextClassLoader() thread context class loader}.
236  * In addition, if a {@link SecurityManager} is present, then
237  * the common pool uses a factory supplying threads that have no
238  * {@link Permissions} enabled.
239  *
240  * Upon any error in establishing these settings, default parameters
241  * are used. It is possible to disable or limit the use of threads in
242  * the common pool by setting the parallelism property to zero, and/or
243  * using a factory that may return {@code null}. However doing so may
244  * cause unjoined tasks to never be executed.
245  *
246  * <p><b>Implementation notes</b>: This implementation restricts the
247  * maximum number of running threads to 32767. Attempts to create
248  * pools with greater than the maximum number result in
249  * {@code IllegalArgumentException}.
250  *
251  * <p>This implementation rejects submitted tasks (that is, by throwing
252  * {@link RejectedExecutionException}) only when the pool is shut down
253  * or internal resources have been exhausted.
254  *
255  * @since 1.7
256  * @author Doug Lea
257  */
258 class ForkJoinPool : AbstractExecutorService { 
259 
260     /*
261      * Implementation Overview
262      *
263      * This class and its nested classes provide the main
264      * functionality and control for a set of worker threads:
265      * Submissions from non-FJ threads enter into submission queues.
266      * Workers take these tasks and typically split them into subtasks
267      * that may be stolen by other workers. Work-stealing based on
268      * randomized scans generally leads to better throughput than
269      * "work dealing" in which producers assign tasks to idle threads,
270      * in part because threads that have finished other tasks before
271      * the signalled thread wakes up (which can be a long time) can
272      * take the task instead.  Preference rules give first priority to
273      * processing tasks from their own queues (LIFO or FIFO, depending
274      * on mode), then to randomized FIFO steals of tasks in other
275      * queues.  This framework began as vehicle for supporting
276      * tree-structured parallelism using work-stealing.  Over time,
277      * its scalability advantages led to extensions and changes to
278      * better support more diverse usage contexts.  Because most
279      * internal methods and nested classes are interrelated, their
280      * main rationale and descriptions are presented here; individual
281      * methods and nested classes contain only brief comments about
282      * details.
283      *
284      * WorkQueues
285      * ==========
286      *
287      * Most operations occur within work-stealing queues (in nested
288      * class WorkQueue).  These are special forms of Deques that
289      * support only three of the four possible end-operations -- push,
290      * pop, and poll (aka steal), under the further constraints that
291      * push and pop are called only from the owning thread (or, as
292      * extended here, under a lock), while poll may be called from
293      * other threads.  (If you are unfamiliar with them, you probably
294      * want to read Herlihy and Shavit's book "The Art of
295      * Multiprocessor programming", chapter 16 describing these in
296      * more detail before proceeding.)  The main work-stealing queue
297      * design is roughly similar to those in the papers "Dynamic
298      * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005
299      * (http://research.sun.com/scalable/pubs/index.html) and
300      * "Idempotent work stealing" by Michael, Saraswat, and Vechev,
301      * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186).
302      * The main differences ultimately stem from GC requirements that
303      * we null out taken slots as soon as we can, to maintain as small
304      * a footprint as possible even in programs generating huge
305      * numbers of tasks. To accomplish this, we shift the CAS
306      * arbitrating pop vs poll (steal) from being on the indices
307      * ("base" and "top") to the slots themselves.
308      *
309      * Adding tasks then takes the form of a classic array push(task)
310      * in a circular buffer:
311      *    q.array[q.top++ % length] = task;
312      *
313      * (The actual code needs to null-check and size-check the array,
314      * uses masking, not mod, for indexing a power-of-two-sized array,
315      * adds a release fence for publication, and possibly signals
316      * waiting workers to start scanning -- see below.)  Both a
317      * successful pop and poll mainly entail a CAS of a slot from
318      * non-null to null.
319      *
320      * The pop operation (always performed by owner) is:
321      *   if ((the task at top slot is not null) and
322      *        (CAS slot to null))
323      *           decrement top and return task;
324      *
325      * And the poll operation (usually by a stealer) is
326      *    if ((the task at base slot is not null) and
327      *        (CAS slot to null))
328      *           increment base and return task;
329      *
330      * There are several variants of each of these. Most uses occur
331      * within operations that also interleave contention or emptiness
332      * tracking or inspection of elements before extracting them, so
333      * must interleave these with the above code. When performed by
334      * owner, getAndSet is used instead of CAS (see for example method
335      * nextLocalTask) which is usually more efficient, and possible
336      * because the top index cannot independently change during the
337      * operation.
338      *
339      * Memory ordering.  See "Correct and Efficient Work-Stealing for
340      * Weak Memory Models" by Le, Pop, Cohen, and Nardelli, PPoPP 2013
341      * (http://www.di.ens.fr/~zappa/readings/ppopp13.pdf) for an
342      * analysis of memory ordering requirements in work-stealing
343      * algorithms similar to (but different than) the one used here.
344      * Extracting tasks in array slots via (fully fenced) CAS provides
345      * primary synchronization. The base and top indices imprecisely
346      * guide where to extract from. We do not usually require strict
347      * orderings of array and index updates. Many index accesses use
348      * plain mode, with ordering constrained by surrounding context
349      * (usually with respect to element CASes or the two WorkQueue
350      * fields source and phase). When not otherwise already
351      * constrained, reads of "base" by queue owners use acquire-mode,
352      * and some externally callable methods preface accesses with
353      * acquire fences.  Additionally, to ensure that index update
354      * writes are not coalesced or postponed in loops etc, "opaque"
355      * mode is used in a few cases where timely writes are not
356      * otherwise ensured. The "locked" versions of push- and pop-
357      * based methods for shared queues differ from owned versions
358      * because locking already forces some of the ordering.
359      *
360      * Because indices and slot contents cannot always be consistent,
361      * a check that base == top indicates (momentary) emptiness, but
362      * otherwise may err on the side of possibly making the queue
363      * appear nonempty when a push, pop, or poll have not fully
364      * committed, or making it appear empty when an update of top has
365      * not yet been visibly written.  (Method isEmpty() checks the
366      * case of a partially completed removal of the last element.)
367      * Because of this, the poll operation, considered individually,
368      * is not wait-free. One thief cannot successfully continue until
369      * another in-progress one (or, if previously empty, a push)
370      * visibly completes.  This can stall threads when required to
371      * consume from a given queue (see method poll()).  However, in
372      * the aggregate, we ensure at least probabilistic
373      * non-blockingness.  If an attempted steal fails, a scanning
374      * thief chooses a different random victim target to try next. So,
375      * in order for one thief to progress, it suffices for any
376      * in-progress poll or new push on any empty queue to complete.
377      *
378      * This approach also enables support of a user mode in which
379      * local task processing is in FIFO, not LIFO order, simply by
380      * using poll rather than pop.  This can be useful in
381      * message-passing frameworks in which tasks are never joined.
382      *
383      * WorkQueues are also used in a similar way for tasks submitted
384      * to the pool. We cannot mix these tasks in the same queues used
385      * by workers. Instead, we randomly associate submission queues
386      * with submitting threads, using a form of hashing.  The
387      * ThreadLocalRandom probe value serves as a hash code for
388      * choosing existing queues, and may be randomly repositioned upon
389      * contention with other submitters.  In essence, submitters act
390      * like workers except that they are restricted to executing local
391      * tasks that they submitted.  Insertion of tasks in shared mode
392      * requires a lock but we use only a simple spinlock (using field
393      * phase), because submitters encountering a busy queue move to a
394      * different position to use or create other queues -- they block
395      * only when creating and registering new queues. Because it is
396      * used only as a spinlock, unlocking requires only a "releasing"
397      * store (using setRelease) unless otherwise signalling.
398      *
399      * Management
400      * ==========
401      *
402      * The main throughput advantages of work-stealing stem from
403      * decentralized control -- workers mostly take tasks from
404      * themselves or each other, at rates that can exceed a billion
405      * per second.  The pool itself creates, activates (enables
406      * scanning for and running tasks), deactivates, blocks, and
407      * terminates threads, all with minimal central information.
408      * There are only a few properties that we can globally track or
409      * maintain, so we pack them into a small number of variables,
410      * often maintaining atomicity without blocking or locking.
411      * Nearly all essentially atomic control state is held in a few
412      * variables that are by far most often read (not
413      * written) as status and consistency checks. We pack as much
414      * information into them as we can.
415      *
416      * Field "ctl" contains 64 bits holding information needed to
417      * atomically decide to add, enqueue (on an event queue), and
418      * dequeue and release workers.  To enable this packing, we
419      * restrict maximum parallelism to (1<<15)-1 (which is far in
420      * excess of normal operating range) to allow ids, counts, and
421      * their negations (used for thresholding) to fit into 16bit
422      * subfields.
423      *
424      * Field "mode" holds configuration parameters as well as lifetime
425      * status, atomically and monotonically setting SHUTDOWN, STOP,
426      * and finally TERMINATED bits.
427      *
428      * Field "workQueues" holds references to WorkQueues.  It is
429      * updated (only during worker creation and termination) under
430      * lock (using field workerNamePrefix as lock), but is otherwise
431      * concurrently readable, and accessed directly. We also ensure
432      * that uses of the array reference itself never become too stale
433      * in case of resizing, by arranging that (re-)reads are separated
434      * by at least one acquiring read access.  To simplify index-based
435      * operations, the array size is always a power of two, and all
436      * readers must tolerate null slots. Worker queues are at odd
437      * indices. Shared (submission) queues are at even indices, up to
438      * a maximum of 64 slots, to limit growth even if the array needs
439      * to expand to add more workers. Grouping them together in this
440      * way simplifies and speeds up task scanning.
441      *
442      * All worker thread creation is on-demand, triggered by task
443      * submissions, replacement of terminated workers, and/or
444      * compensation for blocked workers. However, all other support
445      * code is set up to work with other policies.  To ensure that we
446      * do not hold on to worker references that would prevent GC, all
447      * accesses to workQueues are via indices into the workQueues
448      * array (which is one source of some of the messy code
449      * constructions here). In essence, the workQueues array serves as
450      * a weak reference mechanism. Thus for example the stack top
451      * subfield of ctl stores indices, not references.
452      *
453      * Queuing Idle Workers. Unlike HPC work-stealing frameworks, we
454      * cannot let workers spin indefinitely scanning for tasks when
455      * none can be found immediately, and we cannot start/resume
456      * workers unless there appear to be tasks available.  On the
457      * other hand, we must quickly prod them into action when new
458      * tasks are submitted or generated. In many usages, ramp-up time
459      * is the main limiting factor in overall performance, which is
460      * compounded at program start-up by JIT compilation and
461      * allocation. So we streamline this as much as possible.
462      *
463      * The "ctl" field atomically maintains total worker and
464      * "released" worker counts, plus the head of the available worker
465      * queue (actually stack, represented by the lower 32bit subfield
466      * of ctl).  Released workers are those known to be scanning for
467      * and/or running tasks. Unreleased ("available") workers are
468      * recorded in the ctl stack. These workers are made available for
469      * signalling by enqueuing in ctl (see method runWorker).  The
470      * "queue" is a form of Treiber stack. This is ideal for
471      * activating threads in most-recently used order, and improves
472      * performance and locality, outweighing the disadvantages of
473      * being prone to contention and inability to release a worker
474      * unless it is topmost on stack.  To avoid missed signal problems
475      * inherent in any wait/signal design, available workers rescan
476      * for (and if found run) tasks after enqueuing.  Normally their
477      * release status will be updated while doing so, but the released
478      * worker ctl count may underestimate the number of active
479      * threads. (However, it is still possible to determine quiescence
480      * via a validation traversal -- see isQuiescent).  After an
481      * unsuccessful rescan, available workers are blocked until
482      * signalled (see signalWork).  The top stack state holds the
483      * value of the "phase" field of the worker: its index and status,
484      * plus a version counter that, in addition to the count subfields
485      * (also serving as version stamps) provide protection against
486      * Treiber stack ABA effects.
487      *
488      * Creating workers. To create a worker, we pre-increment counts
489      * (serving as a reservation), and attempt to construct a
490      * ForkJoinWorkerThread via its factory. Upon construction, the
491      * new thread invokes registerWorker, where it constructs a
492      * WorkQueue and is assigned an index in the workQueues array
493      * (expanding the array if necessary). The thread is then started.
494      * Upon any exception across these steps, or null return from
495      * factory, deregisterWorker adjusts counts and records
496      * accordingly.  If a null return, the pool continues running with
497      * fewer than the target number workers. If exceptional, the
498      * exception is propagated, generally to some external caller.
499      * Worker index assignment avoids the bias in scanning that would
500      * occur if entries were sequentially packed starting at the front
501      * of the workQueues array. We treat the array as a simple
502      * power-of-two hash table, expanding as needed. The seedIndex
503      * increment ensures no collisions until a resize is needed or a
504      * worker is deregistered and replaced, and thereafter keeps
505      * probability of collision low. We cannot use
506      * ThreadLocalRandom.getProbe() for similar purposes here because
507      * the thread has not started yet, but do so for creating
508      * submission queues for existing external threads (see
509      * externalPush).
510      *
511      * WorkQueue field "phase" is used by both workers and the pool to
512      * manage and track whether a worker is UNSIGNALLED (possibly
513      * blocked waiting for a signal).  When a worker is enqueued its
514      * phase field is set. Note that phase field updates lag queue CAS
515      * releases so usage requires care -- seeing a negative phase does
516      * not guarantee that the worker is available. When queued, the
517      * lower 16 bits of scanState must hold its pool index. So we
518      * place the index there upon initialization and otherwise keep it
519      * there or restore it when necessary.
520      *
521      * The ctl field also serves as the basis for memory
522      * synchronization surrounding activation. This uses a more
523      * efficient version of a Dekker-like rule that task producers and
524      * consumers sync with each other by both writing/CASing ctl (even
525      * if to its current value).  This would be extremely costly. So
526      * we relax it in several ways: (1) Producers only signal when
527      * their queue is possibly empty at some point during a push
528      * operation (which requires conservatively checking size zero or
529      * one to cover races). (2) Other workers propagate this signal
530      * when they find tasks in a queue with size greater than one. (3)
531      * Workers only enqueue after scanning (see below) and not finding
532      * any tasks.  (4) Rather than CASing ctl to its current value in
533      * the common case where no action is required, we reduce write
534      * contention by equivalently prefacing signalWork when called by
535      * an external task producer using a memory access with
536      * full-semantics or a "fullFence".
537      *
538      * Almost always, too many signals are issued, in part because a
539      * task producer cannot tell if some existing worker is in the
540      * midst of finishing one task (or already scanning) and ready to
541      * take another without being signalled. So the producer might
542      * instead activate a different worker that does not find any
543      * work, and then inactivates. This scarcely matters in
544      * steady-state computations involving all workers, but can create
545      * contention and bookkeeping bottlenecks during ramp-up,
546      * ramp-down, and small computations involving only a few workers.
547      *
548      * Scanning. Method scan (from runWorker) performs top-level
549      * scanning for tasks. (Similar scans appear in helpQuiesce and
550      * pollScan.)  Each scan traverses and tries to poll from each
551      * queue starting at a random index. Scans are not performed in
552      * ideal random permutation order, to reduce cacheline
553      * contention. The pseudorandom generator need not have
554      * high-quality statistical properties in the long term, but just
555      * within computations; We use Marsaglia XorShifts (often via
556      * ThreadLocalRandom.nextSecondarySeed), which are cheap and
557      * suffice. Scanning also includes contention reduction: When
558      * scanning workers fail to extract an apparently existing task,
559      * they soon restart at a different pseudorandom index.  This form
560      * of backoff improves throughput when many threads are trying to
561      * take tasks from few queues, which can be common in some usages.
562      * Scans do not otherwise explicitly take into account core
563      * affinities, loads, cache localities, etc, However, they do
564      * exploit temporal locality (which usually approximates these) by
565      * preferring to re-poll from the same queue after a successful
566      * poll before trying others (see method topLevelExec). However
567      * this preference is bounded (see TOP_BOUND_SHIFT) as a safeguard
568      * against infinitely unfair looping under unbounded user task
569      * recursion, and also to reduce long-term contention when many
570      * threads poll few queues holding many small tasks. The bound is
571      * high enough to avoid much impact on locality and scheduling
572      * overhead.
573      *
574      * Trimming workers. To release resources after periods of lack of
575      * use, a worker starting to wait when the pool is quiescent will
576      * time out and terminate (see method runWorker) if the pool has
577      * remained quiescent for period given by field keepAlive.
578      *
579      * Shutdown and Termination. A call to shutdownNow invokes
580      * tryTerminate to atomically set a runState bit. The calling
581      * thread, as well as every other worker thereafter terminating,
582      * helps terminate others by cancelling their unprocessed tasks,
583      * and waking them up, doing so repeatedly until stable. Calls to
584      * non-abrupt shutdown() preface this by checking whether
585      * termination should commence by sweeping through queues (until
586      * stable) to ensure lack of in-flight submissions and workers
587      * about to process them before triggering the "STOP" phase of
588      * termination.
589      *
590      * Joining Tasks
591      * =============
592      *
593      * Any of several actions may be taken when one worker is waiting
594      * to join a task stolen (or always held) by another.  Because we
595      * are multiplexing many tasks on to a pool of workers, we can't
596      * always just let them block (as in Thread.join).  We also cannot
597      * just reassign the joiner's run-time stack with another and
598      * replace it later, which would be a form of "continuation", that
599      * even if possible is not necessarily a good idea since we may
600      * need both an unblocked task and its continuation to progress.
601      * Instead we combine two tactics:
602      *
603      *   Helping: Arranging for the joiner to execute some task that it
604      *      would be running if the steal had not occurred.
605      *
606      *   Compensating: Unless there are already enough live threads,
607      *      method tryCompensate() may create or re-activate a spare
608      *      thread to compensate for blocked joiners until they unblock.
609      *
610      * A third form (implemented in tryRemoveAndExec) amounts to
611      * helping a hypothetical compensator: If we can readily tell that
612      * a possible action of a compensator is to steal and execute the
613      * task being joined, the joining thread can do so directly,
614      * without the need for a compensation thread.
615      *
616      * The ManagedBlocker extension API can't use helping so relies
617      * only on compensation in method awaitBlocker.
618      *
619      * The algorithm in awaitJoin entails a form of "linear helping".
620      * Each worker records (in field source) the id of the queue from
621      * which it last stole a task.  The scan in method awaitJoin uses
622      * these markers to try to find a worker to help (i.e., steal back
623      * a task from and execute it) that could hasten completion of the
624      * actively joined task.  Thus, the joiner executes a task that
625      * would be on its own local deque if the to-be-joined task had
626      * not been stolen. This is a conservative variant of the approach
627      * described in Wagner & Calder "Leapfrogging: a portable
628      * technique for implementing efficient futures" SIGPLAN Notices,
629      * 1993 (http://portal.acm.org/citation.cfm?id=155354). It differs
630      * mainly in that we only record queue ids, not full dependency
631      * links.  This requires a linear scan of the workQueues array to
632      * locate stealers, but isolates cost to when it is needed, rather
633      * than adding to per-task overhead. Searches can fail to locate
634      * stealers GC stalls and the like delay recording sources.
635      * Further, even when accurately identified, stealers might not
636      * ever produce a task that the joiner can in turn help with. So,
637      * compensation is tried upon failure to find tasks to run.
638      *
639      * Compensation does not by default aim to keep exactly the target
640      * parallelism number of unblocked threads running at any given
641      * time. Some previous versions of this class employed immediate
642      * compensations for any blocked join. However, in practice, the
643      * vast majority of blockages are byproducts of GC and
644      * other JVM or OS activities that are made worse by replacement
645      * when they cause longer-term oversubscription.  Rather than
646      * impose arbitrary policies, we allow users to override the
647      * default of only adding threads upon apparent starvation.  The
648      * compensation mechanism may also be bounded.  Bounds for the
649      * commonPool (see COMMON_MAX_SPARES) better enable JVMs to cope
650      * with programming errors and abuse before running out of
651      * resources to do so.
652      *
653      * Common Pool
654      * ===========
655      *
656      * The static common pool always exists after static
657      * initialization.  Since it (or any other created pool) need
658      * never be used, we minimize initial construction overhead and
659      * footprint to the setup of about a dozen fields.
660      *
661      * When external threads submit to the common pool, they can
662      * perform subtask processing (see externalHelpComplete and
663      * related methods) upon joins.  This caller-helps policy makes it
664      * sensible to set common pool parallelism level to one (or more)
665      * less than the total number of available cores, or even zero for
666      * pure caller-runs.  We do not need to record whether external
667      * submissions are to the common pool -- if not, external help
668      * methods return quickly. These submitters would otherwise be
669      * blocked waiting for completion, so the extra effort (with
670      * liberally sprinkled task status checks) in inapplicable cases
671      * amounts to an odd form of limited spin-wait before blocking in
672      * ForkJoinTask.join.
673      *
674      * As a more appropriate default in managed environments, unless
675      * overridden by system properties, we use workers of subclass
676      * InnocuousForkJoinWorkerThread when there is a SecurityManager
677      * present. These workers have no permissions set, do not belong
678      * to any user-defined ThreadGroupEx, and erase all ThreadLocals
679      * after executing any top-level task (see
680      * WorkQueue.afterTopLevelExec).  The associated mechanics (mainly
681      * in ForkJoinWorkerThread) may be JVM-dependent and must access
682      * particular Thread class fields to achieve this effect.
683      *
684      * Memory placement
685      * ================
686      *
687      * Performance can be very sensitive to placement of instances of
688      * ForkJoinPool and WorkQueues and their queue arrays. To reduce
689      * false-sharing impact, the @Contended annotation isolates
690      * adjacent WorkQueue instances, as well as the ForkJoinPool.ctl
691      * field. WorkQueue arrays are allocated (by their threads) with
692      * larger initial sizes than most ever need, mostly to reduce
693      * false sharing with current garbage collectors that use cardmark
694      * tables.
695      *
696      * Style notes
697      * ===========
698      *
699      * Memory ordering relies mainly on VarHandles.  This can be
700      * awkward and ugly, but also reflects the need to control
701      * outcomes across the unusual cases that arise in very racy code
702      * with very few invariants. All fields are read into locals
703      * before use, and null-checked if they are references.  Array
704      * accesses using masked indices include checks (that are always
705      * true) that the array length is non-zero to avoid compilers
706      * inserting more expensive traps.  This is usually done in a
707      * "C"-like style of listing declarations at the heads of methods
708      * or blocks, and using inline assignments on first encounter.
709      * Nearly all explicit checks lead to bypass/return, not exception
710      * throws, because they may legitimately arise due to
711      * cancellation/revocation during shutdown.
712      *
713      * There is a lot of representation-level coupling among classes
714      * ForkJoinPool, ForkJoinWorkerThread, and ForkJoinTask.  The
715      * fields of WorkQueue maintain data structures managed by
716      * ForkJoinPool, so are directly accessed.  There is little point
717      * trying to reduce this, since any associated future changes in
718      * representations will need to be accompanied by algorithmic
719      * changes anyway. Several methods intrinsically sprawl because
720      * they must accumulate sets of consistent reads of fields held in
721      * local variables. Some others are artificially broken up to
722      * reduce producer/consumer imbalances due to dynamic compilation.
723      * There are also other coding oddities (including several
724      * unnecessary-looking hoisted null checks) that help some methods
725      * perform reasonably even when interpreted (not compiled).
726      *
727      * The order of declarations in this file is (with a few exceptions):
728      * (1) Static utility functions
729      * (2) Nested (static) classes
730      * (3) Static fields
731      * (4) Fields, along with constants used when unpacking some of them
732      * (5) Internal control methods
733      * (6) Callbacks and other support for ForkJoinTask methods
734      * (7) Exported methods
735      * (8) Static block initializing statics in minimally dependent order
736      */
737 
738     // Static utilities
739 
740     /**
741      * If there is a security manager, makes sure caller has
742      * permission to modify threads.
743      */
744     // private static void checkPermission() {
745     //     SecurityManager security = System.getSecurityManager();
746     //     if (security !is null)
747     //         security.checkPermission(modifyThreadPermission);
748     // }
749 
750 
751     // static fields (initialized in static initializer below)
752 
753     /**
754      * Creates a new ForkJoinWorkerThread. This factory is used unless
755      * overridden in ForkJoinPool constructors.
756      */
757     __gshared ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory;
758 
759     /**
760      * Permission required for callers of methods that may start or
761      * kill threads.
762      */
763     // __gshared RuntimePermission modifyThreadPermission;
764 
765     /**
766      * Common (static) pool. Non-null for use unless a static
767      * construction exception, but internal usages null-check on use
768      * to paranoically avoid potential initialization circularities
769      * as well as to simplify generated code.
770      */
771     __gshared ForkJoinPool common;
772 
773     /**
774      * Common pool parallelism. To allow simpler use and management
775      * when common pool threads are disabled, we allow the underlying
776      * common.parallelism field to be zero, but in that case still report
777      * parallelism as 1 to reflect resulting caller-runs mechanics.
778      */
779     __gshared int COMMON_PARALLELISM;
780 
781     /**
782      * Limit on spare thread construction in tryCompensate.
783      */
784     private __gshared int COMMON_MAX_SPARES;
785 
786     /**
787      * Sequence number for creating workerNamePrefix.
788      */
789     private shared static int poolNumberSequence;
790 
791     /**
792      * Returns the next sequence number. We don't expect this to
793      * ever contend, so use simple builtin sync.
794      */
795     private static int nextPoolId() {
796         return AtomicHelper.increment(poolNumberSequence);
797     }
798 
799     // static configuration constants
800 
801     /**
802      * Default idle timeout value (in milliseconds) for the thread
803      * triggering quiescence to park waiting for new work
804      */
805     private enum long DEFAULT_KEEPALIVE = 60_000L;
806 
807     /**
808      * Undershoot tolerance for idle timeouts
809      */
810     private enum long TIMEOUT_SLOP = 20L;
811 
812     /**
813      * The default value for COMMON_MAX_SPARES.  Overridable using the
814      * "hunt.concurrency.ForkJoinPool.common.maximumSpares" system
815      * property.  The default value is far in excess of normal
816      * requirements, but also far short of MAX_CAP and typical OS
817      * thread limits, so allows JVMs to catch misuse/abuse before
818      * running out of resources needed to do so.
819      */
820     private enum int DEFAULT_COMMON_MAX_SPARES = 256;
821 
822     /**
823      * Increment for seed generators. See class ThreadLocal for
824      * explanation.
825      */
826     private enum int SEED_INCREMENT = 0x9e3779b9;
827 
828     /*
829      * Bits and masks for field ctl, packed with 4 16 bit subfields:
830      * RC: Number of released (unqueued) workers minus target parallelism
831      * TC: Number of total workers minus target parallelism
832      * SS: version count and status of top waiting thread
833      * ID: poolIndex of top of Treiber stack of waiters
834      *
835      * When convenient, we can extract the lower 32 stack top bits
836      * (including version bits) as sp=(int)ctl.  The offsets of counts
837      * by the target parallelism and the positionings of fields makes
838      * it possible to perform the most common checks via sign tests of
839      * fields: When ac is negative, there are not enough unqueued
840      * workers, when tc is negative, there are not enough total
841      * workers.  When sp is non-zero, there are waiting workers.  To
842      * deal with possibly negative fields, we use casts in and out of
843      * "short" and/or signed shifts to maintain signedness.
844      *
845      * Because it occupies uppermost bits, we can add one release count
846      * using getAndAddLong of RC_UNIT, rather than CAS, when returning
847      * from a blocked join.  Other updates entail multiple subfields
848      * and masking, requiring CAS.
849      *
850      * The limits packed in field "bounds" are also offset by the
851      * parallelism level to make them comparable to the ctl rc and tc
852      * fields.
853      */
854 
855     // Lower and upper word masks
856     private enum long SP_MASK    = 0xffffffffL;
857     private enum long UC_MASK    = ~SP_MASK;
858 
859     // Release counts
860     private enum int  RC_SHIFT   = 48;
861     private enum long RC_UNIT    = 0x0001L << RC_SHIFT;
862     private enum long RC_MASK    = 0xffffL << RC_SHIFT;
863 
864     // Total counts
865     private enum int  TC_SHIFT   = 32;
866     private enum long TC_UNIT    = 0x0001L << TC_SHIFT;
867     private enum long TC_MASK    = 0xffffL << TC_SHIFT;
868     private enum long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); // sign
869 
870     // Instance fields
871 
872     long stealCount;            // collects worker nsteals
873     long keepAlive;                // milliseconds before dropping if idle
874     int indexSeed;                       // next worker index
875     int bounds;                    // min, max threads packed as shorts
876     int mode;                   // parallelism, runstate, queue mode
877     WorkQueue[] workQueues;              // main registry
878     string workerNamePrefix;       // for worker thread string; sync lock
879     Object workerNameLocker;
880     ForkJoinWorkerThreadFactory factory;
881     UncaughtExceptionHandler ueh;  // per-worker UEH
882     Predicate!(ForkJoinPool) saturate;
883 
884     // @jdk.internal.vm.annotation.Contended("fjpctl") // segregate
885     shared long ctl;                   // main pool control
886 
887     // Creating, registering and deregistering workers
888 
889     /**
890      * Tries to construct and start one worker. Assumes that total
891      * count has already been incremented as a reservation.  Invokes
892      * deregisterWorker on any failure.
893      *
894      * @return true if successful
895      */
896     private bool createWorker() {
897         ForkJoinWorkerThreadFactory fac = factory;
898         Throwable ex = null;
899         ForkJoinWorkerThread wt = null;
900         try {
901             if (fac !is null && (wt = fac.newThread(this)) !is null) {
902                 wt.start();
903                 return true;
904             }
905         } catch (Throwable rex) {
906             ex = rex;
907         }
908         deregisterWorker(wt, ex);
909         return false;
910     }
911 
912     /**
913      * Tries to add one worker, incrementing ctl counts before doing
914      * so, relying on createWorker to back out on failure.
915      *
916      * @param c incoming ctl value, with total count negative and no
917      * idle workers.  On CAS failure, c is refreshed and retried if
918      * this holds (otherwise, a new worker is not needed).
919      */
920     private void tryAddWorker(long c) {
921         do {
922             long nc = ((RC_MASK & (c + RC_UNIT)) |
923                        (TC_MASK & (c + TC_UNIT)));
924             if (ctl == c && AtomicHelper.compareAndSet(this.ctl, c, nc)) {
925                 createWorker();
926                 break;
927             }
928         } while (((c = ctl) & ADD_WORKER) != 0L && cast(int)c == 0);
929     }
930 
931     /**
932      * Callback from ForkJoinWorkerThread constructor to establish and
933      * record its WorkQueue.
934      *
935      * @param wt the worker thread
936      * @return the worker's queue
937      */
938     final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
939         UncaughtExceptionHandler handler;
940         wt.isDaemon(true);                             // configure thread
941         if ((handler = ueh) !is null)
942             wt.setUncaughtExceptionHandler(handler);
943         int tid = 0;                                    // for thread name
944         int idbits = mode & FIFO;
945         string prefix = workerNamePrefix;
946         WorkQueue w = new WorkQueue(this, wt);
947         if (prefix !is null) {
948             synchronized (this) {
949                 WorkQueue[] ws = workQueues; 
950                 int n;
951                 int s = indexSeed += SEED_INCREMENT;
952                 idbits |= (s & ~(SMASK | FIFO | DORMANT));
953                 if (ws !is null && (n = cast(int)ws.length) > 1) {
954                     int m = n - 1;
955                     tid = m & ((s << 1) | 1);           // odd-numbered indices
956                     for (int probes = n >>> 1;;) {      // find empty slot
957                         WorkQueue q;
958                         if ((q = ws[tid]) is null || q.phase == QUIET)
959                             break;
960                         else if (--probes == 0) {
961                             tid = n | 1;                // resize below
962                             break;
963                         }
964                         else
965                             tid = (tid + 2) & m;
966                     }
967                     w.phase = w.id = tid | idbits;      // now publishable
968 
969                     if (tid < n)
970                         ws[tid] = w;
971                     else {                              // expand array
972                         int an = n << 1;
973                         WorkQueue[] as = new WorkQueue[an];
974                         as[tid] = w;
975                         int am = an - 1;
976                         for (int j = 0; j < n; ++j) {
977                             WorkQueue v;                // copy external queue
978                             if ((v = ws[j]) !is null)    // position may change
979                                 as[v.id & am & SQMASK] = v;
980                             if (++j >= n)
981                                 break;
982                             as[j] = ws[j];              // copy worker
983                         }
984                         workQueues = as;
985                     }
986                 }
987             }
988             wt.name(prefix ~ tid.to!string());
989         }
990         return w;
991     }
992 
993     /**
994      * Final callback from terminating worker, as well as upon failure
995      * to construct or start a worker.  Removes record of worker from
996      * array, and adjusts counts. If pool is shutting down, tries to
997      * complete termination.
998      *
999      * @param wt the worker thread, or null if construction failed
1000      * @param ex the exception causing failure, or null if none
1001      */
1002     final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
1003         WorkQueue w = null;
1004         int phase = 0;
1005         if (wt !is null && (w = wt.workQueue) !is null) {
1006             int wid = w.id;
1007             long ns = cast(long)w.nsteals & 0xffffffffL;
1008             if (!workerNamePrefix.empty()) {
1009                 synchronized (this) {
1010                     WorkQueue[] ws; size_t n, i;         // remove index from array
1011                     if ((ws = workQueues) !is null && (n = ws.length) > 0 &&
1012                         ws[i = wid & (n - 1)] == w)
1013                         ws[i] = null;
1014                     stealCount += ns;
1015                 }
1016             }
1017             phase = w.phase;
1018         }
1019         if (phase != QUIET) {                         // else pre-adjusted
1020             long c;                                   // decrement counts
1021             do {} while (!AtomicHelper.compareAndSet(this.ctl, c = ctl, 
1022                 ((RC_MASK & (c - RC_UNIT)) |
1023                 (TC_MASK & (c - TC_UNIT)) |
1024                 (SP_MASK & c))));
1025         }
1026         if (w !is null)
1027             w.cancelAll();                            // cancel remaining tasks
1028 
1029         if (!tryTerminate(false, false) &&            // possibly replace worker
1030             w !is null && w.array !is null)             // avoid repeated failures
1031             signalWork();
1032 
1033         if (ex is null)                               // help clean on way out
1034             ForkJoinTaskHelper.helpExpungeStaleExceptions();
1035         else                                          // rethrow
1036             ForkJoinTaskHelper.rethrow(ex);
1037     }
1038 
1039     /**
1040      * Tries to create or release a worker if too few are running.
1041      */
1042     final void signalWork() {
1043         for (;;) {
1044             long c; int sp; WorkQueue[] ws; int i; WorkQueue v;
1045             if ((c = ctl) >= 0L)                      // enough workers
1046                 break;
1047             else if ((sp = cast(int)c) == 0) {            // no idle workers
1048                 if ((c & ADD_WORKER) != 0L)           // too few workers
1049                     tryAddWorker(c);
1050                 break;
1051             }
1052             else if ((ws = workQueues) is null)
1053                 break;                                // unstarted/terminated
1054             else if (ws.length <= (i = sp & SMASK))
1055                 break;                                // terminated
1056             else if ((v = ws[i]) is null)
1057                 break;                                // terminating
1058             else {
1059                 int np = sp & ~UNSIGNALLED;
1060                 int vp = v.phase;
1061                 long nc = (v.stackPred & SP_MASK) | (UC_MASK & (c + RC_UNIT));
1062                 Thread vt = v.owner;
1063                 if (sp == vp && AtomicHelper.compareAndSet(this.ctl, c, nc)) {
1064                     v.phase = np;
1065                     if (vt !is null && v.source < 0)
1066                         LockSupport.unpark(vt);
1067                     break;
1068                 }
1069             }
1070         }
1071     }
1072 
1073     /**
1074      * Tries to decrement counts (sometimes implicitly) and possibly
1075      * arrange for a compensating worker in preparation for blocking:
1076      * If not all core workers yet exist, creates one, else if any are
1077      * unreleased (possibly including caller) releases one, else if
1078      * fewer than the minimum allowed number of workers running,
1079      * checks to see that they are all active, and if so creates an
1080      * extra worker unless over maximum limit and policy is to
1081      * saturate.  Most of these steps can fail due to interference, in
1082      * which case 0 is returned so caller will retry. A negative
1083      * return value indicates that the caller doesn't need to
1084      * re-adjust counts when later unblocked.
1085      *
1086      * @return 1: block then adjust, -1: block without adjust, 0 : retry
1087      */
1088     private int tryCompensate(WorkQueue w) {
1089         int t, n, sp;
1090         long c = ctl;
1091         WorkQueue[] ws = workQueues;
1092         if ((t = cast(short)(c >>> TC_SHIFT)) >= 0) {
1093             if (ws is null || (n = cast(int)ws.length) <= 0 || w is null)
1094                 return 0;                        // disabled
1095             else if ((sp = cast(int)c) != 0) {       // replace or release
1096                 WorkQueue v = ws[sp & (n - 1)];
1097                 int wp = w.phase;
1098                 long uc = UC_MASK & ((wp < 0) ? c + RC_UNIT : c);
1099                 int np = sp & ~UNSIGNALLED;
1100                 if (v !is null) {
1101                     int vp = v.phase;
1102                     Thread vt = v.owner;
1103                     long nc = (cast(long)v.stackPred & SP_MASK) | uc;
1104                     if (vp == sp && AtomicHelper.compareAndSet(this.ctl, c, nc)) {
1105                         v.phase = np;
1106                         if (vt !is null && v.source < 0)
1107                             LockSupport.unpark(vt);
1108                         return (wp < 0) ? -1 : 1;
1109                     }
1110                 }
1111                 return 0;
1112             }
1113             else if (cast(int)(c >> RC_SHIFT) -      // reduce parallelism
1114                      cast(short)(bounds & SMASK) > 0) {
1115                 long nc = ((RC_MASK & (c - RC_UNIT)) | (~RC_MASK & c));
1116                 return AtomicHelper.compareAndSet(this.ctl, c, nc) ? 1 : 0;
1117             }
1118             else {                               // validate
1119                 int md = mode, pc = md & SMASK, tc = pc + t, bc = 0;
1120                 bool unstable = false;
1121                 for (int i = 1; i < n; i += 2) {
1122                     WorkQueue q; ThreadEx wt; ThreadState ts;
1123                     if ((q = ws[i]) !is null) {
1124                         if (q.source == 0) {
1125                             unstable = true;
1126                             break;
1127                         }
1128                         else {
1129                             --tc;
1130                             if ((wt = q.owner) !is null &&
1131                                 ((ts = wt.getState()) == ThreadState.BLOCKED ||
1132                                  ts == ThreadState.WAITING))
1133                                 ++bc;            // worker is blocking
1134                         }
1135                     }
1136                 }
1137                 if (unstable || tc != 0 || ctl != c)
1138                     return 0;                    // inconsistent
1139                 else if (t + pc >= MAX_CAP || t >= (bounds >>> SWIDTH)) {
1140                     Predicate!(ForkJoinPool) sat;
1141                     if ((sat = saturate) !is null && sat(this))
1142                         return -1;
1143                     else if (bc < pc) {          // lagging
1144                         Thread.yield();          // for retry spins
1145                         return 0;
1146                     }
1147                     else
1148                         throw new RejectedExecutionException(
1149                             "Thread limit exceeded replacing blocked worker");
1150                 }
1151             }
1152         }
1153 
1154         long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK); // expand pool
1155         return AtomicHelper.compareAndSet(this.ctl, c, nc) && createWorker() ? 1 : 0;
1156     }
1157 
1158     /**
1159      * Top-level runloop for workers, called by ForkJoinWorkerThread.run.
1160      * See above for explanation.
1161      */
1162     final void runWorker(WorkQueue w) {
1163         int r = (w.id ^ ThreadLocalRandom.nextSecondarySeed()) | FIFO; // rng
1164         w.array = new IForkJoinTask[INITIAL_QUEUE_CAPACITY]; // initialize
1165         for (;;) {
1166             int phase;
1167             if (scan(w, r)) {                     // scan until apparently empty
1168                 r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // move (xorshift)
1169             }
1170             else if ((phase = w.phase) >= 0) {    // enqueue, then rescan
1171                 long np = (w.phase = (phase + SS_SEQ) | UNSIGNALLED) & SP_MASK;
1172                 long c, nc;
1173                 do {
1174                     w.stackPred = cast(int)(c = ctl);
1175                     nc = ((c - RC_UNIT) & UC_MASK) | np;
1176                 } while (!AtomicHelper.compareAndSet(this.ctl, c, nc));
1177             }
1178             else {                                // already queued
1179                 int pred = w.stackPred;
1180                 ThreadEx.interrupted();             // clear before park
1181                 w.source = DORMANT;               // enable signal
1182                 long c = ctl;
1183                 int md = mode, rc = (md & SMASK) + cast(int)(c >> RC_SHIFT);
1184                 if (md < 0)                       // terminating
1185                     break;
1186                 else if (rc <= 0 && (md & SHUTDOWN) != 0 &&
1187                          tryTerminate(false, false))
1188                     break;                        // quiescent shutdown
1189                 else if (rc <= 0 && pred != 0 && phase == cast(int)c) {
1190                     long nc = (UC_MASK & (c - TC_UNIT)) | (SP_MASK & pred);
1191                     long d = keepAlive + DateTimeHelper.currentTimeMillis();
1192                     LockSupport.parkUntil(this, d);
1193                     if (ctl == c &&               // drop on timeout if all idle
1194                         d - DateTimeHelper.currentTimeMillis() <= TIMEOUT_SLOP &&
1195                         AtomicHelper.compareAndSet(this.ctl, c, nc)) {
1196                         w.phase = QUIET;
1197                         break;
1198                     }
1199                 }
1200                 else if (w.phase < 0)
1201                     LockSupport.park(this);       // OK if spuriously woken
1202                 w.source = 0;                     // disable signal
1203             }
1204         }
1205     }
1206 
1207     /**
1208      * Scans for and if found executes one or more top-level tasks from a queue.
1209      *
1210      * @return true if found an apparently non-empty queue, and
1211      * possibly ran task(s).
1212      */
1213     private bool scan(WorkQueue w, int r) {
1214         WorkQueue[] ws; int n;
1215         if ((ws = workQueues) !is null && (n = cast(int)ws.length) > 0 && w !is null) {
1216             for (int m = n - 1, j = r & m;;) {
1217                 WorkQueue q; int b;
1218                 if ((q = ws[j]) !is null && q.top != (b = q.base)) {
1219                     int qid = q.id;
1220                     IForkJoinTask[] a; size_t cap, k; IForkJoinTask t;
1221                     if ((a = q.array) !is null && (cap = cast(int)a.length) > 0) {
1222                         k = (cap - 1) & b;
1223                         // import core.atomic;  
1224                         // auto ss =     core.atomic.atomicLoad((cast(shared)a[k]));   
1225                         // FIXME: Needing refactor or cleanup -@zxp at 2/6/2019, 5:12:19 PM               
1226                         // 
1227                         // t = AtomicHelper.load(a[k]);
1228                         t = a[k];
1229                         if (q.base == b++ && t !is null &&
1230                             AtomicHelper.compareAndSet(a[k], t, null)) {
1231                             q.base = b;
1232                             w.source = qid;
1233                             if (q.top - b > 0)
1234                                 signalWork();
1235                             w.topLevelExec(t, q,  // random fairness bound
1236                                            r & ((n << TOP_BOUND_SHIFT) - 1));
1237                         }
1238                     }
1239                     return true;
1240                 }
1241                 else if (--n > 0)
1242                     j = (j + 1) & m;
1243                 else
1244                     break;
1245             }
1246         }
1247         return false;
1248     }
1249 
1250     /**
1251      * Helps and/or blocks until the given task is done or timeout.
1252      * First tries locally helping, then scans other queues for a task
1253      * produced by one of w's stealers; compensating and blocking if
1254      * none are found (rescanning if tryCompensate fails).
1255      *
1256      * @param w caller
1257      * @param task the task
1258      * @param deadline for timed waits, if nonzero
1259      * @return task status on exit
1260      */
1261     final int awaitJoin(WorkQueue w, IForkJoinTask task, MonoTime deadline) {
1262         if(w is null || task is null) {
1263             return 0;
1264         }
1265         int s = 0;
1266         int seed = ThreadLocalRandom.nextSecondarySeed();
1267         ICountedCompleter cc = cast(ICountedCompleter)task;
1268         if(cc !is null) {
1269             s = w.helpCC(cc, 0, false);
1270             if(s<0)
1271                 return 0;
1272         }
1273 
1274         w.tryRemoveAndExec(task);
1275         int src = w.source, id = w.id;
1276         int r = (seed >>> 16) | 1, step = (seed & ~1) | 2;
1277         s = task.getStatus();
1278         while (s >= 0) {
1279             WorkQueue[] ws;
1280             int n = (ws = workQueues) is null ? 0 : cast(int)ws.length, m = n - 1;
1281             while (n > 0) {
1282                 WorkQueue q; int b;
1283                 if ((q = ws[r & m]) !is null && q.source == id &&
1284                     q.top != (b = q.base)) {
1285                     IForkJoinTask[] a; int cap, k;
1286                     int qid = q.id;
1287                     if ((a = q.array) !is null && (cap = cast(int)a.length) > 0) {
1288                         k = (cap - 1) & b;
1289                         // FIXME: Needing refactor or cleanup -@zxp at 2/6/2019, 5:13:08 PM
1290                         // 
1291                         // IForkJoinTask t = AtomicHelper.load(a[k]);
1292                         IForkJoinTask t = a[k];
1293                         if (q.source == id && q.base == b++ &&
1294                             t !is null && AtomicHelper.compareAndSet(a[k], t, cast(IForkJoinTask)null)) {
1295                             q.base = b;
1296                             w.source = qid;
1297                             t.doExec();
1298                             w.source = src;
1299                         }
1300                     }
1301                     break;
1302                 }
1303                 else {
1304                     r += step;
1305                     --n;
1306                 }
1307             }
1308 
1309             if ((s = task.getStatus()) < 0)
1310                 break;
1311             else if (n == 0) { // empty scan
1312                 long ms; int block;
1313                 Duration ns;
1314                 if (deadline == MonoTime.zero())
1315                     ms = 0L;                       // untimed
1316                 else if ((ns = deadline - MonoTime.currTime) <= Duration.zero())
1317                     break;                         // timeout
1318                 else if ((ms = ns.total!(TimeUnit.Millisecond)()) <= 0L)
1319                     ms = 1L;                       // avoid 0 for timed wait
1320                 if ((block = tryCompensate(w)) != 0) {
1321                     task.internalWait(ms);
1322                     AtomicHelper.getAndAdd(this.ctl, (block > 0) ? RC_UNIT : 0L);
1323                 }
1324                 s = task.getStatus();
1325             }
1326         }
1327         return s;
1328     }
1329 
1330     /**
1331      * Runs tasks until {@code isQuiescent()}. Rather than blocking
1332      * when tasks cannot be found, rescans until all others cannot
1333      * find tasks either.
1334      */
1335     final void helpQuiescePool(WorkQueue w) {
1336         int prevSrc = w.source;
1337         int seed = ThreadLocalRandom.nextSecondarySeed();
1338         int r = seed >>> 16, step = r | 1;
1339         for (int source = prevSrc, released = -1;;) { // -1 until known
1340             IForkJoinTask localTask; WorkQueue[] ws;
1341             while ((localTask = w.nextLocalTask()) !is null)
1342                 localTask.doExec();
1343             if (w.phase >= 0 && released == -1)
1344                 released = 1;
1345             bool quiet = true, empty = true;
1346             int n = (ws = workQueues) is null ? 0 : cast(int)ws.length;
1347             for (int m = n - 1; n > 0; r += step, --n) {
1348                 WorkQueue q; int b;
1349                 if ((q = ws[r & m]) !is null) {
1350                     int qs = q.source;
1351                     if (q.top != (b = q.base)) {
1352                         quiet = empty = false;
1353                         IForkJoinTask[] a; int cap, k;
1354                         int qid = q.id;
1355                         if ((a = q.array) !is null && (cap = cast(int)a.length) > 0) {
1356                             if (released == 0) {    // increment
1357                                 released = 1;
1358                                 AtomicHelper.getAndAdd(this.ctl, RC_UNIT);
1359                             }
1360                             k = (cap - 1) & b;
1361                             // IForkJoinTask t = AtomicHelper.load(a[k]);
1362                             // FIXME: Needing refactor or cleanup -@zxp at 2/6/2019, 9:32:07 PM
1363                             // 
1364                             IForkJoinTask t = a[k];
1365                             if (q.base == b++ && t !is null) {
1366                                 if(AtomicHelper.compareAndSet(a[k], t, null)) {
1367                                     q.base = b;
1368                                     w.source = qid;
1369                                     t.doExec();
1370                                     w.source = source = prevSrc;
1371                                 }
1372                             }
1373                         }
1374                         break;
1375                     }
1376                     else if ((qs & QUIET) == 0)
1377                         quiet = false;
1378                 }
1379             }
1380             if (quiet) {
1381                 if (released == 0) {
1382                     AtomicHelper.getAndAdd(this.ctl, RC_UNIT);
1383                 }
1384                 w.source = prevSrc;
1385                 break;
1386             }
1387             else if (empty) {
1388                 if (source != QUIET)
1389                     w.source = source = QUIET;
1390                 if (released == 1) {                 // decrement
1391                     released = 0;
1392                     AtomicHelper.getAndAdd(this.ctl, RC_MASK & -RC_UNIT);
1393                 }
1394             }
1395         }
1396     }
1397 
1398     /**
1399      * Scans for and returns a polled task, if available.
1400      * Used only for untracked polls.
1401      *
1402      * @param submissionsOnly if true, only scan submission queues
1403      */
1404     private IForkJoinTask pollScan(bool submissionsOnly) {
1405         WorkQueue[] ws; int n;
1406         rescan: while ((mode & STOP) == 0 && (ws = workQueues) !is null &&
1407                       (n = cast(int)ws.length) > 0) {
1408             int m = n - 1;
1409             int r = ThreadLocalRandom.nextSecondarySeed();
1410             int h = r >>> 16;
1411             int origin, step;
1412             if (submissionsOnly) {
1413                 origin = (r & ~1) & m;         // even indices and steps
1414                 step = (h & ~1) | 2;
1415             }
1416             else {
1417                 origin = r & m;
1418                 step = h | 1;
1419             }
1420             bool nonempty = false;
1421             for (int i = origin, oldSum = 0, checkSum = 0;;) {
1422                 WorkQueue q;
1423                 if ((q = ws[i]) !is null) {
1424                     int b; IForkJoinTask t;
1425                     if (q.top - (b = q.base) > 0) {
1426                         nonempty = true;
1427                         if ((t = q.poll()) !is null)
1428                             return t;
1429                     }
1430                     else
1431                         checkSum += b + q.id;
1432                 }
1433                 if ((i = (i + step) & m) == origin) {
1434                     if (!nonempty && oldSum == (oldSum = checkSum))
1435                         break rescan;
1436                     checkSum = 0;
1437                     nonempty = false;
1438                 }
1439             }
1440         }
1441         return null;
1442     }
1443 
1444     /**
1445      * Gets and removes a local or stolen task for the given worker.
1446      *
1447      * @return a task, if available
1448      */
1449     final IForkJoinTask nextTaskFor(WorkQueue w) {
1450         IForkJoinTask t;
1451         if (w is null || (t = w.nextLocalTask()) is null)
1452             t = pollScan(false);
1453         return t;
1454     }
1455 
1456     // External operations
1457 
1458     /**
1459      * Adds the given task to a submission queue at submitter's
1460      * current queue, creating one if null or contended.
1461      *
1462      * @param task the task. Caller must ensure non-null.
1463      */
1464     final void externalPush(IForkJoinTask task) {
1465         int r;                                // initialize caller's probe
1466         if ((r = ThreadLocalRandom.getProbe()) == 0) {
1467             ThreadLocalRandom.localInit();
1468             r = ThreadLocalRandom.getProbe();
1469         }
1470         for (;;) {
1471             WorkQueue q;
1472             int md = mode, n;
1473             WorkQueue[] ws = workQueues;
1474             if ((md & SHUTDOWN) != 0 || ws is null || (n = cast(int)ws.length) <= 0)
1475                 throw new RejectedExecutionException();
1476             else if ((q = ws[(n - 1) & r & SQMASK]) is null) { // add queue
1477                 int qid = (r | QUIET) & ~(FIFO | OWNED);
1478                 Object lock = workerNameLocker;
1479                 IForkJoinTask[] qa =
1480                     new IForkJoinTask[INITIAL_QUEUE_CAPACITY];
1481                 q = new WorkQueue(this, null);
1482                 q.array = qa;
1483                 q.id = qid;
1484                 q.source = QUIET;
1485                 if (lock !is null) {     // unless disabled, lock pool to install
1486                     synchronized (lock) {
1487                         WorkQueue[] vs; int i, vn;
1488                         if ((vs = workQueues) !is null && (vn = cast(int)vs.length) > 0 &&
1489                             vs[i = qid & (vn - 1) & SQMASK] is null)
1490                             vs[i] = q;  // else another thread already installed
1491                     }
1492                 }
1493             }
1494             else if (!q.tryLockPhase()) // move if busy
1495                 r = ThreadLocalRandom.advanceProbe(r);
1496             else {
1497                 if (q.lockedPush(task))
1498                     signalWork();
1499                 return;
1500             }
1501         }
1502     }
1503 
1504     /**
1505      * Pushes a possibly-external submission.
1506      */
1507     private ForkJoinTask!(T) externalSubmit(T)(ForkJoinTask!(T) task) {
1508         if (task is null)
1509             throw new NullPointerException();
1510         ForkJoinWorkerThread w = cast(ForkJoinWorkerThread)Thread.getThis(); 
1511         WorkQueue q;
1512         if ( w !is null && w.pool is this &&
1513             (q = w.workQueue) !is null)
1514             q.push(task);
1515         else
1516             externalPush(task);
1517         return task;
1518     }
1519 
1520     /**
1521      * Returns common pool queue for an external thread.
1522      */
1523     static WorkQueue commonSubmitterQueue() {
1524         ForkJoinPool p = common;
1525         int r = ThreadLocalRandom.getProbe();
1526         WorkQueue[] ws; int n;
1527         return (p !is null && (ws = p.workQueues) !is null &&
1528                 (n = cast(int)ws.length) > 0) ?
1529             ws[(n - 1) & r & SQMASK] : null;
1530     }
1531 
1532     /**
1533      * Performs tryUnpush for an external submitter.
1534      */
1535     final bool tryExternalUnpush(IForkJoinTask task) {
1536         int r = ThreadLocalRandom.getProbe();
1537         WorkQueue[] ws; WorkQueue w; int n;
1538         return ((ws = workQueues) !is null &&
1539                 (n = cast(int)ws.length) > 0 &&
1540                 (w = ws[(n - 1) & r & SQMASK]) !is null &&
1541                 w.tryLockedUnpush(task));
1542     }
1543 
1544     /**
1545      * Performs helpComplete for an external submitter.
1546      */
1547     final int externalHelpComplete(ICountedCompleter task, int maxTasks) {
1548         int r = ThreadLocalRandom.getProbe();
1549         WorkQueue[] ws; WorkQueue w; int n;
1550         return ((ws = workQueues) !is null && (n = cast(int)ws.length) > 0 &&
1551                 (w = ws[(n - 1) & r & SQMASK]) !is null) ?
1552             w.helpCC(task, maxTasks, true) : 0;
1553     }
1554 
1555     /**
1556      * Tries to steal and run tasks within the target's computation.
1557      * The maxTasks argument supports external usages; internal calls
1558      * use zero, allowing unbounded steps (external calls trap
1559      * non-positive values).
1560      *
1561      * @param w caller
1562      * @param maxTasks if non-zero, the maximum number of other tasks to run
1563      * @return task status on exit
1564      */
1565     final int helpComplete(WorkQueue w, ICountedCompleter task,
1566                            int maxTasks) {
1567         return (w is null) ? 0 : w.helpCC(task, maxTasks, false);
1568     }
1569 
1570     /**
1571      * Returns a cheap heuristic guide for task partitioning when
1572      * programmers, frameworks, tools, or languages have little or no
1573      * idea about task granularity.  In essence, by offering this
1574      * method, we ask users only about tradeoffs in overhead vs
1575      * expected throughput and its variance, rather than how finely to
1576      * partition tasks.
1577      *
1578      * In a steady state strict (tree-structured) computation, each
1579      * thread makes available for stealing enough tasks for other
1580      * threads to remain active. Inductively, if all threads play by
1581      * the same rules, each thread should make available only a
1582      * constant number of tasks.
1583      *
1584      * The minimum useful constant is just 1. But using a value of 1
1585      * would require immediate replenishment upon each steal to
1586      * maintain enough tasks, which is infeasible.  Further,
1587      * partitionings/granularities of offered tasks should minimize
1588      * steal rates, which in general means that threads nearer the top
1589      * of computation tree should generate more than those nearer the
1590      * bottom. In perfect steady state, each thread is at
1591      * approximately the same level of computation tree. However,
1592      * producing extra tasks amortizes the uncertainty of progress and
1593      * diffusion assumptions.
1594      *
1595      * So, users will want to use values larger (but not much larger)
1596      * than 1 to both smooth over shortages and hedge
1597      * against uneven progress; as traded off against the cost of
1598      * extra task overhead. We leave the user to pick a threshold
1599      * value to compare with the results of this call to guide
1600      * decisions, but recommend values such as 3.
1601      *
1602      * When all threads are active, it is on average OK to estimate
1603      * surplus strictly locally. In steady-state, if one thread is
1604      * maintaining say 2 surplus tasks, then so are others. So we can
1605      * just use estimated queue length.  However, this strategy alone
1606      * leads to serious mis-estimates in some non-steady-state
1607      * conditions (ramp-up, ramp-down, other stalls). We can detect
1608      * many of these by further considering the number of "idle"
1609      * threads, that are known to have zero queued tasks, so
1610      * compensate by a factor of (#idle/#active) threads.
1611      */
1612     static int getSurplusQueuedTaskCount() {
1613         Thread t = Thread.getThis(); 
1614         ForkJoinWorkerThread wt = cast(ForkJoinWorkerThread)t; 
1615         ForkJoinPool pool; WorkQueue q;
1616 
1617         if (wt !is null && (pool = wt.pool) !is null &&
1618             (q = wt.workQueue) !is null) {
1619             int p = pool.mode & SMASK;
1620             int a = p + cast(int)(pool.ctl >> RC_SHIFT);
1621             int n = q.top - q.base;
1622             return n - (a > (p >>>= 1) ? 0 :
1623                         a > (p >>>= 1) ? 1 :
1624                         a > (p >>>= 1) ? 2 :
1625                         a > (p >>>= 1) ? 4 :
1626                         8);
1627         }
1628         return 0;
1629     }
1630 
1631     // Termination
1632 
1633     /**
1634      * Possibly initiates and/or completes termination.
1635      *
1636      * @param now if true, unconditionally terminate, else only
1637      * if no work and no active workers
1638      * @param enable if true, terminate when next possible
1639      * @return true if terminating or terminated
1640      */
1641     private bool tryTerminate(bool now, bool enable) {
1642         int md; // 3 phases: try to set SHUTDOWN, then STOP, then TERMINATED
1643 
1644         while (((md = mode) & SHUTDOWN) == 0) {
1645             if (!enable || this == common)        // cannot shutdown
1646                 return false;
1647             else {
1648                 AtomicHelper.compareAndSet(this.mode, md, md | SHUTDOWN);
1649             }
1650                 
1651         }
1652 
1653         while (((md = mode) & STOP) == 0) {       // try to initiate termination
1654             if (!now) {                           // check if quiescent & empty
1655                 for (long oldSum = 0L;;) {        // repeat until stable
1656                     bool running = false;
1657                     long checkSum = ctl;
1658                     WorkQueue[] ws = workQueues;
1659                     if ((md & SMASK) + cast(int)(checkSum >> RC_SHIFT) > 0)
1660                         running = true;
1661                     else if (ws !is null) {
1662                         WorkQueue w;
1663                         for (int i = 0; i < ws.length; ++i) {
1664                             if ((w = ws[i]) !is null) {
1665                                 int s = w.source, p = w.phase;
1666                                 int d = w.id, b = w.base;
1667                                 if (b != w.top ||
1668                                     ((d & 1) == 1 && (s >= 0 || p >= 0))) {
1669                                     running = true;
1670                                     break;     // working, scanning, or have work
1671                                 }
1672                                 checkSum += ((cast(long)s << 48) + (cast(long)p << 32) +
1673                                              (cast(long)b << 16) + cast(long)d);
1674                             }
1675                         }
1676                     }
1677                     if (((md = mode) & STOP) != 0)
1678                         break;                 // already triggered
1679                     else if (running)
1680                         return false;
1681                     else if (workQueues == ws && oldSum == (oldSum = checkSum))
1682                         break;
1683                 }
1684             }
1685             if ((md & STOP) == 0)
1686                 AtomicHelper.compareAndSet(this.mode, md, md | STOP);
1687         }
1688 
1689         while (((md = mode) & TERMINATED) == 0) { // help terminate others
1690             for (long oldSum = 0L;;) {            // repeat until stable
1691                 WorkQueue[] ws; WorkQueue w;
1692                 long checkSum = ctl;
1693                 if ((ws = workQueues) !is null) {
1694                     for (int i = 0; i < ws.length; ++i) {
1695                         if ((w = ws[i]) !is null) {
1696                             ForkJoinWorkerThread wt = w.owner;
1697                             w.cancelAll();        // clear queues
1698                             if (wt !is null) {
1699                                 try {             // unblock join or park
1700                                     wt.interrupt();
1701                                 } catch (Throwable ignore) {
1702                                 }
1703                             }
1704                             checkSum += (cast(long)w.phase << 32) + w.base;
1705                         }
1706                     }
1707                 }
1708                 if (((md = mode) & TERMINATED) != 0 ||
1709                     (workQueues == ws && oldSum == (oldSum = checkSum)))
1710                     break;
1711             }
1712             if ((md & TERMINATED) != 0)
1713                 break;
1714             else if ((md & SMASK) + cast(short)(ctl >>> TC_SHIFT) > 0)
1715                 break;
1716             else if (AtomicHelper.compareAndSet(this.mode, md, md | TERMINATED)) {
1717                 synchronized (this) {
1718                     // notifyAll();                  // for awaitTermination
1719                     // TODO: Tasks pending completion -@zxp at 2/4/2019, 11:03:21 AM
1720                     // 
1721                 }
1722                 break;
1723             }
1724         }
1725         return true;
1726     }
1727 
1728     // Exported methods
1729 
1730     // Constructors
1731 
1732     /**
1733      * Creates a {@code ForkJoinPool} with parallelism equal to {@link
1734      * java.lang.Runtime#availableProcessors}, using defaults for all
1735      * other parameters (see {@link #ForkJoinPool(int,
1736      * ForkJoinWorkerThreadFactory, UncaughtExceptionHandler, bool,
1737      * int, int, int, Predicate, long, TimeUnit)}).
1738      *
1739      * @throws SecurityException if a security manager exists and
1740      *         the caller is not permitted to modify threads
1741      *         because it does not hold {@link
1742      *         java.lang.RuntimePermission}{@code ("modifyThread")}
1743      */
1744     this() {
1745         this(min(MAX_CAP, totalCPUs),
1746              defaultForkJoinWorkerThreadFactory, null, false,
1747              0, MAX_CAP, 1, null, dur!(TimeUnit.Millisecond)(DEFAULT_KEEPALIVE));
1748     }
1749 
1750     /**
1751      * Creates a {@code ForkJoinPool} with the indicated parallelism
1752      * level, using defaults for all other parameters (see {@link
1753      * #ForkJoinPool(int, ForkJoinWorkerThreadFactory,
1754      * UncaughtExceptionHandler, bool, int, int, int, Predicate,
1755      * long, TimeUnit)}).
1756      *
1757      * @param parallelism the parallelism level
1758      * @throws IllegalArgumentException if parallelism less than or
1759      *         equal to zero, or greater than implementation limit
1760      * @throws SecurityException if a security manager exists and
1761      *         the caller is not permitted to modify threads
1762      *         because it does not hold {@link
1763      *         java.lang.RuntimePermission}{@code ("modifyThread")}
1764      */
1765     this(int parallelism) {
1766         this(parallelism, defaultForkJoinWorkerThreadFactory, null, false,
1767              0, MAX_CAP, 1, null, dur!(TimeUnit.Millisecond)(DEFAULT_KEEPALIVE));
1768     }
1769 
1770     /**
1771      * Creates a {@code ForkJoinPool} with the given parameters (using
1772      * defaults for others -- see {@link #ForkJoinPool(int,
1773      * ForkJoinWorkerThreadFactory, UncaughtExceptionHandler, bool,
1774      * int, int, int, Predicate, long, TimeUnit)}).
1775      *
1776      * @param parallelism the parallelism level. For default value,
1777      * use {@link java.lang.Runtime#availableProcessors}.
1778      * @param factory the factory for creating new threads. For default value,
1779      * use {@link #defaultForkJoinWorkerThreadFactory}.
1780      * @param handler the handler for internal worker threads that
1781      * terminate due to unrecoverable errors encountered while executing
1782      * tasks. For default value, use {@code null}.
1783      * @param asyncMode if true,
1784      * establishes local first-in-first-out scheduling mode for forked
1785      * tasks that are never joined. This mode may be more appropriate
1786      * than default locally stack-based mode in applications in which
1787      * worker threads only process event-style asynchronous tasks.
1788      * For default value, use {@code false}.
1789      * @throws IllegalArgumentException if parallelism less than or
1790      *         equal to zero, or greater than implementation limit
1791      * @throws NullPointerException if the factory is null
1792      * @throws SecurityException if a security manager exists and
1793      *         the caller is not permitted to modify threads
1794      *         because it does not hold {@link
1795      *         java.lang.RuntimePermission}{@code ("modifyThread")}
1796      */
1797     this(int parallelism,
1798                         ForkJoinWorkerThreadFactory factory,
1799                         UncaughtExceptionHandler handler,
1800                         bool asyncMode) {
1801         this(parallelism, factory, handler, asyncMode,
1802              0, MAX_CAP, 1, null, dur!(TimeUnit.Millisecond)(DEFAULT_KEEPALIVE));
1803     }
1804 
1805     /**
1806      * Creates a {@code ForkJoinPool} with the given parameters.
1807      *
1808      * @param parallelism the parallelism level. For default value,
1809      * use {@link java.lang.Runtime#availableProcessors}.
1810      *
1811      * @param factory the factory for creating new threads. For
1812      * default value, use {@link #defaultForkJoinWorkerThreadFactory}.
1813      *
1814      * @param handler the handler for internal worker threads that
1815      * terminate due to unrecoverable errors encountered while
1816      * executing tasks. For default value, use {@code null}.
1817      *
1818      * @param asyncMode if true, establishes local first-in-first-out
1819      * scheduling mode for forked tasks that are never joined. This
1820      * mode may be more appropriate than default locally stack-based
1821      * mode in applications in which worker threads only process
1822      * event-style asynchronous tasks.  For default value, use {@code
1823      * false}.
1824      *
1825      * @param corePoolSize the number of threads to keep in the pool
1826      * (unless timed out after an elapsed keep-alive). Normally (and
1827      * by default) this is the same value as the parallelism level,
1828      * but may be set to a larger value to reduce dynamic overhead if
1829      * tasks regularly block. Using a smaller value (for example
1830      * {@code 0}) has the same effect as the default.
1831      *
1832      * @param maximumPoolSize the maximum number of threads allowed.
1833      * When the maximum is reached, attempts to replace blocked
1834      * threads fail.  (However, because creation and termination of
1835      * different threads may overlap, and may be managed by the given
1836      * thread factory, this value may be transiently exceeded.)  To
1837      * arrange the same value as is used by default for the common
1838      * pool, use {@code 256} plus the {@code parallelism} level. (By
1839      * default, the common pool allows a maximum of 256 spare
1840      * threads.)  Using a value (for example {@code
1841      * Integer.MAX_VALUE}) larger than the implementation's total
1842      * thread limit has the same effect as using this limit (which is
1843      * the default).
1844      *
1845      * @param minimumRunnable the minimum allowed number of core
1846      * threads not blocked by a join or {@link ManagedBlocker}.  To
1847      * ensure progress, when too few unblocked threads exist and
1848      * unexecuted tasks may exist, new threads are constructed, up to
1849      * the given maximumPoolSize.  For the default value, use {@code
1850      * 1}, that ensures liveness.  A larger value might improve
1851      * throughput in the presence of blocked activities, but might
1852      * not, due to increased overhead.  A value of zero may be
1853      * acceptable when submitted tasks cannot have dependencies
1854      * requiring additional threads.
1855      *
1856      * @param saturate if non-null, a predicate invoked upon attempts
1857      * to create more than the maximum total allowed threads.  By
1858      * default, when a thread is about to block on a join or {@link
1859      * ManagedBlocker}, but cannot be replaced because the
1860      * maximumPoolSize would be exceeded, a {@link
1861      * RejectedExecutionException} is thrown.  But if this predicate
1862      * returns {@code true}, then no exception is thrown, so the pool
1863      * continues to operate with fewer than the target number of
1864      * runnable threads, which might not ensure progress.
1865      *
1866      * @param keepAliveTime the elapsed time since last use before
1867      * a thread is terminated (and then later replaced if needed).
1868      * For the default value, use {@code 60, TimeUnit.SECONDS}.
1869      *
1870      * @param unit the time unit for the {@code keepAliveTime} argument
1871      *
1872      * @throws IllegalArgumentException if parallelism is less than or
1873      *         equal to zero, or is greater than implementation limit,
1874      *         or if maximumPoolSize is less than parallelism,
1875      *         of if the keepAliveTime is less than or equal to zero.
1876      * @throws NullPointerException if the factory is null
1877      * @throws SecurityException if a security manager exists and
1878      *         the caller is not permitted to modify threads
1879      *         because it does not hold {@link
1880      *         java.lang.RuntimePermission}{@code ("modifyThread")}
1881      * @since 9
1882      */
1883     this(int parallelism,
1884                         ForkJoinWorkerThreadFactory factory,
1885                         UncaughtExceptionHandler handler,
1886                         bool asyncMode,
1887                         int corePoolSize,
1888                         int maximumPoolSize,
1889                         int minimumRunnable,
1890                         Predicate!(ForkJoinPool) saturate,
1891                         Duration keepAliveTime) {
1892         // check, encode, pack parameters
1893         if (parallelism <= 0 || parallelism > MAX_CAP ||
1894             maximumPoolSize < parallelism || keepAliveTime <= Duration.zero)
1895             throw new IllegalArgumentException();
1896         if (factory is null)
1897             throw new NullPointerException();
1898         long ms = max(keepAliveTime.total!(TimeUnit.Millisecond), TIMEOUT_SLOP);
1899         trace("111111");
1900 
1901         int corep = min(max(corePoolSize, parallelism), MAX_CAP);
1902         long c = (((cast(long)(-corep)       << TC_SHIFT) & TC_MASK) |
1903                   ((cast(long)(-parallelism) << RC_SHIFT) & RC_MASK));
1904         int m = parallelism | (asyncMode ? FIFO : 0);
1905         int maxSpares = min(maximumPoolSize, MAX_CAP) - parallelism;
1906         int minAvail = min(max(minimumRunnable, 0), MAX_CAP);
1907         int b = ((minAvail - parallelism) & SMASK) | (maxSpares << SWIDTH);
1908         int n = (parallelism > 1) ? parallelism - 1 : 1; // at least 2 slots
1909         n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16;
1910         n = (n + 1) << 1; // power of two, including space for submission queues
1911 
1912         this.workerNamePrefix = "ForkJoinPool-" ~ nextPoolId().to!string() ~ "-worker-";
1913         this.workQueues = new WorkQueue[n];
1914         this.factory = factory;
1915         this.ueh = handler;
1916         this.saturate = saturate;
1917         this.keepAlive = ms;
1918         this.bounds = b;
1919         this.mode = m;
1920         this.ctl = c;
1921         // checkPermission();
1922         workerNameLocker = new Object();     
1923         trace("111111");   
1924     }
1925 
1926     // private static Object newInstanceFromSystemProperty(string property) {
1927     //     string className = System.getProperty(property);
1928     //     return (className is null)
1929     //         ? null
1930     //         : ClassLoader.getSystemClassLoader().loadClass(className)
1931     //         .getConstructor().newInstance();
1932     // }
1933 
1934     /**
1935      * Constructor for common pool using parameters possibly
1936      * overridden by system properties
1937      */
1938     // private this(byte forCommonPoolOnly) {
1939     //     int parallelism = -1;
1940     //     ForkJoinWorkerThreadFactory fac = null;
1941     //     UncaughtExceptionHandler handler = null;
1942     //     try {  // ignore exceptions in accessing/parsing properties
1943     //         string pp = System.getProperty
1944     //             ("hunt.concurrency.ForkJoinPool.common.parallelism");
1945     //         if (pp !is null)
1946     //             parallelism = Integer.parseInt(pp);
1947     //         fac = (ForkJoinWorkerThreadFactory) newInstanceFromSystemProperty(
1948     //             "hunt.concurrency.ForkJoinPool.common.threadFactory");
1949     //         handler = (UncaughtExceptionHandler) newInstanceFromSystemProperty(
1950     //             "hunt.concurrency.ForkJoinPool.common.exceptionHandler");
1951     //     } catch (Exception ignore) {
1952     //     }
1953 
1954     //     if (fac is null) {
1955     //         if (System.getSecurityManager() is null)
1956     //             fac = defaultForkJoinWorkerThreadFactory;
1957     //         else // use security-managed default
1958     //             fac = new InnocuousForkJoinWorkerThreadFactory();
1959     //     }
1960     //     if (parallelism < 0 && // default 1 less than #cores
1961     //         (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
1962     //         parallelism = 1;
1963     //     if (parallelism > MAX_CAP)
1964     //         parallelism = MAX_CAP;
1965 
1966     //     long c = (((cast(long)(-parallelism) << TC_SHIFT) & TC_MASK) |
1967     //               ((cast(long)(-parallelism) << RC_SHIFT) & RC_MASK));
1968     //     int b = ((1 - parallelism) & SMASK) | (COMMON_MAX_SPARES << SWIDTH);
1969     //     int n = (parallelism > 1) ? parallelism - 1 : 1;
1970     //     n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16;
1971     //     n = (n + 1) << 1;
1972 
1973     //     this.workerNamePrefix = "ForkJoinPool.commonPool-worker-";
1974     //     this.workQueues = new WorkQueue[n];
1975     //     this.factory = fac;
1976     //     this.ueh = handler;
1977     //     this.saturate = null;
1978     //     this.keepAlive = DEFAULT_KEEPALIVE;
1979     //     this.bounds = b;
1980     //     this.mode = parallelism;
1981     //     this.ctl = c;
1982     // }
1983 
1984     /**
1985      * Returns the common pool instance. This pool is statically
1986      * constructed; its run state is unaffected by attempts to {@link
1987      * #shutdown} or {@link #shutdownNow}. However this pool and any
1988      * ongoing processing are automatically terminated upon program
1989      * {@link System#exit}.  Any program that relies on asynchronous
1990      * task processing to complete before program termination should
1991      * invoke {@code commonPool().}{@link #awaitQuiescence awaitQuiescence},
1992      * before exit.
1993      *
1994      * @return the common pool instance
1995      * @since 1.8
1996      */
1997     static ForkJoinPool commonPool() {
1998         // assert common !is null : "static init error";
1999         return common;
2000     }
2001 
2002     // Execution methods
2003 
2004     /**
2005      * Performs the given task, returning its result upon completion.
2006      * If the computation encounters an unchecked Exception or Error,
2007      * it is rethrown as the outcome of this invocation.  Rethrown
2008      * exceptions behave in the same way as regular exceptions, but,
2009      * when possible, contain stack traces (as displayed for example
2010      * using {@code ex.printStackTrace()}) of both the current thread
2011      * as well as the thread actually encountering the exception;
2012      * minimally only the latter.
2013      *
2014      * @param task the task
2015      * @param (T) the type of the task's result
2016      * @return the task's result
2017      * @throws NullPointerException if the task is null
2018      * @throws RejectedExecutionException if the task cannot be
2019      *         scheduled for execution
2020      */
2021     T invoke(T)(ForkJoinTask!(T) task) {
2022         if (task is null)
2023             throw new NullPointerException();
2024         externalSubmit!(T)(task);
2025         return task.join();
2026     }
2027 
2028     /**
2029      * Arranges for (asynchronous) execution of the given task.
2030      *
2031      * @param task the task
2032      * @throws NullPointerException if the task is null
2033      * @throws RejectedExecutionException if the task cannot be
2034      *         scheduled for execution
2035      */
2036     void execute(IForkJoinTask task) {
2037         implementationMissing(false);
2038         // externalSubmit(task);
2039     }
2040 
2041     // AbstractExecutorService methods
2042 
2043     /**
2044      * @throws NullPointerException if the task is null
2045      * @throws RejectedExecutionException if the task cannot be
2046      *         scheduled for execution
2047      */
2048     void execute(Runnable task) {
2049         if (task is null)
2050             throw new NullPointerException();
2051         IForkJoinTask job = cast(IForkJoinTask) task;
2052         if (job is null) // avoid re-wrap
2053             job = new RunnableExecuteAction(task);
2054         // externalSubmit(job);
2055         implementationMissing(false);
2056     }
2057 
2058     /**
2059      * Submits a ForkJoinTask for execution.
2060      *
2061      * @param task the task to submit
2062      * @param (T) the type of the task's result
2063      * @return the task
2064      * @throws NullPointerException if the task is null
2065      * @throws RejectedExecutionException if the task cannot be
2066      *         scheduled for execution
2067      */
2068     ForkJoinTask!(T) submitTask(T)(ForkJoinTask!(T) task) {
2069         return externalSubmit(task);
2070     }
2071 
2072     /**
2073      * @throws NullPointerException if the task is null
2074      * @throws RejectedExecutionException if the task cannot be
2075      *         scheduled for execution
2076      */
2077     ForkJoinTask!(T) submitTask(T)(Callable!(T) task) {
2078         return externalSubmit(new AdaptedCallable!(T)(task));
2079     }
2080 
2081     /**
2082      * @throws NullPointerException if the task is null
2083      * @throws RejectedExecutionException if the task cannot be
2084      *         scheduled for execution
2085      */
2086     ForkJoinTask!(T) submitTask(T)(Runnable task, T result) {
2087         return externalSubmit(new AdaptedRunnable!(T)(task, result));
2088     }
2089 
2090     /**
2091      * @throws NullPointerException if the task is null
2092      * @throws RejectedExecutionException if the task cannot be
2093      *         scheduled for execution
2094      */
2095 
2096     IForkJoinTask submitTask(Runnable task) {
2097         if (task is null)
2098             throw new NullPointerException();
2099         IForkJoinTask t = cast(IForkJoinTask)task;
2100         return externalSubmit(t !is null
2101             ? cast(ForkJoinTask!(void)) task // avoid re-wrap
2102             : new AdaptedRunnableAction(task));
2103     }
2104 
2105     /**
2106      * @throws NullPointerException       {@inheritDoc}
2107      * @throws RejectedExecutionException {@inheritDoc}
2108      */
2109     List!(Future!(T)) invokeAll(T)(Collection!(Callable!(T)) tasks) {
2110         // In previous versions of this class, this method constructed
2111         // a task to run ForkJoinTask.invokeAll, but now external
2112         // invocation of multiple tasks is at least as efficient.
2113         ArrayList!(Future!(T)) futures = new ArrayList!(Future!(T))(tasks.size());
2114 
2115         try {
2116             foreach (Callable!(T) t ; tasks) {
2117                 ForkJoinTask!(T) f = new AdaptedCallable!(T)(t);
2118                 futures.add(f);
2119                 externalSubmit(f);
2120             }
2121             for (int i = 0, size = futures.size(); i < size; i++)
2122                 (cast(IForkJoinTask)(futures.get(i))).quietlyJoin();
2123             return futures;
2124         } catch (Throwable t) {
2125             for (int i = 0, size = futures.size(); i < size; i++)
2126                 futures.get(i).cancel(false);
2127             throw t;
2128         }
2129     }
2130 
2131     /**
2132      * Returns the factory used for constructing new workers.
2133      *
2134      * @return the factory used for constructing new workers
2135      */
2136     ForkJoinWorkerThreadFactory getFactory() {
2137         return factory;
2138     }
2139 
2140     /**
2141      * Returns the handler for internal worker threads that terminate
2142      * due to unrecoverable errors encountered while executing tasks.
2143      *
2144      * @return the handler, or {@code null} if none
2145      */
2146     UncaughtExceptionHandler getUncaughtExceptionHandler() {
2147         return ueh;
2148     }
2149 
2150     /**
2151      * Returns the targeted parallelism level of this pool.
2152      *
2153      * @return the targeted parallelism level of this pool
2154      */
2155     int getParallelism() {
2156         int par = mode & SMASK;
2157         return (par > 0) ? par : 1;
2158     }
2159 
2160     /**
2161      * Returns the targeted parallelism level of the common pool.
2162      *
2163      * @return the targeted parallelism level of the common pool
2164      * @since 1.8
2165      */
2166     static int getCommonPoolParallelism() {
2167         return COMMON_PARALLELISM;
2168     }
2169 
2170     /**
2171      * Returns the number of worker threads that have started but not
2172      * yet terminated.  The result returned by this method may differ
2173      * from {@link #getParallelism} when threads are created to
2174      * maintain parallelism when others are cooperatively blocked.
2175      *
2176      * @return the number of worker threads
2177      */
2178     int getPoolSize() {
2179         return ((mode & SMASK) + cast(short)(ctl >>> TC_SHIFT));
2180     }
2181 
2182     /**
2183      * Returns {@code true} if this pool uses local first-in-first-out
2184      * scheduling mode for forked tasks that are never joined.
2185      *
2186      * @return {@code true} if this pool uses async mode
2187      */
2188     bool getAsyncMode() {
2189         return (mode & FIFO) != 0;
2190     }
2191 
2192     /**
2193      * Returns an estimate of the number of worker threads that are
2194      * not blocked waiting to join tasks or for other managed
2195      * synchronization. This method may overestimate the
2196      * number of running threads.
2197      *
2198      * @return the number of worker threads
2199      */
2200     int getRunningThreadCount() {
2201         WorkQueue[] ws; WorkQueue w;
2202         // VarHandle.acquireFence();
2203         int rc = 0;
2204         if ((ws = workQueues) !is null) {
2205             for (int i = 1; i < cast(int)ws.length; i += 2) {
2206                 if ((w = ws[i]) !is null && w.isApparentlyUnblocked())
2207                     ++rc;
2208             }
2209         }
2210         return rc;
2211     }
2212 
2213     /**
2214      * Returns an estimate of the number of threads that are currently
2215      * stealing or executing tasks. This method may overestimate the
2216      * number of active threads.
2217      *
2218      * @return the number of active threads
2219      */
2220     int getActiveThreadCount() {
2221         int r = (mode & SMASK) + cast(int)(ctl >> RC_SHIFT);
2222         return (r <= 0) ? 0 : r; // suppress momentarily negative values
2223     }
2224 
2225     /**
2226      * Returns {@code true} if all worker threads are currently idle.
2227      * An idle worker is one that cannot obtain a task to execute
2228      * because none are available to steal from other threads, and
2229      * there are no pending submissions to the pool. This method is
2230      * conservative; it might not return {@code true} immediately upon
2231      * idleness of all threads, but will eventually become true if
2232      * threads remain inactive.
2233      *
2234      * @return {@code true} if all threads are currently idle
2235      */
2236     bool isQuiescent() {
2237         for (;;) {
2238             long c = ctl;
2239             int md = mode, pc = md & SMASK;
2240             int tc = pc + cast(short)(c >>> TC_SHIFT);
2241             int rc = pc + cast(int)(c >> RC_SHIFT);
2242             if ((md & (STOP | TERMINATED)) != 0)
2243                 return true;
2244             else if (rc > 0)
2245                 return false;
2246             else {
2247                 WorkQueue[] ws; WorkQueue v;
2248                 if ((ws = workQueues) !is null) {
2249                     for (int i = 1; i < ws.length; i += 2) {
2250                         if ((v = ws[i]) !is null) {
2251                             if (v.source > 0)
2252                                 return false;
2253                             --tc;
2254                         }
2255                     }
2256                 }
2257                 if (tc == 0 && ctl == c)
2258                     return true;
2259             }
2260         }
2261     }
2262 
2263     /**
2264      * Returns an estimate of the total number of tasks stolen from
2265      * one thread's work queue by another. The reported value
2266      * underestimates the actual total number of steals when the pool
2267      * is not quiescent. This value may be useful for monitoring and
2268      * tuning fork/join programs: in general, steal counts should be
2269      * high enough to keep threads busy, but low enough to avoid
2270      * overhead and contention across threads.
2271      *
2272      * @return the number of steals
2273      */
2274     long getStealCount() {
2275         long count = stealCount;
2276         WorkQueue[] ws; WorkQueue w;
2277         if ((ws = workQueues) !is null) {
2278             for (int i = 1; i < ws.length; i += 2) {
2279                 if ((w = ws[i]) !is null)
2280                     count += cast(long)w.nsteals & 0xffffffffL;
2281             }
2282         }
2283         return count;
2284     }
2285 
2286     /**
2287      * Returns an estimate of the total number of tasks currently held
2288      * in queues by worker threads (but not including tasks submitted
2289      * to the pool that have not begun executing). This value is only
2290      * an approximation, obtained by iterating across all threads in
2291      * the pool. This method may be useful for tuning task
2292      * granularities.
2293      *
2294      * @return the number of queued tasks
2295      */
2296     long getQueuedTaskCount() {
2297         WorkQueue[] ws; WorkQueue w;
2298         // VarHandle.acquireFence();
2299         int count = 0;
2300         if ((ws = workQueues) !is null) {
2301             for (int i = 1; i < cast(int)ws.length; i += 2) {
2302                 if ((w = ws[i]) !is null)
2303                     count += w.queueSize();
2304             }
2305         }
2306         return count;
2307     }
2308 
2309     /**
2310      * Returns an estimate of the number of tasks submitted to this
2311      * pool that have not yet begun executing.  This method may take
2312      * time proportional to the number of submissions.
2313      *
2314      * @return the number of queued submissions
2315      */
2316     int getQueuedSubmissionCount() {
2317         WorkQueue[] ws; WorkQueue w;
2318         // VarHandle.acquireFence();
2319         int count = 0;
2320         if ((ws = workQueues) !is null) {
2321             for (int i = 0; i < cast(int)ws.length; i += 2) {
2322                 if ((w = ws[i]) !is null)
2323                     count += w.queueSize();
2324             }
2325         }
2326         return count;
2327     }
2328 
2329     /**
2330      * Returns {@code true} if there are any tasks submitted to this
2331      * pool that have not yet begun executing.
2332      *
2333      * @return {@code true} if there are any queued submissions
2334      */
2335     bool hasQueuedSubmissions() {
2336         WorkQueue[] ws; WorkQueue w;
2337         // VarHandle.acquireFence();
2338         if ((ws = workQueues) !is null) {
2339             for (int i = 0; i < cast(int)ws.length; i += 2) {
2340                 if ((w = ws[i]) !is null && !w.isEmpty())
2341                     return true;
2342             }
2343         }
2344         return false;
2345     }
2346 
2347     /**
2348      * Removes and returns the next unexecuted submission if one is
2349      * available.  This method may be useful in extensions to this
2350      * class that re-assign work in systems with multiple pools.
2351      *
2352      * @return the next submission, or {@code null} if none
2353      */
2354     protected IForkJoinTask pollSubmission() {
2355         return pollScan(true);
2356     }
2357 
2358     /**
2359      * Removes all available unexecuted submitted and forked tasks
2360      * from scheduling queues and adds them to the given collection,
2361      * without altering their execution status. These may include
2362      * artificially generated or wrapped tasks. This method is
2363      * designed to be invoked only when the pool is known to be
2364      * quiescent. Invocations at other times may not remove all
2365      * tasks. A failure encountered while attempting to add elements
2366      * to collection {@code c} may result in elements being in
2367      * neither, either or both collections when the associated
2368      * exception is thrown.  The behavior of this operation is
2369      * undefined if the specified collection is modified while the
2370      * operation is in progress.
2371      *
2372      * @param c the collection to transfer elements into
2373      * @return the number of elements transferred
2374      */
2375     protected int drainTasksTo(Collection!IForkJoinTask c) {
2376         WorkQueue[] ws; WorkQueue w; IForkJoinTask t;
2377         // VarHandle.acquireFence();
2378         int count = 0;
2379         if ((ws = workQueues) !is null) {
2380             for (int i = 0; i < ws.length; ++i) {
2381                 if ((w = ws[i]) !is null) {
2382                     while ((t = w.poll()) !is null) {
2383                         c.add(t);
2384                         ++count;
2385                     }
2386                 }
2387             }
2388         }
2389         return count;
2390     }
2391 
2392     /**
2393      * Returns a string identifying this pool, as well as its state,
2394      * including indications of run state, parallelism level, and
2395      * worker and task counts.
2396      *
2397      * @return a string identifying this pool, as well as its state
2398      */
2399     override string toString() {
2400         // Use a single pass through workQueues to collect counts
2401         int md = mode; // read fields first
2402         long c = ctl;
2403         long st = stealCount;
2404         long qt = 0L, qs = 0L; int rc = 0;
2405         WorkQueue[] ws; WorkQueue w;
2406         if ((ws = workQueues) !is null) {
2407             for (int i = 0; i < ws.length; ++i) {
2408                 if ((w = ws[i]) !is null) {
2409                     int size = w.queueSize();
2410                     if ((i & 1) == 0)
2411                         qs += size;
2412                     else {
2413                         qt += size;
2414                         st += cast(long)w.nsteals & 0xffffffffL;
2415                         if (w.isApparentlyUnblocked())
2416                             ++rc;
2417                     }
2418                 }
2419             }
2420         }
2421 
2422         int pc = (md & SMASK);
2423         int tc = pc + cast(short)(c >>> TC_SHIFT);
2424         int ac = pc + cast(int)(c >> RC_SHIFT);
2425         if (ac < 0) // ignore negative
2426             ac = 0;
2427         string level = ((md & TERMINATED) != 0 ? "Terminated" :
2428                         (md & STOP)       != 0 ? "Terminating" :
2429                         (md & SHUTDOWN)   != 0 ? "Shutting down" :
2430                         "Running");
2431         return super.toString() ~
2432             "[" ~ level ~
2433             ", parallelism = " ~ pc.to!string() ~
2434             ", size = " ~ tc.to!string() ~
2435             ", active = " ~ ac.to!string() ~
2436             ", running = " ~ rc.to!string() ~
2437             ", steals = " ~ st.to!string() ~
2438             ", tasks = " ~ qt.to!string() ~
2439             ", submissions = " ~ qs.to!string() ~
2440             "]";
2441     }
2442 
2443     /**
2444      * Possibly initiates an orderly shutdown in which previously
2445      * submitted tasks are executed, but no new tasks will be
2446      * accepted. Invocation has no effect on execution state if this
2447      * is the {@link #commonPool()}, and no additional effect if
2448      * already shut down.  Tasks that are in the process of being
2449      * submitted concurrently during the course of this method may or
2450      * may not be rejected.
2451      *
2452      * @throws SecurityException if a security manager exists and
2453      *         the caller is not permitted to modify threads
2454      *         because it does not hold {@link
2455      *         java.lang.RuntimePermission}{@code ("modifyThread")}
2456      */
2457     void shutdown() {
2458         // checkPermission();
2459         tryTerminate(false, true);
2460     }
2461 
2462     /**
2463      * Possibly attempts to cancel and/or stop all tasks, and reject
2464      * all subsequently submitted tasks.  Invocation has no effect on
2465      * execution state if this is the {@link #commonPool()}, and no
2466      * additional effect if already shut down. Otherwise, tasks that
2467      * are in the process of being submitted or executed concurrently
2468      * during the course of this method may or may not be
2469      * rejected. This method cancels both existing and unexecuted
2470      * tasks, in order to permit termination in the presence of task
2471      * dependencies. So the method always returns an empty list
2472      * (unlike the case for some other Executors).
2473      *
2474      * @return an empty list
2475      * @throws SecurityException if a security manager exists and
2476      *         the caller is not permitted to modify threads
2477      *         because it does not hold {@link
2478      *         java.lang.RuntimePermission}{@code ("modifyThread")}
2479      */
2480     List!(Runnable) shutdownNow() {
2481         // checkPermission();
2482         tryTerminate(true, true);
2483         return Collections.emptyList!(Runnable)();
2484     }
2485 
2486     /**
2487      * Returns {@code true} if all tasks have completed following shut down.
2488      *
2489      * @return {@code true} if all tasks have completed following shut down
2490      */
2491     bool isTerminated() {
2492         return (mode & TERMINATED) != 0;
2493     }
2494 
2495     /**
2496      * Returns {@code true} if the process of termination has
2497      * commenced but not yet completed.  This method may be useful for
2498      * debugging. A return of {@code true} reported a sufficient
2499      * period after shutdown may indicate that submitted tasks have
2500      * ignored or suppressed interruption, or are waiting for I/O,
2501      * causing this executor not to properly terminate. (See the
2502      * advisory notes for class {@link ForkJoinTask} stating that
2503      * tasks should not normally entail blocking operations.  But if
2504      * they do, they must abort them on interrupt.)
2505      *
2506      * @return {@code true} if terminating but not yet terminated
2507      */
2508     bool isTerminating() {
2509         int md = mode;
2510         return (md & STOP) != 0 && (md & TERMINATED) == 0;
2511     }
2512 
2513     /**
2514      * Returns {@code true} if this pool has been shut down.
2515      *
2516      * @return {@code true} if this pool has been shut down
2517      */
2518     bool isShutdown() {
2519         return (mode & SHUTDOWN) != 0;
2520     }
2521 
2522     /**
2523      * Blocks until all tasks have completed execution after a
2524      * shutdown request, or the timeout occurs, or the current thread
2525      * is interrupted, whichever happens first. Because the {@link
2526      * #commonPool()} never terminates until program shutdown, when
2527      * applied to the common pool, this method is equivalent to {@link
2528      * #awaitQuiescence(long, TimeUnit)} but always returns {@code false}.
2529      *
2530      * @param timeout the maximum time to wait
2531      * @param unit the time unit of the timeout argument
2532      * @return {@code true} if this executor terminated and
2533      *         {@code false} if the timeout elapsed before termination
2534      * @throws InterruptedException if interrupted while waiting
2535      */
2536     bool awaitTermination(Duration timeout)
2537         {
2538         if (ThreadEx.interrupted())
2539             throw new InterruptedException();
2540         if (this == common) {
2541             awaitQuiescence(timeout);
2542             return false;
2543         }
2544         long nanos = timeout.total!(TimeUnit.HectoNanosecond);
2545         if (isTerminated())
2546             return true;
2547         if (nanos <= 0L)
2548             return false;
2549         long deadline = Clock.currStdTime + nanos;
2550         synchronized (this) {
2551             for (;;) {
2552                 if (isTerminated())
2553                     return true;
2554                 if (nanos <= 0L)
2555                     return false;
2556                 // long millis = TimeUnit.NANOSECONDS.toMillis(nanos);
2557                 // wait(millis > 0L ? millis : 1L);
2558                 // ThreadEx.currentThread().par
2559                 ThreadEx.sleep(dur!(TimeUnit.HectoNanosecond)(nanos));
2560                 nanos = deadline - Clock.currStdTime;
2561             }
2562         }
2563     }
2564 
2565     /**
2566      * If called by a ForkJoinTask operating in this pool, equivalent
2567      * in effect to {@link ForkJoinTask#helpQuiesce}. Otherwise,
2568      * waits and/or attempts to assist performing tasks until this
2569      * pool {@link #isQuiescent} or the indicated timeout elapses.
2570      *
2571      * @param timeout the maximum time to wait
2572      * @param unit the time unit of the timeout argument
2573      * @return {@code true} if quiescent; {@code false} if the
2574      * timeout elapsed.
2575      */
2576     bool awaitQuiescence(Duration timeout) {
2577         long nanos = timeout.total!(TimeUnit.HectoNanosecond)();
2578         Thread thread = Thread.getThis();
2579         ForkJoinWorkerThread wt = cast(ForkJoinWorkerThread)thread;
2580         if (wt !is null && wt.pool is this) {
2581             helpQuiescePool(wt.workQueue);
2582             return true;
2583         }
2584         else {
2585             for (long startTime = Clock.currStdTime;;) {
2586                 IForkJoinTask t;
2587                 if ((t = pollScan(false)) !is null)
2588                     t.doExec();
2589                 else if (isQuiescent())
2590                     return true;
2591                 else if ((Clock.currStdTime - startTime) > nanos)
2592                     return false;
2593                 else
2594                     Thread.yield(); // cannot block
2595             }
2596         }
2597     }
2598 
2599     /**
2600      * Waits and/or attempts to assist performing tasks indefinitely
2601      * until the {@link #commonPool()} {@link #isQuiescent}.
2602      */
2603     static void quiesceCommonPool() {
2604         common.awaitQuiescence(Duration.max);
2605     }
2606 
2607     /**
2608      * Runs the given possibly blocking task.  When {@linkplain
2609      * ForkJoinTask#inForkJoinPool() running in a ForkJoinPool}, this
2610      * method possibly arranges for a spare thread to be activated if
2611      * necessary to ensure sufficient parallelism while the current
2612      * thread is blocked in {@link ManagedBlocker#block blocker.block()}.
2613      *
2614      * <p>This method repeatedly calls {@code blocker.isReleasable()} and
2615      * {@code blocker.block()} until either method returns {@code true}.
2616      * Every call to {@code blocker.block()} is preceded by a call to
2617      * {@code blocker.isReleasable()} that returned {@code false}.
2618      *
2619      * <p>If not running in a ForkJoinPool, this method is
2620      * behaviorally equivalent to
2621      * <pre> {@code
2622      * while (!blocker.isReleasable())
2623      *   if (blocker.block())
2624      *     break;}</pre>
2625      *
2626      * If running in a ForkJoinPool, the pool may first be expanded to
2627      * ensure sufficient parallelism available during the call to
2628      * {@code blocker.block()}.
2629      *
2630      * @param blocker the blocker task
2631      * @throws InterruptedException if {@code blocker.block()} did so
2632      */
2633     static void managedBlock(ManagedBlocker blocker) {
2634         if (blocker is null) throw new NullPointerException();
2635         ForkJoinPool p;
2636         WorkQueue w;
2637         Thread t = Thread.getThis();
2638         ForkJoinWorkerThread wt = cast(ForkJoinWorkerThread)t;
2639         if (wt !is null && (p = wt.pool) !is null &&
2640             (w = wt.workQueue) !is null) {
2641             int block;
2642             while (!blocker.isReleasable()) {
2643                 if ((block = p.tryCompensate(w)) != 0) {
2644                     try {
2645                         do {} while (!blocker.isReleasable() &&
2646                                      !blocker.block());
2647                     } finally {
2648                         AtomicHelper.getAndAdd(p.ctl, (block > 0) ? RC_UNIT : 0L);
2649                     }
2650                     break;
2651                 }
2652             }
2653         }
2654         else {
2655             do {} while (!blocker.isReleasable() &&
2656                          !blocker.block());
2657         }
2658     }
2659 
2660     /**
2661      * If the given executor is a ForkJoinPool, poll and execute
2662      * AsynchronousCompletionTasks from worker's queue until none are
2663      * available or blocker is released.
2664      */
2665     static void helpAsyncBlocker(Executor e, ManagedBlocker blocker) {
2666         ForkJoinPool p = cast(ForkJoinPool)e;
2667         if (p !is null) {
2668             WorkQueue w; WorkQueue[] ws; int r, n;
2669             
2670             Thread thread = Thread.getThis();
2671             ForkJoinWorkerThread wt = cast(ForkJoinWorkerThread)thread; 
2672             if (wt !is null && wt.pool is p)
2673                 w = wt.workQueue;
2674             else if ((r = ThreadLocalRandom.getProbe()) != 0 &&
2675                      (ws = p.workQueues) !is null && (n = cast(int)ws.length) > 0)
2676                 w = ws[(n - 1) & r & SQMASK];
2677             else
2678                 w = null;
2679             if (w !is null)
2680                 w.helpAsyncBlocker(blocker);
2681         }
2682     }
2683 
2684     // AbstractExecutorService overrides.  These rely on undocumented
2685     // fact that ForkJoinTask.adapt returns ForkJoinTasks that also
2686     // implement RunnableFuture.
2687 
2688     protected RunnableFuture!(T) newTaskFor(T)(Runnable runnable, T value) {
2689         return new AdaptedRunnable!(T)(runnable, value);
2690     }
2691 
2692     protected RunnableFuture!(T) newTaskFor(T)(Callable!(T) callable) {
2693         return new AdaptedCallable!(T)(callable);
2694     }
2695 
2696     // VarHandle mechanics
2697     // private static final VarHandle CTL;
2698     // private static final VarHandle MODE;
2699     // static final VarHandle QA;
2700 
2701     shared static this() {
2702         // try {
2703         //     MethodHandles.Lookup l = MethodHandles.lookup();
2704         //     CTL = l.findVarHandle(ForkJoinPool.class, "ctl", long.class);
2705         //     MODE = l.findVarHandle(ForkJoinPool.class, "mode", int.class);
2706         //     QA = MethodHandles.arrayElementVarHandle(ForkJoinTask[].class);
2707         // } catch (ReflectiveOperationException e) {
2708         //     throw new ExceptionInInitializerError(e);
2709         // }
2710 
2711         // Reduce the risk of rare disastrous classloading in first call to
2712         // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
2713         // Class<?> ensureLoaded = LockSupport.class;
2714 
2715         int commonMaxSpares = DEFAULT_COMMON_MAX_SPARES;
2716         // try {
2717         //     string p = System.getProperty
2718         //         ("hunt.concurrency.ForkJoinPool.common.maximumSpares");
2719         //     if (p !is null)
2720         //         commonMaxSpares = Integer.parseInt(p);
2721         // } catch (Exception ignore) {}
2722         COMMON_MAX_SPARES = commonMaxSpares;
2723 
2724         defaultForkJoinWorkerThreadFactory =
2725             new DefaultForkJoinWorkerThreadFactory();
2726         // modifyThreadPermission = new RuntimePermission("modifyThread");
2727 
2728         // common = AccessController.doPrivileged(new PrivilegedAction<>() {
2729         //     ForkJoinPool run() {
2730         //         return new ForkJoinPool((byte)0); }});
2731         common = new ForkJoinPool(cast(byte)1);
2732 
2733         COMMON_PARALLELISM = max(common.mode & SMASK, 1);
2734     }
2735 
2736 }
2737 
2738 
2739 /**
2740  * Factory for innocuous worker threads.
2741  */
2742 private final class InnocuousForkJoinWorkerThreadFactory
2743     : ForkJoinWorkerThreadFactory {
2744 
2745     /**
2746      * An ACC to restrict permissions for the factory itself.
2747      * The constructed workers have no permissions set.
2748      */
2749     // private static final AccessControlContext ACC = contextWithPermissions(
2750     //     modifyThreadPermission,
2751     //     new RuntimePermission("enableContextClassLoaderOverride"),
2752     //     new RuntimePermission("modifyThreadGroup"),
2753     //     new RuntimePermission("getClassLoader"),
2754     //     new RuntimePermission("setContextClassLoader"));
2755 
2756     final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
2757         return new InnocuousForkJoinWorkerThread(pool);
2758         // return AccessController.doPrivileged(
2759         //     new PrivilegedAction<>() {
2760         //         ForkJoinWorkerThread run() {
2761         //             return new ForkJoinWorkerThread.
2762         //                 InnocuousForkJoinWorkerThread(pool); }},
2763         //     ACC);
2764     }
2765 }
2766 
2767 
2768 /**
2769  * Factory for creating new {@link ForkJoinWorkerThread}s.
2770  * A {@code ForkJoinWorkerThreadFactory} must be defined and used
2771  * for {@code ForkJoinWorkerThread} subclasses that extend base
2772  * functionality or initialize threads with different contexts.
2773  */
2774 interface ForkJoinWorkerThreadFactory {
2775     /**
2776      * Returns a new worker thread operating in the given pool.
2777      * Returning null or throwing an exception may result in tasks
2778      * never being executed.  If this method throws an exception,
2779      * it is relayed to the caller of the method (for example
2780      * {@code execute}) causing attempted thread creation. If this
2781      * method returns null or throws an exception, it is not
2782      * retried until the next attempted creation (for example
2783      * another call to {@code execute}).
2784      *
2785      * @param pool the pool this thread works in
2786      * @return the new worker thread, or {@code null} if the request
2787      *         to create a thread is rejected
2788      * @throws NullPointerException if the pool is null
2789      */
2790     ForkJoinWorkerThread newThread(ForkJoinPool pool);
2791 }    
2792 
2793 // Nested classes
2794 
2795 // static AccessControlContext contextWithPermissions(Permission ... perms) {
2796 //     Permissions permissions = new Permissions();
2797 //     for (Permission perm : perms)
2798 //         permissions.add(perm);
2799 //     return new AccessControlContext(
2800 //         new ProtectionDomain[] { new ProtectionDomain(null, permissions) });
2801 // }
2802 
2803 /**
2804  * Default ForkJoinWorkerThreadFactory implementation; creates a
2805  * new ForkJoinWorkerThread using the system class loader as the
2806  * thread context class loader.
2807  */
2808 private final class DefaultForkJoinWorkerThreadFactory : ForkJoinWorkerThreadFactory {
2809     // private static final AccessControlContext ACC = contextWithPermissions(
2810     //     new RuntimePermission("getClassLoader"),
2811     //     new RuntimePermission("setContextClassLoader"));
2812 
2813     final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
2814         return new ForkJoinWorkerThread(pool);
2815         // return AccessController.doPrivileged(
2816         //     new PrivilegedAction<>() {
2817         //         ForkJoinWorkerThread run() {
2818         //             return new ForkJoinWorkerThread(
2819         //                 pool, ClassLoader.getSystemClassLoader()); }},
2820         //     ACC);
2821     }
2822 }
2823 
2824 
2825 /**
2826  * Interface for extending managed parallelism for tasks running
2827  * in {@link ForkJoinPool}s.
2828  *
2829  * <p>A {@code ManagedBlocker} provides two methods.  Method
2830  * {@link #isReleasable} must return {@code true} if blocking is
2831  * not necessary. Method {@link #block} blocks the current thread
2832  * if necessary (perhaps internally invoking {@code isReleasable}
2833  * before actually blocking). These actions are performed by any
2834  * thread invoking {@link ForkJoinPool#managedBlock(ManagedBlocker)}.
2835  * The unusual methods in this API accommodate synchronizers that
2836  * may, but don't usually, block for long periods. Similarly, they
2837  * allow more efficient internal handling of cases in which
2838  * additional workers may be, but usually are not, needed to
2839  * ensure sufficient parallelism.  Toward this end,
2840  * implementations of method {@code isReleasable} must be amenable
2841  * to repeated invocation.
2842  *
2843  * <p>For example, here is a ManagedBlocker based on a
2844  * ReentrantLock:
2845  * <pre> {@code
2846  * class ManagedLocker implements ManagedBlocker {
2847  *   final ReentrantLock lock;
2848  *   bool hasLock = false;
2849  *   ManagedLocker(ReentrantLock lock) { this.lock = lock; }
2850  *   bool block() {
2851  *     if (!hasLock)
2852  *       lock.lock();
2853  *     return true;
2854  *   }
2855  *   bool isReleasable() {
2856  *     return hasLock || (hasLock = lock.tryLock());
2857  *   }
2858  * }}</pre>
2859  *
2860  * <p>Here is a class that possibly blocks waiting for an
2861  * item on a given queue:
2862  * <pre> {@code
2863  * class QueueTaker!(E) : ManagedBlocker {
2864  *   final BlockingQueue!(E) queue;
2865  *   E item = null;
2866  *   QueueTaker(BlockingQueue!(E) q) { this.queue = q; }
2867  *   bool block() {
2868  *     if (item is null)
2869  *       item = queue.take();
2870  *     return true;
2871  *   }
2872  *   bool isReleasable() {
2873  *     return item !is null || (item = queue.poll()) !is null;
2874  *   }
2875  *   E getItem() { // call after pool.managedBlock completes
2876  *     return item;
2877  *   }
2878  * }}</pre>
2879  */
2880 static interface ManagedBlocker {
2881     /**
2882      * Possibly blocks the current thread, for example waiting for
2883      * a lock or condition.
2884      *
2885      * @return {@code true} if no additional blocking is necessary
2886      * (i.e., if isReleasable would return true)
2887      * @throws InterruptedException if interrupted while waiting
2888      * (the method is not required to do so, but is allowed to)
2889      */
2890     bool block();
2891 
2892     /**
2893      * Returns {@code true} if blocking is unnecessary.
2894      * @return {@code true} if blocking is unnecessary
2895      */
2896     bool isReleasable();
2897 }
2898 
2899 
2900 
2901 /**
2902  * A thread managed by a {@link ForkJoinPool}, which executes
2903  * {@link ForkJoinTask}s.
2904  * This class is subclassable solely for the sake of adding
2905  * functionality -- there are no overridable methods dealing with
2906  * scheduling or execution.  However, you can override initialization
2907  * and termination methods surrounding the main task processing loop.
2908  * If you do create such a subclass, you will also need to supply a
2909  * custom {@link ForkJoinPool.ForkJoinWorkerThreadFactory} to
2910  * {@linkplain ForkJoinPool#ForkJoinPool(int, ForkJoinWorkerThreadFactory,
2911  * UncaughtExceptionHandler, bool, int, int, int, Predicate, long, TimeUnit)
2912  * use it} in a {@code ForkJoinPool}.
2913  *
2914  * @since 1.7
2915  * @author Doug Lea
2916  */
2917 class ForkJoinWorkerThread : ThreadEx {
2918     /*
2919      * ForkJoinWorkerThreads are managed by ForkJoinPools and perform
2920      * ForkJoinTasks. For explanation, see the internal documentation
2921      * of class ForkJoinPool.
2922      *
2923      * This class just maintains links to its pool and WorkQueue.  The
2924      * pool field is set immediately upon construction, but the
2925      * workQueue field is not set until a call to registerWorker
2926      * completes. This leads to a visibility race, that is tolerated
2927      * by requiring that the workQueue field is only accessed by the
2928      * owning thread.
2929      *
2930      * Support for (non-public) subclass InnocuousForkJoinWorkerThread
2931      * requires that we break quite a lot of encapsulation (via helper
2932      * methods in ThreadLocalRandom) both here and in the subclass to
2933      * access and set Thread fields.
2934      */
2935 
2936     ForkJoinPool pool;                // the pool this thread works in
2937     WorkQueue workQueue;              // work-stealing mechanics
2938 
2939     /**
2940      * Creates a ForkJoinWorkerThread operating in the given pool.
2941      *
2942      * @param pool the pool this thread works in
2943      * @throws NullPointerException if pool is null
2944      */
2945     protected this(ForkJoinPool pool) {
2946         // Use a placeholder until a useful name can be set in registerWorker
2947         super("aForkJoinWorkerThread");
2948         this.pool = pool;
2949         this.workQueue = pool.registerWorker(this);
2950     }
2951 
2952     /**
2953      * Version for use by the default pool.  Supports setting the
2954      * context class loader.  This is a separate constructor to avoid
2955      * affecting the protected constructor.
2956      */
2957     // this(ForkJoinPool pool, ClassLoader ccl) {
2958     //     super("aForkJoinWorkerThread");
2959     //     super.setContextClassLoader(ccl);
2960     //     this.pool = pool;
2961     //     this.workQueue = pool.registerWorker(this);
2962     // }
2963 
2964     /**
2965      * Version for InnocuousForkJoinWorkerThread.
2966      */
2967     this(ForkJoinPool pool,
2968                         //  ClassLoader ccl,
2969                          ThreadGroupEx threadGroup,
2970                         ) { //  AccessControlContext acc
2971         super(threadGroup, null, "aForkJoinWorkerThread");
2972         // super.setContextClassLoader(ccl);
2973         // ThreadLocalRandom.setInheritedAccessControlContext(this, acc);
2974         // ThreadLocalRandom.eraseThreadLocals(this); // clear before registering
2975         this.pool = pool;
2976         this.workQueue = pool.registerWorker(this);
2977     }
2978 
2979     /**
2980      * Returns the pool hosting this thread.
2981      *
2982      * @return the pool
2983      */
2984     ForkJoinPool getPool() {
2985         return pool;
2986     }
2987 
2988     /**
2989      * Returns the unique index number of this thread in its pool.
2990      * The returned value ranges from zero to the maximum number of
2991      * threads (minus one) that may exist in the pool, and does not
2992      * change during the lifetime of the thread.  This method may be
2993      * useful for applications that track status or collect results
2994      * per-worker-thread rather than per-task.
2995      *
2996      * @return the index number
2997      */
2998     int getPoolIndex() {
2999         return workQueue.getPoolIndex();
3000     }
3001 
3002     /**
3003      * Initializes internal state after construction but before
3004      * processing any tasks. If you override this method, you must
3005      * invoke {@code super.onStart()} at the beginning of the method.
3006      * Initialization requires care: Most fields must have legal
3007      * default values, to ensure that attempted accesses from other
3008      * threads work correctly even before this thread starts
3009      * processing tasks.
3010      */
3011     protected void onStart() {
3012     }
3013 
3014     /**
3015      * Performs cleanup associated with termination of this worker
3016      * thread.  If you override this method, you must invoke
3017      * {@code super.onTermination} at the end of the overridden method.
3018      *
3019      * @param exception the exception causing this thread to abort due
3020      * to an unrecoverable error, or {@code null} if completed normally
3021      */
3022     protected void onTermination(Throwable exception) {
3023     }
3024 
3025     /**
3026      * This method is required to be public, but should never be
3027      * called explicitly. It performs the main run loop to execute
3028      * {@link ForkJoinTask}s.
3029      */
3030     override void run() {
3031         if (workQueue.array is null) { // only run once
3032             Throwable exception = null;
3033             scope(exit) {
3034                 pool.deregisterWorker(this, exception);                
3035             }
3036 
3037             try {
3038                 onStart();
3039                 pool.runWorker(workQueue);
3040             } catch (Throwable ex) {
3041                 exception = ex;
3042             } finally {
3043                 onTermination(exception);
3044             }
3045         }
3046     }
3047 
3048     /**
3049      * Non-hook method for InnocuousForkJoinWorkerThread.
3050      */
3051     void afterTopLevelExec() {
3052     }
3053 
3054 
3055     int awaitJoin(IForkJoinTask task) {
3056         int s = task.getStatus(); 
3057         WorkQueue w = workQueue;
3058             if(w.tryUnpush(task) && (s = task.doExec()) < 0 )
3059                 return s;
3060             else
3061                 return pool.awaitJoin(w, task, MonoTime.zero());
3062     }
3063 
3064     /**
3065      * If the current thread is operating in a ForkJoinPool,
3066      * unschedules and returns, without executing, a task externally
3067      * submitted to the pool, if one is available. Availability may be
3068      * transient, so a {@code null} result does not necessarily imply
3069      * quiescence of the pool.  This method is designed primarily to
3070      * support extensions, and is unlikely to be useful otherwise.
3071      *
3072      * @return a task, or {@code null} if none are available
3073      * @since 9
3074      */
3075     protected static IForkJoinTask pollSubmission() {
3076         ForkJoinWorkerThread t = cast(ForkJoinWorkerThread)Thread.getThis();
3077         return t !is null ? t.pool.pollSubmission() : null;
3078     }
3079 }
3080 
3081 
3082 /**
3083  * A worker thread that has no permissions, is not a member of any
3084  * user-defined ThreadGroupEx, uses the system class loader as
3085  * thread context class loader, and erases all ThreadLocals after
3086  * running each top-level task.
3087  */
3088 final class InnocuousForkJoinWorkerThread : ForkJoinWorkerThread {
3089     /** The ThreadGroupEx for all InnocuousForkJoinWorkerThreads */
3090     private __gshared ThreadGroupEx innocuousThreadGroup;
3091         // AccessController.doPrivileged(new PrivilegedAction<>() {
3092         //     ThreadGroupEx run() {
3093         //         ThreadGroupEx group = Thread.getThis().getThreadGroup();
3094         //         for (ThreadGroupEx p; (p = group.getParent()) !is null; )
3095         //             group = p;
3096         //         return new ThreadGroupEx(
3097         //             group, "InnocuousForkJoinWorkerThreadGroup");
3098         //     }});
3099 
3100     /** An AccessControlContext supporting no privileges */
3101     // private static final AccessControlContext INNOCUOUS_ACC =
3102     //     new AccessControlContext(
3103     //         new ProtectionDomain[] { new ProtectionDomain(null, null) });
3104 
3105     shared static this() {
3106         // ThreadGroupEx group = Thread.getThis().getThreadGroup();
3107         // for (ThreadGroupEx p; (p = group.getParent()) !is null; )
3108         //     group = p;
3109         innocuousThreadGroup = new ThreadGroupEx(
3110                     null, "InnocuousForkJoinWorkerThreadGroup");
3111     }
3112 
3113     this(ForkJoinPool pool) {
3114         super(pool,
3115             //   ClassLoader.getSystemClassLoader(),
3116               innocuousThreadGroup,
3117             //   INNOCUOUS_ACC
3118               );
3119     }
3120 
3121     override // to erase ThreadLocals
3122     void afterTopLevelExec() {
3123         // ThreadLocalRandom.eraseThreadLocals(this);
3124     }
3125 
3126     override // to silently fail
3127     void setUncaughtExceptionHandler(UncaughtExceptionHandler x) { }
3128 
3129     // override // paranoically
3130     // void setContextClassLoader(ClassLoader cl) {
3131     //     throw new SecurityException("setContextClassLoader");
3132     // }
3133 }
3134 
3135 
3136 /**
3137  * Queues supporting work-stealing as well as external task
3138  * submission. See above for descriptions and algorithms.
3139  */
3140 final class WorkQueue {
3141     int source;       // source queue id, or sentinel
3142     int id;                    // pool index, mode, tag
3143     shared int base;                  // index of next slot for poll
3144     shared int top;                   // index of next slot for push
3145     shared int phase;        // versioned, negative: queued, 1: locked
3146     int stackPred;             // pool stack (ctl) predecessor link
3147     int nsteals;               // number of steals
3148     IForkJoinTask[] array;   // the queued tasks; power of 2 size
3149     ForkJoinPool pool;   // the containing pool (may be null)
3150     ForkJoinWorkerThread owner; // owning thread or null if shared
3151 
3152     this(ForkJoinPool pool, ForkJoinWorkerThread owner) {
3153         this.pool = pool;
3154         this.owner = owner;
3155         // Place indices in the center of array (that is not yet allocated)
3156         base = top = INITIAL_QUEUE_CAPACITY >>> 1;
3157     }
3158 
3159     /**
3160      * Tries to lock shared queue by CASing phase field.
3161      */
3162     final bool tryLockPhase() {
3163         return cas(&this.phase, 0, 1);
3164         // return PHASE.compareAndSet(this, 0, 1);
3165     }
3166 
3167     final void releasePhaseLock() {
3168         // PHASE.setRelease(this, 0);
3169         atomicStore(this.phase, 0);
3170     }
3171 
3172     /**
3173      * Returns an exportable index (used by ForkJoinWorkerThread).
3174      */
3175     final int getPoolIndex() {
3176         return (id & 0xffff) >>> 1; // ignore odd/even tag bit
3177     }
3178 
3179     /**
3180      * Returns the approximate number of tasks in the queue.
3181      */
3182     final int queueSize() {
3183         // int n = cast(int)BASE.getAcquire(this) - top;
3184         int n = atomicLoad(this.base) - top;
3185         return (n >= 0) ? 0 : -n; // ignore negative
3186     }
3187 
3188     /**
3189      * Provides a more accurate estimate of whether this queue has
3190      * any tasks than does queueSize, by checking whether a
3191      * near-empty queue has at least one unclaimed task.
3192      */
3193     final bool isEmpty() {
3194         IForkJoinTask[] a; int n, cap, b;
3195         // VarHandle.acquireFence(); // needed by external callers
3196         return ((n = (b = base) - top) >= 0 || // possibly one task
3197                 (n == -1 && ((a = array) is null ||
3198                              (cap = cast(int)a.length) == 0 ||
3199                              a[(cap - 1) & b] is null)));
3200     }
3201 
3202     /**
3203      * Pushes a task. Call only by owner in unshared queues.
3204      *
3205      * @param task the task. Caller must ensure non-null.
3206      * @throws RejectedExecutionException if array cannot be resized
3207      */
3208     final void push(IForkJoinTask task) {
3209         IForkJoinTask[] a;
3210         int s = top, d, cap, m;
3211         ForkJoinPool p = pool;
3212         if ((a = array) !is null && (cap = cast(int)a.length) > 0) {
3213             m = cap - 1;
3214             // FIXME: Needing refactor or cleanup -@zxp at 2019/2/9 8:40:55
3215             // 
3216             // AtomicHelper.store(a[m & s], task);
3217             a[m & s] = task;
3218             // QA.setRelease(a, (m = cap - 1) & s, task);
3219             top = s + 1;
3220             if (((d = s - atomicLoad(this.base)) & ~1) == 0 &&
3221                 p !is null) {                 // size 0 or 1
3222                 // VarHandle.fullFence();
3223                 p.signalWork();
3224             }
3225             else if (d == m)
3226                 growArray(false);
3227         }
3228     }
3229 
3230     /**
3231      * Version of push for shared queues. Call only with phase lock held.
3232      * @return true if should signal work
3233      */
3234     final bool lockedPush(IForkJoinTask task) {
3235         IForkJoinTask[] a;
3236         bool signal = false;
3237         int s = top, b = base, cap, d;
3238         if ((a = array) !is null && (cap = cast(int)a.length) > 0) {
3239             a[(cap - 1) & s] = task;
3240             top = s + 1;
3241             if (b - s + cap - 1 == 0)
3242                 growArray(true);
3243             else {
3244                 phase = 0; // full unlock
3245                 if (((s - base) & ~1) == 0) // size 0 or 1
3246                     signal = true;
3247             }
3248         }
3249         return signal;
3250     }
3251 
3252     /**
3253      * Doubles the capacity of array. Call either by owner or with
3254      * lock held -- it is OK for base, but not top, to move while
3255      * resizings are in progress.
3256      */
3257     final void growArray(bool locked) {
3258         IForkJoinTask[] newA = null;
3259         try {
3260             IForkJoinTask[] oldA; int oldSize, newSize;
3261             if ((oldA = array) !is null && (oldSize = cast(int)oldA.length) > 0 &&
3262                 (newSize = oldSize << 1) <= MAXIMUM_QUEUE_CAPACITY &&
3263                 newSize > 0) {
3264                 try {
3265                     newA = new IForkJoinTask[newSize];
3266                 } catch (OutOfMemoryError ex) {
3267                 }
3268                 if (newA !is null) { // poll from old array, push to new
3269                     int oldMask = oldSize - 1, newMask = newSize - 1;
3270                     for (int s = top - 1, k = oldMask; k >= 0; --k) {
3271                         // IForkJoinTask x = AtomicHelper.getAndSet(oldA[s & oldMask], null);
3272                         // FIXME: Needing refactor or cleanup -@zxp at 2019/2/9 下午8:57:26
3273                         // 
3274                         IForkJoinTask x = oldA[s & oldMask];
3275                         oldA[s & oldMask] = null;
3276 
3277                         if (x !is null)
3278                             newA[s-- & newMask] = x;
3279                         else
3280                             break;
3281                     }
3282                     array = newA;
3283                     // VarHandle.releaseFence();
3284                 }
3285             }
3286         } finally {
3287             if (locked)
3288                 phase = 0;
3289         }
3290         if (newA is null)
3291             throw new RejectedExecutionException("Queue capacity exceeded");
3292     }
3293 
3294     /**
3295      * Takes next task, if one exists, in FIFO order.
3296      */
3297     final IForkJoinTask poll() {
3298         int b, k, cap; 
3299         IForkJoinTask[] a;
3300         while ((a = array) !is null && (cap = cast(int)a.length) > 0 &&
3301                top - (b = base) > 0) {
3302             k = (cap - 1) & b;
3303             // FIXME: Needing refactor or cleanup -@zxp at 2019/2/9 8:42:05
3304             // 
3305             
3306             // IForkJoinTask t = AtomicHelper.load(a[k]);
3307             IForkJoinTask t = a[k];
3308             if (base == b++) {
3309                 if (t is null)
3310                     Thread.yield(); // await index advance
3311                 else if (AtomicHelper.compareAndSet(a[k], t, null)) {
3312                 // else if (QA.compareAndSet(a, k, t, null)) {
3313                     AtomicHelper.store(this.base, b);
3314                     return t;
3315                 }
3316             }
3317         }
3318         return null;
3319     }
3320 
3321     /**
3322      * Takes next task, if one exists, in order specified by mode.
3323      */
3324     final IForkJoinTask nextLocalTask() {
3325         IForkJoinTask t = null;
3326         int md = id, b, s, d, cap; IForkJoinTask[] a;
3327         if ((a = array) !is null && (cap = cast(int)a.length) > 0 &&
3328             (d = (s = top) - (b = base)) > 0) {
3329             if ((md & FIFO) == 0 || d == 1) {
3330                 auto index = (cap - 1) & --s;
3331                 t = cast(IForkJoinTask)a[index];
3332                 a[index] = null;
3333                 if(t !is null) {
3334                     AtomicHelper.store(this.top, s);
3335                 }
3336                 // FIXME: Needing refactor or cleanup -@zxp at 2019/2/9 9:10:45
3337                 // 
3338                 // if ((t = cast(IForkJoinTask)
3339                 //      AtomicHelper.getAndSet(a[(cap - 1) & --s], null)) !is null)
3340                     // AtomicHelper.store(this.top, s);
3341             } else {
3342                 // t = cast(IForkJoinTask)
3343                 //       AtomicHelper.getAndSet(a[(cap - 1) & b++], null)
3344                 auto index = (cap - 1) & b++;
3345                 t = cast(IForkJoinTask)a[index];
3346                 a[index] = null;
3347                 if (t !is null) {
3348                     AtomicHelper.store(this.base, b);
3349                 }
3350                 else // on contention in FIFO mode, use regular poll
3351                     t = poll();
3352             } 
3353         }
3354         return t;
3355     }
3356 
3357     /**
3358      * Returns next task, if one exists, in order specified by mode.
3359      */
3360     final IForkJoinTask peek() {
3361         int cap; IForkJoinTask[] a;
3362         return ((a = array) !is null && (cap = cast(int)a.length) > 0) ?
3363             a[(cap - 1) & ((id & FIFO) != 0 ? base : top - 1)] : null;
3364     }
3365 
3366     /**
3367      * Pops the given task only if it is at the current top.
3368      */
3369     final bool tryUnpush(IForkJoinTask task) {
3370         bool popped = false;
3371         int s, cap; IForkJoinTask[] a;
3372         if ((a = array) !is null && (cap = cast(int)a.length) > 0 &&
3373             (s = top) != base) {
3374             popped = AtomicHelper.compareAndSet(a[(cap - 1) & --s], task, null);
3375             if(popped) {
3376                 AtomicHelper.store(this.top, s);
3377             }
3378         }
3379         return popped;
3380     }
3381 
3382     /**
3383      * Shared version of tryUnpush.
3384      */
3385     final bool tryLockedUnpush(IForkJoinTask task) {
3386         bool popped = false;
3387         int s = top - 1, k, cap; IForkJoinTask[] a;
3388         if ((a = array) !is null && (cap = cast(int)a.length) > 0 &&
3389             a[k = (cap - 1) & s] == task && tryLockPhase()) {
3390             if (top == s + 1 && array == a) {
3391                 popped = AtomicHelper.compareAndSet(a[k], task, null);
3392                 if(popped) top = s;
3393             }
3394             releasePhaseLock();
3395         }
3396         return popped;
3397     }
3398 
3399     /**
3400      * Removes and cancels all known tasks, ignoring any exceptions.
3401      */
3402     final void cancelAll() {
3403         for (IForkJoinTask t; (t = poll()) !is null; )
3404             IForkJoinTask.cancelIgnoringExceptions(t);
3405     }
3406 
3407     // Specialized execution methods
3408 
3409     /**
3410      * Runs the given (stolen) task if nonnull, as well as
3411      * remaining local tasks and others available from the given
3412      * queue, up to bound n (to avoid infinite unfairness).
3413      */
3414     final void topLevelExec(IForkJoinTask t, WorkQueue q, int n) {
3415         if (t !is null && q !is null) { // hoist checks
3416             int nstolen = 1;
3417             for (;;) {
3418                 t.doExec();
3419                 if (n-- < 0)
3420                     break;
3421                 else if ((t = nextLocalTask()) is null) {
3422                     if ((t = q.poll()) is null)
3423                         break;
3424                     else
3425                         ++nstolen;
3426                 }
3427             }
3428             ForkJoinWorkerThread thread = owner;
3429             nsteals += nstolen;
3430             source = 0;
3431             if (thread !is null)
3432                 thread.afterTopLevelExec();
3433         }
3434     }
3435 
3436     /**
3437      * If present, removes task from queue and executes it.
3438      */
3439     final void tryRemoveAndExec(IForkJoinTask task) {
3440         IForkJoinTask[] a; int s, cap;
3441         if ((a = array) !is null && (cap = cast(int)a.length) > 0 &&
3442             (s = top) - base > 0) { // traverse from top
3443             for (int m = cap - 1, ns = s - 1, i = ns; ; --i) {
3444                 int index = i & m;
3445                 IForkJoinTask t = a[index]; //QA.get(a, index);
3446                 if (t is null)
3447                     break;
3448                 else if (t == task) {
3449                     // if (AtomicHelper.compareAndSet(a[index], t, null)) 
3450                     if(a[index] == t) {
3451                         a[index] = null;
3452                         top = ns;   // safely shift down
3453                         for (int j = i; j != ns; ++j) {
3454                             IForkJoinTask f;
3455                             int pindex = (j + 1) & m;
3456                             f = a[pindex];// QA.get(a, pindex);
3457                             a[pindex] = null;
3458                             // AtomicHelper.store(a[pindex], null);
3459                             // QA.setVolatile(a, pindex, null);
3460                             int jindex = j & m;
3461                             a[jindex] = f;
3462                             // AtomicHelper.store(a[jindex], f);
3463                             // QA.setRelease(a, jindex, f);
3464                         }
3465                         // VarHandle.releaseFence();
3466                         t.doExec();
3467                     }
3468                     break;
3469                 }
3470             }
3471         }
3472     }
3473 
3474     /**
3475      * Tries to pop and run tasks within the target's computation
3476      * until done, not found, or limit exceeded.
3477      *
3478      * @param task root of CountedCompleter computation
3479      * @param limit max runs, or zero for no limit
3480      * @param shared true if must lock to extract task
3481      * @return task status on exit
3482      */
3483     final int helpCC(ICountedCompleter task, int limit, bool isShared) {
3484         int status = 0;
3485         if (task !is null && (status = task.getStatus()) >= 0) {
3486             int s, k, cap; IForkJoinTask[] a;
3487             while ((a = array) !is null && (cap = cast(int)a.length) > 0 &&
3488                    (s = top) - base > 0) {
3489                 ICountedCompleter v = null;
3490                 IForkJoinTask o = a[k = (cap - 1) & (s - 1)];
3491                 ICountedCompleter t = cast(ICountedCompleter)o;
3492                 if (t !is null) {
3493                     for (ICountedCompleter f = t;;) {
3494                         if (f != task) {
3495                             if ((f = f.getCompleter()) is null)
3496                                 break;
3497                         }
3498                         else if (isShared) {
3499                             if (tryLockPhase()) {
3500                                 if (top == s && array == a &&
3501                                     AtomicHelper.compareAndSet(a[k], t, null)) {
3502                                     top = s - 1;
3503                                     v = t;
3504                                 }
3505                                 releasePhaseLock();
3506                             }
3507                             break;
3508                         }
3509                         else {
3510                             if (AtomicHelper.compareAndSet(a[k], t, null)) {
3511                                 top = s - 1;
3512                                 v = t;
3513                             }
3514                             break;
3515                         }
3516                     }
3517                 }
3518                 if (v !is null)
3519                     v.doExec();
3520                 if ((status = task.getStatus()) < 0 || v is null ||
3521                     (limit != 0 && --limit == 0))
3522                     break;
3523             }
3524         }
3525         return status;
3526     }
3527 
3528     /**
3529      * Tries to poll and run AsynchronousCompletionTasks until
3530      * none found or blocker is released
3531      *
3532      * @param blocker the blocker
3533      */
3534     final void helpAsyncBlocker(ManagedBlocker blocker) {
3535         if (blocker !is null) {
3536             int b, k, cap; IForkJoinTask[] a; IForkJoinTask t;
3537             while ((a = array) !is null && (cap = cast(int)a.length) > 0 &&
3538                    top - (b = base) > 0) {
3539                 k = (cap - 1) & b;
3540                 // t = AtomicHelper.load(a[k]);
3541                 t = a[k];
3542                 if (blocker.isReleasable())
3543                     break;
3544                 else if (base == b++ && t !is null) {
3545                     AsynchronousCompletionTask at = cast(AsynchronousCompletionTask)t;
3546                     if (at is null)
3547                         break;
3548                     else if (AtomicHelper.compareAndSet(a[k], t, null)) {
3549                         AtomicHelper.store(this.base, b);
3550                         t.doExec();
3551                     }
3552                 }
3553             }
3554         }
3555     }
3556 
3557     /**
3558      * Returns true if owned and not known to be blocked.
3559      */
3560     final bool isApparentlyUnblocked() {
3561         ThreadEx wt; ThreadState s;
3562         return ((wt = owner) !is null &&
3563                 (s = wt.getState()) != ThreadState.BLOCKED &&
3564                 s != ThreadState.WAITING &&
3565                 s != ThreadState.TIMED_WAITING);
3566     }
3567 
3568     // VarHandle mechanics.
3569     // static final VarHandle PHASE;
3570     // static final VarHandle BASE;
3571     // static final VarHandle TOP;
3572     // static {
3573     //     try {
3574     //         MethodHandles.Lookup l = MethodHandles.lookup();
3575     //         PHASE = l.findVarHandle(WorkQueue.class, "phase", int.class);
3576     //         BASE = l.findVarHandle(WorkQueue.class, "base", int.class);
3577     //         TOP = l.findVarHandle(WorkQueue.class, "top", int.class);
3578     //     } catch (ReflectiveOperationException e) {
3579     //         throw new ExceptionInInitializerError(e);
3580     //     }
3581     // }
3582 }
3583 
3584 
3585 
3586 /**
3587  * A marker interface identifying asynchronous tasks produced by
3588  * {@code async} methods. This may be useful for monitoring,
3589  * debugging, and tracking asynchronous activities.
3590  *
3591  * @since 1.8
3592  */
3593 interface AsynchronousCompletionTask {
3594 }