1 /* 2 * Hunt - A refined core library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.concurrency.ScheduledThreadPoolExecutor; 13 14 import hunt.concurrency.atomic.AtomicHelper; 15 import hunt.concurrency.BlockingQueue; 16 import hunt.concurrency.Delayed; 17 import hunt.concurrency.Future; 18 import hunt.concurrency.FutureTask; 19 import hunt.concurrency.ScheduledExecutorService; 20 import hunt.concurrency.thread; 21 import hunt.concurrency.ThreadFactory; 22 import hunt.concurrency.ThreadPoolExecutor; 23 24 import hunt.collection; 25 import hunt.Exceptions; 26 import hunt.util.Common; 27 import hunt.util.DateTime; 28 import hunt.Object; 29 import hunt.logging.ConsoleLogger; 30 // import core.time; 31 32 import core.atomic; 33 import core.sync.condition; 34 import core.sync.mutex; 35 36 import std.datetime; 37 // import hunt.collection.AbstractQueue; 38 // import java.util.Arrays; 39 // import hunt.collection.Collection; 40 // import hunt.collection.Iterator; 41 // import java.util.List; 42 // import java.util.NoSuchElementException; 43 // import java.util.Objects; 44 // import hunt.concurrency.atomic.AtomicLong; 45 // import hunt.concurrency.locks.Condition; 46 // import hunt.concurrency.locks.ReentrantLock; 47 48 alias ReentrantLock = Mutex; 49 50 interface IScheduledFutureTask { 51 void heapIndex(int index); 52 int heapIndex(); 53 } 54 55 /** 56 * A {@link ThreadPoolExecutor} that can additionally schedule 57 * commands to run after a given delay, or to execute periodically. 58 * This class is preferable to {@link java.util.Timer} when multiple 59 * worker threads are needed, or when the additional flexibility or 60 * capabilities of {@link ThreadPoolExecutor} (which this class 61 * extends) are required. 62 * 63 * <p>Delayed tasks execute no sooner than they are enabled, but 64 * without any real-time guarantees about when, after they are 65 * enabled, they will commence. Tasks scheduled for exactly the same 66 * execution time are enabled in first-in-first-out (FIFO) order of 67 * submission. 68 * 69 * <p>When a submitted task is cancelled before it is run, execution 70 * is suppressed. By default, such a cancelled task is not 71 * automatically removed from the work queue until its delay elapses. 72 * While this enables further inspection and monitoring, it may also 73 * cause unbounded retention of cancelled tasks. To avoid this, use 74 * {@link #setRemoveOnCancelPolicy} to cause tasks to be immediately 75 * removed from the work queue at time of cancellation. 76 * 77 * <p>Successive executions of a periodic task scheduled via 78 * {@link #scheduleAtFixedRate scheduleAtFixedRate} or 79 * {@link #scheduleWithFixedDelay scheduleWithFixedDelay} 80 * do not overlap. While different executions may be performed by 81 * different threads, the effects of prior executions 82 * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a> 83 * those of subsequent ones. 84 * 85 * <p>While this class inherits from {@link ThreadPoolExecutor}, a few 86 * of the inherited tuning methods are not useful for it. In 87 * particular, because it acts as a fixed-sized pool using 88 * {@code corePoolSize} threads and an unbounded queue, adjustments 89 * to {@code maximumPoolSize} have no useful effect. Additionally, it 90 * is almost never a good idea to set {@code corePoolSize} to zero or 91 * use {@code allowCoreThreadTimeOut} because this may leave the pool 92 * without threads to handle tasks once they become eligible to run. 93 * 94 * <p>As with {@code ThreadPoolExecutor}, if not otherwise specified, 95 * this class uses {@link Executors#defaultThreadFactory} as the 96 * default thread factory, and {@link ThreadPoolExecutor.AbortPolicy} 97 * as the default rejected execution handler. 98 * 99 * <p><b>Extension notes:</b> This class overrides the 100 * {@link ThreadPoolExecutor#execute(Runnable) execute} and 101 * {@link AbstractExecutorService#submit(Runnable) submit} 102 * methods to generate internal {@link ScheduledFuture} objects to 103 * control per-task delays and scheduling. To preserve 104 * functionality, any further overrides of these methods in 105 * subclasses must invoke superclass versions, which effectively 106 * disables additional task customization. However, this class 107 * provides alternative protected extension method 108 * {@code decorateTask} (one version each for {@code Runnable} and 109 * {@code Callable}) that can be used to customize the concrete task 110 * types used to execute commands entered via {@code execute}, 111 * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate}, 112 * and {@code scheduleWithFixedDelay}. By default, a 113 * {@code ScheduledThreadPoolExecutor} uses a task type extending 114 * {@link FutureTask}. However, this may be modified or replaced using 115 * subclasses of the form: 116 * 117 * <pre> {@code 118 * class CustomScheduledExecutor extends ScheduledThreadPoolExecutor { 119 * 120 * static class CustomTask!(V) : RunnableScheduledFuture!(V) { ... } 121 * 122 * protected !(V) RunnableScheduledFuture!(V) decorateTask( 123 * Runnable r, RunnableScheduledFuture!(V) task) { 124 * return new CustomTask!(V)(r, task); 125 * } 126 * 127 * protected !(V) RunnableScheduledFuture!(V) decorateTask( 128 * Callable!(V) c, RunnableScheduledFuture!(V) task) { 129 * return new CustomTask!(V)(c, task); 130 * } 131 * // ... add constructors, etc. 132 * }}</pre> 133 * 134 * @author Doug Lea 135 */ 136 class ScheduledThreadPoolExecutor : ThreadPoolExecutor, ScheduledExecutorService { 137 138 /* 139 * This class specializes ThreadPoolExecutor implementation by 140 * 141 * 1. Using a custom task type ScheduledFutureTask, even for tasks 142 * that don't require scheduling because they are submitted 143 * using ExecutorService rather than ScheduledExecutorService 144 * methods, which are treated as tasks with a delay of zero. 145 * 146 * 2. Using a custom queue (DelayedWorkQueue), a variant of 147 * unbounded DelayQueue. The lack of capacity constraint and 148 * the fact that corePoolSize and maximumPoolSize are 149 * effectively identical simplifies some execution mechanics 150 * (see delayedExecute) compared to ThreadPoolExecutor. 151 * 152 * 3. Supporting optional run-after-shutdown parameters, which 153 * leads to overrides of shutdown methods to remove and cancel 154 * tasks that should NOT be run after shutdown, as well as 155 * different recheck logic when task (re)submission overlaps 156 * with a shutdown. 157 * 158 * 4. Task decoration methods to allow interception and 159 * instrumentation, which are needed because subclasses cannot 160 * otherwise override submit methods to get this effect. These 161 * don't have any impact on pool control logic though. 162 */ 163 164 /** 165 * False if should cancel/suppress periodic tasks on shutdown. 166 */ 167 private bool continueExistingPeriodicTasksAfterShutdown; 168 169 /** 170 * False if should cancel non-periodic not-yet-expired tasks on shutdown. 171 */ 172 private bool executeExistingDelayedTasksAfterShutdown = true; 173 174 /** 175 * True if ScheduledFutureTask.cancel should remove from queue. 176 */ 177 bool removeOnCancel; 178 179 /** 180 * Sequence number to break scheduling ties, and in turn to 181 * guarantee FIFO order among tied entries. 182 */ 183 private shared static long sequencer; //= new AtomicLong(); 184 185 /** 186 * Returns true if can run a task given current run state and 187 * run-after-shutdown parameters. 188 */ 189 bool canRunInCurrentRunState(V)(RunnableScheduledFuture!V task) { 190 if (!isShutdown()) 191 return true; 192 if (isStopped()) 193 return false; 194 return task.isPeriodic() 195 ? continueExistingPeriodicTasksAfterShutdown 196 : (executeExistingDelayedTasksAfterShutdown 197 || task.getDelay() <= Duration.zero); 198 } 199 200 /** 201 * Main execution method for delayed or periodic tasks. If pool 202 * is shut down, rejects the task. Otherwise adds task to queue 203 * and starts a thread, if necessary, to run it. (We cannot 204 * prestart the thread to run the task because the task (probably) 205 * shouldn't be run yet.) If the pool is shut down while the task 206 * is being added, cancel and remove it if required by state and 207 * run-after-shutdown parameters. 208 * 209 * @param task the task 210 */ 211 private void delayedExecute(V)(RunnableScheduledFuture!V task) { 212 if (isShutdown()) 213 reject(task); 214 else { 215 super.getQueue().add(task); 216 if (!canRunInCurrentRunState(task) && remove(task)) 217 task.cancel(false); 218 else 219 ensurePrestart(); 220 } 221 } 222 223 /** 224 * Requeues a periodic task unless current run state precludes it. 225 * Same idea as delayedExecute except drops task rather than rejecting. 226 * 227 * @param task the task 228 */ 229 void reExecutePeriodic(V)(RunnableScheduledFuture!V task) { 230 if (canRunInCurrentRunState(task)) { 231 super.getQueue().add(task); 232 if (canRunInCurrentRunState(task) || !remove(task)) { 233 ensurePrestart(); 234 return; 235 } 236 } 237 task.cancel(false); 238 } 239 240 /** 241 * Cancels and clears the queue of all tasks that should not be run 242 * due to shutdown policy. Invoked within super.shutdown. 243 */ 244 override void onShutdown() { 245 BlockingQueue!(Runnable) q = super.getQueue(); 246 bool keepDelayed = 247 getExecuteExistingDelayedTasksAfterShutdownPolicy(); 248 bool keepPeriodic = 249 getContinueExistingPeriodicTasksAfterShutdownPolicy(); 250 // Traverse snapshot to avoid iterator exceptions 251 // TODO: implement and use efficient removeIf 252 // super.getQueue().removeIf(...); 253 version(HUNT_DEBUG) tracef("Shuting down..., BlockingQueue size: %d", q.size()); 254 foreach (Runnable e ; q.toArray()) { 255 if(e is null) { 256 warning("e is null"); 257 } else { 258 version(HUNT_DEBUG) trace(typeid(cast(Object)e)); 259 IRunnableScheduledFuture t = cast(IRunnableScheduledFuture)e; 260 if (t !is null) { 261 if ((t.isPeriodic() 262 ? !keepPeriodic 263 : (!keepDelayed && t.getDelay() > Duration.zero)) 264 || t.isCancelled()) { // also remove if already cancelled 265 if (q.remove(t)) 266 t.cancel(false); 267 } 268 } else { 269 warning("t is null"); 270 } 271 } 272 273 } 274 tryTerminate(); 275 } 276 277 /** 278 * Modifies or replaces the task used to execute a runnable. 279 * This method can be used to override the concrete 280 * class used for managing internal tasks. 281 * The default implementation simply returns the given task. 282 * 283 * @param runnable the submitted Runnable 284 * @param task the task created to execute the runnable 285 * @param (V) the type of the task's result 286 * @return a task that can execute the runnable 287 */ 288 protected RunnableScheduledFuture!(V) decorateTask(V) ( 289 Runnable runnable, RunnableScheduledFuture!(V) task) { 290 return task; 291 } 292 293 /** 294 * Modifies or replaces the task used to execute a callable. 295 * This method can be used to override the concrete 296 * class used for managing internal tasks. 297 * The default implementation simply returns the given task. 298 * 299 * @param callable the submitted Callable 300 * @param task the task created to execute the callable 301 * @param (V) the type of the task's result 302 * @return a task that can execute the callable 303 */ 304 protected RunnableScheduledFuture!(V) decorateTask(V)( 305 Callable!(V) callable, RunnableScheduledFuture!(V) task) { 306 return task; 307 } 308 309 /** 310 * The default keep-alive time for pool threads. 311 * 312 * Normally, this value is unused because all pool threads will be 313 * core threads, but if a user creates a pool with a corePoolSize 314 * of zero (against our advice), we keep a thread alive as long as 315 * there are queued tasks. If the keep alive time is zero (the 316 * historic value), we end up hot-spinning in getTask, wasting a 317 * CPU. But on the other hand, if we set the value too high, and 318 * users create a one-shot pool which they don't cleanly shutdown, 319 * the pool's non-daemon threads will prevent JVM termination. A 320 * small but non-zero value (relative to a JVM's lifetime) seems 321 * best. 322 */ 323 private enum long DEFAULT_KEEPALIVE_MILLIS = 10L; 324 325 /** 326 * Creates a new {@code ScheduledThreadPoolExecutor} with the 327 * given core pool size. 328 * 329 * @param corePoolSize the number of threads to keep in the pool, even 330 * if they are idle, unless {@code allowCoreThreadTimeOut} is set 331 * @throws IllegalArgumentException if {@code corePoolSize < 0} 332 */ 333 this(int corePoolSize) { 334 super(corePoolSize, int.max, dur!(TimeUnit.Millisecond)(DEFAULT_KEEPALIVE_MILLIS), 335 new DelayedWorkQueue()); 336 } 337 338 /** 339 * Creates a new {@code ScheduledThreadPoolExecutor} with the 340 * given initial parameters. 341 * 342 * @param corePoolSize the number of threads to keep in the pool, even 343 * if they are idle, unless {@code allowCoreThreadTimeOut} is set 344 * @param threadFactory the factory to use when the executor 345 * creates a new thread 346 * @throws IllegalArgumentException if {@code corePoolSize < 0} 347 * @throws NullPointerException if {@code threadFactory} is null 348 */ 349 this(int corePoolSize, ThreadFactory threadFactory) { 350 super(corePoolSize, int.max, 351 dur!(TimeUnit.Millisecond)(DEFAULT_KEEPALIVE_MILLIS), 352 new DelayedWorkQueue(), threadFactory); 353 } 354 355 /** 356 * Creates a new {@code ScheduledThreadPoolExecutor} with the 357 * given initial parameters. 358 * 359 * @param corePoolSize the number of threads to keep in the pool, even 360 * if they are idle, unless {@code allowCoreThreadTimeOut} is set 361 * @param handler the handler to use when execution is blocked 362 * because the thread bounds and queue capacities are reached 363 * @throws IllegalArgumentException if {@code corePoolSize < 0} 364 * @throws NullPointerException if {@code handler} is null 365 */ 366 this(int corePoolSize, RejectedExecutionHandler handler) { 367 super(corePoolSize, int.max, 368 dur!(TimeUnit.Millisecond)(DEFAULT_KEEPALIVE_MILLIS), 369 new DelayedWorkQueue(), handler); 370 } 371 372 /** 373 * Creates a new {@code ScheduledThreadPoolExecutor} with the 374 * given initial parameters. 375 * 376 * @param corePoolSize the number of threads to keep in the pool, even 377 * if they are idle, unless {@code allowCoreThreadTimeOut} is set 378 * @param threadFactory the factory to use when the executor 379 * creates a new thread 380 * @param handler the handler to use when execution is blocked 381 * because the thread bounds and queue capacities are reached 382 * @throws IllegalArgumentException if {@code corePoolSize < 0} 383 * @throws NullPointerException if {@code threadFactory} or 384 * {@code handler} is null 385 */ 386 this(int corePoolSize, ThreadFactory threadFactory, 387 RejectedExecutionHandler handler) { 388 super(corePoolSize, int.max, 389 dur!(TimeUnit.Millisecond)(DEFAULT_KEEPALIVE_MILLIS), 390 new DelayedWorkQueue(), threadFactory, handler); 391 } 392 393 /** 394 * Returns the nanoTime-based trigger time of a delayed action. 395 */ 396 private long triggerTime(Duration delay) { 397 return triggerTime(delay.isNegative ? 0 : delay.total!(TimeUnit.HectoNanosecond)()); 398 } 399 400 /** 401 * Returns the nanoTime-based trigger time of a delayed action. 402 */ 403 long triggerTime(long delay) { 404 return Clock.currStdTime + 405 ((delay < (long.max >> 1)) ? delay : overflowFree(delay)); 406 } 407 408 /** 409 * Constrains the values of all delays in the queue to be within 410 * long.max of each other, to avoid overflow in compareTo. 411 * This may occur if a task is eligible to be dequeued, but has 412 * not yet been, while some other task is added with a delay of 413 * long.max. 414 */ 415 private long overflowFree(long delay) { 416 Delayed head = cast(Delayed) super.getQueue().peek(); 417 if (head !is null) { 418 long headDelay = head.getDelay().total!(TimeUnit.HectoNanosecond)(); 419 if (headDelay < 0 && (delay - headDelay < 0)) 420 delay = long.max + headDelay; 421 } 422 return delay; 423 } 424 425 /** 426 * @throws RejectedExecutionException {@inheritDoc} 427 * @throws NullPointerException {@inheritDoc} 428 */ 429 ScheduledFuture!(void) schedule(Runnable command, Duration delay) { 430 if (command is null) 431 throw new NullPointerException(); 432 long n = atomicOp!"+="(sequencer, 1); 433 n--; 434 // RunnableScheduledFuture!(void) t = decorateTask(command, 435 // new ScheduledFutureTask!(void)(command, cast(void)null, triggerTime(delay), n, this)); 436 RunnableScheduledFuture!(void) t = decorateTask(command, 437 new ScheduledFutureTask!(void)(command, triggerTime(delay), n, this)); 438 delayedExecute!(void)(t); 439 return t; 440 } 441 442 /** 443 * @throws RejectedExecutionException {@inheritDoc} 444 * @throws NullPointerException {@inheritDoc} 445 */ 446 ScheduledFuture!(V) schedule(V)(Callable!(V) callable, Duration delay) { 447 if (callable is null) 448 throw new NullPointerException(); 449 RunnableScheduledFuture!(V) t = decorateTask(callable, 450 new ScheduledFutureTask!(V)(callable, 451 triggerTime(delay), 452 cast(long)AtomicHelper.getAndIncrement(sequencer), this)); 453 delayedExecute(t); 454 return t; 455 } 456 457 /** 458 * Submits a periodic action that becomes enabled first after the 459 * given initial delay, and subsequently with the given period; 460 * that is, executions will commence after 461 * {@code initialDelay}, then {@code initialDelay + period}, then 462 * {@code initialDelay + 2 * period}, and so on. 463 * 464 * <p>The sequence of task executions continues indefinitely until 465 * one of the following exceptional completions occur: 466 * <ul> 467 * <li>The task is {@linkplain Future#cancel explicitly cancelled} 468 * via the returned future. 469 * <li>Method {@link #shutdown} is called and the {@linkplain 470 * #getContinueExistingPeriodicTasksAfterShutdownPolicy policy on 471 * whether to continue after shutdown} is not set true, or method 472 * {@link #shutdownNow} is called; also resulting in task 473 * cancellation. 474 * <li>An execution of the task throws an exception. In this case 475 * calling {@link Future#get() get} on the returned future will throw 476 * {@link ExecutionException}, holding the exception as its cause. 477 * </ul> 478 * Subsequent executions are suppressed. Subsequent calls to 479 * {@link Future#isDone isDone()} on the returned future will 480 * return {@code true}. 481 * 482 * <p>If any execution of this task takes longer than its period, then 483 * subsequent executions may start late, but will not concurrently 484 * execute. 485 * 486 * @throws RejectedExecutionException {@inheritDoc} 487 * @throws NullPointerException {@inheritDoc} 488 * @throws IllegalArgumentException {@inheritDoc} 489 */ 490 ScheduledFuture!void scheduleAtFixedRate(Runnable command, 491 Duration initialDelay, 492 Duration period) { 493 if (command is null) 494 throw new NullPointerException(); 495 if (period <= Duration.zero) 496 throw new IllegalArgumentException(); 497 498 ScheduledFutureTask!(void) sft = 499 new ScheduledFutureTask!(void)(command, // cast(void)null, 500 triggerTime(initialDelay), 501 period.total!(TimeUnit.HectoNanosecond)(), 502 cast(long)AtomicHelper.getAndIncrement(sequencer), this); 503 RunnableScheduledFuture!(void) t = decorateTask(command, sft); 504 sft.outerTask = t; 505 delayedExecute(t); 506 return t; 507 } 508 509 /** 510 * Submits a periodic action that becomes enabled first after the 511 * given initial delay, and subsequently with the given delay 512 * between the termination of one execution and the commencement of 513 * the next. 514 * 515 * <p>The sequence of task executions continues indefinitely until 516 * one of the following exceptional completions occur: 517 * <ul> 518 * <li>The task is {@linkplain Future#cancel explicitly cancelled} 519 * via the returned future. 520 * <li>Method {@link #shutdown} is called and the {@linkplain 521 * #getContinueExistingPeriodicTasksAfterShutdownPolicy policy on 522 * whether to continue after shutdown} is not set true, or method 523 * {@link #shutdownNow} is called; also resulting in task 524 * cancellation. 525 * <li>An execution of the task throws an exception. In this case 526 * calling {@link Future#get() get} on the returned future will throw 527 * {@link ExecutionException}, holding the exception as its cause. 528 * </ul> 529 * Subsequent executions are suppressed. Subsequent calls to 530 * {@link Future#isDone isDone()} on the returned future will 531 * return {@code true}. 532 * 533 * @throws RejectedExecutionException {@inheritDoc} 534 * @throws NullPointerException {@inheritDoc} 535 * @throws IllegalArgumentException {@inheritDoc} 536 */ 537 ScheduledFuture!(void) scheduleWithFixedDelay(Runnable command, 538 Duration initialDelay, 539 Duration delay) { 540 if (command is null) 541 throw new NullPointerException(); 542 if (delay <= Duration.zero) 543 throw new IllegalArgumentException(); 544 ScheduledFutureTask!(void) sft = 545 new ScheduledFutureTask!(void)(command, // cast(void)null, 546 triggerTime(initialDelay), 547 -delay.total!(TimeUnit.HectoNanosecond)(), 548 cast(long)AtomicHelper.getAndIncrement(sequencer), this); 549 RunnableScheduledFuture!(void) t = decorateTask(command, sft); 550 sft.outerTask = t; 551 delayedExecute(t); 552 return t; 553 } 554 555 /** 556 * Executes {@code command} with zero required delay. 557 * This has effect equivalent to 558 * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}. 559 * Note that inspections of the queue and of the list returned by 560 * {@code shutdownNow} will access the zero-delayed 561 * {@link ScheduledFuture}, not the {@code command} itself. 562 * 563 * <p>A consequence of the use of {@code ScheduledFuture} objects is 564 * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always 565 * called with a null second {@code Throwable} argument, even if the 566 * {@code command} terminated abruptly. Instead, the {@code Throwable} 567 * thrown by such a task can be obtained via {@link Future#get}. 568 * 569 * @throws RejectedExecutionException at discretion of 570 * {@code RejectedExecutionHandler}, if the task 571 * cannot be accepted for execution because the 572 * executor has been shut down 573 * @throws NullPointerException {@inheritDoc} 574 */ 575 override void execute(Runnable command) { 576 schedule(command, Duration.zero); 577 } 578 579 // Override AbstractExecutorService methods 580 581 /** 582 * @throws RejectedExecutionException {@inheritDoc} 583 * @throws NullPointerException {@inheritDoc} 584 */ 585 override Future!void submit(Runnable task) { 586 return schedule(task, Duration.zero); 587 } 588 589 /** 590 * @throws RejectedExecutionException {@inheritDoc} 591 * @throws NullPointerException {@inheritDoc} 592 */ 593 Future!(T) submit(T)(Runnable task, T result) { 594 return schedule(Executors.callable(task, result), Duration.zero); 595 } 596 597 /** 598 * @throws RejectedExecutionException {@inheritDoc} 599 * @throws NullPointerException {@inheritDoc} 600 */ 601 Future!(T) submit(T)(Callable!(T) task) { 602 return schedule(task, Duration.zero); 603 } 604 605 /** 606 * Sets the policy on whether to continue executing existing 607 * periodic tasks even when this executor has been {@code shutdown}. 608 * In this case, executions will continue until {@code shutdownNow} 609 * or the policy is set to {@code false} when already shutdown. 610 * This value is by default {@code false}. 611 * 612 * @param value if {@code true}, continue after shutdown, else don't 613 * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy 614 */ 615 void setContinueExistingPeriodicTasksAfterShutdownPolicy(bool value) { 616 continueExistingPeriodicTasksAfterShutdown = value; 617 if (!value && isShutdown()) 618 onShutdown(); 619 } 620 621 /** 622 * Gets the policy on whether to continue executing existing 623 * periodic tasks even when this executor has been {@code shutdown}. 624 * In this case, executions will continue until {@code shutdownNow} 625 * or the policy is set to {@code false} when already shutdown. 626 * This value is by default {@code false}. 627 * 628 * @return {@code true} if will continue after shutdown 629 * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy 630 */ 631 bool getContinueExistingPeriodicTasksAfterShutdownPolicy() { 632 return continueExistingPeriodicTasksAfterShutdown; 633 } 634 635 /** 636 * Sets the policy on whether to execute existing delayed 637 * tasks even when this executor has been {@code shutdown}. 638 * In this case, these tasks will only terminate upon 639 * {@code shutdownNow}, or after setting the policy to 640 * {@code false} when already shutdown. 641 * This value is by default {@code true}. 642 * 643 * @param value if {@code true}, execute after shutdown, else don't 644 * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy 645 */ 646 void setExecuteExistingDelayedTasksAfterShutdownPolicy(bool value) { 647 executeExistingDelayedTasksAfterShutdown = value; 648 if (!value && isShutdown()) 649 onShutdown(); 650 } 651 652 /** 653 * Gets the policy on whether to execute existing delayed 654 * tasks even when this executor has been {@code shutdown}. 655 * In this case, these tasks will only terminate upon 656 * {@code shutdownNow}, or after setting the policy to 657 * {@code false} when already shutdown. 658 * This value is by default {@code true}. 659 * 660 * @return {@code true} if will execute after shutdown 661 * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy 662 */ 663 bool getExecuteExistingDelayedTasksAfterShutdownPolicy() { 664 return executeExistingDelayedTasksAfterShutdown; 665 } 666 667 /** 668 * Sets the policy on whether cancelled tasks should be immediately 669 * removed from the work queue at time of cancellation. This value is 670 * by default {@code false}. 671 * 672 * @param value if {@code true}, remove on cancellation, else don't 673 * @see #getRemoveOnCancelPolicy 674 */ 675 void setRemoveOnCancelPolicy(bool value) { 676 removeOnCancel = value; 677 } 678 679 /** 680 * Gets the policy on whether cancelled tasks should be immediately 681 * removed from the work queue at time of cancellation. This value is 682 * by default {@code false}. 683 * 684 * @return {@code true} if cancelled tasks are immediately removed 685 * from the queue 686 * @see #setRemoveOnCancelPolicy 687 */ 688 bool getRemoveOnCancelPolicy() { 689 return removeOnCancel; 690 } 691 692 /** 693 * Initiates an orderly shutdown in which previously submitted 694 * tasks are executed, but no new tasks will be accepted. 695 * Invocation has no additional effect if already shut down. 696 * 697 * <p>This method does not wait for previously submitted tasks to 698 * complete execution. Use {@link #awaitTermination awaitTermination} 699 * to do that. 700 * 701 * <p>If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy} 702 * has been set {@code false}, existing delayed tasks whose delays 703 * have not yet elapsed are cancelled. And unless the {@code 704 * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set 705 * {@code true}, future executions of existing periodic tasks will 706 * be cancelled. 707 * 708 * @throws SecurityException {@inheritDoc} 709 */ 710 override void shutdown() { 711 super.shutdown(); 712 } 713 714 /** 715 * Attempts to stop all actively executing tasks, halts the 716 * processing of waiting tasks, and returns a list of the tasks 717 * that were awaiting execution. These tasks are drained (removed) 718 * from the task queue upon return from this method. 719 * 720 * <p>This method does not wait for actively executing tasks to 721 * terminate. Use {@link #awaitTermination awaitTermination} to 722 * do that. 723 * 724 * <p>There are no guarantees beyond best-effort attempts to stop 725 * processing actively executing tasks. This implementation 726 * interrupts tasks via {@link Thread#interrupt}; any task that 727 * fails to respond to interrupts may never terminate. 728 * 729 * @return list of tasks that never commenced execution. 730 * Each element of this list is a {@link ScheduledFuture}. 731 * For tasks submitted via one of the {@code schedule} 732 * methods, the element will be identical to the returned 733 * {@code ScheduledFuture}. For tasks submitted using 734 * {@link #execute execute}, the element will be a 735 * zero-delay {@code ScheduledFuture}. 736 * @throws SecurityException {@inheritDoc} 737 */ 738 override List!(Runnable) shutdownNow() { 739 return super.shutdownNow(); 740 } 741 742 /** 743 * Returns the task queue used by this executor. Access to the 744 * task queue is intended primarily for debugging and monitoring. 745 * This queue may be in active use. Retrieving the task queue 746 * does not prevent queued tasks from executing. 747 * 748 * <p>Each element of this queue is a {@link ScheduledFuture}. 749 * For tasks submitted via one of the {@code schedule} methods, the 750 * element will be identical to the returned {@code ScheduledFuture}. 751 * For tasks submitted using {@link #execute execute}, the element 752 * will be a zero-delay {@code ScheduledFuture}. 753 * 754 * <p>Iteration over this queue is <em>not</em> guaranteed to traverse 755 * tasks in the order in which they will execute. 756 * 757 * @return the task queue 758 */ 759 override BlockingQueue!(Runnable) getQueue() { 760 return super.getQueue(); 761 } 762 } 763 764 765 /** 766 */ 767 private class ScheduledFutureTask(V) : FutureTask!(V) , 768 RunnableScheduledFuture!(V), IScheduledFutureTask { 769 770 /** Sequence number to break ties FIFO */ 771 private long sequenceNumber; 772 773 /** The nanoTime-based time when the task is enabled to execute. */ 774 private long time; 775 776 /** 777 * Period for repeating tasks, in nanoseconds. 778 * A positive value indicates fixed-rate execution. 779 * A negative value indicates fixed-delay execution. 780 * A value of 0 indicates a non-repeating (one-shot) task. 781 */ 782 private long period; 783 784 /** The actual task to be re-enqueued by reExecutePeriodic */ 785 RunnableScheduledFuture!(V) outerTask; // = this; 786 ScheduledThreadPoolExecutor poolExecutor; 787 788 /** 789 * Index into delay queue, to support faster cancellation. 790 */ 791 int _heapIndex; 792 793 static if(is(V == void)) { 794 this(Runnable r, long triggerTime, 795 long sequenceNumber, ScheduledThreadPoolExecutor poolExecutor) { 796 super(r); 797 this.time = triggerTime; 798 this.period = 0; 799 this.sequenceNumber = sequenceNumber; 800 this.poolExecutor = poolExecutor; 801 initializeMembers(); 802 } 803 804 /** 805 * Creates a periodic action with given nanoTime-based initial 806 * trigger time and period. 807 */ 808 this(Runnable r, long triggerTime, 809 long period, long sequenceNumber, ScheduledThreadPoolExecutor poolExecutor) { 810 super(r); 811 this.time = triggerTime; 812 this.period = period; 813 this.sequenceNumber = sequenceNumber; 814 this.poolExecutor = poolExecutor; 815 initializeMembers(); 816 } 817 } else { 818 819 /** 820 * Creates a one-shot action with given nanoTime-based trigger time. 821 */ 822 this(Runnable r, V result, long triggerTime, 823 long sequenceNumber, ScheduledThreadPoolExecutor poolExecutor) { 824 super(r, result); 825 this.time = triggerTime; 826 this.period = 0; 827 this.sequenceNumber = sequenceNumber; 828 this.poolExecutor = poolExecutor; 829 initializeMembers(); 830 } 831 832 /** 833 * Creates a periodic action with given nanoTime-based initial 834 * trigger time and period. 835 */ 836 this(Runnable r, V result, long triggerTime, 837 long period, long sequenceNumber, ScheduledThreadPoolExecutor poolExecutor) { 838 super(r, result); 839 this.time = triggerTime; 840 this.period = period; 841 this.sequenceNumber = sequenceNumber; 842 this.poolExecutor = poolExecutor; 843 initializeMembers(); 844 } 845 } 846 847 /** 848 * Creates a one-shot action with given nanoTime-based trigger time. 849 */ 850 this(Callable!(V) callable, long triggerTime, 851 long sequenceNumber, ScheduledThreadPoolExecutor poolExecutor) { 852 super(callable); 853 this.time = triggerTime; 854 this.period = 0; 855 this.sequenceNumber = sequenceNumber; 856 this.poolExecutor = poolExecutor; 857 initializeMembers(); 858 } 859 860 private void initializeMembers() { 861 outerTask = this; 862 } 863 864 void heapIndex(int index) { 865 _heapIndex = index; 866 } 867 868 int heapIndex() { 869 return _heapIndex; 870 } 871 872 Duration getDelay() { 873 return dur!(TimeUnit.HectoNanosecond)(time - Clock.currStdTime()); 874 } 875 876 int opCmp(Delayed other) { 877 if (other == this) // compare zero if same object 878 return 0; 879 ScheduledFutureTask!V x = cast(ScheduledFutureTask!V)other; 880 if (x !is null) { 881 long diff = time - x.time; 882 if (diff < 0) 883 return -1; 884 else if (diff > 0) 885 return 1; 886 else if (sequenceNumber < x.sequenceNumber) 887 return -1; 888 else 889 return 1; 890 } 891 Duration diff = getDelay() - other.getDelay(); 892 return (diff.isNegative) ? -1 : (diff > Duration.zero) ? 1 : 0; 893 } 894 895 /** 896 * Returns {@code true} if this is a periodic (not a one-shot) action. 897 * 898 * @return {@code true} if periodic 899 */ 900 bool isPeriodic() { 901 return period != 0; 902 } 903 904 /** 905 * Sets the next time to run for a periodic task. 906 */ 907 private void setNextRunTime() { 908 long p = period; 909 if (p > 0) 910 time += p; 911 else 912 time = poolExecutor.triggerTime(-p); 913 } 914 915 override bool cancel(bool mayInterruptIfRunning) { 916 // The racy read of heapIndex below is benign: 917 // if heapIndex < 0, then OOTA guarantees that we have surely 918 // been removed; else we recheck under lock in remove() 919 bool cancelled = super.cancel(mayInterruptIfRunning); 920 if (cancelled && poolExecutor.removeOnCancel && heapIndex >= 0) 921 poolExecutor.remove(this); 922 return cancelled; 923 } 924 925 /** 926 * Overrides FutureTask version so as to reset/requeue if periodic. 927 */ 928 override void run() { 929 if (!poolExecutor.canRunInCurrentRunState(this)) 930 cancel(false); 931 else if (!isPeriodic()) 932 super.run(); 933 else if (super.runAndReset()) { 934 setNextRunTime(); 935 poolExecutor.reExecutePeriodic(outerTask); 936 } 937 } 938 939 // alias from FutureTask 940 // alias isCancelled = FutureTask!V.isCancelled; 941 // alias isDone = FutureTask!V.isDone; 942 alias get = FutureTask!V.get; 943 944 override bool isCancelled() { 945 return super.isCancelled(); 946 } 947 948 override bool isDone() { 949 return super.isDone(); 950 } 951 952 override V get() { 953 return super.get(); 954 } 955 956 override V get(Duration timeout) { 957 return super.get(timeout); 958 } 959 } 960 961 962 /** 963 * Specialized delay queue. To mesh with TPE declarations, this 964 * class must be declared as a BlockingQueue!(Runnable) even though 965 * it can only hold RunnableScheduledFutures. 966 */ 967 class DelayedWorkQueue : AbstractQueue!(Runnable), BlockingQueue!(Runnable) { 968 969 /* 970 * A DelayedWorkQueue is based on a heap-based data structure 971 * like those in DelayQueue and PriorityQueue, except that 972 * every ScheduledFutureTask also records its index into the 973 * heap array. This eliminates the need to find a task upon 974 * cancellation, greatly speeding up removal (down from O(n) 975 * to O(log n)), and reducing garbage retention that would 976 * otherwise occur by waiting for the element to rise to top 977 * before clearing. But because the queue may also hold 978 * RunnableScheduledFutures that are not ScheduledFutureTasks, 979 * we are not guaranteed to have such indices available, in 980 * which case we fall back to linear search. (We expect that 981 * most tasks will not be decorated, and that the faster cases 982 * will be much more common.) 983 * 984 * All heap operations must record index changes -- mainly 985 * within siftUp and siftDown. Upon removal, a task's 986 * heapIndex is set to -1. Note that ScheduledFutureTasks can 987 * appear at most once in the queue (this need not be true for 988 * other kinds of tasks or work queues), so are uniquely 989 * identified by heapIndex. 990 */ 991 992 private enum int INITIAL_CAPACITY = 16; 993 private IRunnableScheduledFuture[] queue; 994 private ReentrantLock lock; 995 private int _size; 996 997 /** 998 * Thread designated to wait for the task at the head of the 999 * queue. This variant of the Leader-Follower pattern 1000 * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to 1001 * minimize unnecessary timed waiting. When a thread becomes 1002 * the leader, it waits only for the next delay to elapse, but 1003 * other threads await indefinitely. The leader thread must 1004 * signal some other thread before returning from take() or 1005 * poll(...), unless some other thread becomes leader in the 1006 * interim. Whenever the head of the queue is replaced with a 1007 * task with an earlier expiration time, the leader field is 1008 * invalidated by being reset to null, and some waiting 1009 * thread, but not necessarily the current leader, is 1010 * signalled. So waiting threads must be prepared to acquire 1011 * and lose leadership while waiting. 1012 */ 1013 private ThreadEx leader; 1014 1015 /** 1016 * Condition signalled when a newer task becomes available at the 1017 * head of the queue or a new thread may need to become leader. 1018 */ 1019 private Condition available; 1020 1021 this() { 1022 initializeMembers(); 1023 } 1024 1025 private void initializeMembers() { 1026 lock = new ReentrantLock(); 1027 available = new Condition(lock); 1028 queue = new IRunnableScheduledFuture[INITIAL_CAPACITY]; 1029 } 1030 1031 /** 1032 * Sets f's heapIndex if it is a ScheduledFutureTask. 1033 */ 1034 private static void setIndex(IRunnableScheduledFuture f, int idx) { 1035 IScheduledFutureTask t = cast(IScheduledFutureTask)f; 1036 // tracef("index=%d, type: %s", idx, typeid(cast(Object)t)); 1037 if (t !is null) 1038 t.heapIndex = idx; 1039 } 1040 1041 /** 1042 * Sifts element added at bottom up to its heap-ordered spot. 1043 * Call only when holding lock. 1044 */ 1045 private void siftUp(int k, IRunnableScheduledFuture key) { 1046 while (k > 0) { 1047 int parent = (k - 1) >>> 1; 1048 IRunnableScheduledFuture e = queue[parent]; 1049 if (key >= e) 1050 break; 1051 queue[k] = e; 1052 setIndex(e, k); 1053 k = parent; 1054 } 1055 // tracef("k=%d, key is null: %s", k, key is null); 1056 queue[k] = key; 1057 setIndex(key, k); 1058 } 1059 1060 /** 1061 * Sifts element added at top down to its heap-ordered spot. 1062 * Call only when holding lock. 1063 */ 1064 private void siftDown(int k, IRunnableScheduledFuture key) { 1065 int half = size >>> 1; 1066 while (k < half) { 1067 int child = (k << 1) + 1; 1068 IRunnableScheduledFuture c = queue[child]; 1069 int right = child + 1; 1070 if (right < size && c.opCmp(queue[right]) > 0) 1071 c = queue[child = right]; 1072 if (key.opCmp(c) <= 0) 1073 break; 1074 queue[k] = c; 1075 setIndex(c, k); 1076 k = child; 1077 } 1078 queue[k] = key; 1079 setIndex(key, k); 1080 } 1081 1082 /** 1083 * Resizes the heap array. Call only when holding lock. 1084 */ 1085 private void grow() { 1086 size_t oldCapacity = queue.length; 1087 size_t newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50% 1088 if (newCapacity < 0) // overflow 1089 newCapacity = int.max; 1090 queue.length = newCapacity; 1091 } 1092 1093 /** 1094 * Finds index of given object, or -1 if absent. 1095 */ 1096 private int indexOf(Runnable x) { 1097 if (x !is null) { 1098 IScheduledFutureTask sf = cast(IScheduledFutureTask) x; 1099 if (sf !is null) { 1100 int i = sf.heapIndex; 1101 // Sanity check; x could conceivably be a 1102 // ScheduledFutureTask from some other pool. 1103 if (i >= 0 && i < size && queue[i] == x) 1104 return i; 1105 } else { 1106 for (int i = 0; i < size; i++) { 1107 // if (x.opEquals(cast(Object)queue[i])) 1108 if(x is queue[i]) 1109 return i; 1110 } 1111 } 1112 } 1113 return -1; 1114 } 1115 1116 override bool contains(Runnable x) { 1117 ReentrantLock lock = this.lock; 1118 lock.lock(); 1119 try { 1120 return indexOf(x) != -1; 1121 } finally { 1122 lock.unlock(); 1123 } 1124 } 1125 1126 override bool remove(Runnable x) { 1127 ReentrantLock lock = this.lock; 1128 trace(cast(Object)x); 1129 lock.lock(); 1130 try { 1131 int i = indexOf(x); 1132 if (i < 0) 1133 return false; 1134 1135 setIndex(queue[i], -1); 1136 int s = --_size; 1137 IRunnableScheduledFuture replacement = queue[s]; 1138 queue[s] = null; 1139 if (s != i) { 1140 siftDown(i, replacement); 1141 if (queue[i] == replacement) 1142 siftUp(i, replacement); 1143 } 1144 return true; 1145 } finally { 1146 lock.unlock(); 1147 } 1148 } 1149 1150 override int size() { 1151 // return _size; 1152 ReentrantLock lock = this.lock; 1153 lock.lock(); 1154 try { 1155 return _size; 1156 } finally { 1157 lock.unlock(); 1158 } 1159 } 1160 1161 override bool isEmpty() { 1162 return size() == 0; 1163 } 1164 1165 int remainingCapacity() { 1166 return int.max; 1167 } 1168 1169 IRunnableScheduledFuture peek() { 1170 ReentrantLock lock = this.lock; 1171 lock.lock(); 1172 try { 1173 return queue[0]; 1174 } finally { 1175 lock.unlock(); 1176 } 1177 } 1178 1179 bool offer(Runnable x) { 1180 if (x is null) 1181 throw new NullPointerException(); 1182 IRunnableScheduledFuture e = cast(IRunnableScheduledFuture)x; 1183 ReentrantLock lock = this.lock; 1184 lock.lock(); 1185 try { 1186 int i = _size; 1187 if (i >= queue.length) 1188 grow(); 1189 _size = i + 1; 1190 if (i == 0) { 1191 queue[0] = e; 1192 setIndex(e, 0); 1193 } else { 1194 siftUp(i, e); 1195 } 1196 if (queue[0] == e) { 1197 leader = null; 1198 available.notify(); 1199 } 1200 } finally { 1201 lock.unlock(); 1202 } 1203 return true; 1204 } 1205 1206 override void put(Runnable e) { 1207 offer(e); 1208 } 1209 1210 override bool add(Runnable e) { 1211 return offer(e); 1212 } 1213 1214 bool offer(Runnable e, Duration timeout) { 1215 return offer(e); 1216 } 1217 1218 /** 1219 * Performs common bookkeeping for poll and take: Replaces 1220 * first element with last and sifts it down. Call only when 1221 * holding lock. 1222 * @param f the task to remove and return 1223 */ 1224 private IRunnableScheduledFuture finishPoll(IRunnableScheduledFuture f) { 1225 int s = --_size; 1226 IRunnableScheduledFuture x = queue[s]; 1227 queue[s] = null; 1228 if (s != 0) 1229 siftDown(0, x); 1230 setIndex(f, -1); 1231 return f; 1232 } 1233 1234 IRunnableScheduledFuture poll() { 1235 ReentrantLock lock = this.lock; 1236 lock.lock(); 1237 try { 1238 IRunnableScheduledFuture first = queue[0]; 1239 return (first is null || first.getDelay() > Duration.zero) 1240 ? null 1241 : finishPoll(first); 1242 } finally { 1243 lock.unlock(); 1244 } 1245 } 1246 1247 IRunnableScheduledFuture take() { 1248 ReentrantLock lock = this.lock; 1249 // lock.lockInterruptibly(); 1250 lock.lock(); 1251 try { 1252 for (;;) { 1253 IRunnableScheduledFuture first = queue[0]; 1254 if (first is null) 1255 available.wait(); 1256 else { 1257 Duration delay = first.getDelay(); 1258 if (delay <= Duration.zero) 1259 return finishPoll(first); 1260 first = null; // don't retain ref while waiting 1261 if (leader !is null) 1262 available.wait(); 1263 else { 1264 ThreadEx thisThread = ThreadEx.currentThread(); 1265 leader = thisThread; 1266 try { 1267 available.wait(delay); 1268 } finally { 1269 if (leader == thisThread) 1270 leader = null; 1271 } 1272 } 1273 } 1274 } 1275 } finally { 1276 if (leader is null && queue[0] !is null) 1277 available.notify(); 1278 lock.unlock(); 1279 } 1280 } 1281 1282 IRunnableScheduledFuture poll(Duration timeout) { 1283 // long nanos = total!(TimeUnit.HectoNanosecond)(timeout); 1284 Duration nanos = timeout; 1285 ReentrantLock lock = this.lock; 1286 // lock.lockInterruptibly(); 1287 lock.lock(); 1288 try { 1289 for (;;) { 1290 IRunnableScheduledFuture first = queue[0]; 1291 if (first is null) { 1292 if (nanos <= Duration.zero) 1293 return null; 1294 else 1295 available.wait(nanos); // nanos = 1296 } else { 1297 Duration delay = first.getDelay(); 1298 if (delay <= Duration.zero) 1299 return finishPoll(first); 1300 if (nanos <= Duration.zero) 1301 return null; 1302 first = null; // don't retain ref while waiting 1303 if (nanos < delay || leader !is null) 1304 available.wait(nanos); // nanos = 1305 else { 1306 ThreadEx thisThread = ThreadEx.currentThread(); 1307 leader = thisThread; 1308 try { 1309 available.wait(delay); 1310 nanos -= delay; 1311 // long timeLeft = available.wait(delay); 1312 // nanos -= delay - timeLeft; 1313 } finally { 1314 if (leader == thisThread) 1315 leader = null; 1316 } 1317 } 1318 } 1319 } 1320 } finally { 1321 if (leader is null && queue[0] !is null) 1322 available.notify(); 1323 lock.unlock(); 1324 } 1325 } 1326 1327 override void clear() { 1328 ReentrantLock lock = this.lock; 1329 lock.lock(); 1330 try { 1331 for (int i = 0; i < size; i++) { 1332 IRunnableScheduledFuture t = queue[i]; 1333 if (t !is null) { 1334 queue[i] = null; 1335 setIndex(t, -1); 1336 } 1337 } 1338 _size = 0; 1339 } finally { 1340 lock.unlock(); 1341 } 1342 } 1343 1344 int drainTo(Collection!(Runnable) c) { 1345 return drainTo(c, int.max); 1346 } 1347 1348 int drainTo(Collection!(Runnable) c, int maxElements) { 1349 // Objects.requireNonNull(c); 1350 1351 if (c == this) 1352 throw new IllegalArgumentException(); 1353 if (maxElements <= 0) 1354 return 0; 1355 ReentrantLock lock = this.lock; 1356 lock.lock(); 1357 try { 1358 int n = 0; 1359 for (IRunnableScheduledFuture first; 1360 n < maxElements 1361 && (first = queue[0]) !is null 1362 && first.getDelay() <= Duration.zero;) { 1363 c.add(first); // In this order, in case add() throws. 1364 finishPoll(first); 1365 ++n; 1366 } 1367 return n; 1368 } finally { 1369 lock.unlock(); 1370 } 1371 } 1372 1373 override Runnable[] toArray() { 1374 ReentrantLock lock = this.lock; 1375 lock.lock(); 1376 try { 1377 Runnable[] r = new Runnable[_size]; 1378 for(int i=0; i<_size; i++) { 1379 r[i] = queue[i]; 1380 } 1381 return r; 1382 1383 } finally { 1384 lock.unlock(); 1385 } 1386 } 1387 1388 override int opApply(scope int delegate(ref Runnable) dg) { 1389 if(dg is null) 1390 throw new NullPointerException(); 1391 ReentrantLock lock = this.lock; 1392 lock.lock(); 1393 scope(exit) lock.unlock(); 1394 1395 int result = 0; 1396 foreach(int i; 0.._size) { 1397 Runnable v = queue[i]; 1398 result = dg(v); 1399 if(result != 0) return result; 1400 } 1401 return result; 1402 } 1403 1404 1405 // Iterator!(Runnable) iterator() { 1406 // ReentrantLock lock = this.lock; 1407 // lock.lock(); 1408 // try { 1409 // return new Itr(Arrays.copyOf(queue, size)); 1410 // } finally { 1411 // lock.unlock(); 1412 // } 1413 // } 1414 1415 /** 1416 * Snapshot iterator that works off copy of underlying q array. 1417 */ 1418 // private class Itr : Iterator!(Runnable) { 1419 // final IRunnableScheduledFuture[] array; 1420 // int cursor; // index of next element to return; initially 0 1421 // int lastRet = -1; // index of last element returned; -1 if no such 1422 1423 // this(IRunnableScheduledFuture[] array) { 1424 // this.array = array; 1425 // } 1426 1427 // bool hasNext() { 1428 // return cursor < array.length; 1429 // } 1430 1431 // Runnable next() { 1432 // if (cursor >= array.length) 1433 // throw new NoSuchElementException(); 1434 // return array[lastRet = cursor++]; 1435 // } 1436 1437 // void remove() { 1438 // if (lastRet < 0) 1439 // throw new IllegalStateException(); 1440 // DelayedWorkQueue.this.remove(array[lastRet]); 1441 // lastRet = -1; 1442 // } 1443 // } 1444 1445 override bool opEquals(IObject o) { 1446 return opEquals(cast(Object) o); 1447 } 1448 1449 override bool opEquals(Object o) { 1450 return super.opEquals(o); 1451 } 1452 1453 override string toString() { 1454 return super.toString(); 1455 } 1456 1457 override size_t toHash() @trusted nothrow { 1458 return super.toHash(); 1459 } 1460 }