1 /*
2  * Hunt - A refined core library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net/
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module hunt.concurrency.ScheduledThreadPoolExecutor;
13 
14 import hunt.concurrency.atomic.AtomicHelper;
15 import hunt.concurrency.BlockingQueue;
16 import hunt.concurrency.Delayed;
17 import hunt.concurrency.Future;
18 import hunt.concurrency.FutureTask;
19 import hunt.concurrency.ScheduledExecutorService;
20 import hunt.concurrency.thread;
21 import hunt.concurrency.ThreadFactory;
22 import hunt.concurrency.ThreadPoolExecutor;
23 
24 import hunt.collection;
25 import hunt.Exceptions;
26 import hunt.util.Common;
27 import hunt.util.DateTime;
28 import hunt.Object;
29 import hunt.logging.ConsoleLogger;
30 // import core.time;
31 
32 import core.atomic;
33 import core.sync.condition;
34 import core.sync.mutex;
35 
36 import std.datetime;
37 // import hunt.collection.AbstractQueue;
38 // import java.util.Arrays;
39 // import hunt.collection.Collection;
40 // import hunt.collection.Iterator;
41 // import java.util.List;
42 // import java.util.NoSuchElementException;
43 // import java.util.Objects;
44 // import hunt.concurrency.atomic.AtomicLong;
45 // import hunt.concurrency.locks.Condition;
46 // import hunt.concurrency.locks.ReentrantLock;
47 
48 alias ReentrantLock = Mutex;
49 
50 interface IScheduledFutureTask {
51     void heapIndex(int index);
52     int heapIndex();
53 }
54 
55 /**
56  * A {@link ThreadPoolExecutor} that can additionally schedule
57  * commands to run after a given delay, or to execute periodically.
58  * This class is preferable to {@link java.util.Timer} when multiple
59  * worker threads are needed, or when the additional flexibility or
60  * capabilities of {@link ThreadPoolExecutor} (which this class
61  * extends) are required.
62  *
63  * <p>Delayed tasks execute no sooner than they are enabled, but
64  * without any real-time guarantees about when, after they are
65  * enabled, they will commence. Tasks scheduled for exactly the same
66  * execution time are enabled in first-in-first-out (FIFO) order of
67  * submission.
68  *
69  * <p>When a submitted task is cancelled before it is run, execution
70  * is suppressed.  By default, such a cancelled task is not
71  * automatically removed from the work queue until its delay elapses.
72  * While this enables further inspection and monitoring, it may also
73  * cause unbounded retention of cancelled tasks.  To avoid this, use
74  * {@link #setRemoveOnCancelPolicy} to cause tasks to be immediately
75  * removed from the work queue at time of cancellation.
76  *
77  * <p>Successive executions of a periodic task scheduled via
78  * {@link #scheduleAtFixedRate scheduleAtFixedRate} or
79  * {@link #scheduleWithFixedDelay scheduleWithFixedDelay}
80  * do not overlap. While different executions may be performed by
81  * different threads, the effects of prior executions
82  * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
83  * those of subsequent ones.
84  *
85  * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
86  * of the inherited tuning methods are not useful for it. In
87  * particular, because it acts as a fixed-sized pool using
88  * {@code corePoolSize} threads and an unbounded queue, adjustments
89  * to {@code maximumPoolSize} have no useful effect. Additionally, it
90  * is almost never a good idea to set {@code corePoolSize} to zero or
91  * use {@code allowCoreThreadTimeOut} because this may leave the pool
92  * without threads to handle tasks once they become eligible to run.
93  *
94  * <p>As with {@code ThreadPoolExecutor}, if not otherwise specified,
95  * this class uses {@link Executors#defaultThreadFactory} as the
96  * default thread factory, and {@link ThreadPoolExecutor.AbortPolicy}
97  * as the default rejected execution handler.
98  *
99  * <p><b>Extension notes:</b> This class overrides the
100  * {@link ThreadPoolExecutor#execute(Runnable) execute} and
101  * {@link AbstractExecutorService#submit(Runnable) submit}
102  * methods to generate internal {@link ScheduledFuture} objects to
103  * control per-task delays and scheduling.  To preserve
104  * functionality, any further overrides of these methods in
105  * subclasses must invoke superclass versions, which effectively
106  * disables additional task customization.  However, this class
107  * provides alternative protected extension method
108  * {@code decorateTask} (one version each for {@code Runnable} and
109  * {@code Callable}) that can be used to customize the concrete task
110  * types used to execute commands entered via {@code execute},
111  * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate},
112  * and {@code scheduleWithFixedDelay}.  By default, a
113  * {@code ScheduledThreadPoolExecutor} uses a task type extending
114  * {@link FutureTask}. However, this may be modified or replaced using
115  * subclasses of the form:
116  *
117  * <pre> {@code
118  * class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
119  *
120  *   static class CustomTask!(V) : RunnableScheduledFuture!(V) { ... }
121  *
122  *   protected !(V) RunnableScheduledFuture!(V) decorateTask(
123  *                Runnable r, RunnableScheduledFuture!(V) task) {
124  *       return new CustomTask!(V)(r, task);
125  *   }
126  *
127  *   protected !(V) RunnableScheduledFuture!(V) decorateTask(
128  *                Callable!(V) c, RunnableScheduledFuture!(V) task) {
129  *       return new CustomTask!(V)(c, task);
130  *   }
131  *   // ... add constructors, etc.
132  * }}</pre>
133  *
134  * @author Doug Lea
135  */
136 class ScheduledThreadPoolExecutor : ThreadPoolExecutor, ScheduledExecutorService {
137 
138     /*
139      * This class specializes ThreadPoolExecutor implementation by
140      *
141      * 1. Using a custom task type ScheduledFutureTask, even for tasks
142      *    that don't require scheduling because they are submitted
143      *    using ExecutorService rather than ScheduledExecutorService
144      *    methods, which are treated as tasks with a delay of zero.
145      *
146      * 2. Using a custom queue (DelayedWorkQueue), a variant of
147      *    unbounded DelayQueue. The lack of capacity constraint and
148      *    the fact that corePoolSize and maximumPoolSize are
149      *    effectively identical simplifies some execution mechanics
150      *    (see delayedExecute) compared to ThreadPoolExecutor.
151      *
152      * 3. Supporting optional run-after-shutdown parameters, which
153      *    leads to overrides of shutdown methods to remove and cancel
154      *    tasks that should NOT be run after shutdown, as well as
155      *    different recheck logic when task (re)submission overlaps
156      *    with a shutdown.
157      *
158      * 4. Task decoration methods to allow interception and
159      *    instrumentation, which are needed because subclasses cannot
160      *    otherwise override submit methods to get this effect. These
161      *    don't have any impact on pool control logic though.
162      */
163 
164     /**
165      * False if should cancel/suppress periodic tasks on shutdown.
166      */
167     private bool continueExistingPeriodicTasksAfterShutdown;
168 
169     /**
170      * False if should cancel non-periodic not-yet-expired tasks on shutdown.
171      */
172     private bool executeExistingDelayedTasksAfterShutdown = true;
173 
174     /**
175      * True if ScheduledFutureTask.cancel should remove from queue.
176      */
177     bool removeOnCancel;
178 
179     /**
180      * Sequence number to break scheduling ties, and in turn to
181      * guarantee FIFO order among tied entries.
182      */
183     private shared static long sequencer; //= new AtomicLong();
184 
185     /**
186      * Returns true if can run a task given current run state and
187      * run-after-shutdown parameters.
188      */
189     bool canRunInCurrentRunState(V)(RunnableScheduledFuture!V task) {
190         if (!isShutdown())
191             return true;
192         if (isStopped())
193             return false;
194         return task.isPeriodic()
195             ? continueExistingPeriodicTasksAfterShutdown
196             : (executeExistingDelayedTasksAfterShutdown
197                || task.getDelay() <= Duration.zero);
198     }
199 
200     /**
201      * Main execution method for delayed or periodic tasks.  If pool
202      * is shut down, rejects the task. Otherwise adds task to queue
203      * and starts a thread, if necessary, to run it.  (We cannot
204      * prestart the thread to run the task because the task (probably)
205      * shouldn't be run yet.)  If the pool is shut down while the task
206      * is being added, cancel and remove it if required by state and
207      * run-after-shutdown parameters.
208      *
209      * @param task the task
210      */
211     private void delayedExecute(V)(RunnableScheduledFuture!V task) {
212         if (isShutdown())
213             reject(task);
214         else {
215             super.getQueue().add(task);
216             if (!canRunInCurrentRunState(task) && remove(task))
217                 task.cancel(false);
218             else
219                 ensurePrestart();
220         }
221     }
222 
223     /**
224      * Requeues a periodic task unless current run state precludes it.
225      * Same idea as delayedExecute except drops task rather than rejecting.
226      *
227      * @param task the task
228      */
229     void reExecutePeriodic(V)(RunnableScheduledFuture!V task) {
230         if (canRunInCurrentRunState(task)) {
231             super.getQueue().add(task);
232             if (canRunInCurrentRunState(task) || !remove(task)) {
233                 ensurePrestart();
234                 return;
235             }
236         }
237         task.cancel(false);
238     }
239 
240     /**
241      * Cancels and clears the queue of all tasks that should not be run
242      * due to shutdown policy.  Invoked within super.shutdown.
243      */
244     override void onShutdown() {
245         BlockingQueue!(Runnable) q = super.getQueue();
246         bool keepDelayed =
247             getExecuteExistingDelayedTasksAfterShutdownPolicy();
248         bool keepPeriodic =
249             getContinueExistingPeriodicTasksAfterShutdownPolicy();
250         // Traverse snapshot to avoid iterator exceptions
251         // TODO: implement and use efficient removeIf
252         // super.getQueue().removeIf(...);
253         version(HUNT_DEBUG) tracef("Shuting down..., BlockingQueue size: %d", q.size());
254         foreach (Runnable e ; q.toArray()) {
255             if(e is null) {
256                 warning("e is null");
257             } else {
258                 version(HUNT_DEBUG) trace(typeid(cast(Object)e));
259                 IRunnableScheduledFuture t = cast(IRunnableScheduledFuture)e;
260                 if (t !is null) {
261                     if ((t.isPeriodic()
262                          ? !keepPeriodic
263                          : (!keepDelayed && t.getDelay() > Duration.zero))
264                         || t.isCancelled()) { // also remove if already cancelled
265                         if (q.remove(t))
266                             t.cancel(false);
267                     }
268                 } else {
269                     warning("t is null");
270                 }
271             }
272                 
273         }
274         tryTerminate();
275     }
276 
277     /**
278      * Modifies or replaces the task used to execute a runnable.
279      * This method can be used to override the concrete
280      * class used for managing internal tasks.
281      * The default implementation simply returns the given task.
282      *
283      * @param runnable the submitted Runnable
284      * @param task the task created to execute the runnable
285      * @param (V) the type of the task's result
286      * @return a task that can execute the runnable
287      */
288     protected RunnableScheduledFuture!(V) decorateTask(V) (
289         Runnable runnable, RunnableScheduledFuture!(V) task) {
290         return task;
291     }
292 
293     /**
294      * Modifies or replaces the task used to execute a callable.
295      * This method can be used to override the concrete
296      * class used for managing internal tasks.
297      * The default implementation simply returns the given task.
298      *
299      * @param callable the submitted Callable
300      * @param task the task created to execute the callable
301      * @param (V) the type of the task's result
302      * @return a task that can execute the callable
303      */
304     protected RunnableScheduledFuture!(V) decorateTask(V)(
305         Callable!(V) callable, RunnableScheduledFuture!(V) task) {
306         return task;
307     }
308 
309     /**
310      * The default keep-alive time for pool threads.
311      *
312      * Normally, this value is unused because all pool threads will be
313      * core threads, but if a user creates a pool with a corePoolSize
314      * of zero (against our advice), we keep a thread alive as long as
315      * there are queued tasks.  If the keep alive time is zero (the
316      * historic value), we end up hot-spinning in getTask, wasting a
317      * CPU.  But on the other hand, if we set the value too high, and
318      * users create a one-shot pool which they don't cleanly shutdown,
319      * the pool's non-daemon threads will prevent JVM termination.  A
320      * small but non-zero value (relative to a JVM's lifetime) seems
321      * best.
322      */
323     private enum long DEFAULT_KEEPALIVE_MILLIS = 10L;
324 
325     /**
326      * Creates a new {@code ScheduledThreadPoolExecutor} with the
327      * given core pool size.
328      *
329      * @param corePoolSize the number of threads to keep in the pool, even
330      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
331      * @throws IllegalArgumentException if {@code corePoolSize < 0}
332      */
333     this(int corePoolSize) {
334         super(corePoolSize, int.max, dur!(TimeUnit.Millisecond)(DEFAULT_KEEPALIVE_MILLIS),
335               new DelayedWorkQueue());
336     }
337 
338     /**
339      * Creates a new {@code ScheduledThreadPoolExecutor} with the
340      * given initial parameters.
341      *
342      * @param corePoolSize the number of threads to keep in the pool, even
343      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
344      * @param threadFactory the factory to use when the executor
345      *        creates a new thread
346      * @throws IllegalArgumentException if {@code corePoolSize < 0}
347      * @throws NullPointerException if {@code threadFactory} is null
348      */
349     this(int corePoolSize, ThreadFactory threadFactory) {
350         super(corePoolSize, int.max,
351               dur!(TimeUnit.Millisecond)(DEFAULT_KEEPALIVE_MILLIS),
352               new DelayedWorkQueue(), threadFactory);
353     }
354 
355     /**
356      * Creates a new {@code ScheduledThreadPoolExecutor} with the
357      * given initial parameters.
358      *
359      * @param corePoolSize the number of threads to keep in the pool, even
360      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
361      * @param handler the handler to use when execution is blocked
362      *        because the thread bounds and queue capacities are reached
363      * @throws IllegalArgumentException if {@code corePoolSize < 0}
364      * @throws NullPointerException if {@code handler} is null
365      */
366     this(int corePoolSize, RejectedExecutionHandler handler) {
367         super(corePoolSize, int.max,
368               dur!(TimeUnit.Millisecond)(DEFAULT_KEEPALIVE_MILLIS),
369               new DelayedWorkQueue(), handler);
370     }
371 
372     /**
373      * Creates a new {@code ScheduledThreadPoolExecutor} with the
374      * given initial parameters.
375      *
376      * @param corePoolSize the number of threads to keep in the pool, even
377      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
378      * @param threadFactory the factory to use when the executor
379      *        creates a new thread
380      * @param handler the handler to use when execution is blocked
381      *        because the thread bounds and queue capacities are reached
382      * @throws IllegalArgumentException if {@code corePoolSize < 0}
383      * @throws NullPointerException if {@code threadFactory} or
384      *         {@code handler} is null
385      */
386     this(int corePoolSize, ThreadFactory threadFactory,
387                                        RejectedExecutionHandler handler) {
388         super(corePoolSize, int.max,
389               dur!(TimeUnit.Millisecond)(DEFAULT_KEEPALIVE_MILLIS),
390               new DelayedWorkQueue(), threadFactory, handler);
391     }
392 
393     /**
394      * Returns the nanoTime-based trigger time of a delayed action.
395      */
396     private long triggerTime(Duration delay) {
397         return triggerTime(delay.isNegative ? 0 : delay.total!(TimeUnit.HectoNanosecond)());
398     }
399 
400     /**
401      * Returns the nanoTime-based trigger time of a delayed action.
402      */
403     long triggerTime(long delay) {
404         return Clock.currStdTime +
405             ((delay < (long.max >> 1)) ? delay : overflowFree(delay));
406     }
407 
408     /**
409      * Constrains the values of all delays in the queue to be within
410      * long.max of each other, to avoid overflow in compareTo.
411      * This may occur if a task is eligible to be dequeued, but has
412      * not yet been, while some other task is added with a delay of
413      * long.max.
414      */
415     private long overflowFree(long delay) {
416         Delayed head = cast(Delayed) super.getQueue().peek();
417         if (head !is null) {
418             long headDelay = head.getDelay().total!(TimeUnit.HectoNanosecond)();
419             if (headDelay < 0 && (delay - headDelay < 0))
420                 delay = long.max + headDelay;
421         }
422         return delay;
423     }
424 
425     /**
426      * @throws RejectedExecutionException {@inheritDoc}
427      * @throws NullPointerException       {@inheritDoc}
428      */
429     ScheduledFuture!(void) schedule(Runnable command, Duration delay) {
430         if (command is null)
431             throw new NullPointerException();
432         long n = atomicOp!"+="(sequencer, 1);
433         n--;
434         // RunnableScheduledFuture!(void) t = decorateTask(command,
435         //     new ScheduledFutureTask!(void)(command, cast(void)null, triggerTime(delay), n, this));
436         RunnableScheduledFuture!(void) t = decorateTask(command,
437             new ScheduledFutureTask!(void)(command, triggerTime(delay), n, this));        
438         delayedExecute!(void)(t);
439         return t;
440     }
441 
442     /**
443      * @throws RejectedExecutionException {@inheritDoc}
444      * @throws NullPointerException       {@inheritDoc}
445      */
446     ScheduledFuture!(V) schedule(V)(Callable!(V) callable, Duration delay) {
447         if (callable is null)
448             throw new NullPointerException();
449         RunnableScheduledFuture!(V) t = decorateTask(callable,
450             new ScheduledFutureTask!(V)(callable,
451                                        triggerTime(delay),
452                                        cast(long)AtomicHelper.getAndIncrement(sequencer), this));
453         delayedExecute(t);
454         return t;
455     }
456 
457     /**
458      * Submits a periodic action that becomes enabled first after the
459      * given initial delay, and subsequently with the given period;
460      * that is, executions will commence after
461      * {@code initialDelay}, then {@code initialDelay + period}, then
462      * {@code initialDelay + 2 * period}, and so on.
463      *
464      * <p>The sequence of task executions continues indefinitely until
465      * one of the following exceptional completions occur:
466      * <ul>
467      * <li>The task is {@linkplain Future#cancel explicitly cancelled}
468      * via the returned future.
469      * <li>Method {@link #shutdown} is called and the {@linkplain
470      * #getContinueExistingPeriodicTasksAfterShutdownPolicy policy on
471      * whether to continue after shutdown} is not set true, or method
472      * {@link #shutdownNow} is called; also resulting in task
473      * cancellation.
474      * <li>An execution of the task throws an exception.  In this case
475      * calling {@link Future#get() get} on the returned future will throw
476      * {@link ExecutionException}, holding the exception as its cause.
477      * </ul>
478      * Subsequent executions are suppressed.  Subsequent calls to
479      * {@link Future#isDone isDone()} on the returned future will
480      * return {@code true}.
481      *
482      * <p>If any execution of this task takes longer than its period, then
483      * subsequent executions may start late, but will not concurrently
484      * execute.
485      *
486      * @throws RejectedExecutionException {@inheritDoc}
487      * @throws NullPointerException       {@inheritDoc}
488      * @throws IllegalArgumentException   {@inheritDoc}
489      */
490     ScheduledFuture!void scheduleAtFixedRate(Runnable command,
491                                                   Duration initialDelay,
492                                                   Duration period) {
493         if (command is null)
494             throw new NullPointerException();
495         if (period <= Duration.zero)
496             throw new IllegalArgumentException();
497 
498         ScheduledFutureTask!(void) sft =
499             new ScheduledFutureTask!(void)(command, // cast(void)null,
500                                           triggerTime(initialDelay),
501                                           period.total!(TimeUnit.HectoNanosecond)(), 
502                                           cast(long)AtomicHelper.getAndIncrement(sequencer), this);        
503         RunnableScheduledFuture!(void) t = decorateTask(command, sft);
504         sft.outerTask = t;
505         delayedExecute(t);
506         return t;
507     }
508 
509     /**
510      * Submits a periodic action that becomes enabled first after the
511      * given initial delay, and subsequently with the given delay
512      * between the termination of one execution and the commencement of
513      * the next.
514      *
515      * <p>The sequence of task executions continues indefinitely until
516      * one of the following exceptional completions occur:
517      * <ul>
518      * <li>The task is {@linkplain Future#cancel explicitly cancelled}
519      * via the returned future.
520      * <li>Method {@link #shutdown} is called and the {@linkplain
521      * #getContinueExistingPeriodicTasksAfterShutdownPolicy policy on
522      * whether to continue after shutdown} is not set true, or method
523      * {@link #shutdownNow} is called; also resulting in task
524      * cancellation.
525      * <li>An execution of the task throws an exception.  In this case
526      * calling {@link Future#get() get} on the returned future will throw
527      * {@link ExecutionException}, holding the exception as its cause.
528      * </ul>
529      * Subsequent executions are suppressed.  Subsequent calls to
530      * {@link Future#isDone isDone()} on the returned future will
531      * return {@code true}.
532      *
533      * @throws RejectedExecutionException {@inheritDoc}
534      * @throws NullPointerException       {@inheritDoc}
535      * @throws IllegalArgumentException   {@inheritDoc}
536      */
537     ScheduledFuture!(void) scheduleWithFixedDelay(Runnable command,
538                                                      Duration initialDelay,
539                                                      Duration delay) {
540         if (command is null)
541             throw new NullPointerException();
542         if (delay <= Duration.zero)
543             throw new IllegalArgumentException();
544         ScheduledFutureTask!(void) sft =
545             new ScheduledFutureTask!(void)(command, // cast(void)null,
546                                           triggerTime(initialDelay),
547                                           -delay.total!(TimeUnit.HectoNanosecond)(),
548                                           cast(long)AtomicHelper.getAndIncrement(sequencer), this);
549         RunnableScheduledFuture!(void) t = decorateTask(command, sft);
550         sft.outerTask = t;
551         delayedExecute(t);
552         return t;
553     }
554 
555     /**
556      * Executes {@code command} with zero required delay.
557      * This has effect equivalent to
558      * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
559      * Note that inspections of the queue and of the list returned by
560      * {@code shutdownNow} will access the zero-delayed
561      * {@link ScheduledFuture}, not the {@code command} itself.
562      *
563      * <p>A consequence of the use of {@code ScheduledFuture} objects is
564      * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
565      * called with a null second {@code Throwable} argument, even if the
566      * {@code command} terminated abruptly.  Instead, the {@code Throwable}
567      * thrown by such a task can be obtained via {@link Future#get}.
568      *
569      * @throws RejectedExecutionException at discretion of
570      *         {@code RejectedExecutionHandler}, if the task
571      *         cannot be accepted for execution because the
572      *         executor has been shut down
573      * @throws NullPointerException {@inheritDoc}
574      */
575     override void execute(Runnable command) {
576         schedule(command, Duration.zero);
577     }
578 
579     // Override AbstractExecutorService methods
580 
581     /**
582      * @throws RejectedExecutionException {@inheritDoc}
583      * @throws NullPointerException       {@inheritDoc}
584      */
585     override Future!void submit(Runnable task) {
586         return schedule(task, Duration.zero);
587     }
588 
589     /**
590      * @throws RejectedExecutionException {@inheritDoc}
591      * @throws NullPointerException       {@inheritDoc}
592      */
593     Future!(T) submit(T)(Runnable task, T result) {
594         return schedule(Executors.callable(task, result), Duration.zero);
595     }
596 
597     /**
598      * @throws RejectedExecutionException {@inheritDoc}
599      * @throws NullPointerException       {@inheritDoc}
600      */
601     Future!(T) submit(T)(Callable!(T) task) {
602         return schedule(task, Duration.zero);
603     }
604 
605     /**
606      * Sets the policy on whether to continue executing existing
607      * periodic tasks even when this executor has been {@code shutdown}.
608      * In this case, executions will continue until {@code shutdownNow}
609      * or the policy is set to {@code false} when already shutdown.
610      * This value is by default {@code false}.
611      *
612      * @param value if {@code true}, continue after shutdown, else don't
613      * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
614      */
615     void setContinueExistingPeriodicTasksAfterShutdownPolicy(bool value) {
616         continueExistingPeriodicTasksAfterShutdown = value;
617         if (!value && isShutdown())
618             onShutdown();
619     }
620 
621     /**
622      * Gets the policy on whether to continue executing existing
623      * periodic tasks even when this executor has been {@code shutdown}.
624      * In this case, executions will continue until {@code shutdownNow}
625      * or the policy is set to {@code false} when already shutdown.
626      * This value is by default {@code false}.
627      *
628      * @return {@code true} if will continue after shutdown
629      * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
630      */
631     bool getContinueExistingPeriodicTasksAfterShutdownPolicy() {
632         return continueExistingPeriodicTasksAfterShutdown;
633     }
634 
635     /**
636      * Sets the policy on whether to execute existing delayed
637      * tasks even when this executor has been {@code shutdown}.
638      * In this case, these tasks will only terminate upon
639      * {@code shutdownNow}, or after setting the policy to
640      * {@code false} when already shutdown.
641      * This value is by default {@code true}.
642      *
643      * @param value if {@code true}, execute after shutdown, else don't
644      * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
645      */
646     void setExecuteExistingDelayedTasksAfterShutdownPolicy(bool value) {
647         executeExistingDelayedTasksAfterShutdown = value;
648         if (!value && isShutdown())
649             onShutdown();
650     }
651 
652     /**
653      * Gets the policy on whether to execute existing delayed
654      * tasks even when this executor has been {@code shutdown}.
655      * In this case, these tasks will only terminate upon
656      * {@code shutdownNow}, or after setting the policy to
657      * {@code false} when already shutdown.
658      * This value is by default {@code true}.
659      *
660      * @return {@code true} if will execute after shutdown
661      * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
662      */
663     bool getExecuteExistingDelayedTasksAfterShutdownPolicy() {
664         return executeExistingDelayedTasksAfterShutdown;
665     }
666 
667     /**
668      * Sets the policy on whether cancelled tasks should be immediately
669      * removed from the work queue at time of cancellation.  This value is
670      * by default {@code false}.
671      *
672      * @param value if {@code true}, remove on cancellation, else don't
673      * @see #getRemoveOnCancelPolicy
674      */
675     void setRemoveOnCancelPolicy(bool value) {
676         removeOnCancel = value;
677     }
678 
679     /**
680      * Gets the policy on whether cancelled tasks should be immediately
681      * removed from the work queue at time of cancellation.  This value is
682      * by default {@code false}.
683      *
684      * @return {@code true} if cancelled tasks are immediately removed
685      *         from the queue
686      * @see #setRemoveOnCancelPolicy
687      */
688     bool getRemoveOnCancelPolicy() {
689         return removeOnCancel;
690     }
691 
692     /**
693      * Initiates an orderly shutdown in which previously submitted
694      * tasks are executed, but no new tasks will be accepted.
695      * Invocation has no additional effect if already shut down.
696      *
697      * <p>This method does not wait for previously submitted tasks to
698      * complete execution.  Use {@link #awaitTermination awaitTermination}
699      * to do that.
700      *
701      * <p>If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy}
702      * has been set {@code false}, existing delayed tasks whose delays
703      * have not yet elapsed are cancelled.  And unless the {@code
704      * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set
705      * {@code true}, future executions of existing periodic tasks will
706      * be cancelled.
707      *
708      * @throws SecurityException {@inheritDoc}
709      */
710     override void shutdown() {
711         super.shutdown();
712     }
713 
714     /**
715      * Attempts to stop all actively executing tasks, halts the
716      * processing of waiting tasks, and returns a list of the tasks
717      * that were awaiting execution. These tasks are drained (removed)
718      * from the task queue upon return from this method.
719      *
720      * <p>This method does not wait for actively executing tasks to
721      * terminate.  Use {@link #awaitTermination awaitTermination} to
722      * do that.
723      *
724      * <p>There are no guarantees beyond best-effort attempts to stop
725      * processing actively executing tasks.  This implementation
726      * interrupts tasks via {@link Thread#interrupt}; any task that
727      * fails to respond to interrupts may never terminate.
728      *
729      * @return list of tasks that never commenced execution.
730      *         Each element of this list is a {@link ScheduledFuture}.
731      *         For tasks submitted via one of the {@code schedule}
732      *         methods, the element will be identical to the returned
733      *         {@code ScheduledFuture}.  For tasks submitted using
734      *         {@link #execute execute}, the element will be a
735      *         zero-delay {@code ScheduledFuture}.
736      * @throws SecurityException {@inheritDoc}
737      */
738     override List!(Runnable) shutdownNow() {
739         return super.shutdownNow();
740     }
741 
742     /**
743      * Returns the task queue used by this executor.  Access to the
744      * task queue is intended primarily for debugging and monitoring.
745      * This queue may be in active use.  Retrieving the task queue
746      * does not prevent queued tasks from executing.
747      *
748      * <p>Each element of this queue is a {@link ScheduledFuture}.
749      * For tasks submitted via one of the {@code schedule} methods, the
750      * element will be identical to the returned {@code ScheduledFuture}.
751      * For tasks submitted using {@link #execute execute}, the element
752      * will be a zero-delay {@code ScheduledFuture}.
753      *
754      * <p>Iteration over this queue is <em>not</em> guaranteed to traverse
755      * tasks in the order in which they will execute.
756      *
757      * @return the task queue
758      */
759     override BlockingQueue!(Runnable) getQueue() {
760         return super.getQueue();
761     }
762 }
763 
764 
765 /**
766 */
767 private class ScheduledFutureTask(V) : FutureTask!(V) , 
768     RunnableScheduledFuture!(V), IScheduledFutureTask {
769 
770     /** Sequence number to break ties FIFO */
771     private long sequenceNumber;
772 
773     /** The nanoTime-based time when the task is enabled to execute. */
774     private long time;
775 
776     /**
777      * Period for repeating tasks, in nanoseconds.
778      * A positive value indicates fixed-rate execution.
779      * A negative value indicates fixed-delay execution.
780      * A value of 0 indicates a non-repeating (one-shot) task.
781      */
782     private long period;
783 
784     /** The actual task to be re-enqueued by reExecutePeriodic */
785     RunnableScheduledFuture!(V) outerTask; // = this;
786     ScheduledThreadPoolExecutor poolExecutor;
787 
788     /**
789      * Index into delay queue, to support faster cancellation.
790      */
791     int _heapIndex;
792 
793 static if(is(V == void)) {         
794     this(Runnable r, long triggerTime,
795                         long sequenceNumber, ScheduledThreadPoolExecutor poolExecutor) {
796         super(r);
797         this.time = triggerTime;
798         this.period = 0;
799         this.sequenceNumber = sequenceNumber;
800         this.poolExecutor = poolExecutor;
801         initializeMembers();
802     }        
803 
804     /**
805      * Creates a periodic action with given nanoTime-based initial
806      * trigger time and period.
807      */
808     this(Runnable r, long triggerTime,
809                         long period, long sequenceNumber, ScheduledThreadPoolExecutor poolExecutor) {
810         super(r);
811         this.time = triggerTime;
812         this.period = period;
813         this.sequenceNumber = sequenceNumber;
814         this.poolExecutor = poolExecutor;
815         initializeMembers();
816     }
817 } else {
818 
819     /**
820      * Creates a one-shot action with given nanoTime-based trigger time.
821      */
822     this(Runnable r, V result, long triggerTime,
823                         long sequenceNumber, ScheduledThreadPoolExecutor poolExecutor) {
824         super(r, result);
825         this.time = triggerTime;
826         this.period = 0;
827         this.sequenceNumber = sequenceNumber;
828         this.poolExecutor = poolExecutor;
829         initializeMembers();
830     }           
831 
832     /**
833      * Creates a periodic action with given nanoTime-based initial
834      * trigger time and period.
835      */
836     this(Runnable r, V result, long triggerTime,
837                         long period, long sequenceNumber, ScheduledThreadPoolExecutor poolExecutor) {
838         super(r, result);
839         this.time = triggerTime;
840         this.period = period;
841         this.sequenceNumber = sequenceNumber;
842         this.poolExecutor = poolExecutor;
843         initializeMembers();
844     } 
845 }
846 
847     /**
848      * Creates a one-shot action with given nanoTime-based trigger time.
849      */
850     this(Callable!(V) callable, long triggerTime,
851                         long sequenceNumber, ScheduledThreadPoolExecutor poolExecutor) {
852         super(callable);
853         this.time = triggerTime;
854         this.period = 0;
855         this.sequenceNumber = sequenceNumber;
856         this.poolExecutor = poolExecutor;
857         initializeMembers();
858     }
859 
860     private void initializeMembers() {
861         outerTask = this;
862     }
863     
864     void heapIndex(int index) {
865         _heapIndex = index;
866     }
867 
868     int heapIndex() {
869         return _heapIndex;
870     }
871 
872     Duration getDelay() {
873         return dur!(TimeUnit.HectoNanosecond)(time - Clock.currStdTime()); 
874     }
875 
876     int opCmp(Delayed other) {
877         if (other == this) // compare zero if same object
878             return 0;
879         ScheduledFutureTask!V x = cast(ScheduledFutureTask!V)other;
880         if (x !is null) {
881             long diff = time - x.time;
882             if (diff < 0)
883                 return -1;
884             else if (diff > 0)
885                 return 1;
886             else if (sequenceNumber < x.sequenceNumber)
887                 return -1;
888             else
889                 return 1;
890         }
891         Duration diff = getDelay() - other.getDelay();
892         return (diff.isNegative) ? -1 : (diff > Duration.zero) ? 1 : 0;
893     }
894 
895     /**
896      * Returns {@code true} if this is a periodic (not a one-shot) action.
897      *
898      * @return {@code true} if periodic
899      */
900     bool isPeriodic() {
901         return period != 0;
902     }
903 
904     /**
905      * Sets the next time to run for a periodic task.
906      */
907     private void setNextRunTime() {
908         long p = period;
909         if (p > 0)
910             time += p;
911         else
912             time = poolExecutor.triggerTime(-p);
913     }
914 
915     override bool cancel(bool mayInterruptIfRunning) {
916         // The racy read of heapIndex below is benign:
917         // if heapIndex < 0, then OOTA guarantees that we have surely
918         // been removed; else we recheck under lock in remove()
919         bool cancelled = super.cancel(mayInterruptIfRunning);
920         if (cancelled && poolExecutor.removeOnCancel && heapIndex >= 0)
921             poolExecutor.remove(this);
922         return cancelled;
923     }
924 
925     /**
926      * Overrides FutureTask version so as to reset/requeue if periodic.
927      */
928     override void run() {
929         if (!poolExecutor.canRunInCurrentRunState(this))
930             cancel(false);
931         else if (!isPeriodic())
932             super.run();
933         else if (super.runAndReset()) {
934             setNextRunTime();
935             poolExecutor.reExecutePeriodic(outerTask);
936         }
937     }
938 
939     // alias from FutureTask
940     // alias isCancelled = FutureTask!V.isCancelled;
941     // alias isDone = FutureTask!V.isDone;
942     alias get = FutureTask!V.get;
943     
944     override bool isCancelled() {
945         return super.isCancelled();
946     }
947 
948     override bool isDone() {
949         return super.isDone();
950     }
951 
952     override V get() {
953         return super.get();
954     }
955 
956     override V get(Duration timeout) {
957         return super.get(timeout);
958     }
959 }
960 
961 
962 /**
963  * Specialized delay queue. To mesh with TPE declarations, this
964  * class must be declared as a BlockingQueue!(Runnable) even though
965  * it can only hold RunnableScheduledFutures.
966  */
967 class DelayedWorkQueue : AbstractQueue!(Runnable), BlockingQueue!(Runnable) {
968 
969     /*
970      * A DelayedWorkQueue is based on a heap-based data structure
971      * like those in DelayQueue and PriorityQueue, except that
972      * every ScheduledFutureTask also records its index into the
973      * heap array. This eliminates the need to find a task upon
974      * cancellation, greatly speeding up removal (down from O(n)
975      * to O(log n)), and reducing garbage retention that would
976      * otherwise occur by waiting for the element to rise to top
977      * before clearing. But because the queue may also hold
978      * RunnableScheduledFutures that are not ScheduledFutureTasks,
979      * we are not guaranteed to have such indices available, in
980      * which case we fall back to linear search. (We expect that
981      * most tasks will not be decorated, and that the faster cases
982      * will be much more common.)
983      *
984      * All heap operations must record index changes -- mainly
985      * within siftUp and siftDown. Upon removal, a task's
986      * heapIndex is set to -1. Note that ScheduledFutureTasks can
987      * appear at most once in the queue (this need not be true for
988      * other kinds of tasks or work queues), so are uniquely
989      * identified by heapIndex.
990      */
991 
992     private enum int INITIAL_CAPACITY = 16;
993     private IRunnableScheduledFuture[] queue;
994     private ReentrantLock lock;
995     private int _size;
996 
997     /**
998      * Thread designated to wait for the task at the head of the
999      * queue.  This variant of the Leader-Follower pattern
1000      * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
1001      * minimize unnecessary timed waiting.  When a thread becomes
1002      * the leader, it waits only for the next delay to elapse, but
1003      * other threads await indefinitely.  The leader thread must
1004      * signal some other thread before returning from take() or
1005      * poll(...), unless some other thread becomes leader in the
1006      * interim.  Whenever the head of the queue is replaced with a
1007      * task with an earlier expiration time, the leader field is
1008      * invalidated by being reset to null, and some waiting
1009      * thread, but not necessarily the current leader, is
1010      * signalled.  So waiting threads must be prepared to acquire
1011      * and lose leadership while waiting.
1012      */
1013     private ThreadEx leader;
1014 
1015     /**
1016      * Condition signalled when a newer task becomes available at the
1017      * head of the queue or a new thread may need to become leader.
1018      */
1019     private Condition available;
1020 
1021     this() {
1022         initializeMembers();
1023     }
1024 
1025     private void initializeMembers() {
1026         lock = new ReentrantLock();
1027         available = new Condition(lock);
1028         queue = new IRunnableScheduledFuture[INITIAL_CAPACITY];
1029     }
1030 
1031     /**
1032      * Sets f's heapIndex if it is a ScheduledFutureTask.
1033      */
1034     private static void setIndex(IRunnableScheduledFuture f, int idx) {
1035         IScheduledFutureTask t = cast(IScheduledFutureTask)f;
1036         // tracef("index=%d, type: %s", idx, typeid(cast(Object)t));
1037         if (t !is null)
1038             t.heapIndex = idx;
1039     }
1040 
1041     /**
1042      * Sifts element added at bottom up to its heap-ordered spot.
1043      * Call only when holding lock.
1044      */
1045     private void siftUp(int k, IRunnableScheduledFuture key) {
1046         while (k > 0) {
1047             int parent = (k - 1) >>> 1;
1048             IRunnableScheduledFuture e = queue[parent];
1049             if (key >= e)
1050                 break;
1051             queue[k] = e;
1052             setIndex(e, k);
1053             k = parent;
1054         }
1055         // tracef("k=%d, key is null: %s", k, key is null);
1056         queue[k] = key;
1057         setIndex(key, k);
1058     }
1059 
1060     /**
1061      * Sifts element added at top down to its heap-ordered spot.
1062      * Call only when holding lock.
1063      */
1064     private void siftDown(int k, IRunnableScheduledFuture key) {
1065         int half = size >>> 1;
1066         while (k < half) {
1067             int child = (k << 1) + 1;
1068             IRunnableScheduledFuture c = queue[child];
1069             int right = child + 1;
1070             if (right < size && c.opCmp(queue[right]) > 0)
1071                 c = queue[child = right];
1072             if (key.opCmp(c) <= 0)
1073                 break;
1074             queue[k] = c;
1075             setIndex(c, k);
1076             k = child;
1077         }
1078         queue[k] = key;
1079         setIndex(key, k);
1080     }
1081 
1082     /**
1083      * Resizes the heap array.  Call only when holding lock.
1084      */
1085     private void grow() {
1086         size_t oldCapacity = queue.length;
1087         size_t newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
1088         if (newCapacity < 0) // overflow
1089             newCapacity = int.max;
1090         queue.length = newCapacity;
1091     }
1092 
1093     /**
1094      * Finds index of given object, or -1 if absent.
1095      */
1096     private int indexOf(Runnable x) {
1097         if (x !is null) {
1098             IScheduledFutureTask sf = cast(IScheduledFutureTask) x;
1099             if (sf !is null) {
1100                 int i = sf.heapIndex;
1101                 // Sanity check; x could conceivably be a
1102                 // ScheduledFutureTask from some other pool.
1103                 if (i >= 0 && i < size && queue[i] == x)
1104                     return i;
1105             } else {
1106                 for (int i = 0; i < size; i++) {
1107                     // if (x.opEquals(cast(Object)queue[i]))
1108                     if(x is queue[i])
1109                         return i;
1110                 }
1111             }
1112         }
1113         return -1;
1114     }
1115 
1116     override bool contains(Runnable x) {
1117         ReentrantLock lock = this.lock;
1118         lock.lock();
1119         try {
1120             return indexOf(x) != -1;
1121         } finally {
1122             lock.unlock();
1123         }
1124     }
1125 
1126     override bool remove(Runnable x) {
1127         ReentrantLock lock = this.lock;
1128         trace(cast(Object)x);
1129         lock.lock();
1130         try {
1131             int i = indexOf(x);
1132             if (i < 0)
1133                 return false;
1134 
1135             setIndex(queue[i], -1);
1136             int s = --_size;
1137             IRunnableScheduledFuture replacement = queue[s];
1138             queue[s] = null;
1139             if (s != i) {
1140                 siftDown(i, replacement);
1141                 if (queue[i] == replacement)
1142                     siftUp(i, replacement);
1143             }
1144             return true;
1145         } finally {
1146             lock.unlock();
1147         }
1148     }
1149 
1150     override int size() {
1151         // return _size;
1152         ReentrantLock lock = this.lock;
1153         lock.lock();
1154         try {
1155             return _size;
1156         } finally {
1157             lock.unlock();
1158         }
1159     }
1160 
1161     override bool isEmpty() {
1162         return size() == 0;
1163     }
1164 
1165     int remainingCapacity() {
1166         return int.max;
1167     }
1168 
1169     IRunnableScheduledFuture peek() {
1170         ReentrantLock lock = this.lock;
1171         lock.lock();
1172         try {
1173             return queue[0];
1174         } finally {
1175             lock.unlock();
1176         }
1177     }
1178 
1179     bool offer(Runnable x) {
1180         if (x is null)
1181             throw new NullPointerException();
1182         IRunnableScheduledFuture e = cast(IRunnableScheduledFuture)x;
1183         ReentrantLock lock = this.lock;
1184         lock.lock();
1185         try {
1186             int i = _size;
1187             if (i >= queue.length)
1188                 grow();
1189             _size = i + 1;
1190             if (i == 0) {
1191                 queue[0] = e;
1192                 setIndex(e, 0);
1193             } else {
1194                 siftUp(i, e);
1195             }
1196             if (queue[0] == e) {
1197                 leader = null;
1198                 available.notify();
1199             }
1200         } finally {
1201             lock.unlock();
1202         }
1203         return true;
1204     }
1205 
1206     override void put(Runnable e) {
1207         offer(e);
1208     }
1209 
1210     override bool add(Runnable e) {
1211         return offer(e);
1212     }
1213 
1214     bool offer(Runnable e, Duration timeout) {
1215         return offer(e);
1216     }
1217 
1218     /**
1219      * Performs common bookkeeping for poll and take: Replaces
1220      * first element with last and sifts it down.  Call only when
1221      * holding lock.
1222      * @param f the task to remove and return
1223      */
1224     private IRunnableScheduledFuture finishPoll(IRunnableScheduledFuture f) {
1225         int s = --_size;
1226         IRunnableScheduledFuture x = queue[s];
1227         queue[s] = null;
1228         if (s != 0)
1229             siftDown(0, x);
1230         setIndex(f, -1);
1231         return f;
1232     }
1233 
1234     IRunnableScheduledFuture poll() {
1235         ReentrantLock lock = this.lock;
1236         lock.lock();
1237         try {
1238             IRunnableScheduledFuture first = queue[0];
1239             return (first is null || first.getDelay() > Duration.zero)
1240                 ? null
1241                 : finishPoll(first);
1242         } finally {
1243             lock.unlock();
1244         }
1245     }
1246 
1247     IRunnableScheduledFuture take() {
1248         ReentrantLock lock = this.lock;
1249         // lock.lockInterruptibly();
1250         lock.lock();
1251         try {
1252             for (;;) {
1253                 IRunnableScheduledFuture first = queue[0];
1254                 if (first is null)
1255                     available.wait();
1256                 else {
1257                     Duration delay = first.getDelay();
1258                     if (delay <= Duration.zero)
1259                         return finishPoll(first);
1260                     first = null; // don't retain ref while waiting
1261                     if (leader !is null)
1262                         available.wait();
1263                     else {
1264                         ThreadEx thisThread = ThreadEx.currentThread();
1265                         leader = thisThread;
1266                         try {
1267                             available.wait(delay);
1268                         } finally {
1269                             if (leader == thisThread)
1270                                 leader = null;
1271                         }
1272                     }
1273                 }
1274             }
1275         } finally {
1276             if (leader is null && queue[0] !is null)
1277                 available.notify();
1278             lock.unlock();
1279         }
1280     }
1281 
1282     IRunnableScheduledFuture poll(Duration timeout) {
1283         // long nanos = total!(TimeUnit.HectoNanosecond)(timeout);
1284         Duration nanos = timeout;
1285         ReentrantLock lock = this.lock;
1286         // lock.lockInterruptibly();
1287         lock.lock();
1288         try {
1289             for (;;) {
1290                 IRunnableScheduledFuture first = queue[0];
1291                 if (first is null) {
1292                     if (nanos <= Duration.zero)
1293                         return null;
1294                     else
1295                         available.wait(nanos); // nanos = 
1296                 } else {
1297                     Duration delay = first.getDelay();
1298                     if (delay <= Duration.zero)
1299                         return finishPoll(first);
1300                     if (nanos <= Duration.zero)
1301                         return null;
1302                     first = null; // don't retain ref while waiting
1303                     if (nanos < delay || leader !is null)
1304                         available.wait(nanos); // nanos = 
1305                     else {
1306                         ThreadEx thisThread = ThreadEx.currentThread();
1307                         leader = thisThread;
1308                         try {
1309                             available.wait(delay);
1310                             nanos -= delay;
1311                             // long timeLeft = available.wait(delay);
1312                             // nanos -= delay - timeLeft;
1313                         } finally {
1314                             if (leader == thisThread)
1315                                 leader = null;
1316                         }
1317                     }
1318                 }
1319             }
1320         } finally {
1321             if (leader is null && queue[0] !is null)
1322                 available.notify();
1323             lock.unlock();
1324         }
1325     }
1326 
1327     override void clear() {
1328         ReentrantLock lock = this.lock;
1329         lock.lock();
1330         try {
1331             for (int i = 0; i < size; i++) {
1332                 IRunnableScheduledFuture t = queue[i];
1333                 if (t !is null) {
1334                     queue[i] = null;
1335                     setIndex(t, -1);
1336                 }
1337             }
1338             _size = 0;
1339         } finally {
1340             lock.unlock();
1341         }
1342     }
1343 
1344     int drainTo(Collection!(Runnable) c) {
1345         return drainTo(c, int.max);
1346     }
1347 
1348     int drainTo(Collection!(Runnable) c, int maxElements) {
1349         // Objects.requireNonNull(c);
1350 
1351         if (c == this)
1352             throw new IllegalArgumentException();
1353         if (maxElements <= 0)
1354             return 0;
1355         ReentrantLock lock = this.lock;
1356         lock.lock();
1357         try {
1358             int n = 0;
1359             for (IRunnableScheduledFuture first;
1360                  n < maxElements
1361                      && (first = queue[0]) !is null
1362                      && first.getDelay() <= Duration.zero;) {
1363                 c.add(first);   // In this order, in case add() throws.
1364                 finishPoll(first);
1365                 ++n;
1366             }
1367             return n;
1368         } finally {
1369             lock.unlock();
1370         }
1371     }
1372 
1373     override Runnable[] toArray() {
1374         ReentrantLock lock = this.lock;
1375         lock.lock();
1376         try {
1377             Runnable[] r = new Runnable[_size];
1378             for(int i=0; i<_size; i++) {
1379                 r[i] = queue[i];
1380             }
1381             return r;
1382 
1383         } finally {
1384             lock.unlock();
1385         }
1386     }
1387 
1388     override int opApply(scope int delegate(ref Runnable) dg) {
1389        if(dg is null)
1390             throw new NullPointerException();
1391         ReentrantLock lock = this.lock;
1392         lock.lock();
1393         scope(exit) lock.unlock();
1394 
1395         int result = 0;
1396         foreach(int i; 0.._size) {
1397             Runnable v = queue[i];
1398             result = dg(v);
1399             if(result != 0) return result;
1400         }
1401         return result;
1402     }
1403 
1404 
1405     // Iterator!(Runnable) iterator() {
1406     //     ReentrantLock lock = this.lock;
1407     //     lock.lock();
1408     //     try {
1409     //         return new Itr(Arrays.copyOf(queue, size));
1410     //     } finally {
1411     //         lock.unlock();
1412     //     }
1413     // }
1414 
1415     /**
1416      * Snapshot iterator that works off copy of underlying q array.
1417      */
1418     // private class Itr : Iterator!(Runnable) {
1419     //     final IRunnableScheduledFuture[] array;
1420     //     int cursor;        // index of next element to return; initially 0
1421     //     int lastRet = -1;  // index of last element returned; -1 if no such
1422 
1423     //     this(IRunnableScheduledFuture[] array) {
1424     //         this.array = array;
1425     //     }
1426 
1427     //     bool hasNext() {
1428     //         return cursor < array.length;
1429     //     }
1430 
1431     //     Runnable next() {
1432     //         if (cursor >= array.length)
1433     //             throw new NoSuchElementException();
1434     //         return array[lastRet = cursor++];
1435     //     }
1436 
1437     //     void remove() {
1438     //         if (lastRet < 0)
1439     //             throw new IllegalStateException();
1440     //         DelayedWorkQueue.this.remove(array[lastRet]);
1441     //         lastRet = -1;
1442     //     }
1443     // }
1444 
1445     override bool opEquals(IObject o) {
1446         return opEquals(cast(Object) o);
1447     }
1448 
1449     override bool opEquals(Object o) {
1450         return super.opEquals(o);
1451     }
1452     
1453     override string toString() {
1454         return super.toString();
1455     }
1456 
1457     override size_t toHash() @trusted nothrow {
1458         return super.toHash();
1459     }
1460 }