1 /*
2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3  *
4  * This code is free software; you can redistribute it and/or modify it
5  * under the terms of the GNU General Public License version 2 only, as
6  * published by the Free Software Foundation.  Oracle designates this
7  * particular file as subject to the "Classpath" exception as provided
8  * by Oracle in the LICENSE file that accompanied this code.
9  *
10  * This code is distributed in the hope that it will be useful, but WITHOUT
11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
13  * version 2 for more details (a copy is included in the LICENSE file that
14  * accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License version
17  * 2 along with this work; if not, write to the Free Software Foundation,
18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19  *
20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21  * or visit www.oracle.com if you need additional information or have any
22  * questions.
23  */
24 
25 /*
26  * This file is available under and governed by the GNU General Public
27  * License version 2 only, as published by the Free Software Foundation.
28  * However, the following notice accompanied the original version of this
29  * file:
30  *
31  * Written by Doug Lea with assistance from members of JCP JSR-166
32  * Expert Group and released to the domain, as explained at
33  * http://creativecommons.org/publicdomain/zero/1.0/
34  */
35 
36 module hunt.concurrency.ForkJoinPool;
37 
38 import hunt.concurrency.AbstractExecutorService;
39 import hunt.concurrency.atomic.AtomicHelper;
40 import hunt.concurrency.Exceptions;
41 import hunt.concurrency.ForkJoinTask;
42 import hunt.concurrency.ForkJoinTaskHelper;
43 import hunt.concurrency.ThreadLocalRandom;
44 import hunt.concurrency.thread;
45 
46 import hunt.collection.List;
47 import hunt.collection.Collection;
48 import hunt.collection.Collections;
49 import hunt.logging.ConsoleLogger;
50 import hunt.Exceptions;
51 import hunt.Functions;
52 import hunt.system.Environment;
53 import hunt.system.Memory;
54 import hunt.util.Common;
55 import hunt.util.Configuration;
56 import hunt.util.DateTime;
57 
58 import core.atomic;
59 import core.sync.mutex;
60 import core.thread;
61 import core.time;
62 
63 import std.algorithm;
64 import std.array;
65 import std.conv;
66 import std.datetime;
67 
68 alias ReentrantLock = Mutex;
69 
70 // import java.lang.Thread.UncaughtExceptionHandler;
71 // import java.lang.invoke.MethodHandles;
72 // import java.lang.invoke.VarHandle;
73 // import java.security.AccessController;
74 // import java.security.AccessControlContext;
75 // import java.security.Permission;
76 // import java.security.Permissions;
77 // import java.security.PrivilegedAction;
78 // import java.security.ProtectionDomain;
79 // import hunt.collection.ArrayList;
80 // import hunt.collection.Collection;
81 // import java.util.Collections;
82 // import java.util.List;
83 // import hunt.util.functional.Predicate;
84 // import hunt.concurrency.locks.LockSupport;
85 
86 
87 private {
88 
89 // Constants shared across ForkJoinPool and WorkQueue
90 
91 // Bounds
92 enum int SWIDTH       = 16;            // width of short
93 enum int SMASK        = 0xffff;        // short bits == max index
94 enum int MAX_CAP      = 0x7fff;        // max #workers - 1
95 enum int SQMASK       = 0x007e;        // max 64 (even) slots
96 
97 // Masks and units for WorkQueue.phase and ctl sp subfield
98 enum int UNSIGNALLED  = 1 << 31;       // must be negative
99 enum int SS_SEQ       = 1 << 16;       // version count
100 enum int QLOCK        = 1;             // must be 1
101 
102 // Mode bits and sentinels, some also used in WorkQueue id and.source fields
103 enum int OWNED        = 1;             // queue has owner thread
104 enum int FIFO         = 1 << 16;       // fifo queue or access mode
105 enum int SHUTDOWN     = 1 << 18;
106 enum int TERMINATED   = 1 << 19;
107 enum int STOP         = 1 << 31;       // must be negative
108 enum int QUIET        = 1 << 30;       // not scanning or working
109 enum int DORMANT      = QUIET | UNSIGNALLED;
110 
111 /**
112  * The maximum number of top-level polls per worker before
113  * checking other queues, expressed as a bit shift to, in effect,
114  * multiply by pool size, and then use as random value mask, so
115  * average bound is about poolSize*(1<<TOP_BOUND_SHIFT).  See
116  * above for rationale.
117  */
118 enum int TOP_BOUND_SHIFT = 10;
119 
120 /**
121  * Initial capacity of work-stealing queue array.
122  * Must be a power of two, at least 2.
123  */
124 enum int INITIAL_QUEUE_CAPACITY = 1 << 13;
125 
126 /**
127  * Maximum capacity for queue arrays. Must be a power of two less
128  * than or equal to 1 << (31 - width of array entry) to ensure
129  * lack of wraparound of index calculations, but defined to a
130  * value a bit less than this to help users trap runaway programs
131  * before saturating systems.
132  */
133 enum int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M
134 
135 }
136 
137 /**
138  * An {@link ExecutorService} for running {@link ForkJoinTask}s.
139  * A {@code ForkJoinPool} provides the entry point for submissions
140  * from non-{@code ForkJoinTask} clients, as well as management and
141  * monitoring operations.
142  *
143  * <p>A {@code ForkJoinPool} differs from other kinds of {@link
144  * ExecutorService} mainly by virtue of employing
145  * <em>work-stealing</em>: all threads in the pool attempt to find and
146  * execute tasks submitted to the pool and/or created by other active
147  * tasks (eventually blocking waiting for work if none exist). This
148  * enables efficient processing when most tasks spawn other subtasks
149  * (as do most {@code ForkJoinTask}s), as well as when many small
150  * tasks are submitted to the pool from external clients.  Especially
151  * when setting <em>asyncMode</em> to true in constructors, {@code
152  * ForkJoinPool}s may also be appropriate for use with event-style
153  * tasks that are never joined. All worker threads are initialized
154  * with {@link Thread#isDaemon} set {@code true}.
155  *
156  * <p>A static {@link #commonPool()} is available and appropriate for
157  * most applications. The common pool is used by any ForkJoinTask that
158  * is not explicitly submitted to a specified pool. Using the common
159  * pool normally reduces resource usage (its threads are slowly
160  * reclaimed during periods of non-use, and reinstated upon subsequent
161  * use).
162  *
163  * <p>For applications that require separate or custom pools, a {@code
164  * ForkJoinPool} may be constructed with a given target parallelism
165  * level; by default, equal to the number of available processors.
166  * The pool attempts to maintain enough active (or available) threads
167  * by dynamically adding, suspending, or resuming internal worker
168  * threads, even if some tasks are stalled waiting to join others.
169  * However, no such adjustments are guaranteed in the face of blocked
170  * I/O or other unmanaged synchronization. The nested {@link
171  * ManagedBlocker} interface enables extension of the kinds of
172  * synchronization accommodated. The default policies may be
173  * overridden using a constructor with parameters corresponding to
174  * those documented in class {@link ThreadPoolExecutor}.
175  *
176  * <p>In addition to execution and lifecycle control methods, this
177  * class provides status check methods (for example
178  * {@link #getStealCount}) that are intended to aid in developing,
179  * tuning, and monitoring fork/join applications. Also, method
180  * {@link #toString} returns indications of pool state in a
181  * convenient form for informal monitoring.
182  *
183  * <p>As is the case with other ExecutorServices, there are three
184  * main task execution methods summarized in the following table.
185  * These are designed to be used primarily by clients not already
186  * engaged in fork/join computations in the current pool.  The main
187  * forms of these methods accept instances of {@code ForkJoinTask},
188  * but overloaded forms also allow mixed execution of plain {@code
189  * Runnable}- or {@code Callable}- based activities as well.  However,
190  * tasks that are already executing in a pool should normally instead
191  * use the within-computation forms listed in the table unless using
192  * async event-style tasks that are not usually joined, in which case
193  * there is little difference among choice of methods.
194  *
195  * <table class="plain">
196  * <caption>Summary of task execution methods</caption>
197  *  <tr>
198  *    <td></td>
199  *    <th scope="col"> Call from non-fork/join clients</th>
200  *    <th scope="col"> Call from within fork/join computations</th>
201  *  </tr>
202  *  <tr>
203  *    <th scope="row" style="text-align:left"> Arrange async execution</th>
204  *    <td> {@link #execute(ForkJoinTask)}</td>
205  *    <td> {@link ForkJoinTask#fork}</td>
206  *  </tr>
207  *  <tr>
208  *    <th scope="row" style="text-align:left"> Await and obtain result</th>
209  *    <td> {@link #invoke(ForkJoinTask)}</td>
210  *    <td> {@link ForkJoinTask#invoke}</td>
211  *  </tr>
212  *  <tr>
213  *    <th scope="row" style="text-align:left"> Arrange exec and obtain Future</th>
214  *    <td> {@link #submit(ForkJoinTask)}</td>
215  *    <td> {@link ForkJoinTask#fork} (ForkJoinTasks <em>are</em> Futures)</td>
216  *  </tr>
217  * </table>
218  *
219  * <p>The parameters used to construct the common pool may be controlled by
220  * setting the following {@linkplain System#getProperty system properties}:
221  * <ul>
222  * <li>{@code hunt.concurrency.ForkJoinPool.common.parallelism}
223  * - the parallelism level, a non-negative integer
224  * <li>{@code hunt.concurrency.ForkJoinPool.common.threadFactory}
225  * - the class name of a {@link ForkJoinWorkerThreadFactory}.
226  * The {@linkplain ClassLoader#getSystemClassLoader() system class loader}
227  * is used to load this class.
228  * <li>{@code hunt.concurrency.ForkJoinPool.common.exceptionHandler}
229  * - the class name of a {@link UncaughtExceptionHandler}.
230  * The {@linkplain ClassLoader#getSystemClassLoader() system class loader}
231  * is used to load this class.
232  * <li>{@code hunt.concurrency.ForkJoinPool.common.maximumSpares}
233  * - the maximum number of allowed extra threads to maintain target
234  * parallelism (default 256).
235  * </ul>
236  * If no thread factory is supplied via a system property, then the
237  * common pool uses a factory that uses the system class loader as the
238  * {@linkplain Thread#getContextClassLoader() thread context class loader}.
239  * In addition, if a {@link SecurityManager} is present, then
240  * the common pool uses a factory supplying threads that have no
241  * {@link Permissions} enabled.
242  *
243  * Upon any error in establishing these settings, default parameters
244  * are used. It is possible to disable or limit the use of threads in
245  * the common pool by setting the parallelism property to zero, and/or
246  * using a factory that may return {@code null}. However doing so may
247  * cause unjoined tasks to never be executed.
248  *
249  * <p><b>Implementation notes</b>: This implementation restricts the
250  * maximum number of running threads to 32767. Attempts to create
251  * pools with greater than the maximum number result in
252  * {@code IllegalArgumentException}.
253  *
254  * <p>This implementation rejects submitted tasks (that is, by throwing
255  * {@link RejectedExecutionException}) only when the pool is shut down
256  * or internal resources have been exhausted.
257  *
258  * @author Doug Lea
259  */
260 class ForkJoinPool : AbstractExecutorService { 
261 
262     /*
263      * Implementation Overview
264      *
265      * This class and its nested classes provide the main
266      * functionality and control for a set of worker threads:
267      * Submissions from non-FJ threads enter into submission queues.
268      * Workers take these tasks and typically split them into subtasks
269      * that may be stolen by other workers. Work-stealing based on
270      * randomized scans generally leads to better throughput than
271      * "work dealing" in which producers assign tasks to idle threads,
272      * in part because threads that have finished other tasks before
273      * the signalled thread wakes up (which can be a long time) can
274      * take the task instead.  Preference rules give first priority to
275      * processing tasks from their own queues (LIFO or FIFO, depending
276      * on mode), then to randomized FIFO steals of tasks in other
277      * queues.  This framework began as vehicle for supporting
278      * tree-structured parallelism using work-stealing.  Over time,
279      * its scalability advantages led to extensions and changes to
280      * better support more diverse usage contexts.  Because most
281      * internal methods and nested classes are interrelated, their
282      * main rationale and descriptions are presented here; individual
283      * methods and nested classes contain only brief comments about
284      * details.
285      *
286      * WorkQueues
287      * ==========
288      *
289      * Most operations occur within work-stealing queues (in nested
290      * class WorkQueue).  These are special forms of Deques that
291      * support only three of the four possible end-operations -- push,
292      * pop, and poll (aka steal), under the further constraints that
293      * push and pop are called only from the owning thread (or, as
294      * extended here, under a lock), while poll may be called from
295      * other threads.  (If you are unfamiliar with them, you probably
296      * want to read Herlihy and Shavit's book "The Art of
297      * Multiprocessor programming", chapter 16 describing these in
298      * more detail before proceeding.)  The main work-stealing queue
299      * design is roughly similar to those in the papers "Dynamic
300      * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005
301      * (http://research.sun.com/scalable/pubs/index.html) and
302      * "Idempotent work stealing" by Michael, Saraswat, and Vechev,
303      * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186).
304      * The main differences ultimately stem from GC requirements that
305      * we null out taken slots as soon as we can, to maintain as small
306      * a footprint as possible even in programs generating huge
307      * numbers of tasks. To accomplish this, we shift the CAS
308      * arbitrating pop vs poll (steal) from being on the indices
309      * ("base" and "top") to the slots themselves.
310      *
311      * Adding tasks then takes the form of a classic array push(task)
312      * in a circular buffer:
313      *    q.array[q.top++ % length] = task;
314      *
315      * (The actual code needs to null-check and size-check the array,
316      * uses masking, not mod, for indexing a power-of-two-sized array,
317      * adds a release fence for publication, and possibly signals
318      * waiting workers to start scanning -- see below.)  Both a
319      * successful pop and poll mainly entail a CAS of a slot from
320      * non-null to null.
321      *
322      * The pop operation (always performed by owner) is:
323      *   if ((the task at top slot is not null) and
324      *        (CAS slot to null))
325      *           decrement top and return task;
326      *
327      * And the poll operation (usually by a stealer) is
328      *    if ((the task at base slot is not null) and
329      *        (CAS slot to null))
330      *           increment base and return task;
331      *
332      * There are several variants of each of these. Most uses occur
333      * within operations that also interleave contention or emptiness
334      * tracking or inspection of elements before extracting them, so
335      * must interleave these with the above code. When performed by
336      * owner, getAndSet is used instead of CAS (see for example method
337      * nextLocalTask) which is usually more efficient, and possible
338      * because the top index cannot independently change during the
339      * operation.
340      *
341      * Memory ordering.  See "Correct and Efficient Work-Stealing for
342      * Weak Memory Models" by Le, Pop, Cohen, and Nardelli, PPoPP 2013
343      * (http://www.di.ens.fr/~zappa/readings/ppopp13.pdf) for an
344      * analysis of memory ordering requirements in work-stealing
345      * algorithms similar to (but different than) the one used here.
346      * Extracting tasks in array slots via (fully fenced) CAS provides
347      * primary synchronization. The base and top indices imprecisely
348      * guide where to extract from. We do not usually require strict
349      * orderings of array and index updates. Many index accesses use
350      * plain mode, with ordering constrained by surrounding context
351      * (usually with respect to element CASes or the two WorkQueue
352      * fields source and phase). When not otherwise already
353      * constrained, reads of "base" by queue owners use acquire-mode,
354      * and some externally callable methods preface accesses with
355      * acquire fences.  Additionally, to ensure that index update
356      * writes are not coalesced or postponed in loops etc, "opaque"
357      * mode is used in a few cases where timely writes are not
358      * otherwise ensured. The "locked" versions of push- and pop-
359      * based methods for shared queues differ from owned versions
360      * because locking already forces some of the ordering.
361      *
362      * Because indices and slot contents cannot always be consistent,
363      * a check that base == top indicates (momentary) emptiness, but
364      * otherwise may err on the side of possibly making the queue
365      * appear nonempty when a push, pop, or poll have not fully
366      * committed, or making it appear empty when an update of top has
367      * not yet been visibly written.  (Method isEmpty() checks the
368      * case of a partially completed removal of the last element.)
369      * Because of this, the poll operation, considered individually,
370      * is not wait-free. One thief cannot successfully continue until
371      * another in-progress one (or, if previously empty, a push)
372      * visibly completes.  This can stall threads when required to
373      * consume from a given queue (see method poll()).  However, in
374      * the aggregate, we ensure at least probabilistic
375      * non-blockingness.  If an attempted steal fails, a scanning
376      * thief chooses a different random victim target to try next. So,
377      * in order for one thief to progress, it suffices for any
378      * in-progress poll or new push on any empty queue to complete.
379      *
380      * This approach also enables support of a user mode in which
381      * local task processing is in FIFO, not LIFO order, simply by
382      * using poll rather than pop.  This can be useful in
383      * message-passing frameworks in which tasks are never joined.
384      *
385      * WorkQueues are also used in a similar way for tasks submitted
386      * to the pool. We cannot mix these tasks in the same queues used
387      * by workers. Instead, we randomly associate submission queues
388      * with submitting threads, using a form of hashing.  The
389      * ThreadLocalRandom probe value serves as a hash code for
390      * choosing existing queues, and may be randomly repositioned upon
391      * contention with other submitters.  In essence, submitters act
392      * like workers except that they are restricted to executing local
393      * tasks that they submitted.  Insertion of tasks in shared mode
394      * requires a lock but we use only a simple spinlock (using field
395      * phase), because submitters encountering a busy queue move to a
396      * different position to use or create other queues -- they block
397      * only when creating and registering new queues. Because it is
398      * used only as a spinlock, unlocking requires only a "releasing"
399      * store (using setRelease) unless otherwise signalling.
400      *
401      * Management
402      * ==========
403      *
404      * The main throughput advantages of work-stealing stem from
405      * decentralized control -- workers mostly take tasks from
406      * themselves or each other, at rates that can exceed a billion
407      * per second.  The pool itself creates, activates (enables
408      * scanning for and running tasks), deactivates, blocks, and
409      * terminates threads, all with minimal central information.
410      * There are only a few properties that we can globally track or
411      * maintain, so we pack them into a small number of variables,
412      * often maintaining atomicity without blocking or locking.
413      * Nearly all essentially atomic control state is held in a few
414      * variables that are by far most often read (not
415      * written) as status and consistency checks. We pack as much
416      * information into them as we can.
417      *
418      * Field "ctl" contains 64 bits holding information needed to
419      * atomically decide to add, enqueue (on an event queue), and
420      * dequeue and release workers.  To enable this packing, we
421      * restrict maximum parallelism to (1<<15)-1 (which is far in
422      * excess of normal operating range) to allow ids, counts, and
423      * their negations (used for thresholding) to fit into 16bit
424      * subfields.
425      *
426      * Field "mode" holds configuration parameters as well as lifetime
427      * status, atomically and monotonically setting SHUTDOWN, STOP,
428      * and finally TERMINATED bits.
429      *
430      * Field "workQueues" holds references to WorkQueues.  It is
431      * updated (only during worker creation and termination) under
432      * lock (using field workerNamePrefix as lock), but is otherwise
433      * concurrently readable, and accessed directly. We also ensure
434      * that uses of the array reference itself never become too stale
435      * in case of resizing, by arranging that (re-)reads are separated
436      * by at least one acquiring read access.  To simplify index-based
437      * operations, the array size is always a power of two, and all
438      * readers must tolerate null slots. Worker queues are at odd
439      * indices. Shared (submission) queues are at even indices, up to
440      * a maximum of 64 slots, to limit growth even if the array needs
441      * to expand to add more workers. Grouping them together in this
442      * way simplifies and speeds up task scanning.
443      *
444      * All worker thread creation is on-demand, triggered by task
445      * submissions, replacement of terminated workers, and/or
446      * compensation for blocked workers. However, all other support
447      * code is set up to work with other policies.  To ensure that we
448      * do not hold on to worker references that would prevent GC, all
449      * accesses to workQueues are via indices into the workQueues
450      * array (which is one source of some of the messy code
451      * constructions here). In essence, the workQueues array serves as
452      * a weak reference mechanism. Thus for example the stack top
453      * subfield of ctl stores indices, not references.
454      *
455      * Queuing Idle Workers. Unlike HPC work-stealing frameworks, we
456      * cannot let workers spin indefinitely scanning for tasks when
457      * none can be found immediately, and we cannot start/resume
458      * workers unless there appear to be tasks available.  On the
459      * other hand, we must quickly prod them into action when new
460      * tasks are submitted or generated. In many usages, ramp-up time
461      * is the main limiting factor in overall performance, which is
462      * compounded at program start-up by JIT compilation and
463      * allocation. So we streamline this as much as possible.
464      *
465      * The "ctl" field atomically maintains total worker and
466      * "released" worker counts, plus the head of the available worker
467      * queue (actually stack, represented by the lower 32bit subfield
468      * of ctl).  Released workers are those known to be scanning for
469      * and/or running tasks. Unreleased ("available") workers are
470      * recorded in the ctl stack. These workers are made available for
471      * signalling by enqueuing in ctl (see method runWorker).  The
472      * "queue" is a form of Treiber stack. This is ideal for
473      * activating threads in most-recently used order, and improves
474      * performance and locality, outweighing the disadvantages of
475      * being prone to contention and inability to release a worker
476      * unless it is topmost on stack.  To avoid missed signal problems
477      * inherent in any wait/signal design, available workers rescan
478      * for (and if found run) tasks after enqueuing.  Normally their
479      * release status will be updated while doing so, but the released
480      * worker ctl count may underestimate the number of active
481      * threads. (However, it is still possible to determine quiescence
482      * via a validation traversal -- see isQuiescent).  After an
483      * unsuccessful rescan, available workers are blocked until
484      * signalled (see signalWork).  The top stack state holds the
485      * value of the "phase" field of the worker: its index and status,
486      * plus a version counter that, in addition to the count subfields
487      * (also serving as version stamps) provide protection against
488      * Treiber stack ABA effects.
489      *
490      * Creating workers. To create a worker, we pre-increment counts
491      * (serving as a reservation), and attempt to construct a
492      * ForkJoinWorkerThread via its factory. Upon construction, the
493      * new thread invokes registerWorker, where it constructs a
494      * WorkQueue and is assigned an index in the workQueues array
495      * (expanding the array if necessary). The thread is then started.
496      * Upon any exception across these steps, or null return from
497      * factory, deregisterWorker adjusts counts and records
498      * accordingly.  If a null return, the pool continues running with
499      * fewer than the target number workers. If exceptional, the
500      * exception is propagated, generally to some external caller.
501      * Worker index assignment avoids the bias in scanning that would
502      * occur if entries were sequentially packed starting at the front
503      * of the workQueues array. We treat the array as a simple
504      * power-of-two hash table, expanding as needed. The seedIndex
505      * increment ensures no collisions until a resize is needed or a
506      * worker is deregistered and replaced, and thereafter keeps
507      * probability of collision low. We cannot use
508      * ThreadLocalRandom.getProbe() for similar purposes here because
509      * the thread has not started yet, but do so for creating
510      * submission queues for existing external threads (see
511      * externalPush).
512      *
513      * WorkQueue field "phase" is used by both workers and the pool to
514      * manage and track whether a worker is UNSIGNALLED (possibly
515      * blocked waiting for a signal).  When a worker is enqueued its
516      * phase field is set. Note that phase field updates lag queue CAS
517      * releases so usage requires care -- seeing a negative phase does
518      * not guarantee that the worker is available. When queued, the
519      * lower 16 bits of scanState must hold its pool index. So we
520      * place the index there upon initialization and otherwise keep it
521      * there or restore it when necessary.
522      *
523      * The ctl field also serves as the basis for memory
524      * synchronization surrounding activation. This uses a more
525      * efficient version of a Dekker-like rule that task producers and
526      * consumers sync with each other by both writing/CASing ctl (even
527      * if to its current value).  This would be extremely costly. So
528      * we relax it in several ways: (1) Producers only signal when
529      * their queue is possibly empty at some point during a push
530      * operation (which requires conservatively checking size zero or
531      * one to cover races). (2) Other workers propagate this signal
532      * when they find tasks in a queue with size greater than one. (3)
533      * Workers only enqueue after scanning (see below) and not finding
534      * any tasks.  (4) Rather than CASing ctl to its current value in
535      * the common case where no action is required, we reduce write
536      * contention by equivalently prefacing signalWork when called by
537      * an external task producer using a memory access with
538      * full-semantics or a "fullFence".
539      *
540      * Almost always, too many signals are issued, in part because a
541      * task producer cannot tell if some existing worker is in the
542      * midst of finishing one task (or already scanning) and ready to
543      * take another without being signalled. So the producer might
544      * instead activate a different worker that does not find any
545      * work, and then inactivates. This scarcely matters in
546      * steady-state computations involving all workers, but can create
547      * contention and bookkeeping bottlenecks during ramp-up,
548      * ramp-down, and small computations involving only a few workers.
549      *
550      * Scanning. Method scan (from runWorker) performs top-level
551      * scanning for tasks. (Similar scans appear in helpQuiesce and
552      * pollScan.)  Each scan traverses and tries to poll from each
553      * queue starting at a random index. Scans are not performed in
554      * ideal random permutation order, to reduce cacheline
555      * contention. The pseudorandom generator need not have
556      * high-quality statistical properties in the long term, but just
557      * within computations; We use Marsaglia XorShifts (often via
558      * ThreadLocalRandom.nextSecondarySeed), which are cheap and
559      * suffice. Scanning also includes contention reduction: When
560      * scanning workers fail to extract an apparently existing task,
561      * they soon restart at a different pseudorandom index.  This form
562      * of backoff improves throughput when many threads are trying to
563      * take tasks from few queues, which can be common in some usages.
564      * Scans do not otherwise explicitly take into account core
565      * affinities, loads, cache localities, etc, However, they do
566      * exploit temporal locality (which usually approximates these) by
567      * preferring to re-poll from the same queue after a successful
568      * poll before trying others (see method topLevelExec). However
569      * this preference is bounded (see TOP_BOUND_SHIFT) as a safeguard
570      * against infinitely unfair looping under unbounded user task
571      * recursion, and also to reduce long-term contention when many
572      * threads poll few queues holding many small tasks. The bound is
573      * high enough to avoid much impact on locality and scheduling
574      * overhead.
575      *
576      * Trimming workers. To release resources after periods of lack of
577      * use, a worker starting to wait when the pool is quiescent will
578      * time out and terminate (see method runWorker) if the pool has
579      * remained quiescent for period given by field keepAlive.
580      *
581      * Shutdown and Termination. A call to shutdownNow invokes
582      * tryTerminate to atomically set a runState bit. The calling
583      * thread, as well as every other worker thereafter terminating,
584      * helps terminate others by cancelling their unprocessed tasks,
585      * and waking them up, doing so repeatedly until stable. Calls to
586      * non-abrupt shutdown() preface this by checking whether
587      * termination should commence by sweeping through queues (until
588      * stable) to ensure lack of in-flight submissions and workers
589      * about to process them before triggering the "STOP" phase of
590      * termination.
591      *
592      * Joining Tasks
593      * =============
594      *
595      * Any of several actions may be taken when one worker is waiting
596      * to join a task stolen (or always held) by another.  Because we
597      * are multiplexing many tasks on to a pool of workers, we can't
598      * always just let them block (as in Thread.join).  We also cannot
599      * just reassign the joiner's run-time stack with another and
600      * replace it later, which would be a form of "continuation", that
601      * even if possible is not necessarily a good idea since we may
602      * need both an unblocked task and its continuation to progress.
603      * Instead we combine two tactics:
604      *
605      *   Helping: Arranging for the joiner to execute some task that it
606      *      would be running if the steal had not occurred.
607      *
608      *   Compensating: Unless there are already enough live threads,
609      *      method tryCompensate() may create or re-activate a spare
610      *      thread to compensate for blocked joiners until they unblock.
611      *
612      * A third form (implemented in tryRemoveAndExec) amounts to
613      * helping a hypothetical compensator: If we can readily tell that
614      * a possible action of a compensator is to steal and execute the
615      * task being joined, the joining thread can do so directly,
616      * without the need for a compensation thread.
617      *
618      * The ManagedBlocker extension API can't use helping so relies
619      * only on compensation in method awaitBlocker.
620      *
621      * The algorithm in awaitJoin entails a form of "linear helping".
622      * Each worker records (in field source) the id of the queue from
623      * which it last stole a task.  The scan in method awaitJoin uses
624      * these markers to try to find a worker to help (i.e., steal back
625      * a task from and execute it) that could hasten completion of the
626      * actively joined task.  Thus, the joiner executes a task that
627      * would be on its own local deque if the to-be-joined task had
628      * not been stolen. This is a conservative variant of the approach
629      * described in Wagner & Calder "Leapfrogging: a portable
630      * technique for implementing efficient futures" SIGPLAN Notices,
631      * 1993 (http://portal.acm.org/citation.cfm?id=155354). It differs
632      * mainly in that we only record queue ids, not full dependency
633      * links.  This requires a linear scan of the workQueues array to
634      * locate stealers, but isolates cost to when it is needed, rather
635      * than adding to per-task overhead. Searches can fail to locate
636      * stealers GC stalls and the like delay recording sources.
637      * Further, even when accurately identified, stealers might not
638      * ever produce a task that the joiner can in turn help with. So,
639      * compensation is tried upon failure to find tasks to run.
640      *
641      * Compensation does not by default aim to keep exactly the target
642      * parallelism number of unblocked threads running at any given
643      * time. Some previous versions of this class employed immediate
644      * compensations for any blocked join. However, in practice, the
645      * vast majority of blockages are byproducts of GC and
646      * other JVM or OS activities that are made worse by replacement
647      * when they cause longer-term oversubscription.  Rather than
648      * impose arbitrary policies, we allow users to override the
649      * default of only adding threads upon apparent starvation.  The
650      * compensation mechanism may also be bounded.  Bounds for the
651      * commonPool (see COMMON_MAX_SPARES) better enable JVMs to cope
652      * with programming errors and abuse before running out of
653      * resources to do so.
654      *
655      * Common Pool
656      * ===========
657      *
658      * The static common pool always exists after static
659      * initialization.  Since it (or any other created pool) need
660      * never be used, we minimize initial construction overhead and
661      * footprint to the setup of about a dozen fields.
662      *
663      * When external threads submit to the common pool, they can
664      * perform subtask processing (see externalHelpComplete and
665      * related methods) upon joins.  This caller-helps policy makes it
666      * sensible to set common pool parallelism level to one (or more)
667      * less than the total number of available cores, or even zero for
668      * pure caller-runs.  We do not need to record whether external
669      * submissions are to the common pool -- if not, external help
670      * methods return quickly. These submitters would otherwise be
671      * blocked waiting for completion, so the extra effort (with
672      * liberally sprinkled task status checks) in inapplicable cases
673      * amounts to an odd form of limited spin-wait before blocking in
674      * ForkJoinTask.join.
675      *
676      * As a more appropriate default in managed environments, unless
677      * overridden by system properties, we use workers of subclass
678      * InnocuousForkJoinWorkerThread when there is a SecurityManager
679      * present. These workers have no permissions set, do not belong
680      * to any user-defined ThreadGroupEx, and erase all ThreadLocals
681      * after executing any top-level task (see
682      * WorkQueue.afterTopLevelExec).  The associated mechanics (mainly
683      * in ForkJoinWorkerThread) may be JVM-dependent and must access
684      * particular Thread class fields to achieve this effect.
685      *
686      * Memory placement
687      * ================
688      *
689      * Performance can be very sensitive to placement of instances of
690      * ForkJoinPool and WorkQueues and their queue arrays. To reduce
691      * false-sharing impact, the @Contended annotation isolates
692      * adjacent WorkQueue instances, as well as the ForkJoinPool.ctl
693      * field. WorkQueue arrays are allocated (by their threads) with
694      * larger initial sizes than most ever need, mostly to reduce
695      * false sharing with current garbage collectors that use cardmark
696      * tables.
697      *
698      * Style notes
699      * ===========
700      *
701      * Memory ordering relies mainly on VarHandles.  This can be
702      * awkward and ugly, but also reflects the need to control
703      * outcomes across the unusual cases that arise in very racy code
704      * with very few invariants. All fields are read into locals
705      * before use, and null-checked if they are references.  Array
706      * accesses using masked indices include checks (that are always
707      * true) that the array length is non-zero to avoid compilers
708      * inserting more expensive traps.  This is usually done in a
709      * "C"-like style of listing declarations at the heads of methods
710      * or blocks, and using inline assignments on first encounter.
711      * Nearly all explicit checks lead to bypass/return, not exception
712      * throws, because they may legitimately arise due to
713      * cancellation/revocation during shutdown.
714      *
715      * There is a lot of representation-level coupling among classes
716      * ForkJoinPool, ForkJoinWorkerThread, and ForkJoinTask.  The
717      * fields of WorkQueue maintain data structures managed by
718      * ForkJoinPool, so are directly accessed.  There is little point
719      * trying to reduce this, since any associated future changes in
720      * representations will need to be accompanied by algorithmic
721      * changes anyway. Several methods intrinsically sprawl because
722      * they must accumulate sets of consistent reads of fields held in
723      * local variables. Some others are artificially broken up to
724      * reduce producer/consumer imbalances due to dynamic compilation.
725      * There are also other coding oddities (including several
726      * unnecessary-looking hoisted null checks) that help some methods
727      * perform reasonably even when interpreted (not compiled).
728      *
729      * The order of declarations in this file is (with a few exceptions):
730      * (1) Static utility functions
731      * (2) Nested (static) classes
732      * (3) Static fields
733      * (4) Fields, along with constants used when unpacking some of them
734      * (5) Internal control methods
735      * (6) Callbacks and other support for ForkJoinTask methods
736      * (7) Exported methods
737      * (8) Static block initializing statics in minimally dependent order
738      */
739 
740     // Static utilities
741 
742     /**
743      * If there is a security manager, makes sure caller has
744      * permission to modify threads.
745      */
746     // private static void checkPermission() {
747     //     SecurityManager security = System.getSecurityManager();
748     //     if (security !is null)
749     //         security.checkPermission(modifyThreadPermission);
750     // }
751 
752 
753     // static fields (initialized in static initializer below)
754 
755     /**
756      * Creates a new ForkJoinWorkerThread. This factory is used unless
757      * overridden in ForkJoinPool constructors.
758      */
759     __gshared ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory;
760 
761     /**
762      * Permission required for callers of methods that may start or
763      * kill threads.
764      */
765     // __gshared RuntimePermission modifyThreadPermission;
766 
767     /**
768      * Common (static) pool. Non-null for use unless a static
769      * construction exception, but internal usages null-check on use
770      * to paranoically avoid potential initialization circularities
771      * as well as to simplify generated code.
772      */
773     __gshared ForkJoinPool common;
774 
775     /**
776      * Common pool parallelism. To allow simpler use and management
777      * when common pool threads are disabled, we allow the underlying
778      * common.parallelism field to be zero, but in that case still report
779      * parallelism as 1 to reflect resulting caller-runs mechanics.
780      */
781     __gshared int COMMON_PARALLELISM;
782 
783     /**
784      * Limit on spare thread construction in tryCompensate.
785      */
786     private __gshared int COMMON_MAX_SPARES;
787 
788     /**
789      * Sequence number for creating workerNamePrefix.
790      */
791     private shared static int poolNumberSequence;
792 
793     /**
794      * Returns the next sequence number. We don't expect this to
795      * ever contend, so use simple builtin sync.
796      */
797     private static int nextPoolId() {
798         return AtomicHelper.increment(poolNumberSequence);
799     }
800 
801     // static configuration constants
802 
803     /**
804      * Default idle timeout value (in milliseconds) for the thread
805      * triggering quiescence to park waiting for new work
806      */
807     private enum long DEFAULT_KEEPALIVE = 60_000L;
808 
809     /**
810      * Undershoot tolerance for idle timeouts
811      */
812     private enum long TIMEOUT_SLOP = 20L;
813 
814     /**
815      * The default value for COMMON_MAX_SPARES.  Overridable using the
816      * "hunt.concurrency.ForkJoinPool.common.maximumSpares" system
817      * property.  The default value is far in excess of normal
818      * requirements, but also far short of MAX_CAP and typical OS
819      * thread limits, so allows JVMs to catch misuse/abuse before
820      * running out of resources needed to do so.
821      */
822     private enum int DEFAULT_COMMON_MAX_SPARES = 256;
823 
824     /**
825      * Increment for seed generators. See class ThreadLocal for
826      * explanation.
827      */
828     private enum int SEED_INCREMENT = 0x9e3779b9;
829 
830     /*
831      * Bits and masks for field ctl, packed with 4 16 bit subfields:
832      * RC: Number of released (unqueued) workers minus target parallelism
833      * TC: Number of total workers minus target parallelism
834      * SS: version count and status of top waiting thread
835      * ID: poolIndex of top of Treiber stack of waiters
836      *
837      * When convenient, we can extract the lower 32 stack top bits
838      * (including version bits) as sp=(int)ctl.  The offsets of counts
839      * by the target parallelism and the positionings of fields makes
840      * it possible to perform the most common checks via sign tests of
841      * fields: When ac is negative, there are not enough unqueued
842      * workers, when tc is negative, there are not enough total
843      * workers.  When sp is non-zero, there are waiting workers.  To
844      * deal with possibly negative fields, we use casts in and out of
845      * "short" and/or signed shifts to maintain signedness.
846      *
847      * Because it occupies uppermost bits, we can add one release count
848      * using getAndAddLong of RC_UNIT, rather than CAS, when returning
849      * from a blocked join.  Other updates entail multiple subfields
850      * and masking, requiring CAS.
851      *
852      * The limits packed in field "bounds" are also offset by the
853      * parallelism level to make them comparable to the ctl rc and tc
854      * fields.
855      */
856 
857     // Lower and upper word masks
858     private enum long SP_MASK    = 0xffffffffL;
859     private enum long UC_MASK    = ~SP_MASK;
860 
861     // Release counts
862     private enum int  RC_SHIFT   = 48;
863     private enum long RC_UNIT    = 0x0001L << RC_SHIFT;
864     private enum long RC_MASK    = 0xffffL << RC_SHIFT;
865 
866     // Total counts
867     private enum int  TC_SHIFT   = 32;
868     private enum long TC_UNIT    = 0x0001L << TC_SHIFT;
869     private enum long TC_MASK    = 0xffffL << TC_SHIFT;
870     private enum long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); // sign
871 
872     // Instance fields
873 
874     long stealCount;            // collects worker nsteals
875     Duration keepAlive;                // milliseconds before dropping if idle
876     int indexSeed;                       // next worker index
877     int bounds;                    // min, max threads packed as shorts
878     int mode;                   // parallelism, runstate, queue mode
879     WorkQueue[] workQueues;              // main registry
880     string workerNamePrefix;       // for worker thread string; sync lock
881     Object workerNameLocker;
882     ForkJoinWorkerThreadFactory factory;
883     UncaughtExceptionHandler ueh;  // per-worker UEH
884     Predicate!(ForkJoinPool) saturate;
885 
886     // @jdk.internal.vm.annotation.Contended("fjpctl") // segregate
887     shared long ctl;                   // main pool control
888 
889     // Creating, registering and deregistering workers
890 
891     /**
892      * Tries to construct and start one worker. Assumes that total
893      * count has already been incremented as a reservation.  Invokes
894      * deregisterWorker on any failure.
895      *
896      * @return true if successful
897      */
898     private bool createWorker() {
899         ForkJoinWorkerThreadFactory fac = factory;
900         Throwable ex = null;
901         ForkJoinWorkerThread wt = null;
902         try {
903             if (fac !is null && (wt = fac.newThread(this)) !is null) {
904                 wt.start();
905                 return true;
906             }
907         } catch (Throwable rex) {
908             ex = rex;
909         }
910         deregisterWorker(wt, ex);
911         return false;
912     }
913 
914     /**
915      * Tries to add one worker, incrementing ctl counts before doing
916      * so, relying on createWorker to back out on failure.
917      *
918      * @param c incoming ctl value, with total count negative and no
919      * idle workers.  On CAS failure, c is refreshed and retried if
920      * this holds (otherwise, a new worker is not needed).
921      */
922     private void tryAddWorker(long c) {
923         do {
924             long nc = ((RC_MASK & (c + RC_UNIT)) |
925                        (TC_MASK & (c + TC_UNIT)));
926             if (ctl == c && AtomicHelper.compareAndSet(this.ctl, c, nc)) {
927                 // version(HUNT_CONCURRENCY_DEBUG) tracef("nc=%d, ctl=%d, c=%d", nc, ctl, c);
928                 createWorker();
929                 break;
930             }
931         } while (((c = ctl) & ADD_WORKER) != 0L && cast(int)c == 0);
932     }
933 
934     /**
935      * Callback from ForkJoinWorkerThread constructor to establish and
936      * record its WorkQueue.
937      *
938      * @param wt the worker thread
939      * @return the worker's queue
940      */
941     final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
942         UncaughtExceptionHandler handler;
943         wt.isDaemon(true);                             // configure thread
944         if ((handler = ueh) !is null)
945             wt.setUncaughtExceptionHandler(handler);
946         int tid = 0;                                    // for thread name
947         int idbits = mode & FIFO;
948         string prefix = workerNamePrefix;
949         WorkQueue w = new WorkQueue(this, wt);
950         if (prefix !is null) {
951             synchronized (this) {
952                 WorkQueue[] ws = workQueues; 
953                 int n;
954                 int s = indexSeed += SEED_INCREMENT;
955                 idbits |= (s & ~(SMASK | FIFO | DORMANT));
956                 if (ws !is null && (n = cast(int)ws.length) > 1) {
957                     int m = n - 1;
958                     tid = m & ((s << 1) | 1);           // odd-numbered indices
959                     for (int probes = n >>> 1;;) {      // find empty slot
960                         WorkQueue q;
961                         if ((q = ws[tid]) is null || q.phase == QUIET)
962                             break;
963                         else if (--probes == 0) {
964                             tid = n | 1;                // resize below
965                             break;
966                         }
967                         else
968                             tid = (tid + 2) & m;
969                     }
970                     w.phase = w.id = tid | idbits;      // now publishable
971 
972                     if (tid < n)
973                         ws[tid] = w;
974                     else {                              // expand array
975                         int an = n << 1;
976                         WorkQueue[] as = new WorkQueue[an];
977                         as[tid] = w;
978                         int am = an - 1;
979                         for (int j = 0; j < n; ++j) {
980                             WorkQueue v;                // copy external queue
981                             if ((v = ws[j]) !is null)    // position may change
982                                 as[v.id & am & SQMASK] = v;
983                             if (++j >= n)
984                                 break;
985                             as[j] = ws[j];              // copy worker
986                         }
987                         workQueues = as;
988                     }
989                 }
990             }
991             wt.name(prefix ~ tid.to!string());
992         }
993         return w;
994     }
995 
996     /**
997      * Final callback from terminating worker, as well as upon failure
998      * to construct or start a worker.  Removes record of worker from
999      * array, and adjusts counts. If pool is shutting down, tries to
1000      * complete termination.
1001      *
1002      * @param wt the worker thread, or null if construction failed
1003      * @param ex the exception causing failure, or null if none
1004      */
1005     final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
1006         WorkQueue w = null;
1007         int phase = 0;
1008         if (wt !is null && (w = wt.workQueue) !is null) {
1009             int wid = w.id;
1010             long ns = cast(long)w.nsteals & 0xffffffffL;
1011             if (!workerNamePrefix.empty()) {
1012                 synchronized (this) {
1013                     WorkQueue[] ws; size_t n, i;         // remove index from array
1014                     if ((ws = workQueues) !is null && (n = ws.length) > 0 &&
1015                         ws[i = wid & (n - 1)] == w)
1016                         ws[i] = null;
1017                     stealCount += ns;
1018                 }
1019             }
1020             phase = w.phase;
1021         }
1022         if (phase != QUIET) {                         // else pre-adjusted
1023             long c;                                   // decrement counts
1024             do {} while (!AtomicHelper.compareAndSet(this.ctl, c = ctl, 
1025                 ((RC_MASK & (c - RC_UNIT)) |
1026                 (TC_MASK & (c - TC_UNIT)) |
1027                 (SP_MASK & c))));
1028         }
1029         if (w !is null)
1030             w.cancelAll();                            // cancel remaining tasks
1031 
1032         if (!tryTerminate(false, false) &&            // possibly replace worker
1033             w !is null && w.array !is null)           // avoid repeated failures
1034             signalWork();
1035 
1036         if (ex !is null)                              // rethrow
1037             ForkJoinTaskHelper.rethrow(ex);
1038     }
1039 
1040     /**
1041      * Tries to create or release a worker if too few are running.
1042      */
1043     final void signalWork() {
1044         for (;;) {
1045             long c; int sp; WorkQueue[] ws; int i; WorkQueue v;
1046             if ((c = ctl) >= 0L)                      // enough workers
1047                 break;
1048             else if ((sp = cast(int)c) == 0) {            // no idle workers
1049                 if ((c & ADD_WORKER) != 0L)           // too few workers
1050                     tryAddWorker(c);
1051                 break;
1052             }
1053             else if ((ws = workQueues) is null)
1054                 break;                                // unstarted/terminated
1055             else if (ws.length <= (i = sp & SMASK))
1056                 break;                                // terminated
1057             else if ((v = ws[i]) is null)
1058                 break;                                // terminating
1059             else {
1060                 int np = sp & ~UNSIGNALLED;
1061                 int vp = v.phase;
1062                 long nc = (v.stackPred & SP_MASK) | (UC_MASK & (c + RC_UNIT));
1063                 Thread vt = v.owner;
1064                 if (sp == vp && AtomicHelper.compareAndSet(this.ctl, c, nc)) {
1065                     v.phase = np;
1066                     if (vt !is null && v.source < 0)
1067                         LockSupport.unpark(vt);
1068                     break;
1069                 }
1070             }
1071         }
1072     }
1073 
1074     /**
1075      * Tries to decrement counts (sometimes implicitly) and possibly
1076      * arrange for a compensating worker in preparation for blocking:
1077      * If not all core workers yet exist, creates one, else if any are
1078      * unreleased (possibly including caller) releases one, else if
1079      * fewer than the minimum allowed number of workers running,
1080      * checks to see that they are all active, and if so creates an
1081      * extra worker unless over maximum limit and policy is to
1082      * saturate.  Most of these steps can fail due to interference, in
1083      * which case 0 is returned so caller will retry. A negative
1084      * return value indicates that the caller doesn't need to
1085      * re-adjust counts when later unblocked.
1086      *
1087      * @return 1: block then adjust, -1: block without adjust, 0 : retry
1088      */
1089     private int tryCompensate(WorkQueue w) {
1090         int t, n, sp;
1091         long c = ctl;
1092         WorkQueue[] ws = workQueues;
1093         if ((t = cast(short)(c >>> TC_SHIFT)) >= 0) {
1094             if (ws is null || (n = cast(int)ws.length) <= 0 || w is null)
1095                 return 0;                        // disabled
1096             else if ((sp = cast(int)c) != 0) {       // replace or release
1097                 WorkQueue v = ws[sp & (n - 1)];
1098                 int wp = w.phase;
1099                 long uc = UC_MASK & ((wp < 0) ? c + RC_UNIT : c);
1100                 int np = sp & ~UNSIGNALLED;
1101                 if (v !is null) {
1102                     int vp = v.phase;
1103                     Thread vt = v.owner;
1104                     long nc = (cast(long)v.stackPred & SP_MASK) | uc;
1105                     if (vp == sp && AtomicHelper.compareAndSet(this.ctl, c, nc)) {
1106                         v.phase = np;
1107                         if (vt !is null && v.source < 0)
1108                             LockSupport.unpark(vt);
1109                         return (wp < 0) ? -1 : 1;
1110                     }
1111                 }
1112                 return 0;
1113             }
1114             else if (cast(int)(c >> RC_SHIFT) -      // reduce parallelism
1115                      cast(short)(bounds & SMASK) > 0) {
1116                 long nc = ((RC_MASK & (c - RC_UNIT)) | (~RC_MASK & c));
1117                 return AtomicHelper.compareAndSet(this.ctl, c, nc) ? 1 : 0;
1118             }
1119             else {                               // validate
1120                 int md = mode, pc = md & SMASK, tc = pc + t, bc = 0;
1121                 bool unstable = false;
1122                 for (int i = 1; i < n; i += 2) {
1123                     WorkQueue q; ThreadEx wt; ThreadState ts;
1124                     if ((q = ws[i]) !is null) {
1125                         if (q.source == 0) {
1126                             unstable = true;
1127                             break;
1128                         }
1129                         else {
1130                             --tc;
1131                             if ((wt = q.owner) !is null &&
1132                                 ((ts = wt.getState()) == ThreadState.BLOCKED ||
1133                                  ts == ThreadState.WAITING))
1134                                 ++bc;            // worker is blocking
1135                         }
1136                     }
1137                 }
1138                 if (unstable || tc != 0 || ctl != c)
1139                     return 0;                    // inconsistent
1140                 else if (t + pc >= MAX_CAP || t >= (bounds >>> SWIDTH)) {
1141                     Predicate!(ForkJoinPool) sat;
1142                     if ((sat = saturate) !is null && sat(this))
1143                         return -1;
1144                     else if (bc < pc) {          // lagging
1145                         Thread.yield();          // for retry spins
1146                         return 0;
1147                     }
1148                     else
1149                         throw new RejectedExecutionException(
1150                             "Thread limit exceeded replacing blocked worker");
1151                 }
1152             }
1153         }
1154 
1155         long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK); // expand pool
1156         return AtomicHelper.compareAndSet(this.ctl, c, nc) && createWorker() ? 1 : 0;
1157     }
1158 
1159     /**
1160      * Top-level runloop for workers, called by ForkJoinWorkerThread.run.
1161      * See above for explanation.
1162      */
1163     final void runWorker(WorkQueue w) {
1164         int r = (w.id ^ ThreadLocalRandom.nextSecondarySeed()) | FIFO; // rng
1165         w.array = new IForkJoinTask[INITIAL_QUEUE_CAPACITY]; // initialize
1166         while(true) {
1167             int phase;
1168             if (scan(w, r)) {                     // scan until apparently empty
1169                 r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // move (xorshift)
1170             }
1171             else if ((phase = w.phase) >= 0) {    // enqueue, then rescan
1172                 long np = (w.phase = (phase + SS_SEQ) | UNSIGNALLED) & SP_MASK;
1173                 long c, nc;
1174                 do {
1175                     w.stackPred = cast(int)(c = ctl);
1176                     nc = ((c - RC_UNIT) & UC_MASK) | np;
1177                 } while (!AtomicHelper.compareAndSet(this.ctl, c, nc));
1178 
1179                 version(HUNT_CONCURRENCY_DEBUG) {
1180                     // infof("ctl=%d, c=%d, nc=%d, stackPred=%d", ctl, c, nc, w.stackPred);
1181                 }
1182             }
1183             else {                                // already queued
1184 
1185                 int pred = w.stackPred;
1186                 ThreadEx.interrupted();           // clear before park
1187                 w.source = DORMANT;               // enable signal
1188                 long c = ctl;
1189                 int md = mode, rc = (md & SMASK) + cast(int)(c >> RC_SHIFT);
1190 
1191                 version(HUNT_CONCURRENCY_DEBUG) {
1192                     // tracef("md=%d, rc=%d, c=%d, pred=%d, phase=%d", md, rc, c, pred, phase);
1193                 }
1194 
1195                 if (md < 0) {                      // terminating
1196                     break;
1197                 } else if (rc <= 0 && (md & SHUTDOWN) != 0 && tryTerminate(false, false)) {
1198                     break;                        // quiescent shutdown
1199                 } else if (rc <= 0 && pred != 0 && phase == cast(int)c) {
1200                     long nc = (UC_MASK & (c - TC_UNIT)) | (SP_MASK & pred);
1201                     MonoTime d = MonoTime.currTime + keepAlive; // DateTime.currentTimeMillis();
1202                     LockSupport.parkUntil(this, d);
1203                     if (ctl == c &&               // drop on timeout if all idle
1204                         d - MonoTime.currTime <= TIMEOUT_SLOP.msecs &&
1205                         AtomicHelper.compareAndSet(this.ctl, c, nc)) {
1206                         w.phase = QUIET;
1207                         break;
1208                     }
1209                 } else if (w.phase < 0) {
1210                     LockSupport.park(this);       // OK if spuriously woken
1211             break;
1212                 }
1213                 w.source = 0;                     // disable signal
1214             }
1215         }
1216     }
1217 
1218     /**
1219      * Scans for and if found executes one or more top-level tasks from a queue.
1220      *
1221      * @return true if found an apparently non-empty queue, and
1222      * possibly ran task(s).
1223      */
1224     private bool scan(WorkQueue w, int r) {
1225         WorkQueue[] ws; int n;
1226         if ((ws = workQueues) !is null && (n = cast(int)ws.length) > 0 && w !is null) {
1227             for (int m = n - 1, j = r & m;;) {
1228                 WorkQueue q; int b;
1229                 if ((q = ws[j]) !is null && q.top != (b = q.base)) {
1230                     int qid = q.id;
1231                     IForkJoinTask[] a; 
1232                     size_t cap, k; 
1233                     IForkJoinTask t;
1234 
1235                     if ((a = q.array) !is null && (cap = cast(int)a.length) > 0) {
1236                         k = (cap - 1) & b;
1237                         // import core.atomic;  
1238                         // auto ss =     core.atomic.atomicLoad((cast(shared)a[k]));   
1239                         // FIXME: Needing refactor or cleanup -@zxp at 2/6/2019, 5:12:19 PM               
1240                         // IForkJoinTask tt = a[k];
1241                         // t = AtomicHelper.load(tt);
1242                         t = a[k];
1243                         // tracef("k=%d, t is null: %s", k, t is null);
1244                         if (q.base == b++ && t !is null && 
1245                                 AtomicHelper.compareAndSet(a[k], t, cast(IForkJoinTask)null)) {
1246                             q.base = b;
1247                             w.source = qid;
1248                             if (q.top - b > 0) signalWork();
1249 
1250                             // infof("IForkJoinTask: %s", typeid(cast(Object)t));
1251                             w.topLevelExec(t, q,  // random fairness bound
1252                                            r & ((n << TOP_BOUND_SHIFT) - 1));
1253                         }
1254                     }
1255                     return true;
1256                 }
1257                 else if (--n > 0) {
1258                     j = (j + 1) & m;
1259                 }
1260                 else {
1261                     break;
1262                 }
1263             }
1264         }
1265         return false;
1266     }
1267 
1268     /**
1269      * Helps and/or blocks until the given task is done or timeout.
1270      * First tries locally helping, then scans other queues for a task
1271      * produced by one of w's stealers; compensating and blocking if
1272      * none are found (rescanning if tryCompensate fails).
1273      *
1274      * @param w caller
1275      * @param task the task
1276      * @param deadline for timed waits, if nonzero
1277      * @return task status on exit
1278      */
1279     final int awaitJoin(WorkQueue w, IForkJoinTask task, MonoTime deadline) {
1280         if(w is null || task is null) {
1281             return 0;
1282         }
1283         int s = 0;
1284         int seed = ThreadLocalRandom.nextSecondarySeed();
1285         ICountedCompleter cc = cast(ICountedCompleter)task;
1286         if(cc !is null) {
1287             s = w.helpCC(cc, 0, false);
1288             if(s<0)
1289                 return 0;
1290         }
1291 
1292         w.tryRemoveAndExec(task);
1293         int src = w.source, id = w.id;
1294         int r = (seed >>> 16) | 1, step = (seed & ~1) | 2;
1295         s = task.getStatus();
1296         while (s >= 0) {
1297             WorkQueue[] ws;
1298             int n = (ws = workQueues) is null ? 0 : cast(int)ws.length, m = n - 1;
1299             while (n > 0) {
1300                 WorkQueue q; int b;
1301                 if ((q = ws[r & m]) !is null && q.source == id &&
1302                     q.top != (b = q.base)) {
1303                     IForkJoinTask[] a; int cap, k;
1304                     int qid = q.id;
1305                     if ((a = q.array) !is null && (cap = cast(int)a.length) > 0) {
1306                         k = (cap - 1) & b;
1307                         // FIXME: Needing refactor or cleanup -@zxp at 2/6/2019, 5:13:08 PM
1308                         // 
1309                         // IForkJoinTask t = AtomicHelper.load(a[k]);
1310                         IForkJoinTask t = a[k];
1311                         if (q.source == id && q.base == b++ &&
1312                             t !is null && AtomicHelper.compareAndSet(a[k], t, cast(IForkJoinTask)null)) {
1313                             q.base = b;
1314                             w.source = qid;
1315                             t.doExec();
1316                             w.source = src;
1317                         }
1318                     }
1319                     break;
1320                 }
1321                 else {
1322                     r += step;
1323                     --n;
1324                 }
1325             }
1326 
1327             if ((s = task.getStatus()) < 0)
1328                 break;
1329             else if (n == 0) { // empty scan
1330                 long ms; int block;
1331                 Duration ns;
1332                 if (deadline == MonoTime.zero())
1333                     ms = 0L;                       // untimed
1334                 else if ((ns = deadline - MonoTime.currTime) <= Duration.zero())
1335                     break;                         // timeout
1336                 else if ((ms = ns.total!(TimeUnit.Millisecond)()) <= 0L)
1337                     ms = 1L;                       // avoid 0 for timed wait
1338                 if ((block = tryCompensate(w)) != 0) {
1339                     task.internalWait(ms);
1340                     AtomicHelper.getAndAdd(this.ctl, (block > 0) ? RC_UNIT : 0L);
1341                 }
1342                 s = task.getStatus();
1343             }
1344         }
1345         return s;
1346     }
1347 
1348     /**
1349      * Runs tasks until {@code isQuiescent()}. Rather than blocking
1350      * when tasks cannot be found, rescans until all others cannot
1351      * find tasks either.
1352      */
1353     final void helpQuiescePool(WorkQueue w) {
1354         int prevSrc = w.source;
1355         int seed = ThreadLocalRandom.nextSecondarySeed();
1356         int r = seed >>> 16, step = r | 1;
1357         for (int source = prevSrc, released = -1;;) { // -1 until known
1358             IForkJoinTask localTask; WorkQueue[] ws;
1359             while ((localTask = w.nextLocalTask()) !is null)
1360                 localTask.doExec();
1361             if (w.phase >= 0 && released == -1)
1362                 released = 1;
1363             bool quiet = true, empty = true;
1364             int n = (ws = workQueues) is null ? 0 : cast(int)ws.length;
1365             for (int m = n - 1; n > 0; r += step, --n) {
1366                 WorkQueue q; int b;
1367                 if ((q = ws[r & m]) !is null) {
1368                     int qs = q.source;
1369                     if (q.top != (b = q.base)) {
1370                         quiet = empty = false;
1371                         IForkJoinTask[] a; int cap, k;
1372                         int qid = q.id;
1373                         if ((a = q.array) !is null && (cap = cast(int)a.length) > 0) {
1374                             if (released == 0) {    // increment
1375                                 released = 1;
1376                                 AtomicHelper.getAndAdd(this.ctl, RC_UNIT);
1377                             }
1378                             k = (cap - 1) & b;
1379                             // IForkJoinTask t = AtomicHelper.load(a[k]);
1380                             // FIXME: Needing refactor or cleanup -@zxp at 2/6/2019, 9:32:07 PM
1381                             // 
1382                             IForkJoinTask t = a[k];
1383                             if (q.base == b++ && t !is null) {
1384                                 if(AtomicHelper.compareAndSet(a[k], t, null)) {
1385                                     q.base = b;
1386                                     w.source = qid;
1387                                     t.doExec();
1388                                     w.source = source = prevSrc;
1389                                 }
1390                             }
1391                         }
1392                         break;
1393                     }
1394                     else if ((qs & QUIET) == 0)
1395                         quiet = false;
1396                 }
1397             }
1398             if (quiet) {
1399                 if (released == 0) {
1400                     AtomicHelper.getAndAdd(this.ctl, RC_UNIT);
1401                 }
1402                 w.source = prevSrc;
1403                 break;
1404             }
1405             else if (empty) {
1406                 if (source != QUIET)
1407                     w.source = source = QUIET;
1408                 if (released == 1) {                 // decrement
1409                     released = 0;
1410                     AtomicHelper.getAndAdd(this.ctl, RC_MASK & -RC_UNIT);
1411                 }
1412             }
1413         }
1414     }
1415 
1416     /**
1417      * Scans for and returns a polled task, if available.
1418      * Used only for untracked polls.
1419      *
1420      * @param submissionsOnly if true, only scan submission queues
1421      */
1422     private IForkJoinTask pollScan(bool submissionsOnly) {
1423         WorkQueue[] ws; int n;
1424         rescan: while ((mode & STOP) == 0 && (ws = workQueues) !is null &&
1425                       (n = cast(int)ws.length) > 0) {
1426             int m = n - 1;
1427             int r = ThreadLocalRandom.nextSecondarySeed();
1428             int h = r >>> 16;
1429             int origin, step;
1430             if (submissionsOnly) {
1431                 origin = (r & ~1) & m;         // even indices and steps
1432                 step = (h & ~1) | 2;
1433             }
1434             else {
1435                 origin = r & m;
1436                 step = h | 1;
1437             }
1438             bool nonempty = false;
1439             for (int i = origin, oldSum = 0, checkSum = 0;;) {
1440                 WorkQueue q;
1441                 if ((q = ws[i]) !is null) {
1442                     int b; IForkJoinTask t;
1443                     if (q.top - (b = q.base) > 0) {
1444                         nonempty = true;
1445                         if ((t = q.poll()) !is null)
1446                             return t;
1447                     }
1448                     else
1449                         checkSum += b + q.id;
1450                 }
1451                 if ((i = (i + step) & m) == origin) {
1452                     if (!nonempty && oldSum == (oldSum = checkSum))
1453                         break rescan;
1454                     checkSum = 0;
1455                     nonempty = false;
1456                 }
1457             }
1458         }
1459         return null;
1460     }
1461 
1462     /**
1463      * Gets and removes a local or stolen task for the given worker.
1464      *
1465      * @return a task, if available
1466      */
1467     final IForkJoinTask nextTaskFor(WorkQueue w) {
1468         IForkJoinTask t;
1469         if (w is null || (t = w.nextLocalTask()) is null)
1470             t = pollScan(false);
1471         return t;
1472     }
1473 
1474     // External operations
1475 
1476     /**
1477      * Adds the given task to a submission queue at submitter's
1478      * current queue, creating one if null or contended.
1479      *
1480      * @param task the task. Caller must ensure non-null.
1481      */
1482     final void externalPush(IForkJoinTask task) {
1483         // initialize caller's probe
1484         int r = ThreadLocalRandom.getProbe();
1485         if (r == 0) {
1486             ThreadLocalRandom.localInit();
1487             r = ThreadLocalRandom.getProbe();
1488         }
1489 
1490         for (;;) {
1491             WorkQueue q;
1492             int md = mode, n;
1493             WorkQueue[] ws = workQueues;
1494             if ((md & SHUTDOWN) != 0 || ws is null || (n = cast(int)ws.length) <= 0)
1495                 throw new RejectedExecutionException();
1496             else if ((q = ws[(n - 1) & r & SQMASK]) is null) { // add queue
1497                 int qid = (r | QUIET) & ~(FIFO | OWNED);
1498                 Object lock = workerNameLocker;
1499                 IForkJoinTask[] qa =
1500                     new IForkJoinTask[INITIAL_QUEUE_CAPACITY];
1501                 q = new WorkQueue(this, null);
1502                 q.array = qa;
1503                 q.id = qid;
1504                 q.source = QUIET;
1505                 if (lock !is null) {     // unless disabled, lock pool to install
1506                     synchronized (lock) {
1507                         WorkQueue[] vs; int i, vn;
1508                         if ((vs = workQueues) !is null && (vn = cast(int)vs.length) > 0 &&
1509                             vs[i = qid & (vn - 1) & SQMASK] is null)
1510                             vs[i] = q;  // else another thread already installed
1511                     }
1512                 }
1513             }
1514             else if (!q.tryLockPhase()) // move if busy
1515                 r = ThreadLocalRandom.advanceProbe(r);
1516             else {
1517                 if (q.lockedPush(task))
1518                     signalWork();
1519                 return;
1520             }
1521         }
1522     }
1523 
1524     /**
1525      * Pushes a possibly-external submission.
1526      */
1527     private IForkJoinTask externalSubmit(IForkJoinTask task) {
1528         if (task is null)
1529             throw new NullPointerException();
1530         ForkJoinWorkerThread w = cast(ForkJoinWorkerThread)Thread.getThis(); 
1531         WorkQueue q;
1532         if ( w !is null && w.pool is this &&
1533             (q = w.workQueue) !is null)
1534             q.push(task);
1535         else
1536             externalPush(task);
1537         return task;
1538     }
1539 
1540     private ForkJoinTask!(T) externalSubmit(T)(ForkJoinTask!(T) task) {
1541         if (task is null)
1542             throw new NullPointerException();
1543         ForkJoinWorkerThread w = cast(ForkJoinWorkerThread)Thread.getThis(); 
1544         WorkQueue q;
1545         if ( w !is null && w.pool is this &&
1546             (q = w.workQueue) !is null)
1547             q.push(task);
1548         else
1549             externalPush(task);
1550         return task;
1551     }
1552 
1553     /**
1554      * Returns common pool queue for an external thread.
1555      */
1556     static WorkQueue commonSubmitterQueue() {
1557         ForkJoinPool p = common;
1558         int r = ThreadLocalRandom.getProbe();
1559         WorkQueue[] ws; int n;
1560         return (p !is null && (ws = p.workQueues) !is null &&
1561                 (n = cast(int)ws.length) > 0) ?
1562             ws[(n - 1) & r & SQMASK] : null;
1563     }
1564 
1565     /**
1566      * Performs tryUnpush for an external submitter.
1567      */
1568     final bool tryExternalUnpush(IForkJoinTask task) {
1569         int r = ThreadLocalRandom.getProbe();
1570         WorkQueue[] ws; WorkQueue w; int n;
1571         return ((ws = workQueues) !is null &&
1572                 (n = cast(int)ws.length) > 0 &&
1573                 (w = ws[(n - 1) & r & SQMASK]) !is null &&
1574                 w.tryLockedUnpush(task));
1575     }
1576 
1577     /**
1578      * Performs helpComplete for an external submitter.
1579      */
1580     final int externalHelpComplete(ICountedCompleter task, int maxTasks) {
1581         int r = ThreadLocalRandom.getProbe();
1582         WorkQueue[] ws; WorkQueue w; int n;
1583         return ((ws = workQueues) !is null && (n = cast(int)ws.length) > 0 &&
1584                 (w = ws[(n - 1) & r & SQMASK]) !is null) ?
1585             w.helpCC(task, maxTasks, true) : 0;
1586     }
1587 
1588     /**
1589      * Tries to steal and run tasks within the target's computation.
1590      * The maxTasks argument supports external usages; internal calls
1591      * use zero, allowing unbounded steps (external calls trap
1592      * non-positive values).
1593      *
1594      * @param w caller
1595      * @param maxTasks if non-zero, the maximum number of other tasks to run
1596      * @return task status on exit
1597      */
1598     final int helpComplete(WorkQueue w, ICountedCompleter task,
1599                            int maxTasks) {
1600         return (w is null) ? 0 : w.helpCC(task, maxTasks, false);
1601     }
1602 
1603     /**
1604      * Returns a cheap heuristic guide for task partitioning when
1605      * programmers, frameworks, tools, or languages have little or no
1606      * idea about task granularity.  In essence, by offering this
1607      * method, we ask users only about tradeoffs in overhead vs
1608      * expected throughput and its variance, rather than how finely to
1609      * partition tasks.
1610      *
1611      * In a steady state strict (tree-structured) computation, each
1612      * thread makes available for stealing enough tasks for other
1613      * threads to remain active. Inductively, if all threads play by
1614      * the same rules, each thread should make available only a
1615      * constant number of tasks.
1616      *
1617      * The minimum useful constant is just 1. But using a value of 1
1618      * would require immediate replenishment upon each steal to
1619      * maintain enough tasks, which is infeasible.  Further,
1620      * partitionings/granularities of offered tasks should minimize
1621      * steal rates, which in general means that threads nearer the top
1622      * of computation tree should generate more than those nearer the
1623      * bottom. In perfect steady state, each thread is at
1624      * approximately the same level of computation tree. However,
1625      * producing extra tasks amortizes the uncertainty of progress and
1626      * diffusion assumptions.
1627      *
1628      * So, users will want to use values larger (but not much larger)
1629      * than 1 to both smooth over shortages and hedge
1630      * against uneven progress; as traded off against the cost of
1631      * extra task overhead. We leave the user to pick a threshold
1632      * value to compare with the results of this call to guide
1633      * decisions, but recommend values such as 3.
1634      *
1635      * When all threads are active, it is on average OK to estimate
1636      * surplus strictly locally. In steady-state, if one thread is
1637      * maintaining say 2 surplus tasks, then so are others. So we can
1638      * just use estimated queue length.  However, this strategy alone
1639      * leads to serious mis-estimates in some non-steady-state
1640      * conditions (ramp-up, ramp-down, other stalls). We can detect
1641      * many of these by further considering the number of "idle"
1642      * threads, that are known to have zero queued tasks, so
1643      * compensate by a factor of (#idle/#active) threads.
1644      */
1645     static int getSurplusQueuedTaskCount() {
1646         Thread t = Thread.getThis(); 
1647         ForkJoinWorkerThread wt = cast(ForkJoinWorkerThread)t; 
1648         ForkJoinPool pool; WorkQueue q;
1649 
1650         if (wt !is null && (pool = wt.pool) !is null &&
1651             (q = wt.workQueue) !is null) {
1652             int p = pool.mode & SMASK;
1653             int a = p + cast(int)(pool.ctl >> RC_SHIFT);
1654             int n = q.top - q.base;
1655             return n - (a > (p >>>= 1) ? 0 :
1656                         a > (p >>>= 1) ? 1 :
1657                         a > (p >>>= 1) ? 2 :
1658                         a > (p >>>= 1) ? 4 :
1659                         8);
1660         }
1661         return 0;
1662     }
1663 
1664     // Termination
1665 
1666     /**
1667      * Possibly initiates and/or completes termination.
1668      *
1669      * @param now if true, unconditionally terminate, else only
1670      * if no work and no active workers
1671      * @param enable if true, terminate when next possible
1672      * @return true if terminating or terminated
1673      */
1674     private bool tryTerminate(bool now, bool enable) {
1675         int md; // 3 phases: try to set SHUTDOWN, then STOP, then TERMINATED
1676 
1677         while (((md = mode) & SHUTDOWN) == 0) {
1678             if (!enable || this == common)        // cannot shutdown
1679                 return false;
1680             else {
1681                 AtomicHelper.compareAndSet(this.mode, md, md | SHUTDOWN);
1682             }
1683                 
1684         }
1685 
1686         while (((md = mode) & STOP) == 0) {       // try to initiate termination
1687             if (!now) {                           // check if quiescent & empty
1688                 for (long oldSum = 0L;;) {        // repeat until stable
1689                     bool running = false;
1690                     long checkSum = ctl;
1691                     WorkQueue[] ws = workQueues;
1692                     if ((md & SMASK) + cast(int)(checkSum >> RC_SHIFT) > 0)
1693                         running = true;
1694                     else if (ws !is null) {
1695                         WorkQueue w;
1696                         for (int i = 0; i < ws.length; ++i) {
1697                             if ((w = ws[i]) !is null) {
1698                                 int s = w.source, p = w.phase;
1699                                 int d = w.id, b = w.base;
1700                                 if (b != w.top ||
1701                                     ((d & 1) == 1 && (s >= 0 || p >= 0))) {
1702                                     running = true;
1703                                     break;     // working, scanning, or have work
1704                                 }
1705                                 checkSum += ((cast(long)s << 48) + (cast(long)p << 32) +
1706                                              (cast(long)b << 16) + cast(long)d);
1707                             }
1708                         }
1709                     }
1710                     if (((md = mode) & STOP) != 0)
1711                         break;                 // already triggered
1712                     else if (running)
1713                         return false;
1714                     else if (workQueues == ws && oldSum == (oldSum = checkSum))
1715                         break;
1716                 }
1717             }
1718             if ((md & STOP) == 0)
1719                 AtomicHelper.compareAndSet(this.mode, md, md | STOP);
1720         }
1721 
1722         while (((md = mode) & TERMINATED) == 0) { // help terminate others
1723             for (long oldSum = 0L;;) {            // repeat until stable
1724                 WorkQueue[] ws; WorkQueue w;
1725                 long checkSum = ctl;
1726                 if ((ws = workQueues) !is null) {
1727                     for (int i = 0; i < ws.length; ++i) {
1728                         if ((w = ws[i]) !is null) {
1729                             ForkJoinWorkerThread wt = w.owner;
1730                             w.cancelAll();        // clear queues
1731                             if (wt !is null) {
1732                                 try {             // unblock join or park
1733                                     wt.interrupt();
1734                                 } catch (Throwable ignore) {
1735                                 }
1736                             }
1737                             checkSum += (cast(long)w.phase << 32) + w.base;
1738                         }
1739                     }
1740                 }
1741                 if (((md = mode) & TERMINATED) != 0 ||
1742                     (workQueues == ws && oldSum == (oldSum = checkSum)))
1743                     break;
1744             }
1745             if ((md & TERMINATED) != 0)
1746                 break;
1747             else if ((md & SMASK) + cast(short)(ctl >>> TC_SHIFT) > 0)
1748                 break;
1749             else if (AtomicHelper.compareAndSet(this.mode, md, md | TERMINATED)) {
1750                 synchronized (this) {
1751                     // notifyAll();                  // for awaitTermination
1752                     // TODO: Tasks pending completion -@zxp at 2/4/2019, 11:03:21 AM
1753                     // 
1754                 }
1755                 break;
1756             }
1757         }
1758         return true;
1759     }
1760 
1761     // Exported methods
1762 
1763     // Constructors
1764 
1765     /**
1766      * Creates a {@code ForkJoinPool} with parallelism equal to {@link
1767      * java.lang.Runtime#availableProcessors}, using defaults for all
1768      * other parameters (see {@link #ForkJoinPool(int,
1769      * ForkJoinWorkerThreadFactory, UncaughtExceptionHandler, bool,
1770      * int, int, int, Predicate, long, TimeUnit)}).
1771      *
1772      * @throws SecurityException if a security manager exists and
1773      *         the caller is not permitted to modify threads
1774      *         because it does not hold {@link
1775      *         java.lang.RuntimePermission}{@code ("modifyThread")}
1776      */
1777     this() {
1778         this(min(MAX_CAP, totalCPUs),
1779              defaultForkJoinWorkerThreadFactory, null, false,
1780              0, MAX_CAP, 1, null, dur!(TimeUnit.Millisecond)(DEFAULT_KEEPALIVE));
1781     }
1782 
1783     /**
1784      * Creates a {@code ForkJoinPool} with the indicated parallelism
1785      * level, using defaults for all other parameters (see {@link
1786      * #ForkJoinPool(int, ForkJoinWorkerThreadFactory,
1787      * UncaughtExceptionHandler, bool, int, int, int, Predicate,
1788      * long, TimeUnit)}).
1789      *
1790      * @param parallelism the parallelism level
1791      * @throws IllegalArgumentException if parallelism less than or
1792      *         equal to zero, or greater than implementation limit
1793      * @throws SecurityException if a security manager exists and
1794      *         the caller is not permitted to modify threads
1795      *         because it does not hold {@link
1796      *         java.lang.RuntimePermission}{@code ("modifyThread")}
1797      */
1798     this(int parallelism) {
1799         this(parallelism, defaultForkJoinWorkerThreadFactory, null, false,
1800              0, MAX_CAP, 1, null, dur!(TimeUnit.Millisecond)(DEFAULT_KEEPALIVE));
1801     }
1802 
1803     /**
1804      * Creates a {@code ForkJoinPool} with the given parameters (using
1805      * defaults for others -- see {@link #ForkJoinPool(int,
1806      * ForkJoinWorkerThreadFactory, UncaughtExceptionHandler, bool,
1807      * int, int, int, Predicate, long, TimeUnit)}).
1808      *
1809      * @param parallelism the parallelism level. For default value,
1810      * use {@link java.lang.Runtime#availableProcessors}.
1811      * @param factory the factory for creating new threads. For default value,
1812      * use {@link #defaultForkJoinWorkerThreadFactory}.
1813      * @param handler the handler for internal worker threads that
1814      * terminate due to unrecoverable errors encountered while executing
1815      * tasks. For default value, use {@code null}.
1816      * @param asyncMode if true,
1817      * establishes local first-in-first-out scheduling mode for forked
1818      * tasks that are never joined. This mode may be more appropriate
1819      * than default locally stack-based mode in applications in which
1820      * worker threads only process event-style asynchronous tasks.
1821      * For default value, use {@code false}.
1822      * @throws IllegalArgumentException if parallelism less than or
1823      *         equal to zero, or greater than implementation limit
1824      * @throws NullPointerException if the factory is null
1825      * @throws SecurityException if a security manager exists and
1826      *         the caller is not permitted to modify threads
1827      *         because it does not hold {@link
1828      *         java.lang.RuntimePermission}{@code ("modifyThread")}
1829      */
1830     this(int parallelism,
1831                         ForkJoinWorkerThreadFactory factory,
1832                         UncaughtExceptionHandler handler,
1833                         bool asyncMode) {
1834         this(parallelism, factory, handler, asyncMode,
1835              0, MAX_CAP, 1, null, dur!(TimeUnit.Millisecond)(DEFAULT_KEEPALIVE));
1836     }
1837 
1838     /**
1839      * Creates a {@code ForkJoinPool} with the given parameters.
1840      *
1841      * @param parallelism the parallelism level. For default value,
1842      * use {@link java.lang.Runtime#availableProcessors}.
1843      *
1844      * @param factory the factory for creating new threads. For
1845      * default value, use {@link #defaultForkJoinWorkerThreadFactory}.
1846      *
1847      * @param handler the handler for internal worker threads that
1848      * terminate due to unrecoverable errors encountered while
1849      * executing tasks. For default value, use {@code null}.
1850      *
1851      * @param asyncMode if true, establishes local first-in-first-out
1852      * scheduling mode for forked tasks that are never joined. This
1853      * mode may be more appropriate than default locally stack-based
1854      * mode in applications in which worker threads only process
1855      * event-style asynchronous tasks.  For default value, use {@code
1856      * false}.
1857      *
1858      * @param corePoolSize the number of threads to keep in the pool
1859      * (unless timed out after an elapsed keep-alive). Normally (and
1860      * by default) this is the same value as the parallelism level,
1861      * but may be set to a larger value to reduce dynamic overhead if
1862      * tasks regularly block. Using a smaller value (for example
1863      * {@code 0}) has the same effect as the default.
1864      *
1865      * @param maximumPoolSize the maximum number of threads allowed.
1866      * When the maximum is reached, attempts to replace blocked
1867      * threads fail.  (However, because creation and termination of
1868      * different threads may overlap, and may be managed by the given
1869      * thread factory, this value may be transiently exceeded.)  To
1870      * arrange the same value as is used by default for the common
1871      * pool, use {@code 256} plus the {@code parallelism} level. (By
1872      * default, the common pool allows a maximum of 256 spare
1873      * threads.)  Using a value (for example {@code
1874      * Integer.MAX_VALUE}) larger than the implementation's total
1875      * thread limit has the same effect as using this limit (which is
1876      * the default).
1877      *
1878      * @param minimumRunnable the minimum allowed number of core
1879      * threads not blocked by a join or {@link ManagedBlocker}.  To
1880      * ensure progress, when too few unblocked threads exist and
1881      * unexecuted tasks may exist, new threads are constructed, up to
1882      * the given maximumPoolSize.  For the default value, use {@code
1883      * 1}, that ensures liveness.  A larger value might improve
1884      * throughput in the presence of blocked activities, but might
1885      * not, due to increased overhead.  A value of zero may be
1886      * acceptable when submitted tasks cannot have dependencies
1887      * requiring additional threads.
1888      *
1889      * @param saturate if non-null, a predicate invoked upon attempts
1890      * to create more than the maximum total allowed threads.  By
1891      * default, when a thread is about to block on a join or {@link
1892      * ManagedBlocker}, but cannot be replaced because the
1893      * maximumPoolSize would be exceeded, a {@link
1894      * RejectedExecutionException} is thrown.  But if this predicate
1895      * returns {@code true}, then no exception is thrown, so the pool
1896      * continues to operate with fewer than the target number of
1897      * runnable threads, which might not ensure progress.
1898      *
1899      * @param keepAliveTime the elapsed time since last use before
1900      * a thread is terminated (and then later replaced if needed).
1901      * For the default value, use {@code 60, TimeUnit.SECONDS}.
1902      *
1903      * @param unit the time unit for the {@code keepAliveTime} argument
1904      *
1905      * @throws IllegalArgumentException if parallelism is less than or
1906      *         equal to zero, or is greater than implementation limit,
1907      *         or if maximumPoolSize is less than parallelism,
1908      *         of if the keepAliveTime is less than or equal to zero.
1909      * @throws NullPointerException if the factory is null
1910      * @throws SecurityException if a security manager exists and
1911      *         the caller is not permitted to modify threads
1912      *         because it does not hold {@link
1913      *         java.lang.RuntimePermission}{@code ("modifyThread")}
1914      */
1915     this(int parallelism,
1916                         ForkJoinWorkerThreadFactory factory,
1917                         UncaughtExceptionHandler handler,
1918                         bool asyncMode,
1919                         int corePoolSize,
1920                         int maximumPoolSize,
1921                         int minimumRunnable,
1922                         Predicate!(ForkJoinPool) saturate,
1923                         Duration keepAliveTime) {
1924         // check, encode, pack parameters
1925         if (parallelism <= 0 || parallelism > MAX_CAP ||
1926             maximumPoolSize < parallelism || keepAliveTime <= Duration.zero)
1927             throw new IllegalArgumentException();
1928         if (factory is null)
1929             throw new NullPointerException();
1930         long ms = max(keepAliveTime.total!(TimeUnit.Millisecond), TIMEOUT_SLOP);
1931 
1932         int corep = min(max(corePoolSize, parallelism), MAX_CAP);
1933         long c = (((cast(long)(-corep)       << TC_SHIFT) & TC_MASK) |
1934                   ((cast(long)(-parallelism) << RC_SHIFT) & RC_MASK));
1935         int m = parallelism | (asyncMode ? FIFO : 0);
1936         int maxSpares = min(maximumPoolSize, MAX_CAP) - parallelism;
1937         int minAvail = min(max(minimumRunnable, 0), MAX_CAP);
1938         int b = ((minAvail - parallelism) & SMASK) | (maxSpares << SWIDTH);
1939         int n = (parallelism > 1) ? parallelism - 1 : 1; // at least 2 slots
1940         n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16;
1941         n = (n + 1) << 1; // power of two, including space for submission queues
1942 
1943         this.workerNamePrefix = "ForkJoinPool-" ~ nextPoolId().to!string() ~ "-worker-";
1944         this.workQueues = new WorkQueue[n];
1945         this.factory = factory;
1946         this.ueh = handler;
1947         this.saturate = saturate;
1948         this.keepAlive = ms.msecs;
1949         this.bounds = b;
1950         this.mode = m;
1951         this.ctl = c;
1952         // checkPermission();
1953         workerNameLocker = new Object(); 
1954     }
1955 
1956     private static Object newInstanceFrom(string className) {
1957         return (className.empty) ? null : Object.factory(className);
1958     }
1959 
1960     /**
1961      * Constructor for common pool using parameters possibly
1962      * overridden by system properties
1963      */
1964     private this(byte forCommonPoolOnly) {
1965         int parallelism = -1;
1966         ForkJoinWorkerThreadFactory fac = null;
1967         UncaughtExceptionHandler handler = null;
1968         try {  // ignore exceptions in accessing/parsing properties
1969             ConfigBuilder config = Environment.getProperties();
1970             string pp = config.getProperty("hunt.concurrency.ForkJoinPool.common.parallelism");
1971             if (!pp.empty) {
1972                 parallelism = pp.to!int();
1973             }
1974 
1975             string className = config.getProperty("hunt.concurrency.ForkJoinPool.common.threadFactory");
1976             fac = cast(ForkJoinWorkerThreadFactory) newInstanceFrom(className);
1977 
1978             className = config.getProperty("hunt.concurrency.ForkJoinPool.common.exceptionHandler");
1979             handler = cast(UncaughtExceptionHandler) newInstanceFrom(className);
1980         } catch (Exception ignore) {
1981             version(HUNT_DEBUG) {
1982                 warning(ignore);
1983             }
1984         }
1985 
1986         if (fac is null) {
1987             // if (System.getSecurityManager() is null)
1988                 fac = defaultForkJoinWorkerThreadFactory;
1989             // else // use security-managed default
1990             //     fac = new InnocuousForkJoinWorkerThreadFactory();
1991         }
1992         if (parallelism < 0 && // default 1 less than #cores
1993             (parallelism = totalCPUs - 1) <= 0)
1994             parallelism = 1;
1995         if (parallelism > MAX_CAP)
1996             parallelism = MAX_CAP;
1997 
1998         long c = (((cast(long)(-parallelism) << TC_SHIFT) & TC_MASK) |
1999                   ((cast(long)(-parallelism) << RC_SHIFT) & RC_MASK));
2000         int b = ((1 - parallelism) & SMASK) | (COMMON_MAX_SPARES << SWIDTH);
2001         int n = (parallelism > 1) ? parallelism - 1 : 1;
2002         n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16;
2003         n = (n + 1) << 1;
2004 
2005         this.workerNamePrefix = "ForkJoinPool.commonPool-worker-";
2006         this.workerNameLocker = new Object(); 
2007         this.workQueues = new WorkQueue[n];
2008         this.factory = fac;
2009         this.ueh = handler;
2010         this.saturate = null;
2011         this.keepAlive = DEFAULT_KEEPALIVE.msecs;
2012         this.bounds = b;
2013         this.mode = parallelism;
2014         this.ctl = c;
2015     }
2016 
2017     /**
2018      * Returns the common pool instance. This pool is statically
2019      * constructed; its run state is unaffected by attempts to {@link
2020      * #shutdown} or {@link #shutdownNow}. However this pool and any
2021      * ongoing processing are automatically terminated upon program
2022      * {@link System#exit}.  Any program that relies on asynchronous
2023      * task processing to complete before program termination should
2024      * invoke {@code commonPool().}{@link #awaitQuiescence awaitQuiescence},
2025      * before exit.
2026      *
2027      * @return the common pool instance
2028      */
2029     static ForkJoinPool commonPool() {
2030         return common;
2031     }
2032 
2033     // Execution methods
2034 
2035     /**
2036      * Performs the given task, returning its result upon completion.
2037      * If the computation encounters an unchecked Exception or Error,
2038      * it is rethrown as the outcome of this invocation.  Rethrown
2039      * exceptions behave in the same way as regular exceptions, but,
2040      * when possible, contain stack traces (as displayed for example
2041      * using {@code ex.printStackTrace()}) of both the current thread
2042      * as well as the thread actually encountering the exception;
2043      * minimally only the latter.
2044      *
2045      * @param task the task
2046      * @param (T) the type of the task's result
2047      * @return the task's result
2048      * @throws NullPointerException if the task is null
2049      * @throws RejectedExecutionException if the task cannot be
2050      *         scheduled for execution
2051      */
2052     T invoke(T)(ForkJoinTask!(T) task) {
2053         if (task is null)
2054             throw new NullPointerException();
2055         externalSubmit!(T)(task);
2056         version(HUNT_CONCURRENCY_DEBUG) {
2057             infof("waiting the result...");
2058             T c = task.join();
2059             infof("final result: %s", c);
2060             return c;
2061         } else {
2062             return task.join();
2063         }
2064     }
2065 
2066     /**
2067      * Arranges for (asynchronous) execution of the given task.
2068      *
2069      * @param task the task
2070      * @throws NullPointerException if the task is null
2071      * @throws RejectedExecutionException if the task cannot be
2072      *         scheduled for execution
2073      */
2074     void execute(IForkJoinTask task) {
2075         externalSubmit(task);
2076     }
2077 
2078     // AbstractExecutorService methods
2079 
2080     /**
2081      * @throws NullPointerException if the task is null
2082      * @throws RejectedExecutionException if the task cannot be
2083      *         scheduled for execution
2084      */
2085     void execute(Runnable task) {
2086         if (task is null)
2087             throw new NullPointerException();
2088         IForkJoinTask job = cast(IForkJoinTask) task;
2089         if (job is null) { // avoid re-wrap
2090             warning("job is null");
2091             job = new RunnableExecuteAction(task);
2092         }
2093         externalSubmit(job);
2094     }
2095 
2096     /**
2097      * Submits a ForkJoinTask for execution.
2098      *
2099      * @param task the task to submit
2100      * @param (T) the type of the task's result
2101      * @return the task
2102      * @throws NullPointerException if the task is null
2103      * @throws RejectedExecutionException if the task cannot be
2104      *         scheduled for execution
2105      */
2106     ForkJoinTask!(T) submitTask(T)(ForkJoinTask!(T) task) {
2107         return externalSubmit(task);
2108     }
2109 
2110     /**
2111      * @throws NullPointerException if the task is null
2112      * @throws RejectedExecutionException if the task cannot be
2113      *         scheduled for execution
2114      */
2115     ForkJoinTask!(T) submitTask(T)(Callable!(T) task) {
2116         return externalSubmit(new AdaptedCallable!(T)(task));
2117     }
2118 
2119     /**
2120      * @throws NullPointerException if the task is null
2121      * @throws RejectedExecutionException if the task cannot be
2122      *         scheduled for execution
2123      */
2124     ForkJoinTask!(T) submitTask(T)(Runnable task, T result) {
2125         return externalSubmit(new AdaptedRunnable!(T)(task, result));
2126     }
2127 
2128     /**
2129      * @throws NullPointerException if the task is null
2130      * @throws RejectedExecutionException if the task cannot be
2131      *         scheduled for execution
2132      */
2133 
2134     IForkJoinTask submitTask(Runnable task) {
2135         if (task is null)
2136             throw new NullPointerException();
2137         IForkJoinTask t = cast(IForkJoinTask)task;
2138         return externalSubmit(t !is null
2139             ? cast(ForkJoinTask!(void)) task // avoid re-wrap
2140             : new AdaptedRunnableAction(task));
2141     }
2142 
2143     /**
2144      * @throws NullPointerException       {@inheritDoc}
2145      * @throws RejectedExecutionException {@inheritDoc}
2146      */
2147     List!(Future!(T)) invokeAll(T)(Collection!(Callable!(T)) tasks) {
2148         // In previous versions of this class, this method constructed
2149         // a task to run ForkJoinTask.invokeAll, but now external
2150         // invocation of multiple tasks is at least as efficient.
2151         ArrayList!(Future!(T)) futures = new ArrayList!(Future!(T))(tasks.size());
2152 
2153         try {
2154             foreach (Callable!(T) t ; tasks) {
2155                 ForkJoinTask!(T) f = new AdaptedCallable!(T)(t);
2156                 futures.add(f);
2157                 externalSubmit(f);
2158             }
2159             for (int i = 0, size = futures.size(); i < size; i++)
2160                 (cast(IForkJoinTask)(futures.get(i))).quietlyJoin();
2161             return futures;
2162         } catch (Throwable t) {
2163             for (int i = 0, size = futures.size(); i < size; i++)
2164                 futures.get(i).cancel(false);
2165             throw t;
2166         }
2167     }
2168 
2169     /**
2170      * Returns the factory used for constructing new workers.
2171      *
2172      * @return the factory used for constructing new workers
2173      */
2174     ForkJoinWorkerThreadFactory getFactory() {
2175         return factory;
2176     }
2177 
2178     /**
2179      * Returns the handler for internal worker threads that terminate
2180      * due to unrecoverable errors encountered while executing tasks.
2181      *
2182      * @return the handler, or {@code null} if none
2183      */
2184     UncaughtExceptionHandler getUncaughtExceptionHandler() {
2185         return ueh;
2186     }
2187 
2188     /**
2189      * Returns the targeted parallelism level of this pool.
2190      *
2191      * @return the targeted parallelism level of this pool
2192      */
2193     int getParallelism() {
2194         int par = mode & SMASK;
2195         return (par > 0) ? par : 1;
2196     }
2197 
2198     /**
2199      * Returns the targeted parallelism level of the common pool.
2200      *
2201      * @return the targeted parallelism level of the common pool
2202      */
2203     static int getCommonPoolParallelism() {
2204         return COMMON_PARALLELISM;
2205     }
2206 
2207     /**
2208      * Returns the number of worker threads that have started but not
2209      * yet terminated.  The result returned by this method may differ
2210      * from {@link #getParallelism} when threads are created to
2211      * maintain parallelism when others are cooperatively blocked.
2212      *
2213      * @return the number of worker threads
2214      */
2215     int getPoolSize() {
2216         return ((mode & SMASK) + cast(short)(ctl >>> TC_SHIFT));
2217     }
2218 
2219     /**
2220      * Returns {@code true} if this pool uses local first-in-first-out
2221      * scheduling mode for forked tasks that are never joined.
2222      *
2223      * @return {@code true} if this pool uses async mode
2224      */
2225     bool getAsyncMode() {
2226         return (mode & FIFO) != 0;
2227     }
2228 
2229     /**
2230      * Returns an estimate of the number of worker threads that are
2231      * not blocked waiting to join tasks or for other managed
2232      * synchronization. This method may overestimate the
2233      * number of running threads.
2234      *
2235      * @return the number of worker threads
2236      */
2237     int getRunningThreadCount() {
2238         WorkQueue[] ws; WorkQueue w;
2239         // VarHandle.acquireFence();
2240         int rc = 0;
2241         if ((ws = workQueues) !is null) {
2242             for (int i = 1; i < cast(int)ws.length; i += 2) {
2243                 if ((w = ws[i]) !is null && w.isApparentlyUnblocked())
2244                     ++rc;
2245             }
2246         }
2247         return rc;
2248     }
2249 
2250     /**
2251      * Returns an estimate of the number of threads that are currently
2252      * stealing or executing tasks. This method may overestimate the
2253      * number of active threads.
2254      *
2255      * @return the number of active threads
2256      */
2257     int getActiveThreadCount() {
2258         int r = (mode & SMASK) + cast(int)(ctl >> RC_SHIFT);
2259         return (r <= 0) ? 0 : r; // suppress momentarily negative values
2260     }
2261 
2262     /**
2263      * Returns {@code true} if all worker threads are currently idle.
2264      * An idle worker is one that cannot obtain a task to execute
2265      * because none are available to steal from other threads, and
2266      * there are no pending submissions to the pool. This method is
2267      * conservative; it might not return {@code true} immediately upon
2268      * idleness of all threads, but will eventually become true if
2269      * threads remain inactive.
2270      *
2271      * @return {@code true} if all threads are currently idle
2272      */
2273     bool isQuiescent() {
2274         for (;;) {
2275             long c = ctl;
2276             int md = mode, pc = md & SMASK;
2277             int tc = pc + cast(short)(c >>> TC_SHIFT);
2278             int rc = pc + cast(int)(c >> RC_SHIFT);
2279             if ((md & (STOP | TERMINATED)) != 0)
2280                 return true;
2281             else if (rc > 0)
2282                 return false;
2283             else {
2284                 WorkQueue[] ws; WorkQueue v;
2285                 if ((ws = workQueues) !is null) {
2286                     for (int i = 1; i < ws.length; i += 2) {
2287                         if ((v = ws[i]) !is null) {
2288                             if (v.source > 0)
2289                                 return false;
2290                             --tc;
2291                         }
2292                     }
2293                 }
2294                 if (tc == 0 && ctl == c)
2295                     return true;
2296             }
2297         }
2298     }
2299 
2300     /**
2301      * Returns an estimate of the total number of tasks stolen from
2302      * one thread's work queue by another. The reported value
2303      * underestimates the actual total number of steals when the pool
2304      * is not quiescent. This value may be useful for monitoring and
2305      * tuning fork/join programs: in general, steal counts should be
2306      * high enough to keep threads busy, but low enough to avoid
2307      * overhead and contention across threads.
2308      *
2309      * @return the number of steals
2310      */
2311     long getStealCount() {
2312         long count = stealCount;
2313         WorkQueue[] ws; WorkQueue w;
2314         if ((ws = workQueues) !is null) {
2315             for (int i = 1; i < ws.length; i += 2) {
2316                 if ((w = ws[i]) !is null)
2317                     count += cast(long)w.nsteals & 0xffffffffL;
2318             }
2319         }
2320         return count;
2321     }
2322 
2323     /**
2324      * Returns an estimate of the total number of tasks currently held
2325      * in queues by worker threads (but not including tasks submitted
2326      * to the pool that have not begun executing). This value is only
2327      * an approximation, obtained by iterating across all threads in
2328      * the pool. This method may be useful for tuning task
2329      * granularities.
2330      *
2331      * @return the number of queued tasks
2332      */
2333     long getQueuedTaskCount() {
2334         WorkQueue[] ws; WorkQueue w;
2335         // VarHandle.acquireFence();
2336         int count = 0;
2337         if ((ws = workQueues) !is null) {
2338             for (int i = 1; i < cast(int)ws.length; i += 2) {
2339                 if ((w = ws[i]) !is null)
2340                     count += w.queueSize();
2341             }
2342         }
2343         return count;
2344     }
2345 
2346     /**
2347      * Returns an estimate of the number of tasks submitted to this
2348      * pool that have not yet begun executing.  This method may take
2349      * time proportional to the number of submissions.
2350      *
2351      * @return the number of queued submissions
2352      */
2353     int getQueuedSubmissionCount() {
2354         WorkQueue[] ws; WorkQueue w;
2355         // VarHandle.acquireFence();
2356         int count = 0;
2357         if ((ws = workQueues) !is null) {
2358             for (int i = 0; i < cast(int)ws.length; i += 2) {
2359                 if ((w = ws[i]) !is null)
2360                     count += w.queueSize();
2361             }
2362         }
2363         return count;
2364     }
2365 
2366     /**
2367      * Returns {@code true} if there are any tasks submitted to this
2368      * pool that have not yet begun executing.
2369      *
2370      * @return {@code true} if there are any queued submissions
2371      */
2372     bool hasQueuedSubmissions() {
2373         WorkQueue[] ws; WorkQueue w;
2374         // VarHandle.acquireFence();
2375         if ((ws = workQueues) !is null) {
2376             for (int i = 0; i < cast(int)ws.length; i += 2) {
2377                 if ((w = ws[i]) !is null && !w.isEmpty())
2378                     return true;
2379             }
2380         }
2381         return false;
2382     }
2383 
2384     /**
2385      * Removes and returns the next unexecuted submission if one is
2386      * available.  This method may be useful in extensions to this
2387      * class that re-assign work in systems with multiple pools.
2388      *
2389      * @return the next submission, or {@code null} if none
2390      */
2391     protected IForkJoinTask pollSubmission() {
2392         return pollScan(true);
2393     }
2394 
2395     /**
2396      * Removes all available unexecuted submitted and forked tasks
2397      * from scheduling queues and adds them to the given collection,
2398      * without altering their execution status. These may include
2399      * artificially generated or wrapped tasks. This method is
2400      * designed to be invoked only when the pool is known to be
2401      * quiescent. Invocations at other times may not remove all
2402      * tasks. A failure encountered while attempting to add elements
2403      * to collection {@code c} may result in elements being in
2404      * neither, either or both collections when the associated
2405      * exception is thrown.  The behavior of this operation is
2406      * undefined if the specified collection is modified while the
2407      * operation is in progress.
2408      *
2409      * @param c the collection to transfer elements into
2410      * @return the number of elements transferred
2411      */
2412     protected int drainTasksTo(Collection!IForkJoinTask c) {
2413         WorkQueue[] ws; WorkQueue w; IForkJoinTask t;
2414         // VarHandle.acquireFence();
2415         int count = 0;
2416         if ((ws = workQueues) !is null) {
2417             for (int i = 0; i < ws.length; ++i) {
2418                 if ((w = ws[i]) !is null) {
2419                     while ((t = w.poll()) !is null) {
2420                         c.add(t);
2421                         ++count;
2422                     }
2423                 }
2424             }
2425         }
2426         return count;
2427     }
2428 
2429     /**
2430      * Returns a string identifying this pool, as well as its state,
2431      * including indications of run state, parallelism level, and
2432      * worker and task counts.
2433      *
2434      * @return a string identifying this pool, as well as its state
2435      */
2436     override string toString() {
2437         // Use a single pass through workQueues to collect counts
2438         int md = mode; // read fields first
2439         long c = ctl;
2440         long st = stealCount;
2441         long qt = 0L, qs = 0L; int rc = 0;
2442         WorkQueue[] ws; WorkQueue w;
2443         if ((ws = workQueues) !is null) {
2444             for (int i = 0; i < ws.length; ++i) {
2445                 if ((w = ws[i]) !is null) {
2446                     int size = w.queueSize();
2447                     if ((i & 1) == 0)
2448                         qs += size;
2449                     else {
2450                         qt += size;
2451                         st += cast(long)w.nsteals & 0xffffffffL;
2452                         if (w.isApparentlyUnblocked())
2453                             ++rc;
2454                     }
2455                 }
2456             }
2457         }
2458 
2459         int pc = (md & SMASK);
2460         int tc = pc + cast(short)(c >>> TC_SHIFT);
2461         int ac = pc + cast(int)(c >> RC_SHIFT);
2462         if (ac < 0) // ignore negative
2463             ac = 0;
2464         string level = ((md & TERMINATED) != 0 ? "Terminated" :
2465                         (md & STOP)       != 0 ? "Terminating" :
2466                         (md & SHUTDOWN)   != 0 ? "Shutting down" :
2467                         "Running");
2468         return super.toString() ~
2469             "[" ~ level ~
2470             ", parallelism = " ~ pc.to!string() ~
2471             ", size = " ~ tc.to!string() ~
2472             ", active = " ~ ac.to!string() ~
2473             ", running = " ~ rc.to!string() ~
2474             ", steals = " ~ st.to!string() ~
2475             ", tasks = " ~ qt.to!string() ~
2476             ", submissions = " ~ qs.to!string() ~
2477             "]";
2478     }
2479 
2480     /**
2481      * Possibly initiates an orderly shutdown in which previously
2482      * submitted tasks are executed, but no new tasks will be
2483      * accepted. Invocation has no effect on execution state if this
2484      * is the {@link #commonPool()}, and no additional effect if
2485      * already shut down.  Tasks that are in the process of being
2486      * submitted concurrently during the course of this method may or
2487      * may not be rejected.
2488      *
2489      * @throws SecurityException if a security manager exists and
2490      *         the caller is not permitted to modify threads
2491      *         because it does not hold {@link
2492      *         java.lang.RuntimePermission}{@code ("modifyThread")}
2493      */
2494     void shutdown() {
2495         // checkPermission();
2496         tryTerminate(false, true);
2497     }
2498 
2499     /**
2500      * Possibly attempts to cancel and/or stop all tasks, and reject
2501      * all subsequently submitted tasks.  Invocation has no effect on
2502      * execution state if this is the {@link #commonPool()}, and no
2503      * additional effect if already shut down. Otherwise, tasks that
2504      * are in the process of being submitted or executed concurrently
2505      * during the course of this method may or may not be
2506      * rejected. This method cancels both existing and unexecuted
2507      * tasks, in order to permit termination in the presence of task
2508      * dependencies. So the method always returns an empty list
2509      * (unlike the case for some other Executors).
2510      *
2511      * @return an empty list
2512      * @throws SecurityException if a security manager exists and
2513      *         the caller is not permitted to modify threads
2514      *         because it does not hold {@link
2515      *         java.lang.RuntimePermission}{@code ("modifyThread")}
2516      */
2517     List!(Runnable) shutdownNow() {
2518         // checkPermission();
2519         tryTerminate(true, true);
2520         return Collections.emptyList!(Runnable)();
2521     }
2522 
2523     /**
2524      * Returns {@code true} if all tasks have completed following shut down.
2525      *
2526      * @return {@code true} if all tasks have completed following shut down
2527      */
2528     bool isTerminated() {
2529         return (mode & TERMINATED) != 0;
2530     }
2531 
2532     /**
2533      * Returns {@code true} if the process of termination has
2534      * commenced but not yet completed.  This method may be useful for
2535      * debugging. A return of {@code true} reported a sufficient
2536      * period after shutdown may indicate that submitted tasks have
2537      * ignored or suppressed interruption, or are waiting for I/O,
2538      * causing this executor not to properly terminate. (See the
2539      * advisory notes for class {@link ForkJoinTask} stating that
2540      * tasks should not normally entail blocking operations.  But if
2541      * they do, they must abort them on interrupt.)
2542      *
2543      * @return {@code true} if terminating but not yet terminated
2544      */
2545     bool isTerminating() {
2546         int md = mode;
2547         return (md & STOP) != 0 && (md & TERMINATED) == 0;
2548     }
2549 
2550     /**
2551      * Returns {@code true} if this pool has been shut down.
2552      *
2553      * @return {@code true} if this pool has been shut down
2554      */
2555     bool isShutdown() {
2556         return (mode & SHUTDOWN) != 0;
2557     }
2558 
2559     /**
2560      * Blocks until all tasks have completed execution after a
2561      * shutdown request, or the timeout occurs, or the current thread
2562      * is interrupted, whichever happens first. Because the {@link
2563      * #commonPool()} never terminates until program shutdown, when
2564      * applied to the common pool, this method is equivalent to {@link
2565      * #awaitQuiescence(long, TimeUnit)} but always returns {@code false}.
2566      *
2567      * @param timeout the maximum time to wait
2568      * @param unit the time unit of the timeout argument
2569      * @return {@code true} if this executor terminated and
2570      *         {@code false} if the timeout elapsed before termination
2571      * @throws InterruptedException if interrupted while waiting
2572      */
2573     bool awaitTermination(Duration timeout)
2574         {
2575         if (ThreadEx.interrupted())
2576             throw new InterruptedException();
2577         if (this == common) {
2578             awaitQuiescence(timeout);
2579             return false;
2580         }
2581         long nanos = timeout.total!(TimeUnit.HectoNanosecond);
2582         if (isTerminated())
2583             return true;
2584         if (nanos <= 0L)
2585             return false;
2586         long deadline = Clock.currStdTime + nanos;
2587         synchronized (this) {
2588             for (;;) {
2589                 if (isTerminated())
2590                     return true;
2591                 if (nanos <= 0L)
2592                     return false;
2593                 // long millis = TimeUnit.NANOSECONDS.toMillis(nanos);
2594                 // wait(millis > 0L ? millis : 1L);
2595                 // ThreadEx.currentThread().par
2596                 ThreadEx.sleep(dur!(TimeUnit.HectoNanosecond)(nanos));
2597                 nanos = deadline - Clock.currStdTime;
2598             }
2599         }
2600     }
2601 
2602     /**
2603      * If called by a ForkJoinTask operating in this pool, equivalent
2604      * in effect to {@link ForkJoinTask#helpQuiesce}. Otherwise,
2605      * waits and/or attempts to assist performing tasks until this
2606      * pool {@link #isQuiescent} or the indicated timeout elapses.
2607      *
2608      * @param timeout the maximum time to wait
2609      * @param unit the time unit of the timeout argument
2610      * @return {@code true} if quiescent; {@code false} if the
2611      * timeout elapsed.
2612      */
2613     bool awaitQuiescence(Duration timeout) {
2614         long nanos = timeout.total!(TimeUnit.HectoNanosecond)();
2615         Thread thread = Thread.getThis();
2616         ForkJoinWorkerThread wt = cast(ForkJoinWorkerThread)thread;
2617         if (wt !is null && wt.pool is this) {
2618             helpQuiescePool(wt.workQueue);
2619             return true;
2620         }
2621         else {
2622             for (long startTime = Clock.currStdTime;;) {
2623                 IForkJoinTask t;
2624                 if ((t = pollScan(false)) !is null)
2625                     t.doExec();
2626                 else if (isQuiescent())
2627                     return true;
2628                 else if ((Clock.currStdTime - startTime) > nanos)
2629                     return false;
2630                 else
2631                     Thread.yield(); // cannot block
2632             }
2633         }
2634     }
2635 
2636     /**
2637      * Waits and/or attempts to assist performing tasks indefinitely
2638      * until the {@link #commonPool()} {@link #isQuiescent}.
2639      */
2640     static void quiesceCommonPool() {
2641         common.awaitQuiescence(Duration.max);
2642     }
2643 
2644     /**
2645      * Runs the given possibly blocking task.  When {@linkplain
2646      * ForkJoinTask#inForkJoinPool() running in a ForkJoinPool}, this
2647      * method possibly arranges for a spare thread to be activated if
2648      * necessary to ensure sufficient parallelism while the current
2649      * thread is blocked in {@link ManagedBlocker#block blocker.block()}.
2650      *
2651      * <p>This method repeatedly calls {@code blocker.isReleasable()} and
2652      * {@code blocker.block()} until either method returns {@code true}.
2653      * Every call to {@code blocker.block()} is preceded by a call to
2654      * {@code blocker.isReleasable()} that returned {@code false}.
2655      *
2656      * <p>If not running in a ForkJoinPool, this method is
2657      * behaviorally equivalent to
2658      * <pre> {@code
2659      * while (!blocker.isReleasable())
2660      *   if (blocker.block())
2661      *     break;}</pre>
2662      *
2663      * If running in a ForkJoinPool, the pool may first be expanded to
2664      * ensure sufficient parallelism available during the call to
2665      * {@code blocker.block()}.
2666      *
2667      * @param blocker the blocker task
2668      * @throws InterruptedException if {@code blocker.block()} did so
2669      */
2670     static void managedBlock(ManagedBlocker blocker) {
2671         if (blocker is null) throw new NullPointerException();
2672         ForkJoinPool p;
2673         WorkQueue w;
2674         Thread t = Thread.getThis();
2675         ForkJoinWorkerThread wt = cast(ForkJoinWorkerThread)t;
2676         if (wt !is null && (p = wt.pool) !is null &&
2677             (w = wt.workQueue) !is null) {
2678             int block;
2679             while (!blocker.isReleasable()) {
2680                 if ((block = p.tryCompensate(w)) != 0) {
2681                     try {
2682                         do {} while (!blocker.isReleasable() &&
2683                                      !blocker.block());
2684                     } finally {
2685                         AtomicHelper.getAndAdd(p.ctl, (block > 0) ? RC_UNIT : 0L);
2686                     }
2687                     break;
2688                 }
2689             }
2690         }
2691         else {
2692             do {} while (!blocker.isReleasable() &&
2693                          !blocker.block());
2694         }
2695     }
2696 
2697     /**
2698      * If the given executor is a ForkJoinPool, poll and execute
2699      * AsynchronousCompletionTasks from worker's queue until none are
2700      * available or blocker is released.
2701      */
2702     static void helpAsyncBlocker(Executor e, ManagedBlocker blocker) {
2703         ForkJoinPool p = cast(ForkJoinPool)e;
2704         if (p !is null) {
2705             WorkQueue w; WorkQueue[] ws; int r, n;
2706             
2707             Thread thread = Thread.getThis();
2708             ForkJoinWorkerThread wt = cast(ForkJoinWorkerThread)thread; 
2709             if (wt !is null && wt.pool is p)
2710                 w = wt.workQueue;
2711             else if ((r = ThreadLocalRandom.getProbe()) != 0 &&
2712                      (ws = p.workQueues) !is null && (n = cast(int)ws.length) > 0)
2713                 w = ws[(n - 1) & r & SQMASK];
2714             else
2715                 w = null;
2716             if (w !is null)
2717                 w.helpAsyncBlocker(blocker);
2718         }
2719     }
2720 
2721     // AbstractExecutorService overrides.  These rely on undocumented
2722     // fact that ForkJoinTask.adapt returns ForkJoinTasks that also
2723     // implement RunnableFuture.
2724 
2725     protected RunnableFuture!(T) newTaskFor(T)(Runnable runnable, T value) {
2726         return new AdaptedRunnable!(T)(runnable, value);
2727     }
2728 
2729     protected RunnableFuture!(T) newTaskFor(T)(Callable!(T) callable) {
2730         return new AdaptedCallable!(T)(callable);
2731     }
2732 
2733     shared static this() {
2734         int commonMaxSpares = DEFAULT_COMMON_MAX_SPARES;
2735         try {
2736             ConfigBuilder config = Environment.getProperties();
2737             string p = config.getProperty
2738                 ("hunt.concurrency.ForkJoinPool.common.maximumSpares");
2739             if (!p.empty())
2740                 commonMaxSpares = p.to!int();
2741         } catch (Exception ignore) {
2742             version(HUNT_DEBUG) warning(ignore.toString());
2743         }
2744         COMMON_MAX_SPARES = commonMaxSpares;
2745 
2746         defaultForkJoinWorkerThreadFactory =
2747             new DefaultForkJoinWorkerThreadFactory();
2748         common = new ForkJoinPool(cast(byte)0);
2749         COMMON_PARALLELISM = max(common.mode & SMASK, 1);
2750     }
2751 }
2752 
2753 
2754 /**
2755  * Factory for innocuous worker threads.
2756  */
2757 private final class InnocuousForkJoinWorkerThreadFactory
2758     : ForkJoinWorkerThreadFactory {
2759 
2760     /**
2761      * An ACC to restrict permissions for the factory itself.
2762      * The constructed workers have no permissions set.
2763      */
2764     // private static final AccessControlContext ACC = contextWithPermissions(
2765     //     modifyThreadPermission,
2766     //     new RuntimePermission("enableContextClassLoaderOverride"),
2767     //     new RuntimePermission("modifyThreadGroup"),
2768     //     new RuntimePermission("getClassLoader"),
2769     //     new RuntimePermission("setContextClassLoader"));
2770 
2771     final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
2772         return new InnocuousForkJoinWorkerThread(pool);
2773         // return AccessController.doPrivileged(
2774         //     new PrivilegedAction<>() {
2775         //         ForkJoinWorkerThread run() {
2776         //             return new ForkJoinWorkerThread.
2777         //                 InnocuousForkJoinWorkerThread(pool); }},
2778         //     ACC);
2779     }
2780 }
2781 
2782 
2783 /**
2784  * Factory for creating new {@link ForkJoinWorkerThread}s.
2785  * A {@code ForkJoinWorkerThreadFactory} must be defined and used
2786  * for {@code ForkJoinWorkerThread} subclasses that extend base
2787  * functionality or initialize threads with different contexts.
2788  */
2789 interface ForkJoinWorkerThreadFactory {
2790     /**
2791      * Returns a new worker thread operating in the given pool.
2792      * Returning null or throwing an exception may result in tasks
2793      * never being executed.  If this method throws an exception,
2794      * it is relayed to the caller of the method (for example
2795      * {@code execute}) causing attempted thread creation. If this
2796      * method returns null or throws an exception, it is not
2797      * retried until the next attempted creation (for example
2798      * another call to {@code execute}).
2799      *
2800      * @param pool the pool this thread works in
2801      * @return the new worker thread, or {@code null} if the request
2802      *         to create a thread is rejected
2803      * @throws NullPointerException if the pool is null
2804      */
2805     ForkJoinWorkerThread newThread(ForkJoinPool pool);
2806 }    
2807 
2808 // Nested classes
2809 
2810 // static AccessControlContext contextWithPermissions(Permission ... perms) {
2811 //     Permissions permissions = new Permissions();
2812 //     for (Permission perm : perms)
2813 //         permissions.add(perm);
2814 //     return new AccessControlContext(
2815 //         new ProtectionDomain[] { new ProtectionDomain(null, permissions) });
2816 // }
2817 
2818 /**
2819  * Default ForkJoinWorkerThreadFactory implementation; creates a
2820  * new ForkJoinWorkerThread using the system class loader as the
2821  * thread context class loader.
2822  */
2823 private final class DefaultForkJoinWorkerThreadFactory : ForkJoinWorkerThreadFactory {
2824     // private static final AccessControlContext ACC = contextWithPermissions(
2825     //     new RuntimePermission("getClassLoader"),
2826     //     new RuntimePermission("setContextClassLoader"));
2827 
2828     final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
2829         return new ForkJoinWorkerThread(pool);
2830         // return AccessController.doPrivileged(
2831         //     new PrivilegedAction<>() {
2832         //         ForkJoinWorkerThread run() {
2833         //             return new ForkJoinWorkerThread(
2834         //                 pool, ClassLoader.getSystemClassLoader()); }},
2835         //     ACC);
2836     }
2837 }
2838 
2839 
2840 /**
2841  * Interface for extending managed parallelism for tasks running
2842  * in {@link ForkJoinPool}s.
2843  *
2844  * <p>A {@code ManagedBlocker} provides two methods.  Method
2845  * {@link #isReleasable} must return {@code true} if blocking is
2846  * not necessary. Method {@link #block} blocks the current thread
2847  * if necessary (perhaps internally invoking {@code isReleasable}
2848  * before actually blocking). These actions are performed by any
2849  * thread invoking {@link ForkJoinPool#managedBlock(ManagedBlocker)}.
2850  * The unusual methods in this API accommodate synchronizers that
2851  * may, but don't usually, block for long periods. Similarly, they
2852  * allow more efficient internal handling of cases in which
2853  * additional workers may be, but usually are not, needed to
2854  * ensure sufficient parallelism.  Toward this end,
2855  * implementations of method {@code isReleasable} must be amenable
2856  * to repeated invocation.
2857  *
2858  * <p>For example, here is a ManagedBlocker based on a
2859  * ReentrantLock:
2860  * <pre> {@code
2861  * class ManagedLocker implements ManagedBlocker {
2862  *   final ReentrantLock lock;
2863  *   bool hasLock = false;
2864  *   ManagedLocker(ReentrantLock lock) { this.lock = lock; }
2865  *   bool block() {
2866  *     if (!hasLock)
2867  *       lock.lock();
2868  *     return true;
2869  *   }
2870  *   bool isReleasable() {
2871  *     return hasLock || (hasLock = lock.tryLock());
2872  *   }
2873  * }}</pre>
2874  *
2875  * <p>Here is a class that possibly blocks waiting for an
2876  * item on a given queue:
2877  * <pre> {@code
2878  * class QueueTaker!(E) : ManagedBlocker {
2879  *   final BlockingQueue!(E) queue;
2880  *   E item = null;
2881  *   QueueTaker(BlockingQueue!(E) q) { this.queue = q; }
2882  *   bool block() {
2883  *     if (item is null)
2884  *       item = queue.take();
2885  *     return true;
2886  *   }
2887  *   bool isReleasable() {
2888  *     return item !is null || (item = queue.poll()) !is null;
2889  *   }
2890  *   E getItem() { // call after pool.managedBlock completes
2891  *     return item;
2892  *   }
2893  * }}</pre>
2894  */
2895 static interface ManagedBlocker {
2896     /**
2897      * Possibly blocks the current thread, for example waiting for
2898      * a lock or condition.
2899      *
2900      * @return {@code true} if no additional blocking is necessary
2901      * (i.e., if isReleasable would return true)
2902      * @throws InterruptedException if interrupted while waiting
2903      * (the method is not required to do so, but is allowed to)
2904      */
2905     bool block();
2906 
2907     /**
2908      * Returns {@code true} if blocking is unnecessary.
2909      * @return {@code true} if blocking is unnecessary
2910      */
2911     bool isReleasable();
2912 }
2913 
2914 
2915 
2916 /**
2917  * A thread managed by a {@link ForkJoinPool}, which executes
2918  * {@link ForkJoinTask}s.
2919  * This class is subclassable solely for the sake of adding
2920  * functionality -- there are no overridable methods dealing with
2921  * scheduling or execution.  However, you can override initialization
2922  * and termination methods surrounding the main task processing loop.
2923  * If you do create such a subclass, you will also need to supply a
2924  * custom {@link ForkJoinPool.ForkJoinWorkerThreadFactory} to
2925  * {@linkplain ForkJoinPool#ForkJoinPool(int, ForkJoinWorkerThreadFactory,
2926  * UncaughtExceptionHandler, bool, int, int, int, Predicate, long, TimeUnit)
2927  * use it} in a {@code ForkJoinPool}.
2928  *
2929  * @author Doug Lea
2930  */
2931 class ForkJoinWorkerThread : ThreadEx {
2932     /*
2933      * ForkJoinWorkerThreads are managed by ForkJoinPools and perform
2934      * ForkJoinTasks. For explanation, see the internal documentation
2935      * of class ForkJoinPool.
2936      *
2937      * This class just maintains links to its pool and WorkQueue.  The
2938      * pool field is set immediately upon construction, but the
2939      * workQueue field is not set until a call to registerWorker
2940      * completes. This leads to a visibility race, that is tolerated
2941      * by requiring that the workQueue field is only accessed by the
2942      * owning thread.
2943      *
2944      * Support for (non-public) subclass InnocuousForkJoinWorkerThread
2945      * requires that we break quite a lot of encapsulation (via helper
2946      * methods in ThreadLocalRandom) both here and in the subclass to
2947      * access and set Thread fields.
2948      */
2949 
2950     ForkJoinPool pool;                // the pool this thread works in
2951     WorkQueue workQueue;              // work-stealing mechanics
2952 
2953     /**
2954      * Creates a ForkJoinWorkerThread operating in the given pool.
2955      *
2956      * @param pool the pool this thread works in
2957      * @throws NullPointerException if pool is null
2958      */
2959     protected this(ForkJoinPool pool) {
2960         // Use a placeholder until a useful name can be set in registerWorker
2961         super("aForkJoinWorkerThread");
2962         this.pool = pool;
2963         this.workQueue = pool.registerWorker(this);
2964     }
2965 
2966     /**
2967      * Version for use by the default pool.  Supports setting the
2968      * context class loader.  This is a separate constructor to avoid
2969      * affecting the protected constructor.
2970      */
2971     // this(ForkJoinPool pool, ClassLoader ccl) {
2972     //     super("aForkJoinWorkerThread");
2973     //     super.setContextClassLoader(ccl);
2974     //     this.pool = pool;
2975     //     this.workQueue = pool.registerWorker(this);
2976     // }
2977 
2978     /**
2979      * Version for InnocuousForkJoinWorkerThread.
2980      */
2981     this(ForkJoinPool pool,
2982                         //  ClassLoader ccl,
2983                          ThreadGroupEx threadGroup,
2984                         ) { //  AccessControlContext acc
2985         super(threadGroup, null, "aForkJoinWorkerThread");
2986         // super.setContextClassLoader(ccl);
2987         // ThreadLocalRandom.setInheritedAccessControlContext(this, acc);
2988         // ThreadLocalRandom.eraseThreadLocals(this); // clear before registering
2989         this.pool = pool;
2990         this.workQueue = pool.registerWorker(this);
2991     }
2992 
2993     /**
2994      * Returns the pool hosting this thread.
2995      *
2996      * @return the pool
2997      */
2998     ForkJoinPool getPool() {
2999         return pool;
3000     }
3001 
3002     /**
3003      * Returns the unique index number of this thread in its pool.
3004      * The returned value ranges from zero to the maximum number of
3005      * threads (minus one) that may exist in the pool, and does not
3006      * change during the lifetime of the thread.  This method may be
3007      * useful for applications that track status or collect results
3008      * per-worker-thread rather than per-task.
3009      *
3010      * @return the index number
3011      */
3012     int getPoolIndex() {
3013         return workQueue.getPoolIndex();
3014     }
3015 
3016     /**
3017      * Initializes internal state after construction but before
3018      * processing any tasks. If you override this method, you must
3019      * invoke {@code super.onStart()} at the beginning of the method.
3020      * Initialization requires care: Most fields must have legal
3021      * default values, to ensure that attempted accesses from other
3022      * threads work correctly even before this thread starts
3023      * processing tasks.
3024      */
3025     protected void onStart() {
3026     }
3027 
3028     /**
3029      * Performs cleanup associated with termination of this worker
3030      * thread.  If you override this method, you must invoke
3031      * {@code super.onTermination} at the end of the overridden method.
3032      *
3033      * @param exception the exception causing this thread to abort due
3034      * to an unrecoverable error, or {@code null} if completed normally
3035      */
3036     protected void onTermination(Throwable exception) {
3037     }
3038 
3039     /**
3040      * This method is required to be public, but should never be
3041      * called explicitly. It performs the main run loop to execute
3042      * {@link ForkJoinTask}s.
3043      */
3044     override void run() {
3045         if (workQueue.array !is null)
3046             return;
3047         // only run once
3048         Throwable exception = null;
3049 
3050         try {
3051             onStart();
3052             pool.runWorker(workQueue);
3053         } catch (Throwable ex) {
3054             version(HUNT_DEBUG) {
3055                 warning(ex);
3056             } else {
3057                 warning(ex.msg);
3058             }
3059             exception = ex;
3060         }
3061 
3062         try {
3063             onTermination(exception);
3064         } catch(Throwable ex) {
3065             version(HUNT_DEBUG) {
3066                 warning(ex);
3067             } else {
3068                 warning(ex.msg);
3069             }
3070             if (exception is null)
3071                 exception = ex;
3072         }
3073         pool.deregisterWorker(this, exception);  
3074     }
3075 
3076     /**
3077      * Non-hook method for InnocuousForkJoinWorkerThread.
3078      */
3079     void afterTopLevelExec() {
3080     }
3081 
3082 
3083     int awaitJoin(IForkJoinTask task) {
3084         int s = task.getStatus(); 
3085         WorkQueue w = workQueue;
3086             if(w.tryUnpush(task) && (s = task.doExec()) < 0 )
3087                 return s;
3088             else
3089                 return pool.awaitJoin(w, task, MonoTime.zero());
3090     }
3091 
3092     /**
3093      * If the current thread is operating in a ForkJoinPool,
3094      * unschedules and returns, without executing, a task externally
3095      * submitted to the pool, if one is available. Availability may be
3096      * transient, so a {@code null} result does not necessarily imply
3097      * quiescence of the pool.  This method is designed primarily to
3098      * support extensions, and is unlikely to be useful otherwise.
3099      *
3100      * @return a task, or {@code null} if none are available
3101      */
3102     protected static IForkJoinTask pollSubmission() {
3103         ForkJoinWorkerThread t = cast(ForkJoinWorkerThread)Thread.getThis();
3104         return t !is null ? t.pool.pollSubmission() : null;
3105     }
3106 }
3107 
3108 
3109 /**
3110  * A worker thread that has no permissions, is not a member of any
3111  * user-defined ThreadGroupEx, uses the system class loader as
3112  * thread context class loader, and erases all ThreadLocals after
3113  * running each top-level task.
3114  */
3115 final class InnocuousForkJoinWorkerThread : ForkJoinWorkerThread {
3116     /** The ThreadGroupEx for all InnocuousForkJoinWorkerThreads */
3117     private __gshared ThreadGroupEx innocuousThreadGroup;
3118         // AccessController.doPrivileged(new PrivilegedAction<>() {
3119         //     ThreadGroupEx run() {
3120         //         ThreadGroupEx group = Thread.getThis().getThreadGroup();
3121         //         for (ThreadGroupEx p; (p = group.getParent()) !is null; )
3122         //             group = p;
3123         //         return new ThreadGroupEx(
3124         //             group, "InnocuousForkJoinWorkerThreadGroup");
3125         //     }});
3126 
3127     /** An AccessControlContext supporting no privileges */
3128     // private static final AccessControlContext INNOCUOUS_ACC =
3129     //     new AccessControlContext(
3130     //         new ProtectionDomain[] { new ProtectionDomain(null, null) });
3131 
3132     shared static this() {
3133         // ThreadGroupEx group = Thread.getThis().getThreadGroup();
3134         // for (ThreadGroupEx p; (p = group.getParent()) !is null; )
3135         //     group = p;
3136         innocuousThreadGroup = new ThreadGroupEx(
3137                     null, "InnocuousForkJoinWorkerThreadGroup");
3138     }
3139 
3140     this(ForkJoinPool pool) {
3141         super(pool,
3142             //   ClassLoader.getSystemClassLoader(),
3143               innocuousThreadGroup,
3144             //   INNOCUOUS_ACC
3145               );
3146     }
3147 
3148     override // to erase ThreadLocals
3149     void afterTopLevelExec() {
3150         // ThreadLocalRandom.eraseThreadLocals(this);
3151     }
3152 
3153     override // to silently fail
3154     void setUncaughtExceptionHandler(UncaughtExceptionHandler x) { }
3155 
3156     // override // paranoically
3157     // void setContextClassLoader(ClassLoader cl) {
3158     //     throw new SecurityException("setContextClassLoader");
3159     // }
3160 }
3161 
3162 
3163 /**
3164  * Queues supporting work-stealing as well as external task
3165  * submission. See above for descriptions and algorithms.
3166  */
3167 final class WorkQueue {
3168     int source;       // source queue id, or sentinel
3169     int id;                    // pool index, mode, tag
3170     shared int base;                  // index of next slot for poll
3171     shared int top;                   // index of next slot for push
3172     shared int phase;        // versioned, negative: queued, 1: locked
3173     int stackPred;             // pool stack (ctl) predecessor link
3174     int nsteals;               // number of steals
3175     IForkJoinTask[] array;   // the queued tasks; power of 2 size
3176     ForkJoinPool pool;   // the containing pool (may be null)
3177     ForkJoinWorkerThread owner; // owning thread or null if shared
3178 
3179     this(ForkJoinPool pool, ForkJoinWorkerThread owner) {
3180         this.pool = pool;
3181         this.owner = owner;
3182         // Place indices in the center of array (that is not yet allocated)
3183         base = top = INITIAL_QUEUE_CAPACITY >>> 1;
3184     }
3185 
3186     /**
3187      * Tries to lock shared queue by CASing phase field.
3188      */
3189     final bool tryLockPhase() {
3190         return cas(&this.phase, 0, 1);
3191         // return PHASE.compareAndSet(this, 0, 1);
3192     }
3193 
3194     final void releasePhaseLock() {
3195         // PHASE.setRelease(this, 0);
3196         atomicStore(this.phase, 0);
3197     }
3198 
3199     /**
3200      * Returns an exportable index (used by ForkJoinWorkerThread).
3201      */
3202     final int getPoolIndex() {
3203         return (id & 0xffff) >>> 1; // ignore odd/even tag bit
3204     }
3205 
3206     /**
3207      * Returns the approximate number of tasks in the queue.
3208      */
3209     final int queueSize() {
3210         // int n = cast(int)BASE.getAcquire(this) - top;
3211         int n = atomicLoad(this.base) - top;
3212         return (n >= 0) ? 0 : -n; // ignore negative
3213     }
3214 
3215     /**
3216      * Provides a more accurate estimate of whether this queue has
3217      * any tasks than does queueSize, by checking whether a
3218      * near-empty queue has at least one unclaimed task.
3219      */
3220     final bool isEmpty() {
3221         IForkJoinTask[] a; int n, cap, b;
3222         // VarHandle.acquireFence(); // needed by external callers
3223         return ((n = (b = base) - top) >= 0 || // possibly one task
3224                 (n == -1 && ((a = array) is null ||
3225                              (cap = cast(int)a.length) == 0 ||
3226                              a[(cap - 1) & b] is null)));
3227     }
3228 
3229     /**
3230      * Pushes a task. Call only by owner in unshared queues.
3231      *
3232      * @param task the task. Caller must ensure non-null.
3233      * @throws RejectedExecutionException if array cannot be resized
3234      */
3235     final void push(IForkJoinTask task) {
3236         IForkJoinTask[] a;
3237         int s = top, d, cap, m;
3238         ForkJoinPool p = pool;
3239         if ((a = array) !is null && (cap = cast(int)a.length) > 0) {
3240             m = cap - 1;
3241             // FIXME: Needing refactor or cleanup -@zxp at 2019/2/9 8:40:55
3242             // 
3243             // AtomicHelper.store(a[m & s], task);
3244             a[m & s] = task;
3245             // QA.setRelease(a, (m = cap - 1) & s, task);
3246             top = s + 1;
3247             if (((d = s - atomicLoad(this.base)) & ~1) == 0 &&
3248                 p !is null) {                 // size 0 or 1
3249                 // VarHandle.fullFence();
3250                 p.signalWork();
3251             }
3252             else if (d == m)
3253                 growArray(false);
3254         }
3255     }
3256 
3257     /**
3258      * Version of push for shared queues. Call only with phase lock held.
3259      * @return true if should signal work
3260      */
3261     final bool lockedPush(IForkJoinTask task) {
3262         IForkJoinTask[] a;
3263         bool signal = false;
3264         int s = top, b = base, cap, d;
3265         if ((a = array) !is null && (cap = cast(int)a.length) > 0) {
3266             a[(cap - 1) & s] = task;
3267             top = s + 1;
3268             if (b - s + cap - 1 == 0)
3269                 growArray(true);
3270             else {
3271                 phase = 0; // full unlock
3272                 if (((s - base) & ~1) == 0) // size 0 or 1
3273                     signal = true;
3274             }
3275         }
3276         return signal;
3277     }
3278 
3279     /**
3280      * Doubles the capacity of array. Call either by owner or with
3281      * lock held -- it is OK for base, but not top, to move while
3282      * resizings are in progress.
3283      */
3284     final void growArray(bool locked) {
3285         IForkJoinTask[] newA = null;
3286         try {
3287             IForkJoinTask[] oldA; int oldSize, newSize;
3288             if ((oldA = array) !is null && (oldSize = cast(int)oldA.length) > 0 &&
3289                 (newSize = oldSize << 1) <= MAXIMUM_QUEUE_CAPACITY &&
3290                 newSize > 0) {
3291                 try {
3292                     newA = new IForkJoinTask[newSize];
3293                 } catch (OutOfMemoryError ex) {
3294                 }
3295                 if (newA !is null) { // poll from old array, push to new
3296                     int oldMask = oldSize - 1, newMask = newSize - 1;
3297                     for (int s = top - 1, k = oldMask; k >= 0; --k) {
3298                         // IForkJoinTask x = AtomicHelper.getAndSet(oldA[s & oldMask], null);
3299                         // FIXME: Needing refactor or cleanup -@zxp at 2019/2/9 下午8:57:26
3300                         // 
3301                         IForkJoinTask x = oldA[s & oldMask];
3302                         oldA[s & oldMask] = null;
3303 
3304                         if (x !is null)
3305                             newA[s-- & newMask] = x;
3306                         else
3307                             break;
3308                     }
3309                     array = newA;
3310                     // VarHandle.releaseFence();
3311                 }
3312             }
3313         } finally {
3314             if (locked)
3315                 phase = 0;
3316         }
3317         if (newA is null)
3318             throw new RejectedExecutionException("Queue capacity exceeded");
3319     }
3320 
3321     /**
3322      * Takes next task, if one exists, in FIFO order.
3323      */
3324     final IForkJoinTask poll() {
3325         int b, k, cap; 
3326         IForkJoinTask[] a;
3327         while ((a = array) !is null && (cap = cast(int)a.length) > 0 &&
3328                top - (b = base) > 0) {
3329             k = (cap - 1) & b;
3330             // FIXME: Needing refactor or cleanup -@zxp at 2019/2/9 8:42:05
3331             // 
3332             
3333             // IForkJoinTask t = AtomicHelper.load(a[k]);
3334             IForkJoinTask t = a[k];
3335             if (base == b++) {
3336                 if (t is null)
3337                     Thread.yield(); // await index advance
3338                 else if (AtomicHelper.compareAndSet(a[k], t, null)) {
3339                 // else if (QA.compareAndSet(a, k, t, null)) {
3340                     AtomicHelper.store(this.base, b);
3341                     return t;
3342                 }
3343             }
3344         }
3345         return null;
3346     }
3347 
3348     /**
3349      * Takes next task, if one exists, in order specified by mode.
3350      */
3351     final IForkJoinTask nextLocalTask() {
3352         IForkJoinTask t = null;
3353         int md = id, b, s, d, cap; IForkJoinTask[] a;
3354         if ((a = array) !is null && (cap = cast(int)a.length) > 0 &&
3355             (d = (s = top) - (b = base)) > 0) {
3356             if ((md & FIFO) == 0 || d == 1) {
3357                 auto index = (cap - 1) & --s;
3358                 t = AtomicHelper.getAndSet(a[index], cast(IForkJoinTask)null);
3359                 if(t !is null) {
3360                     AtomicHelper.store(this.top, s);
3361                 }
3362             } else {
3363                 auto index = (cap - 1) & b++;
3364                 t = AtomicHelper.getAndSet(a[index], cast(IForkJoinTask)null);
3365                 if (t !is null) {
3366                     AtomicHelper.store(this.base, b);
3367                 }
3368                 else // on contention in FIFO mode, use regular poll
3369                     t = poll();
3370             } 
3371         }
3372         return t;
3373     }
3374 
3375     /**
3376      * Returns next task, if one exists, in order specified by mode.
3377      */
3378     final IForkJoinTask peek() {
3379         int cap; IForkJoinTask[] a;
3380         return ((a = array) !is null && (cap = cast(int)a.length) > 0) ?
3381             a[(cap - 1) & ((id & FIFO) != 0 ? base : top - 1)] : null;
3382     }
3383 
3384     /**
3385      * Pops the given task only if it is at the current top.
3386      */
3387     final bool tryUnpush(IForkJoinTask task) {
3388         bool popped = false;
3389         int s, cap; IForkJoinTask[] a;
3390         if ((a = array) !is null && (cap = cast(int)a.length) > 0 &&
3391             (s = top) != base) {
3392             popped = AtomicHelper.compareAndSet(a[(cap - 1) & --s], task, null);
3393             if(popped) {
3394                 AtomicHelper.store(this.top, s);
3395             }
3396         }
3397         return popped;
3398     }
3399 
3400     /**
3401      * Shared version of tryUnpush.
3402      */
3403     final bool tryLockedUnpush(IForkJoinTask task) {
3404         bool popped = false;
3405         int s = top - 1, k, cap; IForkJoinTask[] a;
3406         if ((a = array) !is null && (cap = cast(int)a.length) > 0 &&
3407             a[k = (cap - 1) & s] == task && tryLockPhase()) {
3408             if (top == s + 1 && array == a) {
3409                 popped = AtomicHelper.compareAndSet(a[k], task, null);
3410                 if(popped) top = s;
3411             }
3412             releasePhaseLock();
3413         }
3414         return popped;
3415     }
3416 
3417     /**
3418      * Removes and cancels all known tasks, ignoring any exceptions.
3419      */
3420     final void cancelAll() {
3421         for (IForkJoinTask t; (t = poll()) !is null; )
3422             IForkJoinTask.cancelIgnoringExceptions(t);
3423     }
3424 
3425     // Specialized execution methods
3426 
3427     /**
3428      * Runs the given (stolen) task if nonnull, as well as
3429      * remaining local tasks and others available from the given
3430      * queue, up to bound n (to avoid infinite unfairness).
3431      */
3432     final void topLevelExec(IForkJoinTask t, WorkQueue q, int n) {
3433         if (t !is null && q !is null) { // hoist checks
3434             int nstolen = 1;
3435             for (;;) {
3436                 t.doExec();
3437                 if (n-- < 0)
3438                     break;
3439                 else if ((t = nextLocalTask()) is null) {
3440                     if ((t = q.poll()) is null)
3441                         break;
3442                     else
3443                         ++nstolen;
3444                 }
3445             }
3446             ForkJoinWorkerThread thread = owner;
3447             nsteals += nstolen;
3448             source = 0;
3449             if (thread !is null)
3450                 thread.afterTopLevelExec();
3451         }
3452     }
3453 
3454     /**
3455      * If present, removes task from queue and executes it.
3456      */
3457     final void tryRemoveAndExec(IForkJoinTask task) {
3458         IForkJoinTask[] a; int s, cap;
3459         if ((a = array) !is null && (cap = cast(int)a.length) > 0 &&
3460             (s = top) - base > 0) { // traverse from top
3461             for (int m = cap - 1, ns = s - 1, i = ns; ; --i) {
3462                 int index = i & m;
3463                 IForkJoinTask t = a[index]; //QA.get(a, index);
3464                 if (t is null)
3465                     break;
3466                 else if (t == task) {
3467                     // if (AtomicHelper.compareAndSet(a[index], t, null)) 
3468                     if(a[index] == t) {
3469                         a[index] = null;
3470                         top = ns;   // safely shift down
3471                         for (int j = i; j != ns; ++j) {
3472                             IForkJoinTask f;
3473                             int pindex = (j + 1) & m;
3474                             f = a[pindex];// QA.get(a, pindex);
3475                             a[pindex] = null;
3476                             // AtomicHelper.store(a[pindex], null);
3477                             // QA.setVolatile(a, pindex, null);
3478                             int jindex = j & m;
3479                             a[jindex] = f;
3480                             // AtomicHelper.store(a[jindex], f);
3481                             // QA.setRelease(a, jindex, f);
3482                         }
3483                         // VarHandle.releaseFence();
3484                         t.doExec();
3485                     }
3486                     break;
3487                 }
3488             }
3489         }
3490     }
3491 
3492     /**
3493      * Tries to pop and run tasks within the target's computation
3494      * until done, not found, or limit exceeded.
3495      *
3496      * @param task root of CountedCompleter computation
3497      * @param limit max runs, or zero for no limit
3498      * @param shared true if must lock to extract task
3499      * @return task status on exit
3500      */
3501     final int helpCC(ICountedCompleter task, int limit, bool isShared) {
3502         int status = 0;
3503         if (task !is null && (status = task.getStatus()) >= 0) {
3504             int s, k, cap; IForkJoinTask[] a;
3505             while ((a = array) !is null && (cap = cast(int)a.length) > 0 &&
3506                    (s = top) - base > 0) {
3507                 ICountedCompleter v = null;
3508                 IForkJoinTask o = a[k = (cap - 1) & (s - 1)];
3509                 ICountedCompleter t = cast(ICountedCompleter)o;
3510                 if (t !is null) {
3511                     for (ICountedCompleter f = t;;) {
3512                         if (f != task) {
3513                             if ((f = f.getCompleter()) is null)
3514                                 break;
3515                         }
3516                         else if (isShared) {
3517                             if (tryLockPhase()) {
3518                                 if (top == s && array == a &&
3519                                     AtomicHelper.compareAndSet(a[k], t, cast(IForkJoinTask)null)) {
3520                                     top = s - 1;
3521                                     v = t;
3522                                 }
3523                                 releasePhaseLock();
3524                             }
3525                             break;
3526                         }
3527                         else {
3528                             if (AtomicHelper.compareAndSet(a[k], t, cast(IForkJoinTask)null)) {
3529                                 top = s - 1;
3530                                 v = t;
3531                             }
3532                             break;
3533                         }
3534                     }
3535                 }
3536                 if (v !is null)
3537                     v.doExec();
3538                 if ((status = task.getStatus()) < 0 || v is null ||
3539                     (limit != 0 && --limit == 0))
3540                     break;
3541             }
3542         }
3543         return status;
3544     }
3545 
3546     /**
3547      * Tries to poll and run AsynchronousCompletionTasks until
3548      * none found or blocker is released
3549      *
3550      * @param blocker the blocker
3551      */
3552     final void helpAsyncBlocker(ManagedBlocker blocker) {
3553         if (blocker !is null) {
3554             int b, k, cap; IForkJoinTask[] a; IForkJoinTask t;
3555             while ((a = array) !is null && (cap = cast(int)a.length) > 0 &&
3556                    top - (b = base) > 0) {
3557                 k = (cap - 1) & b;
3558                 // t = AtomicHelper.load(a[k]);
3559                 t = a[k];
3560                 if (blocker.isReleasable())
3561                     break;
3562                 else if (base == b++ && t !is null) {
3563                     AsynchronousCompletionTask at = cast(AsynchronousCompletionTask)t;
3564                     if (at is null)
3565                         break;
3566                     else if (AtomicHelper.compareAndSet(a[k], t, null)) {
3567                         AtomicHelper.store(this.base, b);
3568                         t.doExec();
3569                     }
3570                 }
3571             }
3572         }
3573     }
3574 
3575     /**
3576      * Returns true if owned and not known to be blocked.
3577      */
3578     final bool isApparentlyUnblocked() {
3579         ThreadEx wt; ThreadState s;
3580         return ((wt = owner) !is null &&
3581                 (s = wt.getState()) != ThreadState.BLOCKED &&
3582                 s != ThreadState.WAITING &&
3583                 s != ThreadState.TIMED_WAITING);
3584     }
3585 
3586     // VarHandle mechanics.
3587     // static final VarHandle PHASE;
3588     // static final VarHandle BASE;
3589     // static final VarHandle TOP;
3590     // static {
3591     //     try {
3592     //         MethodHandles.Lookup l = MethodHandles.lookup();
3593     //         PHASE = l.findVarHandle(WorkQueue.class, "phase", int.class);
3594     //         BASE = l.findVarHandle(WorkQueue.class, "base", int.class);
3595     //         TOP = l.findVarHandle(WorkQueue.class, "top", int.class);
3596     //     } catch (ReflectiveOperationException e) {
3597     //         throw new ExceptionInInitializerError(e);
3598     //     }
3599     // }
3600 }
3601 
3602 
3603 
3604 /**
3605  * A marker interface identifying asynchronous tasks produced by
3606  * {@code async} methods. This may be useful for monitoring,
3607  * debugging, and tracking asynchronous activities.
3608  *
3609  */
3610 interface AsynchronousCompletionTask {
3611 }