1 /*
2  * Hunt - A refined core library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net/
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module hunt.concurrency.ScheduledThreadPoolExecutor;
13 
14 import hunt.concurrency.atomic.AtomicHelper;
15 import hunt.concurrency.BlockingQueue;
16 import hunt.concurrency.Delayed;
17 import hunt.concurrency.Future;
18 import hunt.concurrency.FutureTask;
19 import hunt.concurrency.ScheduledExecutorService;
20 import hunt.concurrency.thread;
21 import hunt.concurrency.ThreadFactory;
22 import hunt.concurrency.ThreadPoolExecutor;
23 
24 import hunt.collection;
25 import hunt.Exceptions;
26 import hunt.util.Common;
27 import hunt.util.DateTime;
28 import hunt.Object;
29 import hunt.logging.ConsoleLogger;
30 // import core.time;
31 
32 import core.atomic;
33 import core.sync.condition;
34 import core.sync.mutex;
35 
36 import std.datetime;
37 // import hunt.collection.AbstractQueue;
38 // import java.util.Arrays;
39 // import hunt.collection.Collection;
40 // import hunt.collection.Iterator;
41 // import java.util.List;
42 // import java.util.NoSuchElementException;
43 // import java.util.Objects;
44 // import hunt.concurrency.atomic.AtomicLong;
45 // import hunt.concurrency.locks.Condition;
46 // import hunt.concurrency.locks.ReentrantLock;
47 
48 alias ReentrantLock = Mutex;
49 
50 interface IScheduledFutureTask {
51     void heapIndex(int index);
52     int heapIndex();
53 }
54 
55 /**
56  * A {@link ThreadPoolExecutor} that can additionally schedule
57  * commands to run after a given delay, or to execute periodically.
58  * This class is preferable to {@link java.util.Timer} when multiple
59  * worker threads are needed, or when the additional flexibility or
60  * capabilities of {@link ThreadPoolExecutor} (which this class
61  * extends) are required.
62  *
63  * <p>Delayed tasks execute no sooner than they are enabled, but
64  * without any real-time guarantees about when, after they are
65  * enabled, they will commence. Tasks scheduled for exactly the same
66  * execution time are enabled in first-in-first-out (FIFO) order of
67  * submission.
68  *
69  * <p>When a submitted task is cancelled before it is run, execution
70  * is suppressed.  By default, such a cancelled task is not
71  * automatically removed from the work queue until its delay elapses.
72  * While this enables further inspection and monitoring, it may also
73  * cause unbounded retention of cancelled tasks.  To avoid this, use
74  * {@link #setRemoveOnCancelPolicy} to cause tasks to be immediately
75  * removed from the work queue at time of cancellation.
76  *
77  * <p>Successive executions of a periodic task scheduled via
78  * {@link #scheduleAtFixedRate scheduleAtFixedRate} or
79  * {@link #scheduleWithFixedDelay scheduleWithFixedDelay}
80  * do not overlap. While different executions may be performed by
81  * different threads, the effects of prior executions
82  * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
83  * those of subsequent ones.
84  *
85  * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
86  * of the inherited tuning methods are not useful for it. In
87  * particular, because it acts as a fixed-sized pool using
88  * {@code corePoolSize} threads and an unbounded queue, adjustments
89  * to {@code maximumPoolSize} have no useful effect. Additionally, it
90  * is almost never a good idea to set {@code corePoolSize} to zero or
91  * use {@code allowCoreThreadTimeOut} because this may leave the pool
92  * without threads to handle tasks once they become eligible to run.
93  *
94  * <p>As with {@code ThreadPoolExecutor}, if not otherwise specified,
95  * this class uses {@link Executors#defaultThreadFactory} as the
96  * default thread factory, and {@link ThreadPoolExecutor.AbortPolicy}
97  * as the default rejected execution handler.
98  *
99  * <p><b>Extension notes:</b> This class overrides the
100  * {@link ThreadPoolExecutor#execute(Runnable) execute} and
101  * {@link AbstractExecutorService#submit(Runnable) submit}
102  * methods to generate internal {@link ScheduledFuture} objects to
103  * control per-task delays and scheduling.  To preserve
104  * functionality, any further overrides of these methods in
105  * subclasses must invoke superclass versions, which effectively
106  * disables additional task customization.  However, this class
107  * provides alternative protected extension method
108  * {@code decorateTask} (one version each for {@code Runnable} and
109  * {@code Callable}) that can be used to customize the concrete task
110  * types used to execute commands entered via {@code execute},
111  * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate},
112  * and {@code scheduleWithFixedDelay}.  By default, a
113  * {@code ScheduledThreadPoolExecutor} uses a task type extending
114  * {@link FutureTask}. However, this may be modified or replaced using
115  * subclasses of the form:
116  *
117  * <pre> {@code
118  * class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
119  *
120  *   static class CustomTask!(V) : RunnableScheduledFuture!(V) { ... }
121  *
122  *   protected !(V) RunnableScheduledFuture!(V) decorateTask(
123  *                Runnable r, RunnableScheduledFuture!(V) task) {
124  *       return new CustomTask!(V)(r, task);
125  *   }
126  *
127  *   protected !(V) RunnableScheduledFuture!(V) decorateTask(
128  *                Callable!(V) c, RunnableScheduledFuture!(V) task) {
129  *       return new CustomTask!(V)(c, task);
130  *   }
131  *   // ... add constructors, etc.
132  * }}</pre>
133  *
134  * @since 1.5
135  * @author Doug Lea
136  */
137 class ScheduledThreadPoolExecutor : ThreadPoolExecutor, ScheduledExecutorService {
138 
139     /*
140      * This class specializes ThreadPoolExecutor implementation by
141      *
142      * 1. Using a custom task type ScheduledFutureTask, even for tasks
143      *    that don't require scheduling because they are submitted
144      *    using ExecutorService rather than ScheduledExecutorService
145      *    methods, which are treated as tasks with a delay of zero.
146      *
147      * 2. Using a custom queue (DelayedWorkQueue), a variant of
148      *    unbounded DelayQueue. The lack of capacity constraint and
149      *    the fact that corePoolSize and maximumPoolSize are
150      *    effectively identical simplifies some execution mechanics
151      *    (see delayedExecute) compared to ThreadPoolExecutor.
152      *
153      * 3. Supporting optional run-after-shutdown parameters, which
154      *    leads to overrides of shutdown methods to remove and cancel
155      *    tasks that should NOT be run after shutdown, as well as
156      *    different recheck logic when task (re)submission overlaps
157      *    with a shutdown.
158      *
159      * 4. Task decoration methods to allow interception and
160      *    instrumentation, which are needed because subclasses cannot
161      *    otherwise override submit methods to get this effect. These
162      *    don't have any impact on pool control logic though.
163      */
164 
165     /**
166      * False if should cancel/suppress periodic tasks on shutdown.
167      */
168     private bool continueExistingPeriodicTasksAfterShutdown;
169 
170     /**
171      * False if should cancel non-periodic not-yet-expired tasks on shutdown.
172      */
173     private bool executeExistingDelayedTasksAfterShutdown = true;
174 
175     /**
176      * True if ScheduledFutureTask.cancel should remove from queue.
177      */
178     bool removeOnCancel;
179 
180     /**
181      * Sequence number to break scheduling ties, and in turn to
182      * guarantee FIFO order among tied entries.
183      */
184     private shared static long sequencer; //= new AtomicLong();
185 
186     /**
187      * Returns true if can run a task given current run state and
188      * run-after-shutdown parameters.
189      */
190     bool canRunInCurrentRunState(V)(RunnableScheduledFuture!V task) {
191         if (!isShutdown())
192             return true;
193         if (isStopped())
194             return false;
195         return task.isPeriodic()
196             ? continueExistingPeriodicTasksAfterShutdown
197             : (executeExistingDelayedTasksAfterShutdown
198                || task.getDelay() <= Duration.zero);
199     }
200 
201     /**
202      * Main execution method for delayed or periodic tasks.  If pool
203      * is shut down, rejects the task. Otherwise adds task to queue
204      * and starts a thread, if necessary, to run it.  (We cannot
205      * prestart the thread to run the task because the task (probably)
206      * shouldn't be run yet.)  If the pool is shut down while the task
207      * is being added, cancel and remove it if required by state and
208      * run-after-shutdown parameters.
209      *
210      * @param task the task
211      */
212     private void delayedExecute(V)(RunnableScheduledFuture!V task) {
213         if (isShutdown())
214             reject(task);
215         else {
216             super.getQueue().add(task);
217             if (!canRunInCurrentRunState(task) && remove(task))
218                 task.cancel(false);
219             else
220                 ensurePrestart();
221         }
222     }
223 
224     /**
225      * Requeues a periodic task unless current run state precludes it.
226      * Same idea as delayedExecute except drops task rather than rejecting.
227      *
228      * @param task the task
229      */
230     void reExecutePeriodic(V)(RunnableScheduledFuture!V task) {
231         if (canRunInCurrentRunState(task)) {
232             super.getQueue().add(task);
233             if (canRunInCurrentRunState(task) || !remove(task)) {
234                 ensurePrestart();
235                 return;
236             }
237         }
238         task.cancel(false);
239     }
240 
241     /**
242      * Cancels and clears the queue of all tasks that should not be run
243      * due to shutdown policy.  Invoked within super.shutdown.
244      */
245     override void onShutdown() {
246         BlockingQueue!(Runnable) q = super.getQueue();
247         bool keepDelayed =
248             getExecuteExistingDelayedTasksAfterShutdownPolicy();
249         bool keepPeriodic =
250             getContinueExistingPeriodicTasksAfterShutdownPolicy();
251         // Traverse snapshot to avoid iterator exceptions
252         // TODO: implement and use efficient removeIf
253         // super.getQueue().removeIf(...);
254         version(HUNT_DEBUG) tracef("Shuting down..., BlockingQueue size: %d", q.size());
255         foreach (Runnable e ; q.toArray()) {
256             if(e is null) {
257                 warning("e is null");
258             } else {
259                 version(HUNT_DEBUG) trace(typeid(cast(Object)e));
260                 IRunnableScheduledFuture t = cast(IRunnableScheduledFuture)e;
261                 if (t !is null) {
262                     if ((t.isPeriodic()
263                          ? !keepPeriodic
264                          : (!keepDelayed && t.getDelay() > Duration.zero))
265                         || t.isCancelled()) { // also remove if already cancelled
266                         if (q.remove(t))
267                             t.cancel(false);
268                     }
269                 } else {
270                     warning("t is null");
271                 }
272             }
273                 
274         }
275         tryTerminate();
276     }
277 
278     /**
279      * Modifies or replaces the task used to execute a runnable.
280      * This method can be used to override the concrete
281      * class used for managing internal tasks.
282      * The default implementation simply returns the given task.
283      *
284      * @param runnable the submitted Runnable
285      * @param task the task created to execute the runnable
286      * @param (V) the type of the task's result
287      * @return a task that can execute the runnable
288      * @since 1.6
289      */
290     protected RunnableScheduledFuture!(V) decorateTask(V) (
291         Runnable runnable, RunnableScheduledFuture!(V) task) {
292         return task;
293     }
294 
295     /**
296      * Modifies or replaces the task used to execute a callable.
297      * This method can be used to override the concrete
298      * class used for managing internal tasks.
299      * The default implementation simply returns the given task.
300      *
301      * @param callable the submitted Callable
302      * @param task the task created to execute the callable
303      * @param (V) the type of the task's result
304      * @return a task that can execute the callable
305      * @since 1.6
306      */
307     protected RunnableScheduledFuture!(V) decorateTask(V)(
308         Callable!(V) callable, RunnableScheduledFuture!(V) task) {
309         return task;
310     }
311 
312     /**
313      * The default keep-alive time for pool threads.
314      *
315      * Normally, this value is unused because all pool threads will be
316      * core threads, but if a user creates a pool with a corePoolSize
317      * of zero (against our advice), we keep a thread alive as long as
318      * there are queued tasks.  If the keep alive time is zero (the
319      * historic value), we end up hot-spinning in getTask, wasting a
320      * CPU.  But on the other hand, if we set the value too high, and
321      * users create a one-shot pool which they don't cleanly shutdown,
322      * the pool's non-daemon threads will prevent JVM termination.  A
323      * small but non-zero value (relative to a JVM's lifetime) seems
324      * best.
325      */
326     private enum long DEFAULT_KEEPALIVE_MILLIS = 10L;
327 
328     /**
329      * Creates a new {@code ScheduledThreadPoolExecutor} with the
330      * given core pool size.
331      *
332      * @param corePoolSize the number of threads to keep in the pool, even
333      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
334      * @throws IllegalArgumentException if {@code corePoolSize < 0}
335      */
336     this(int corePoolSize) {
337         super(corePoolSize, int.max, dur!(TimeUnit.Millisecond)(DEFAULT_KEEPALIVE_MILLIS),
338               new DelayedWorkQueue());
339     }
340 
341     /**
342      * Creates a new {@code ScheduledThreadPoolExecutor} with the
343      * given initial parameters.
344      *
345      * @param corePoolSize the number of threads to keep in the pool, even
346      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
347      * @param threadFactory the factory to use when the executor
348      *        creates a new thread
349      * @throws IllegalArgumentException if {@code corePoolSize < 0}
350      * @throws NullPointerException if {@code threadFactory} is null
351      */
352     this(int corePoolSize, ThreadFactory threadFactory) {
353         super(corePoolSize, int.max,
354               dur!(TimeUnit.Millisecond)(DEFAULT_KEEPALIVE_MILLIS),
355               new DelayedWorkQueue(), threadFactory);
356     }
357 
358     /**
359      * Creates a new {@code ScheduledThreadPoolExecutor} with the
360      * given initial parameters.
361      *
362      * @param corePoolSize the number of threads to keep in the pool, even
363      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
364      * @param handler the handler to use when execution is blocked
365      *        because the thread bounds and queue capacities are reached
366      * @throws IllegalArgumentException if {@code corePoolSize < 0}
367      * @throws NullPointerException if {@code handler} is null
368      */
369     this(int corePoolSize, RejectedExecutionHandler handler) {
370         super(corePoolSize, int.max,
371               dur!(TimeUnit.Millisecond)(DEFAULT_KEEPALIVE_MILLIS),
372               new DelayedWorkQueue(), handler);
373     }
374 
375     /**
376      * Creates a new {@code ScheduledThreadPoolExecutor} with the
377      * given initial parameters.
378      *
379      * @param corePoolSize the number of threads to keep in the pool, even
380      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
381      * @param threadFactory the factory to use when the executor
382      *        creates a new thread
383      * @param handler the handler to use when execution is blocked
384      *        because the thread bounds and queue capacities are reached
385      * @throws IllegalArgumentException if {@code corePoolSize < 0}
386      * @throws NullPointerException if {@code threadFactory} or
387      *         {@code handler} is null
388      */
389     this(int corePoolSize, ThreadFactory threadFactory,
390                                        RejectedExecutionHandler handler) {
391         super(corePoolSize, int.max,
392               dur!(TimeUnit.Millisecond)(DEFAULT_KEEPALIVE_MILLIS),
393               new DelayedWorkQueue(), threadFactory, handler);
394     }
395 
396     /**
397      * Returns the nanoTime-based trigger time of a delayed action.
398      */
399     private long triggerTime(Duration delay) {
400         return triggerTime(delay.isNegative ? 0 : delay.total!(TimeUnit.HectoNanosecond)());
401     }
402 
403     /**
404      * Returns the nanoTime-based trigger time of a delayed action.
405      */
406     long triggerTime(long delay) {
407         return Clock.currStdTime +
408             ((delay < (long.max >> 1)) ? delay : overflowFree(delay));
409     }
410 
411     /**
412      * Constrains the values of all delays in the queue to be within
413      * long.max of each other, to avoid overflow in compareTo.
414      * This may occur if a task is eligible to be dequeued, but has
415      * not yet been, while some other task is added with a delay of
416      * long.max.
417      */
418     private long overflowFree(long delay) {
419         Delayed head = cast(Delayed) super.getQueue().peek();
420         if (head !is null) {
421             long headDelay = head.getDelay().total!(TimeUnit.HectoNanosecond)();
422             if (headDelay < 0 && (delay - headDelay < 0))
423                 delay = long.max + headDelay;
424         }
425         return delay;
426     }
427 
428     /**
429      * @throws RejectedExecutionException {@inheritDoc}
430      * @throws NullPointerException       {@inheritDoc}
431      */
432     ScheduledFuture!(void) schedule(Runnable command, Duration delay) {
433         if (command is null)
434             throw new NullPointerException();
435         long n = atomicOp!"+="(sequencer, 1);
436         n--;
437         RunnableScheduledFuture!(void) t = decorateTask(command,
438             new ScheduledFutureTask!(void)(command, triggerTime(delay), n, this));
439         delayedExecute!(void)(t);
440         return t;
441     }
442 
443     /**
444      * @throws RejectedExecutionException {@inheritDoc}
445      * @throws NullPointerException       {@inheritDoc}
446      */
447     ScheduledFuture!(V) schedule(V)(Callable!(V) callable, Duration delay) {
448         if (callable is null)
449             throw new NullPointerException();
450         RunnableScheduledFuture!(V) t = decorateTask(callable,
451             new ScheduledFutureTask!(V)(callable,
452                                        triggerTime(delay),
453                                        cast(long)AtomicHelper.getAndIncrement(sequencer), this));
454         delayedExecute(t);
455         return t;
456     }
457 
458     /**
459      * Submits a periodic action that becomes enabled first after the
460      * given initial delay, and subsequently with the given period;
461      * that is, executions will commence after
462      * {@code initialDelay}, then {@code initialDelay + period}, then
463      * {@code initialDelay + 2 * period}, and so on.
464      *
465      * <p>The sequence of task executions continues indefinitely until
466      * one of the following exceptional completions occur:
467      * <ul>
468      * <li>The task is {@linkplain Future#cancel explicitly cancelled}
469      * via the returned future.
470      * <li>Method {@link #shutdown} is called and the {@linkplain
471      * #getContinueExistingPeriodicTasksAfterShutdownPolicy policy on
472      * whether to continue after shutdown} is not set true, or method
473      * {@link #shutdownNow} is called; also resulting in task
474      * cancellation.
475      * <li>An execution of the task throws an exception.  In this case
476      * calling {@link Future#get() get} on the returned future will throw
477      * {@link ExecutionException}, holding the exception as its cause.
478      * </ul>
479      * Subsequent executions are suppressed.  Subsequent calls to
480      * {@link Future#isDone isDone()} on the returned future will
481      * return {@code true}.
482      *
483      * <p>If any execution of this task takes longer than its period, then
484      * subsequent executions may start late, but will not concurrently
485      * execute.
486      *
487      * @throws RejectedExecutionException {@inheritDoc}
488      * @throws NullPointerException       {@inheritDoc}
489      * @throws IllegalArgumentException   {@inheritDoc}
490      */
491     ScheduledFuture!void scheduleAtFixedRate(Runnable command,
492                                                   Duration initialDelay,
493                                                   Duration period) {
494         if (command is null)
495             throw new NullPointerException();
496         if (period <= Duration.zero)
497             throw new IllegalArgumentException();
498         ScheduledFutureTask!(void) sft =
499             new ScheduledFutureTask!(void)(command,
500                                           triggerTime(initialDelay),
501                                           period.total!(TimeUnit.HectoNanosecond)(), 
502                                           cast(long)AtomicHelper.getAndIncrement(sequencer), this);
503         RunnableScheduledFuture!(void) t = decorateTask(command, sft);
504         sft.outerTask = t;
505         delayedExecute(t);
506         return t;
507     }
508 
509     /**
510      * Submits a periodic action that becomes enabled first after the
511      * given initial delay, and subsequently with the given delay
512      * between the termination of one execution and the commencement of
513      * the next.
514      *
515      * <p>The sequence of task executions continues indefinitely until
516      * one of the following exceptional completions occur:
517      * <ul>
518      * <li>The task is {@linkplain Future#cancel explicitly cancelled}
519      * via the returned future.
520      * <li>Method {@link #shutdown} is called and the {@linkplain
521      * #getContinueExistingPeriodicTasksAfterShutdownPolicy policy on
522      * whether to continue after shutdown} is not set true, or method
523      * {@link #shutdownNow} is called; also resulting in task
524      * cancellation.
525      * <li>An execution of the task throws an exception.  In this case
526      * calling {@link Future#get() get} on the returned future will throw
527      * {@link ExecutionException}, holding the exception as its cause.
528      * </ul>
529      * Subsequent executions are suppressed.  Subsequent calls to
530      * {@link Future#isDone isDone()} on the returned future will
531      * return {@code true}.
532      *
533      * @throws RejectedExecutionException {@inheritDoc}
534      * @throws NullPointerException       {@inheritDoc}
535      * @throws IllegalArgumentException   {@inheritDoc}
536      */
537     ScheduledFuture!(void) scheduleWithFixedDelay(Runnable command,
538                                                      Duration initialDelay,
539                                                      Duration delay) {
540         if (command is null)
541             throw new NullPointerException();
542         if (delay <= Duration.zero)
543             throw new IllegalArgumentException();
544         ScheduledFutureTask!(void) sft =
545             new ScheduledFutureTask!(void)(command,
546                                           triggerTime(initialDelay),
547                                           -delay.total!(TimeUnit.HectoNanosecond)(),
548                                           cast(long)AtomicHelper.getAndIncrement(sequencer), this);
549         RunnableScheduledFuture!(void) t = decorateTask(command, sft);
550         sft.outerTask = t;
551         delayedExecute(t);
552         return t;
553     }
554 
555     /**
556      * Executes {@code command} with zero required delay.
557      * This has effect equivalent to
558      * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
559      * Note that inspections of the queue and of the list returned by
560      * {@code shutdownNow} will access the zero-delayed
561      * {@link ScheduledFuture}, not the {@code command} itself.
562      *
563      * <p>A consequence of the use of {@code ScheduledFuture} objects is
564      * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
565      * called with a null second {@code Throwable} argument, even if the
566      * {@code command} terminated abruptly.  Instead, the {@code Throwable}
567      * thrown by such a task can be obtained via {@link Future#get}.
568      *
569      * @throws RejectedExecutionException at discretion of
570      *         {@code RejectedExecutionHandler}, if the task
571      *         cannot be accepted for execution because the
572      *         executor has been shut down
573      * @throws NullPointerException {@inheritDoc}
574      */
575     override void execute(Runnable command) {
576         schedule(command, Duration.zero);
577     }
578 
579     // Override AbstractExecutorService methods
580 
581     /**
582      * @throws RejectedExecutionException {@inheritDoc}
583      * @throws NullPointerException       {@inheritDoc}
584      */
585     override Future!void submit(Runnable task) {
586         return schedule(task, Duration.zero);
587     }
588 
589     /**
590      * @throws RejectedExecutionException {@inheritDoc}
591      * @throws NullPointerException       {@inheritDoc}
592      */
593     Future!(T) submit(T)(Runnable task, T result) {
594         return schedule(Executors.callable(task, result), Duration.zero);
595     }
596 
597     /**
598      * @throws RejectedExecutionException {@inheritDoc}
599      * @throws NullPointerException       {@inheritDoc}
600      */
601     Future!(T) submit(T)(Callable!(T) task) {
602         return schedule(task, Duration.zero);
603     }
604 
605     /**
606      * Sets the policy on whether to continue executing existing
607      * periodic tasks even when this executor has been {@code shutdown}.
608      * In this case, executions will continue until {@code shutdownNow}
609      * or the policy is set to {@code false} when already shutdown.
610      * This value is by default {@code false}.
611      *
612      * @param value if {@code true}, continue after shutdown, else don't
613      * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
614      */
615     void setContinueExistingPeriodicTasksAfterShutdownPolicy(bool value) {
616         continueExistingPeriodicTasksAfterShutdown = value;
617         if (!value && isShutdown())
618             onShutdown();
619     }
620 
621     /**
622      * Gets the policy on whether to continue executing existing
623      * periodic tasks even when this executor has been {@code shutdown}.
624      * In this case, executions will continue until {@code shutdownNow}
625      * or the policy is set to {@code false} when already shutdown.
626      * This value is by default {@code false}.
627      *
628      * @return {@code true} if will continue after shutdown
629      * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
630      */
631     bool getContinueExistingPeriodicTasksAfterShutdownPolicy() {
632         return continueExistingPeriodicTasksAfterShutdown;
633     }
634 
635     /**
636      * Sets the policy on whether to execute existing delayed
637      * tasks even when this executor has been {@code shutdown}.
638      * In this case, these tasks will only terminate upon
639      * {@code shutdownNow}, or after setting the policy to
640      * {@code false} when already shutdown.
641      * This value is by default {@code true}.
642      *
643      * @param value if {@code true}, execute after shutdown, else don't
644      * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
645      */
646     void setExecuteExistingDelayedTasksAfterShutdownPolicy(bool value) {
647         executeExistingDelayedTasksAfterShutdown = value;
648         if (!value && isShutdown())
649             onShutdown();
650     }
651 
652     /**
653      * Gets the policy on whether to execute existing delayed
654      * tasks even when this executor has been {@code shutdown}.
655      * In this case, these tasks will only terminate upon
656      * {@code shutdownNow}, or after setting the policy to
657      * {@code false} when already shutdown.
658      * This value is by default {@code true}.
659      *
660      * @return {@code true} if will execute after shutdown
661      * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
662      */
663     bool getExecuteExistingDelayedTasksAfterShutdownPolicy() {
664         return executeExistingDelayedTasksAfterShutdown;
665     }
666 
667     /**
668      * Sets the policy on whether cancelled tasks should be immediately
669      * removed from the work queue at time of cancellation.  This value is
670      * by default {@code false}.
671      *
672      * @param value if {@code true}, remove on cancellation, else don't
673      * @see #getRemoveOnCancelPolicy
674      * @since 1.7
675      */
676     void setRemoveOnCancelPolicy(bool value) {
677         removeOnCancel = value;
678     }
679 
680     /**
681      * Gets the policy on whether cancelled tasks should be immediately
682      * removed from the work queue at time of cancellation.  This value is
683      * by default {@code false}.
684      *
685      * @return {@code true} if cancelled tasks are immediately removed
686      *         from the queue
687      * @see #setRemoveOnCancelPolicy
688      * @since 1.7
689      */
690     bool getRemoveOnCancelPolicy() {
691         return removeOnCancel;
692     }
693 
694     /**
695      * Initiates an orderly shutdown in which previously submitted
696      * tasks are executed, but no new tasks will be accepted.
697      * Invocation has no additional effect if already shut down.
698      *
699      * <p>This method does not wait for previously submitted tasks to
700      * complete execution.  Use {@link #awaitTermination awaitTermination}
701      * to do that.
702      *
703      * <p>If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy}
704      * has been set {@code false}, existing delayed tasks whose delays
705      * have not yet elapsed are cancelled.  And unless the {@code
706      * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set
707      * {@code true}, future executions of existing periodic tasks will
708      * be cancelled.
709      *
710      * @throws SecurityException {@inheritDoc}
711      */
712     override void shutdown() {
713         super.shutdown();
714     }
715 
716     /**
717      * Attempts to stop all actively executing tasks, halts the
718      * processing of waiting tasks, and returns a list of the tasks
719      * that were awaiting execution. These tasks are drained (removed)
720      * from the task queue upon return from this method.
721      *
722      * <p>This method does not wait for actively executing tasks to
723      * terminate.  Use {@link #awaitTermination awaitTermination} to
724      * do that.
725      *
726      * <p>There are no guarantees beyond best-effort attempts to stop
727      * processing actively executing tasks.  This implementation
728      * interrupts tasks via {@link Thread#interrupt}; any task that
729      * fails to respond to interrupts may never terminate.
730      *
731      * @return list of tasks that never commenced execution.
732      *         Each element of this list is a {@link ScheduledFuture}.
733      *         For tasks submitted via one of the {@code schedule}
734      *         methods, the element will be identical to the returned
735      *         {@code ScheduledFuture}.  For tasks submitted using
736      *         {@link #execute execute}, the element will be a
737      *         zero-delay {@code ScheduledFuture}.
738      * @throws SecurityException {@inheritDoc}
739      */
740     override List!(Runnable) shutdownNow() {
741         return super.shutdownNow();
742     }
743 
744     /**
745      * Returns the task queue used by this executor.  Access to the
746      * task queue is intended primarily for debugging and monitoring.
747      * This queue may be in active use.  Retrieving the task queue
748      * does not prevent queued tasks from executing.
749      *
750      * <p>Each element of this queue is a {@link ScheduledFuture}.
751      * For tasks submitted via one of the {@code schedule} methods, the
752      * element will be identical to the returned {@code ScheduledFuture}.
753      * For tasks submitted using {@link #execute execute}, the element
754      * will be a zero-delay {@code ScheduledFuture}.
755      *
756      * <p>Iteration over this queue is <em>not</em> guaranteed to traverse
757      * tasks in the order in which they will execute.
758      *
759      * @return the task queue
760      */
761     override BlockingQueue!(Runnable) getQueue() {
762         return super.getQueue();
763     }
764 }
765 
766 
767 /**
768 */
769 private class ScheduledFutureTask(V) : FutureTask!(V) , 
770     RunnableScheduledFuture!(V), IScheduledFutureTask {
771 
772     /** Sequence number to break ties FIFO */
773     private long sequenceNumber;
774 
775     /** The nanoTime-based time when the task is enabled to execute. */
776     private long time;
777 
778     /**
779      * Period for repeating tasks, in nanoseconds.
780      * A positive value indicates fixed-rate execution.
781      * A negative value indicates fixed-delay execution.
782      * A value of 0 indicates a non-repeating (one-shot) task.
783      */
784     private long period;
785 
786     /** The actual task to be re-enqueued by reExecutePeriodic */
787     RunnableScheduledFuture!(V) outerTask; // = this;
788     ScheduledThreadPoolExecutor poolExecutor;
789 
790     /**
791      * Index into delay queue, to support faster cancellation.
792      */
793     int _heapIndex;
794 
795 static if(is(V == void)) {         
796     this(Runnable r, long triggerTime,
797                         long sequenceNumber, ScheduledThreadPoolExecutor poolExecutor) {
798         super(r);
799         this.time = triggerTime;
800         this.period = 0;
801         this.sequenceNumber = sequenceNumber;
802         this.poolExecutor = poolExecutor;
803         initializeMembers();
804     }        
805 
806     /**
807      * Creates a periodic action with given nanoTime-based initial
808      * trigger time and period.
809      */
810     this(Runnable r, long triggerTime,
811                         long period, long sequenceNumber, ScheduledThreadPoolExecutor poolExecutor) {
812         super(r);
813         this.time = triggerTime;
814         this.period = period;
815         this.sequenceNumber = sequenceNumber;
816         this.poolExecutor = poolExecutor;
817         initializeMembers();
818     }
819 } else {
820 
821     /**
822      * Creates a one-shot action with given nanoTime-based trigger time.
823      */
824     this(Runnable r, V result, long triggerTime,
825                         long sequenceNumber, ScheduledThreadPoolExecutor poolExecutor) {
826         super(r, result);
827         this.time = triggerTime;
828         this.period = 0;
829         this.sequenceNumber = sequenceNumber;
830         this.poolExecutor = poolExecutor;
831         initializeMembers();
832     }           
833 
834     /**
835      * Creates a periodic action with given nanoTime-based initial
836      * trigger time and period.
837      */
838     this(Runnable r, V result, long triggerTime,
839                         long period, long sequenceNumber, ScheduledThreadPoolExecutor poolExecutor) {
840         super(r, result);
841         this.time = triggerTime;
842         this.period = period;
843         this.sequenceNumber = sequenceNumber;
844         this.poolExecutor = poolExecutor;
845         initializeMembers();
846     } 
847 }
848 
849     /**
850      * Creates a one-shot action with given nanoTime-based trigger time.
851      */
852     this(Callable!(V) callable, long triggerTime,
853                         long sequenceNumber, ScheduledThreadPoolExecutor poolExecutor) {
854         super(callable);
855         this.time = triggerTime;
856         this.period = 0;
857         this.sequenceNumber = sequenceNumber;
858         this.poolExecutor = poolExecutor;
859         initializeMembers();
860     }
861 
862     private void initializeMembers() {
863         outerTask = this;
864     }
865     
866     void heapIndex(int index) {
867         _heapIndex = index;
868     }
869 
870     int heapIndex() {
871         return _heapIndex;
872     }
873 
874     Duration getDelay() {
875         return dur!(TimeUnit.HectoNanosecond)(time - Clock.currStdTime()); 
876     }
877 
878     int opCmp(Delayed other) {
879         if (other == this) // compare zero if same object
880             return 0;
881         ScheduledFutureTask!V x = cast(ScheduledFutureTask!V)other;
882         if (x !is null) {
883             long diff = time - x.time;
884             if (diff < 0)
885                 return -1;
886             else if (diff > 0)
887                 return 1;
888             else if (sequenceNumber < x.sequenceNumber)
889                 return -1;
890             else
891                 return 1;
892         }
893         Duration diff = getDelay() - other.getDelay();
894         return (diff.isNegative) ? -1 : (diff > Duration.zero) ? 1 : 0;
895     }
896 
897     /**
898      * Returns {@code true} if this is a periodic (not a one-shot) action.
899      *
900      * @return {@code true} if periodic
901      */
902     bool isPeriodic() {
903         return period != 0;
904     }
905 
906     /**
907      * Sets the next time to run for a periodic task.
908      */
909     private void setNextRunTime() {
910         long p = period;
911         if (p > 0)
912             time += p;
913         else
914             time = poolExecutor.triggerTime(-p);
915     }
916 
917     override bool cancel(bool mayInterruptIfRunning) {
918         // The racy read of heapIndex below is benign:
919         // if heapIndex < 0, then OOTA guarantees that we have surely
920         // been removed; else we recheck under lock in remove()
921         bool cancelled = super.cancel(mayInterruptIfRunning);
922         if (cancelled && poolExecutor.removeOnCancel && heapIndex >= 0)
923             poolExecutor.remove(this);
924         return cancelled;
925     }
926 
927     /**
928      * Overrides FutureTask version so as to reset/requeue if periodic.
929      */
930     override void run() {
931         if (!poolExecutor.canRunInCurrentRunState(this))
932             cancel(false);
933         else if (!isPeriodic())
934             super.run();
935         else if (super.runAndReset()) {
936             setNextRunTime();
937             poolExecutor.reExecutePeriodic(outerTask);
938         }
939     }
940 
941     // alias from FutureTask
942     // alias isCancelled = FutureTask!V.isCancelled;
943     // alias isDone = FutureTask!V.isDone;
944     alias get = FutureTask!V.get;
945     
946     override bool isCancelled() {
947         return super.isCancelled();
948     }
949 
950     override bool isDone() {
951         return super.isDone();
952     }
953 
954     override V get() {
955         return super.get();
956     }
957 
958     override V get(Duration timeout) {
959         return super.get(timeout);
960     }
961 }
962 
963 
964 /**
965  * Specialized delay queue. To mesh with TPE declarations, this
966  * class must be declared as a BlockingQueue!(Runnable) even though
967  * it can only hold RunnableScheduledFutures.
968  */
969 class DelayedWorkQueue : AbstractQueue!(Runnable), BlockingQueue!(Runnable) {
970 
971     /*
972      * A DelayedWorkQueue is based on a heap-based data structure
973      * like those in DelayQueue and PriorityQueue, except that
974      * every ScheduledFutureTask also records its index into the
975      * heap array. This eliminates the need to find a task upon
976      * cancellation, greatly speeding up removal (down from O(n)
977      * to O(log n)), and reducing garbage retention that would
978      * otherwise occur by waiting for the element to rise to top
979      * before clearing. But because the queue may also hold
980      * RunnableScheduledFutures that are not ScheduledFutureTasks,
981      * we are not guaranteed to have such indices available, in
982      * which case we fall back to linear search. (We expect that
983      * most tasks will not be decorated, and that the faster cases
984      * will be much more common.)
985      *
986      * All heap operations must record index changes -- mainly
987      * within siftUp and siftDown. Upon removal, a task's
988      * heapIndex is set to -1. Note that ScheduledFutureTasks can
989      * appear at most once in the queue (this need not be true for
990      * other kinds of tasks or work queues), so are uniquely
991      * identified by heapIndex.
992      */
993 
994     private enum int INITIAL_CAPACITY = 16;
995     private IRunnableScheduledFuture[] queue;
996     private ReentrantLock lock;
997     private int _size;
998 
999     /**
1000      * Thread designated to wait for the task at the head of the
1001      * queue.  This variant of the Leader-Follower pattern
1002      * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
1003      * minimize unnecessary timed waiting.  When a thread becomes
1004      * the leader, it waits only for the next delay to elapse, but
1005      * other threads await indefinitely.  The leader thread must
1006      * signal some other thread before returning from take() or
1007      * poll(...), unless some other thread becomes leader in the
1008      * interim.  Whenever the head of the queue is replaced with a
1009      * task with an earlier expiration time, the leader field is
1010      * invalidated by being reset to null, and some waiting
1011      * thread, but not necessarily the current leader, is
1012      * signalled.  So waiting threads must be prepared to acquire
1013      * and lose leadership while waiting.
1014      */
1015     private ThreadEx leader;
1016 
1017     /**
1018      * Condition signalled when a newer task becomes available at the
1019      * head of the queue or a new thread may need to become leader.
1020      */
1021     private Condition available;
1022 
1023     this() {
1024         initializeMembers();
1025     }
1026 
1027     private void initializeMembers() {
1028         lock = new ReentrantLock();
1029         available = new Condition(lock);
1030         queue = new IRunnableScheduledFuture[INITIAL_CAPACITY];
1031     }
1032 
1033     /**
1034      * Sets f's heapIndex if it is a ScheduledFutureTask.
1035      */
1036     private static void setIndex(IRunnableScheduledFuture f, int idx) {
1037         IScheduledFutureTask t = cast(IScheduledFutureTask)f;
1038         // tracef("index=%d, type: %s", idx, typeid(cast(Object)t));
1039         if (t !is null)
1040             t.heapIndex = idx;
1041     }
1042 
1043     /**
1044      * Sifts element added at bottom up to its heap-ordered spot.
1045      * Call only when holding lock.
1046      */
1047     private void siftUp(int k, IRunnableScheduledFuture key) {
1048         while (k > 0) {
1049             int parent = (k - 1) >>> 1;
1050             IRunnableScheduledFuture e = queue[parent];
1051             if (key >= e)
1052                 break;
1053             queue[k] = e;
1054             setIndex(e, k);
1055             k = parent;
1056         }
1057         // tracef("k=%d, key is null: %s", k, key is null);
1058         queue[k] = key;
1059         setIndex(key, k);
1060     }
1061 
1062     /**
1063      * Sifts element added at top down to its heap-ordered spot.
1064      * Call only when holding lock.
1065      */
1066     private void siftDown(int k, IRunnableScheduledFuture key) {
1067         int half = size >>> 1;
1068         while (k < half) {
1069             int child = (k << 1) + 1;
1070             IRunnableScheduledFuture c = queue[child];
1071             int right = child + 1;
1072             if (right < size && c.opCmp(queue[right]) > 0)
1073                 c = queue[child = right];
1074             if (key.opCmp(c) <= 0)
1075                 break;
1076             queue[k] = c;
1077             setIndex(c, k);
1078             k = child;
1079         }
1080         queue[k] = key;
1081         setIndex(key, k);
1082     }
1083 
1084     /**
1085      * Resizes the heap array.  Call only when holding lock.
1086      */
1087     private void grow() {
1088         size_t oldCapacity = queue.length;
1089         size_t newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
1090         if (newCapacity < 0) // overflow
1091             newCapacity = int.max;
1092         queue.length = newCapacity;
1093     }
1094 
1095     /**
1096      * Finds index of given object, or -1 if absent.
1097      */
1098     private int indexOf(Runnable x) {
1099         if (x !is null) {
1100             IScheduledFutureTask sf = cast(IScheduledFutureTask) x;
1101             if (sf !is null) {
1102                 int i = sf.heapIndex;
1103                 // Sanity check; x could conceivably be a
1104                 // ScheduledFutureTask from some other pool.
1105                 if (i >= 0 && i < size && queue[i] == x)
1106                     return i;
1107             } else {
1108                 for (int i = 0; i < size; i++) {
1109                     // if (x.opEquals(cast(Object)queue[i]))
1110                     if(x is queue[i])
1111                         return i;
1112                 }
1113             }
1114         }
1115         return -1;
1116     }
1117 
1118     override bool contains(Runnable x) {
1119         ReentrantLock lock = this.lock;
1120         lock.lock();
1121         try {
1122             return indexOf(x) != -1;
1123         } finally {
1124             lock.unlock();
1125         }
1126     }
1127 
1128     override bool remove(Runnable x) {
1129         ReentrantLock lock = this.lock;
1130         trace(cast(Object)x);
1131         lock.lock();
1132         try {
1133             int i = indexOf(x);
1134             if (i < 0)
1135                 return false;
1136 
1137             setIndex(queue[i], -1);
1138             int s = --_size;
1139             IRunnableScheduledFuture replacement = queue[s];
1140             queue[s] = null;
1141             if (s != i) {
1142                 siftDown(i, replacement);
1143                 if (queue[i] == replacement)
1144                     siftUp(i, replacement);
1145             }
1146             return true;
1147         } finally {
1148             lock.unlock();
1149         }
1150     }
1151 
1152     override int size() {
1153         // return _size;
1154         ReentrantLock lock = this.lock;
1155         lock.lock();
1156         try {
1157             return _size;
1158         } finally {
1159             lock.unlock();
1160         }
1161     }
1162 
1163     override bool isEmpty() {
1164         return size() == 0;
1165     }
1166 
1167     int remainingCapacity() {
1168         return int.max;
1169     }
1170 
1171     IRunnableScheduledFuture peek() {
1172         ReentrantLock lock = this.lock;
1173         lock.lock();
1174         try {
1175             return queue[0];
1176         } finally {
1177             lock.unlock();
1178         }
1179     }
1180 
1181     bool offer(Runnable x) {
1182         if (x is null)
1183             throw new NullPointerException();
1184         IRunnableScheduledFuture e = cast(IRunnableScheduledFuture)x;
1185         ReentrantLock lock = this.lock;
1186         lock.lock();
1187         try {
1188             int i = _size;
1189             if (i >= queue.length)
1190                 grow();
1191             _size = i + 1;
1192             if (i == 0) {
1193                 queue[0] = e;
1194                 setIndex(e, 0);
1195             } else {
1196                 siftUp(i, e);
1197             }
1198             if (queue[0] == e) {
1199                 leader = null;
1200                 available.notify();
1201             }
1202         } finally {
1203             lock.unlock();
1204         }
1205         return true;
1206     }
1207 
1208     override void put(Runnable e) {
1209         offer(e);
1210     }
1211 
1212     override bool add(Runnable e) {
1213         return offer(e);
1214     }
1215 
1216     bool offer(Runnable e, Duration timeout) {
1217         return offer(e);
1218     }
1219 
1220     /**
1221      * Performs common bookkeeping for poll and take: Replaces
1222      * first element with last and sifts it down.  Call only when
1223      * holding lock.
1224      * @param f the task to remove and return
1225      */
1226     private IRunnableScheduledFuture finishPoll(IRunnableScheduledFuture f) {
1227         int s = --_size;
1228         IRunnableScheduledFuture x = queue[s];
1229         queue[s] = null;
1230         if (s != 0)
1231             siftDown(0, x);
1232         setIndex(f, -1);
1233         return f;
1234     }
1235 
1236     IRunnableScheduledFuture poll() {
1237         ReentrantLock lock = this.lock;
1238         lock.lock();
1239         try {
1240             IRunnableScheduledFuture first = queue[0];
1241             return (first is null || first.getDelay() > Duration.zero)
1242                 ? null
1243                 : finishPoll(first);
1244         } finally {
1245             lock.unlock();
1246         }
1247     }
1248 
1249     IRunnableScheduledFuture take() {
1250         ReentrantLock lock = this.lock;
1251         // lock.lockInterruptibly();
1252         lock.lock();
1253         try {
1254             for (;;) {
1255                 IRunnableScheduledFuture first = queue[0];
1256                 if (first is null)
1257                     available.wait();
1258                 else {
1259                     Duration delay = first.getDelay();
1260                     if (delay <= Duration.zero)
1261                         return finishPoll(first);
1262                     first = null; // don't retain ref while waiting
1263                     if (leader !is null)
1264                         available.wait();
1265                     else {
1266                         ThreadEx thisThread = ThreadEx.currentThread();
1267                         leader = thisThread;
1268                         try {
1269                             available.wait(delay);
1270                         } finally {
1271                             if (leader == thisThread)
1272                                 leader = null;
1273                         }
1274                     }
1275                 }
1276             }
1277         } finally {
1278             if (leader is null && queue[0] !is null)
1279                 available.notify();
1280             lock.unlock();
1281         }
1282     }
1283 
1284     IRunnableScheduledFuture poll(Duration timeout) {
1285         // long nanos = total!(TimeUnit.HectoNanosecond)(timeout);
1286         Duration nanos = timeout;
1287         ReentrantLock lock = this.lock;
1288         // lock.lockInterruptibly();
1289         lock.lock();
1290         try {
1291             for (;;) {
1292                 IRunnableScheduledFuture first = queue[0];
1293                 if (first is null) {
1294                     if (nanos <= Duration.zero)
1295                         return null;
1296                     else
1297                         available.wait(nanos); // nanos = 
1298                 } else {
1299                     Duration delay = first.getDelay();
1300                     if (delay <= Duration.zero)
1301                         return finishPoll(first);
1302                     if (nanos <= Duration.zero)
1303                         return null;
1304                     first = null; // don't retain ref while waiting
1305                     if (nanos < delay || leader !is null)
1306                         available.wait(nanos); // nanos = 
1307                     else {
1308                         ThreadEx thisThread = ThreadEx.currentThread();
1309                         leader = thisThread;
1310                         try {
1311                             available.wait(delay);
1312                             nanos -= delay;
1313                             // long timeLeft = available.wait(delay);
1314                             // nanos -= delay - timeLeft;
1315                         } finally {
1316                             if (leader == thisThread)
1317                                 leader = null;
1318                         }
1319                     }
1320                 }
1321             }
1322         } finally {
1323             if (leader is null && queue[0] !is null)
1324                 available.notify();
1325             lock.unlock();
1326         }
1327     }
1328 
1329     override void clear() {
1330         ReentrantLock lock = this.lock;
1331         lock.lock();
1332         try {
1333             for (int i = 0; i < size; i++) {
1334                 IRunnableScheduledFuture t = queue[i];
1335                 if (t !is null) {
1336                     queue[i] = null;
1337                     setIndex(t, -1);
1338                 }
1339             }
1340             _size = 0;
1341         } finally {
1342             lock.unlock();
1343         }
1344     }
1345 
1346     int drainTo(Collection!(Runnable) c) {
1347         return drainTo(c, int.max);
1348     }
1349 
1350     int drainTo(Collection!(Runnable) c, int maxElements) {
1351         // Objects.requireNonNull(c);
1352 
1353         if (c == this)
1354             throw new IllegalArgumentException();
1355         if (maxElements <= 0)
1356             return 0;
1357         ReentrantLock lock = this.lock;
1358         lock.lock();
1359         try {
1360             int n = 0;
1361             for (IRunnableScheduledFuture first;
1362                  n < maxElements
1363                      && (first = queue[0]) !is null
1364                      && first.getDelay() <= Duration.zero;) {
1365                 c.add(first);   // In this order, in case add() throws.
1366                 finishPoll(first);
1367                 ++n;
1368             }
1369             return n;
1370         } finally {
1371             lock.unlock();
1372         }
1373     }
1374 
1375     override Runnable[] toArray() {
1376         ReentrantLock lock = this.lock;
1377         lock.lock();
1378         try {
1379             Runnable[] r = new Runnable[_size];
1380             for(int i=0; i<_size; i++) {
1381                 r[i] = queue[i];
1382             }
1383             return r;
1384 
1385         } finally {
1386             lock.unlock();
1387         }
1388     }
1389 
1390     override int opApply(scope int delegate(ref Runnable) dg) {
1391        if(dg is null)
1392             throw new NullPointerException();
1393         ReentrantLock lock = this.lock;
1394         lock.lock();
1395         scope(exit) lock.unlock();
1396 
1397         int result = 0;
1398         foreach(int i; 0.._size) {
1399             Runnable v = queue[i];
1400             result = dg(v);
1401             if(result != 0) return result;
1402         }
1403         return result;
1404     }
1405 
1406 
1407     // Iterator!(Runnable) iterator() {
1408     //     ReentrantLock lock = this.lock;
1409     //     lock.lock();
1410     //     try {
1411     //         return new Itr(Arrays.copyOf(queue, size));
1412     //     } finally {
1413     //         lock.unlock();
1414     //     }
1415     // }
1416 
1417     /**
1418      * Snapshot iterator that works off copy of underlying q array.
1419      */
1420     // private class Itr : Iterator!(Runnable) {
1421     //     final IRunnableScheduledFuture[] array;
1422     //     int cursor;        // index of next element to return; initially 0
1423     //     int lastRet = -1;  // index of last element returned; -1 if no such
1424 
1425     //     this(IRunnableScheduledFuture[] array) {
1426     //         this.array = array;
1427     //     }
1428 
1429     //     bool hasNext() {
1430     //         return cursor < array.length;
1431     //     }
1432 
1433     //     Runnable next() {
1434     //         if (cursor >= array.length)
1435     //             throw new NoSuchElementException();
1436     //         return array[lastRet = cursor++];
1437     //     }
1438 
1439     //     void remove() {
1440     //         if (lastRet < 0)
1441     //             throw new IllegalStateException();
1442     //         DelayedWorkQueue.this.remove(array[lastRet]);
1443     //         lastRet = -1;
1444     //     }
1445     // }
1446 
1447     override bool opEquals(IObject o) {
1448         return opEquals(cast(Object) o);
1449     }
1450 
1451     override bool opEquals(Object o) {
1452         return super.opEquals(o);
1453     }
1454     
1455     override string toString() {
1456         return super.toString();
1457     }
1458 
1459     override size_t toHash() @trusted nothrow {
1460         return super.toHash();
1461     }
1462 }