1 /* 2 * Hunt - A refined core library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.concurrency.ScheduledThreadPoolExecutor; 13 14 import hunt.concurrency.atomic.AtomicHelper; 15 import hunt.concurrency.BlockingQueue; 16 import hunt.concurrency.Delayed; 17 import hunt.concurrency.Future; 18 import hunt.concurrency.FutureTask; 19 import hunt.concurrency.ScheduledExecutorService; 20 import hunt.concurrency.thread; 21 import hunt.concurrency.ThreadFactory; 22 import hunt.concurrency.ThreadPoolExecutor; 23 24 import hunt.collection; 25 import hunt.Exceptions; 26 import hunt.util.Common; 27 import hunt.util.DateTime; 28 import hunt.Object; 29 import hunt.logging.ConsoleLogger; 30 // import core.time; 31 32 import core.atomic; 33 import core.sync.condition; 34 import core.sync.mutex; 35 36 import std.datetime; 37 // import hunt.collection.AbstractQueue; 38 // import java.util.Arrays; 39 // import hunt.collection.Collection; 40 // import hunt.collection.Iterator; 41 // import java.util.List; 42 // import java.util.NoSuchElementException; 43 // import java.util.Objects; 44 // import hunt.concurrency.atomic.AtomicLong; 45 // import hunt.concurrency.locks.Condition; 46 // import hunt.concurrency.locks.ReentrantLock; 47 48 alias ReentrantLock = Mutex; 49 50 interface IScheduledFutureTask { 51 void heapIndex(int index); 52 int heapIndex(); 53 } 54 55 /** 56 * A {@link ThreadPoolExecutor} that can additionally schedule 57 * commands to run after a given delay, or to execute periodically. 58 * This class is preferable to {@link java.util.Timer} when multiple 59 * worker threads are needed, or when the additional flexibility or 60 * capabilities of {@link ThreadPoolExecutor} (which this class 61 * extends) are required. 62 * 63 * <p>Delayed tasks execute no sooner than they are enabled, but 64 * without any real-time guarantees about when, after they are 65 * enabled, they will commence. Tasks scheduled for exactly the same 66 * execution time are enabled in first-in-first-out (FIFO) order of 67 * submission. 68 * 69 * <p>When a submitted task is cancelled before it is run, execution 70 * is suppressed. By default, such a cancelled task is not 71 * automatically removed from the work queue until its delay elapses. 72 * While this enables further inspection and monitoring, it may also 73 * cause unbounded retention of cancelled tasks. To avoid this, use 74 * {@link #setRemoveOnCancelPolicy} to cause tasks to be immediately 75 * removed from the work queue at time of cancellation. 76 * 77 * <p>Successive executions of a periodic task scheduled via 78 * {@link #scheduleAtFixedRate scheduleAtFixedRate} or 79 * {@link #scheduleWithFixedDelay scheduleWithFixedDelay} 80 * do not overlap. While different executions may be performed by 81 * different threads, the effects of prior executions 82 * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a> 83 * those of subsequent ones. 84 * 85 * <p>While this class inherits from {@link ThreadPoolExecutor}, a few 86 * of the inherited tuning methods are not useful for it. In 87 * particular, because it acts as a fixed-sized pool using 88 * {@code corePoolSize} threads and an unbounded queue, adjustments 89 * to {@code maximumPoolSize} have no useful effect. Additionally, it 90 * is almost never a good idea to set {@code corePoolSize} to zero or 91 * use {@code allowCoreThreadTimeOut} because this may leave the pool 92 * without threads to handle tasks once they become eligible to run. 93 * 94 * <p>As with {@code ThreadPoolExecutor}, if not otherwise specified, 95 * this class uses {@link Executors#defaultThreadFactory} as the 96 * default thread factory, and {@link ThreadPoolExecutor.AbortPolicy} 97 * as the default rejected execution handler. 98 * 99 * <p><b>Extension notes:</b> This class overrides the 100 * {@link ThreadPoolExecutor#execute(Runnable) execute} and 101 * {@link AbstractExecutorService#submit(Runnable) submit} 102 * methods to generate internal {@link ScheduledFuture} objects to 103 * control per-task delays and scheduling. To preserve 104 * functionality, any further overrides of these methods in 105 * subclasses must invoke superclass versions, which effectively 106 * disables additional task customization. However, this class 107 * provides alternative protected extension method 108 * {@code decorateTask} (one version each for {@code Runnable} and 109 * {@code Callable}) that can be used to customize the concrete task 110 * types used to execute commands entered via {@code execute}, 111 * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate}, 112 * and {@code scheduleWithFixedDelay}. By default, a 113 * {@code ScheduledThreadPoolExecutor} uses a task type extending 114 * {@link FutureTask}. However, this may be modified or replaced using 115 * subclasses of the form: 116 * 117 * <pre> {@code 118 * class CustomScheduledExecutor extends ScheduledThreadPoolExecutor { 119 * 120 * static class CustomTask!(V) : RunnableScheduledFuture!(V) { ... } 121 * 122 * protected !(V) RunnableScheduledFuture!(V) decorateTask( 123 * Runnable r, RunnableScheduledFuture!(V) task) { 124 * return new CustomTask!(V)(r, task); 125 * } 126 * 127 * protected !(V) RunnableScheduledFuture!(V) decorateTask( 128 * Callable!(V) c, RunnableScheduledFuture!(V) task) { 129 * return new CustomTask!(V)(c, task); 130 * } 131 * // ... add constructors, etc. 132 * }}</pre> 133 * 134 * @since 1.5 135 * @author Doug Lea 136 */ 137 class ScheduledThreadPoolExecutor : ThreadPoolExecutor, ScheduledExecutorService { 138 139 /* 140 * This class specializes ThreadPoolExecutor implementation by 141 * 142 * 1. Using a custom task type ScheduledFutureTask, even for tasks 143 * that don't require scheduling because they are submitted 144 * using ExecutorService rather than ScheduledExecutorService 145 * methods, which are treated as tasks with a delay of zero. 146 * 147 * 2. Using a custom queue (DelayedWorkQueue), a variant of 148 * unbounded DelayQueue. The lack of capacity constraint and 149 * the fact that corePoolSize and maximumPoolSize are 150 * effectively identical simplifies some execution mechanics 151 * (see delayedExecute) compared to ThreadPoolExecutor. 152 * 153 * 3. Supporting optional run-after-shutdown parameters, which 154 * leads to overrides of shutdown methods to remove and cancel 155 * tasks that should NOT be run after shutdown, as well as 156 * different recheck logic when task (re)submission overlaps 157 * with a shutdown. 158 * 159 * 4. Task decoration methods to allow interception and 160 * instrumentation, which are needed because subclasses cannot 161 * otherwise override submit methods to get this effect. These 162 * don't have any impact on pool control logic though. 163 */ 164 165 /** 166 * False if should cancel/suppress periodic tasks on shutdown. 167 */ 168 private bool continueExistingPeriodicTasksAfterShutdown; 169 170 /** 171 * False if should cancel non-periodic not-yet-expired tasks on shutdown. 172 */ 173 private bool executeExistingDelayedTasksAfterShutdown = true; 174 175 /** 176 * True if ScheduledFutureTask.cancel should remove from queue. 177 */ 178 bool removeOnCancel; 179 180 /** 181 * Sequence number to break scheduling ties, and in turn to 182 * guarantee FIFO order among tied entries. 183 */ 184 private shared static long sequencer; //= new AtomicLong(); 185 186 /** 187 * Returns true if can run a task given current run state and 188 * run-after-shutdown parameters. 189 */ 190 bool canRunInCurrentRunState(V)(RunnableScheduledFuture!V task) { 191 if (!isShutdown()) 192 return true; 193 if (isStopped()) 194 return false; 195 return task.isPeriodic() 196 ? continueExistingPeriodicTasksAfterShutdown 197 : (executeExistingDelayedTasksAfterShutdown 198 || task.getDelay() <= Duration.zero); 199 } 200 201 /** 202 * Main execution method for delayed or periodic tasks. If pool 203 * is shut down, rejects the task. Otherwise adds task to queue 204 * and starts a thread, if necessary, to run it. (We cannot 205 * prestart the thread to run the task because the task (probably) 206 * shouldn't be run yet.) If the pool is shut down while the task 207 * is being added, cancel and remove it if required by state and 208 * run-after-shutdown parameters. 209 * 210 * @param task the task 211 */ 212 private void delayedExecute(V)(RunnableScheduledFuture!V task) { 213 if (isShutdown()) 214 reject(task); 215 else { 216 super.getQueue().add(task); 217 if (!canRunInCurrentRunState(task) && remove(task)) 218 task.cancel(false); 219 else 220 ensurePrestart(); 221 } 222 } 223 224 /** 225 * Requeues a periodic task unless current run state precludes it. 226 * Same idea as delayedExecute except drops task rather than rejecting. 227 * 228 * @param task the task 229 */ 230 void reExecutePeriodic(V)(RunnableScheduledFuture!V task) { 231 if (canRunInCurrentRunState(task)) { 232 super.getQueue().add(task); 233 if (canRunInCurrentRunState(task) || !remove(task)) { 234 ensurePrestart(); 235 return; 236 } 237 } 238 task.cancel(false); 239 } 240 241 /** 242 * Cancels and clears the queue of all tasks that should not be run 243 * due to shutdown policy. Invoked within super.shutdown. 244 */ 245 override void onShutdown() { 246 BlockingQueue!(Runnable) q = super.getQueue(); 247 bool keepDelayed = 248 getExecuteExistingDelayedTasksAfterShutdownPolicy(); 249 bool keepPeriodic = 250 getContinueExistingPeriodicTasksAfterShutdownPolicy(); 251 // Traverse snapshot to avoid iterator exceptions 252 // TODO: implement and use efficient removeIf 253 // super.getQueue().removeIf(...); 254 version(HUNT_DEBUG) tracef("Shuting down..., BlockingQueue size: %d", q.size()); 255 foreach (Runnable e ; q.toArray()) { 256 if(e is null) { 257 warning("e is null"); 258 } else { 259 version(HUNT_DEBUG) trace(typeid(cast(Object)e)); 260 IRunnableScheduledFuture t = cast(IRunnableScheduledFuture)e; 261 if (t !is null) { 262 if ((t.isPeriodic() 263 ? !keepPeriodic 264 : (!keepDelayed && t.getDelay() > Duration.zero)) 265 || t.isCancelled()) { // also remove if already cancelled 266 if (q.remove(t)) 267 t.cancel(false); 268 } 269 } else { 270 warning("t is null"); 271 } 272 } 273 274 } 275 tryTerminate(); 276 } 277 278 /** 279 * Modifies or replaces the task used to execute a runnable. 280 * This method can be used to override the concrete 281 * class used for managing internal tasks. 282 * The default implementation simply returns the given task. 283 * 284 * @param runnable the submitted Runnable 285 * @param task the task created to execute the runnable 286 * @param (V) the type of the task's result 287 * @return a task that can execute the runnable 288 * @since 1.6 289 */ 290 protected RunnableScheduledFuture!(V) decorateTask(V) ( 291 Runnable runnable, RunnableScheduledFuture!(V) task) { 292 return task; 293 } 294 295 /** 296 * Modifies or replaces the task used to execute a callable. 297 * This method can be used to override the concrete 298 * class used for managing internal tasks. 299 * The default implementation simply returns the given task. 300 * 301 * @param callable the submitted Callable 302 * @param task the task created to execute the callable 303 * @param (V) the type of the task's result 304 * @return a task that can execute the callable 305 * @since 1.6 306 */ 307 protected RunnableScheduledFuture!(V) decorateTask(V)( 308 Callable!(V) callable, RunnableScheduledFuture!(V) task) { 309 return task; 310 } 311 312 /** 313 * The default keep-alive time for pool threads. 314 * 315 * Normally, this value is unused because all pool threads will be 316 * core threads, but if a user creates a pool with a corePoolSize 317 * of zero (against our advice), we keep a thread alive as long as 318 * there are queued tasks. If the keep alive time is zero (the 319 * historic value), we end up hot-spinning in getTask, wasting a 320 * CPU. But on the other hand, if we set the value too high, and 321 * users create a one-shot pool which they don't cleanly shutdown, 322 * the pool's non-daemon threads will prevent JVM termination. A 323 * small but non-zero value (relative to a JVM's lifetime) seems 324 * best. 325 */ 326 private enum long DEFAULT_KEEPALIVE_MILLIS = 10L; 327 328 /** 329 * Creates a new {@code ScheduledThreadPoolExecutor} with the 330 * given core pool size. 331 * 332 * @param corePoolSize the number of threads to keep in the pool, even 333 * if they are idle, unless {@code allowCoreThreadTimeOut} is set 334 * @throws IllegalArgumentException if {@code corePoolSize < 0} 335 */ 336 this(int corePoolSize) { 337 super(corePoolSize, int.max, dur!(TimeUnit.Millisecond)(DEFAULT_KEEPALIVE_MILLIS), 338 new DelayedWorkQueue()); 339 } 340 341 /** 342 * Creates a new {@code ScheduledThreadPoolExecutor} with the 343 * given initial parameters. 344 * 345 * @param corePoolSize the number of threads to keep in the pool, even 346 * if they are idle, unless {@code allowCoreThreadTimeOut} is set 347 * @param threadFactory the factory to use when the executor 348 * creates a new thread 349 * @throws IllegalArgumentException if {@code corePoolSize < 0} 350 * @throws NullPointerException if {@code threadFactory} is null 351 */ 352 this(int corePoolSize, ThreadFactory threadFactory) { 353 super(corePoolSize, int.max, 354 dur!(TimeUnit.Millisecond)(DEFAULT_KEEPALIVE_MILLIS), 355 new DelayedWorkQueue(), threadFactory); 356 } 357 358 /** 359 * Creates a new {@code ScheduledThreadPoolExecutor} with the 360 * given initial parameters. 361 * 362 * @param corePoolSize the number of threads to keep in the pool, even 363 * if they are idle, unless {@code allowCoreThreadTimeOut} is set 364 * @param handler the handler to use when execution is blocked 365 * because the thread bounds and queue capacities are reached 366 * @throws IllegalArgumentException if {@code corePoolSize < 0} 367 * @throws NullPointerException if {@code handler} is null 368 */ 369 this(int corePoolSize, RejectedExecutionHandler handler) { 370 super(corePoolSize, int.max, 371 dur!(TimeUnit.Millisecond)(DEFAULT_KEEPALIVE_MILLIS), 372 new DelayedWorkQueue(), handler); 373 } 374 375 /** 376 * Creates a new {@code ScheduledThreadPoolExecutor} with the 377 * given initial parameters. 378 * 379 * @param corePoolSize the number of threads to keep in the pool, even 380 * if they are idle, unless {@code allowCoreThreadTimeOut} is set 381 * @param threadFactory the factory to use when the executor 382 * creates a new thread 383 * @param handler the handler to use when execution is blocked 384 * because the thread bounds and queue capacities are reached 385 * @throws IllegalArgumentException if {@code corePoolSize < 0} 386 * @throws NullPointerException if {@code threadFactory} or 387 * {@code handler} is null 388 */ 389 this(int corePoolSize, ThreadFactory threadFactory, 390 RejectedExecutionHandler handler) { 391 super(corePoolSize, int.max, 392 dur!(TimeUnit.Millisecond)(DEFAULT_KEEPALIVE_MILLIS), 393 new DelayedWorkQueue(), threadFactory, handler); 394 } 395 396 /** 397 * Returns the nanoTime-based trigger time of a delayed action. 398 */ 399 private long triggerTime(Duration delay) { 400 return triggerTime(delay.isNegative ? 0 : delay.total!(TimeUnit.HectoNanosecond)()); 401 } 402 403 /** 404 * Returns the nanoTime-based trigger time of a delayed action. 405 */ 406 long triggerTime(long delay) { 407 return Clock.currStdTime + 408 ((delay < (long.max >> 1)) ? delay : overflowFree(delay)); 409 } 410 411 /** 412 * Constrains the values of all delays in the queue to be within 413 * long.max of each other, to avoid overflow in compareTo. 414 * This may occur if a task is eligible to be dequeued, but has 415 * not yet been, while some other task is added with a delay of 416 * long.max. 417 */ 418 private long overflowFree(long delay) { 419 Delayed head = cast(Delayed) super.getQueue().peek(); 420 if (head !is null) { 421 long headDelay = head.getDelay().total!(TimeUnit.HectoNanosecond)(); 422 if (headDelay < 0 && (delay - headDelay < 0)) 423 delay = long.max + headDelay; 424 } 425 return delay; 426 } 427 428 /** 429 * @throws RejectedExecutionException {@inheritDoc} 430 * @throws NullPointerException {@inheritDoc} 431 */ 432 ScheduledFuture!(void) schedule(Runnable command, Duration delay) { 433 if (command is null) 434 throw new NullPointerException(); 435 long n = atomicOp!"+="(sequencer, 1); 436 n--; 437 RunnableScheduledFuture!(void) t = decorateTask(command, 438 new ScheduledFutureTask!(void)(command, triggerTime(delay), n, this)); 439 delayedExecute!(void)(t); 440 return t; 441 } 442 443 /** 444 * @throws RejectedExecutionException {@inheritDoc} 445 * @throws NullPointerException {@inheritDoc} 446 */ 447 ScheduledFuture!(V) schedule(V)(Callable!(V) callable, Duration delay) { 448 if (callable is null) 449 throw new NullPointerException(); 450 RunnableScheduledFuture!(V) t = decorateTask(callable, 451 new ScheduledFutureTask!(V)(callable, 452 triggerTime(delay), 453 cast(long)AtomicHelper.getAndIncrement(sequencer), this)); 454 delayedExecute(t); 455 return t; 456 } 457 458 /** 459 * Submits a periodic action that becomes enabled first after the 460 * given initial delay, and subsequently with the given period; 461 * that is, executions will commence after 462 * {@code initialDelay}, then {@code initialDelay + period}, then 463 * {@code initialDelay + 2 * period}, and so on. 464 * 465 * <p>The sequence of task executions continues indefinitely until 466 * one of the following exceptional completions occur: 467 * <ul> 468 * <li>The task is {@linkplain Future#cancel explicitly cancelled} 469 * via the returned future. 470 * <li>Method {@link #shutdown} is called and the {@linkplain 471 * #getContinueExistingPeriodicTasksAfterShutdownPolicy policy on 472 * whether to continue after shutdown} is not set true, or method 473 * {@link #shutdownNow} is called; also resulting in task 474 * cancellation. 475 * <li>An execution of the task throws an exception. In this case 476 * calling {@link Future#get() get} on the returned future will throw 477 * {@link ExecutionException}, holding the exception as its cause. 478 * </ul> 479 * Subsequent executions are suppressed. Subsequent calls to 480 * {@link Future#isDone isDone()} on the returned future will 481 * return {@code true}. 482 * 483 * <p>If any execution of this task takes longer than its period, then 484 * subsequent executions may start late, but will not concurrently 485 * execute. 486 * 487 * @throws RejectedExecutionException {@inheritDoc} 488 * @throws NullPointerException {@inheritDoc} 489 * @throws IllegalArgumentException {@inheritDoc} 490 */ 491 ScheduledFuture!void scheduleAtFixedRate(Runnable command, 492 Duration initialDelay, 493 Duration period) { 494 if (command is null) 495 throw new NullPointerException(); 496 if (period <= Duration.zero) 497 throw new IllegalArgumentException(); 498 ScheduledFutureTask!(void) sft = 499 new ScheduledFutureTask!(void)(command, 500 triggerTime(initialDelay), 501 period.total!(TimeUnit.HectoNanosecond)(), 502 cast(long)AtomicHelper.getAndIncrement(sequencer), this); 503 RunnableScheduledFuture!(void) t = decorateTask(command, sft); 504 sft.outerTask = t; 505 delayedExecute(t); 506 return t; 507 } 508 509 /** 510 * Submits a periodic action that becomes enabled first after the 511 * given initial delay, and subsequently with the given delay 512 * between the termination of one execution and the commencement of 513 * the next. 514 * 515 * <p>The sequence of task executions continues indefinitely until 516 * one of the following exceptional completions occur: 517 * <ul> 518 * <li>The task is {@linkplain Future#cancel explicitly cancelled} 519 * via the returned future. 520 * <li>Method {@link #shutdown} is called and the {@linkplain 521 * #getContinueExistingPeriodicTasksAfterShutdownPolicy policy on 522 * whether to continue after shutdown} is not set true, or method 523 * {@link #shutdownNow} is called; also resulting in task 524 * cancellation. 525 * <li>An execution of the task throws an exception. In this case 526 * calling {@link Future#get() get} on the returned future will throw 527 * {@link ExecutionException}, holding the exception as its cause. 528 * </ul> 529 * Subsequent executions are suppressed. Subsequent calls to 530 * {@link Future#isDone isDone()} on the returned future will 531 * return {@code true}. 532 * 533 * @throws RejectedExecutionException {@inheritDoc} 534 * @throws NullPointerException {@inheritDoc} 535 * @throws IllegalArgumentException {@inheritDoc} 536 */ 537 ScheduledFuture!(void) scheduleWithFixedDelay(Runnable command, 538 Duration initialDelay, 539 Duration delay) { 540 if (command is null) 541 throw new NullPointerException(); 542 if (delay <= Duration.zero) 543 throw new IllegalArgumentException(); 544 ScheduledFutureTask!(void) sft = 545 new ScheduledFutureTask!(void)(command, 546 triggerTime(initialDelay), 547 -delay.total!(TimeUnit.HectoNanosecond)(), 548 cast(long)AtomicHelper.getAndIncrement(sequencer), this); 549 RunnableScheduledFuture!(void) t = decorateTask(command, sft); 550 sft.outerTask = t; 551 delayedExecute(t); 552 return t; 553 } 554 555 /** 556 * Executes {@code command} with zero required delay. 557 * This has effect equivalent to 558 * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}. 559 * Note that inspections of the queue and of the list returned by 560 * {@code shutdownNow} will access the zero-delayed 561 * {@link ScheduledFuture}, not the {@code command} itself. 562 * 563 * <p>A consequence of the use of {@code ScheduledFuture} objects is 564 * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always 565 * called with a null second {@code Throwable} argument, even if the 566 * {@code command} terminated abruptly. Instead, the {@code Throwable} 567 * thrown by such a task can be obtained via {@link Future#get}. 568 * 569 * @throws RejectedExecutionException at discretion of 570 * {@code RejectedExecutionHandler}, if the task 571 * cannot be accepted for execution because the 572 * executor has been shut down 573 * @throws NullPointerException {@inheritDoc} 574 */ 575 override void execute(Runnable command) { 576 schedule(command, Duration.zero); 577 } 578 579 // Override AbstractExecutorService methods 580 581 /** 582 * @throws RejectedExecutionException {@inheritDoc} 583 * @throws NullPointerException {@inheritDoc} 584 */ 585 override Future!void submit(Runnable task) { 586 return schedule(task, Duration.zero); 587 } 588 589 /** 590 * @throws RejectedExecutionException {@inheritDoc} 591 * @throws NullPointerException {@inheritDoc} 592 */ 593 Future!(T) submit(T)(Runnable task, T result) { 594 return schedule(Executors.callable(task, result), Duration.zero); 595 } 596 597 /** 598 * @throws RejectedExecutionException {@inheritDoc} 599 * @throws NullPointerException {@inheritDoc} 600 */ 601 Future!(T) submit(T)(Callable!(T) task) { 602 return schedule(task, Duration.zero); 603 } 604 605 /** 606 * Sets the policy on whether to continue executing existing 607 * periodic tasks even when this executor has been {@code shutdown}. 608 * In this case, executions will continue until {@code shutdownNow} 609 * or the policy is set to {@code false} when already shutdown. 610 * This value is by default {@code false}. 611 * 612 * @param value if {@code true}, continue after shutdown, else don't 613 * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy 614 */ 615 void setContinueExistingPeriodicTasksAfterShutdownPolicy(bool value) { 616 continueExistingPeriodicTasksAfterShutdown = value; 617 if (!value && isShutdown()) 618 onShutdown(); 619 } 620 621 /** 622 * Gets the policy on whether to continue executing existing 623 * periodic tasks even when this executor has been {@code shutdown}. 624 * In this case, executions will continue until {@code shutdownNow} 625 * or the policy is set to {@code false} when already shutdown. 626 * This value is by default {@code false}. 627 * 628 * @return {@code true} if will continue after shutdown 629 * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy 630 */ 631 bool getContinueExistingPeriodicTasksAfterShutdownPolicy() { 632 return continueExistingPeriodicTasksAfterShutdown; 633 } 634 635 /** 636 * Sets the policy on whether to execute existing delayed 637 * tasks even when this executor has been {@code shutdown}. 638 * In this case, these tasks will only terminate upon 639 * {@code shutdownNow}, or after setting the policy to 640 * {@code false} when already shutdown. 641 * This value is by default {@code true}. 642 * 643 * @param value if {@code true}, execute after shutdown, else don't 644 * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy 645 */ 646 void setExecuteExistingDelayedTasksAfterShutdownPolicy(bool value) { 647 executeExistingDelayedTasksAfterShutdown = value; 648 if (!value && isShutdown()) 649 onShutdown(); 650 } 651 652 /** 653 * Gets the policy on whether to execute existing delayed 654 * tasks even when this executor has been {@code shutdown}. 655 * In this case, these tasks will only terminate upon 656 * {@code shutdownNow}, or after setting the policy to 657 * {@code false} when already shutdown. 658 * This value is by default {@code true}. 659 * 660 * @return {@code true} if will execute after shutdown 661 * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy 662 */ 663 bool getExecuteExistingDelayedTasksAfterShutdownPolicy() { 664 return executeExistingDelayedTasksAfterShutdown; 665 } 666 667 /** 668 * Sets the policy on whether cancelled tasks should be immediately 669 * removed from the work queue at time of cancellation. This value is 670 * by default {@code false}. 671 * 672 * @param value if {@code true}, remove on cancellation, else don't 673 * @see #getRemoveOnCancelPolicy 674 * @since 1.7 675 */ 676 void setRemoveOnCancelPolicy(bool value) { 677 removeOnCancel = value; 678 } 679 680 /** 681 * Gets the policy on whether cancelled tasks should be immediately 682 * removed from the work queue at time of cancellation. This value is 683 * by default {@code false}. 684 * 685 * @return {@code true} if cancelled tasks are immediately removed 686 * from the queue 687 * @see #setRemoveOnCancelPolicy 688 * @since 1.7 689 */ 690 bool getRemoveOnCancelPolicy() { 691 return removeOnCancel; 692 } 693 694 /** 695 * Initiates an orderly shutdown in which previously submitted 696 * tasks are executed, but no new tasks will be accepted. 697 * Invocation has no additional effect if already shut down. 698 * 699 * <p>This method does not wait for previously submitted tasks to 700 * complete execution. Use {@link #awaitTermination awaitTermination} 701 * to do that. 702 * 703 * <p>If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy} 704 * has been set {@code false}, existing delayed tasks whose delays 705 * have not yet elapsed are cancelled. And unless the {@code 706 * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set 707 * {@code true}, future executions of existing periodic tasks will 708 * be cancelled. 709 * 710 * @throws SecurityException {@inheritDoc} 711 */ 712 override void shutdown() { 713 super.shutdown(); 714 } 715 716 /** 717 * Attempts to stop all actively executing tasks, halts the 718 * processing of waiting tasks, and returns a list of the tasks 719 * that were awaiting execution. These tasks are drained (removed) 720 * from the task queue upon return from this method. 721 * 722 * <p>This method does not wait for actively executing tasks to 723 * terminate. Use {@link #awaitTermination awaitTermination} to 724 * do that. 725 * 726 * <p>There are no guarantees beyond best-effort attempts to stop 727 * processing actively executing tasks. This implementation 728 * interrupts tasks via {@link Thread#interrupt}; any task that 729 * fails to respond to interrupts may never terminate. 730 * 731 * @return list of tasks that never commenced execution. 732 * Each element of this list is a {@link ScheduledFuture}. 733 * For tasks submitted via one of the {@code schedule} 734 * methods, the element will be identical to the returned 735 * {@code ScheduledFuture}. For tasks submitted using 736 * {@link #execute execute}, the element will be a 737 * zero-delay {@code ScheduledFuture}. 738 * @throws SecurityException {@inheritDoc} 739 */ 740 override List!(Runnable) shutdownNow() { 741 return super.shutdownNow(); 742 } 743 744 /** 745 * Returns the task queue used by this executor. Access to the 746 * task queue is intended primarily for debugging and monitoring. 747 * This queue may be in active use. Retrieving the task queue 748 * does not prevent queued tasks from executing. 749 * 750 * <p>Each element of this queue is a {@link ScheduledFuture}. 751 * For tasks submitted via one of the {@code schedule} methods, the 752 * element will be identical to the returned {@code ScheduledFuture}. 753 * For tasks submitted using {@link #execute execute}, the element 754 * will be a zero-delay {@code ScheduledFuture}. 755 * 756 * <p>Iteration over this queue is <em>not</em> guaranteed to traverse 757 * tasks in the order in which they will execute. 758 * 759 * @return the task queue 760 */ 761 override BlockingQueue!(Runnable) getQueue() { 762 return super.getQueue(); 763 } 764 } 765 766 767 /** 768 */ 769 private class ScheduledFutureTask(V) : FutureTask!(V) , 770 RunnableScheduledFuture!(V), IScheduledFutureTask { 771 772 /** Sequence number to break ties FIFO */ 773 private long sequenceNumber; 774 775 /** The nanoTime-based time when the task is enabled to execute. */ 776 private long time; 777 778 /** 779 * Period for repeating tasks, in nanoseconds. 780 * A positive value indicates fixed-rate execution. 781 * A negative value indicates fixed-delay execution. 782 * A value of 0 indicates a non-repeating (one-shot) task. 783 */ 784 private long period; 785 786 /** The actual task to be re-enqueued by reExecutePeriodic */ 787 RunnableScheduledFuture!(V) outerTask; // = this; 788 ScheduledThreadPoolExecutor poolExecutor; 789 790 /** 791 * Index into delay queue, to support faster cancellation. 792 */ 793 int _heapIndex; 794 795 static if(is(V == void)) { 796 this(Runnable r, long triggerTime, 797 long sequenceNumber, ScheduledThreadPoolExecutor poolExecutor) { 798 super(r); 799 this.time = triggerTime; 800 this.period = 0; 801 this.sequenceNumber = sequenceNumber; 802 this.poolExecutor = poolExecutor; 803 initializeMembers(); 804 } 805 806 /** 807 * Creates a periodic action with given nanoTime-based initial 808 * trigger time and period. 809 */ 810 this(Runnable r, long triggerTime, 811 long period, long sequenceNumber, ScheduledThreadPoolExecutor poolExecutor) { 812 super(r); 813 this.time = triggerTime; 814 this.period = period; 815 this.sequenceNumber = sequenceNumber; 816 this.poolExecutor = poolExecutor; 817 initializeMembers(); 818 } 819 } else { 820 821 /** 822 * Creates a one-shot action with given nanoTime-based trigger time. 823 */ 824 this(Runnable r, V result, long triggerTime, 825 long sequenceNumber, ScheduledThreadPoolExecutor poolExecutor) { 826 super(r, result); 827 this.time = triggerTime; 828 this.period = 0; 829 this.sequenceNumber = sequenceNumber; 830 this.poolExecutor = poolExecutor; 831 initializeMembers(); 832 } 833 834 /** 835 * Creates a periodic action with given nanoTime-based initial 836 * trigger time and period. 837 */ 838 this(Runnable r, V result, long triggerTime, 839 long period, long sequenceNumber, ScheduledThreadPoolExecutor poolExecutor) { 840 super(r, result); 841 this.time = triggerTime; 842 this.period = period; 843 this.sequenceNumber = sequenceNumber; 844 this.poolExecutor = poolExecutor; 845 initializeMembers(); 846 } 847 } 848 849 /** 850 * Creates a one-shot action with given nanoTime-based trigger time. 851 */ 852 this(Callable!(V) callable, long triggerTime, 853 long sequenceNumber, ScheduledThreadPoolExecutor poolExecutor) { 854 super(callable); 855 this.time = triggerTime; 856 this.period = 0; 857 this.sequenceNumber = sequenceNumber; 858 this.poolExecutor = poolExecutor; 859 initializeMembers(); 860 } 861 862 private void initializeMembers() { 863 outerTask = this; 864 } 865 866 void heapIndex(int index) { 867 _heapIndex = index; 868 } 869 870 int heapIndex() { 871 return _heapIndex; 872 } 873 874 Duration getDelay() { 875 return dur!(TimeUnit.HectoNanosecond)(time - Clock.currStdTime()); 876 } 877 878 int opCmp(Delayed other) { 879 if (other == this) // compare zero if same object 880 return 0; 881 ScheduledFutureTask!V x = cast(ScheduledFutureTask!V)other; 882 if (x !is null) { 883 long diff = time - x.time; 884 if (diff < 0) 885 return -1; 886 else if (diff > 0) 887 return 1; 888 else if (sequenceNumber < x.sequenceNumber) 889 return -1; 890 else 891 return 1; 892 } 893 Duration diff = getDelay() - other.getDelay(); 894 return (diff.isNegative) ? -1 : (diff > Duration.zero) ? 1 : 0; 895 } 896 897 /** 898 * Returns {@code true} if this is a periodic (not a one-shot) action. 899 * 900 * @return {@code true} if periodic 901 */ 902 bool isPeriodic() { 903 return period != 0; 904 } 905 906 /** 907 * Sets the next time to run for a periodic task. 908 */ 909 private void setNextRunTime() { 910 long p = period; 911 if (p > 0) 912 time += p; 913 else 914 time = poolExecutor.triggerTime(-p); 915 } 916 917 override bool cancel(bool mayInterruptIfRunning) { 918 // The racy read of heapIndex below is benign: 919 // if heapIndex < 0, then OOTA guarantees that we have surely 920 // been removed; else we recheck under lock in remove() 921 bool cancelled = super.cancel(mayInterruptIfRunning); 922 if (cancelled && poolExecutor.removeOnCancel && heapIndex >= 0) 923 poolExecutor.remove(this); 924 return cancelled; 925 } 926 927 /** 928 * Overrides FutureTask version so as to reset/requeue if periodic. 929 */ 930 override void run() { 931 if (!poolExecutor.canRunInCurrentRunState(this)) 932 cancel(false); 933 else if (!isPeriodic()) 934 super.run(); 935 else if (super.runAndReset()) { 936 setNextRunTime(); 937 poolExecutor.reExecutePeriodic(outerTask); 938 } 939 } 940 941 // alias from FutureTask 942 // alias isCancelled = FutureTask!V.isCancelled; 943 // alias isDone = FutureTask!V.isDone; 944 alias get = FutureTask!V.get; 945 946 override bool isCancelled() { 947 return super.isCancelled(); 948 } 949 950 override bool isDone() { 951 return super.isDone(); 952 } 953 954 override V get() { 955 return super.get(); 956 } 957 958 override V get(Duration timeout) { 959 return super.get(timeout); 960 } 961 } 962 963 964 /** 965 * Specialized delay queue. To mesh with TPE declarations, this 966 * class must be declared as a BlockingQueue!(Runnable) even though 967 * it can only hold RunnableScheduledFutures. 968 */ 969 class DelayedWorkQueue : AbstractQueue!(Runnable), BlockingQueue!(Runnable) { 970 971 /* 972 * A DelayedWorkQueue is based on a heap-based data structure 973 * like those in DelayQueue and PriorityQueue, except that 974 * every ScheduledFutureTask also records its index into the 975 * heap array. This eliminates the need to find a task upon 976 * cancellation, greatly speeding up removal (down from O(n) 977 * to O(log n)), and reducing garbage retention that would 978 * otherwise occur by waiting for the element to rise to top 979 * before clearing. But because the queue may also hold 980 * RunnableScheduledFutures that are not ScheduledFutureTasks, 981 * we are not guaranteed to have such indices available, in 982 * which case we fall back to linear search. (We expect that 983 * most tasks will not be decorated, and that the faster cases 984 * will be much more common.) 985 * 986 * All heap operations must record index changes -- mainly 987 * within siftUp and siftDown. Upon removal, a task's 988 * heapIndex is set to -1. Note that ScheduledFutureTasks can 989 * appear at most once in the queue (this need not be true for 990 * other kinds of tasks or work queues), so are uniquely 991 * identified by heapIndex. 992 */ 993 994 private enum int INITIAL_CAPACITY = 16; 995 private IRunnableScheduledFuture[] queue; 996 private ReentrantLock lock; 997 private int _size; 998 999 /** 1000 * Thread designated to wait for the task at the head of the 1001 * queue. This variant of the Leader-Follower pattern 1002 * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to 1003 * minimize unnecessary timed waiting. When a thread becomes 1004 * the leader, it waits only for the next delay to elapse, but 1005 * other threads await indefinitely. The leader thread must 1006 * signal some other thread before returning from take() or 1007 * poll(...), unless some other thread becomes leader in the 1008 * interim. Whenever the head of the queue is replaced with a 1009 * task with an earlier expiration time, the leader field is 1010 * invalidated by being reset to null, and some waiting 1011 * thread, but not necessarily the current leader, is 1012 * signalled. So waiting threads must be prepared to acquire 1013 * and lose leadership while waiting. 1014 */ 1015 private ThreadEx leader; 1016 1017 /** 1018 * Condition signalled when a newer task becomes available at the 1019 * head of the queue or a new thread may need to become leader. 1020 */ 1021 private Condition available; 1022 1023 this() { 1024 initializeMembers(); 1025 } 1026 1027 private void initializeMembers() { 1028 lock = new ReentrantLock(); 1029 available = new Condition(lock); 1030 queue = new IRunnableScheduledFuture[INITIAL_CAPACITY]; 1031 } 1032 1033 /** 1034 * Sets f's heapIndex if it is a ScheduledFutureTask. 1035 */ 1036 private static void setIndex(IRunnableScheduledFuture f, int idx) { 1037 IScheduledFutureTask t = cast(IScheduledFutureTask)f; 1038 // tracef("index=%d, type: %s", idx, typeid(cast(Object)t)); 1039 if (t !is null) 1040 t.heapIndex = idx; 1041 } 1042 1043 /** 1044 * Sifts element added at bottom up to its heap-ordered spot. 1045 * Call only when holding lock. 1046 */ 1047 private void siftUp(int k, IRunnableScheduledFuture key) { 1048 while (k > 0) { 1049 int parent = (k - 1) >>> 1; 1050 IRunnableScheduledFuture e = queue[parent]; 1051 if (key >= e) 1052 break; 1053 queue[k] = e; 1054 setIndex(e, k); 1055 k = parent; 1056 } 1057 // tracef("k=%d, key is null: %s", k, key is null); 1058 queue[k] = key; 1059 setIndex(key, k); 1060 } 1061 1062 /** 1063 * Sifts element added at top down to its heap-ordered spot. 1064 * Call only when holding lock. 1065 */ 1066 private void siftDown(int k, IRunnableScheduledFuture key) { 1067 int half = size >>> 1; 1068 while (k < half) { 1069 int child = (k << 1) + 1; 1070 IRunnableScheduledFuture c = queue[child]; 1071 int right = child + 1; 1072 if (right < size && c.opCmp(queue[right]) > 0) 1073 c = queue[child = right]; 1074 if (key.opCmp(c) <= 0) 1075 break; 1076 queue[k] = c; 1077 setIndex(c, k); 1078 k = child; 1079 } 1080 queue[k] = key; 1081 setIndex(key, k); 1082 } 1083 1084 /** 1085 * Resizes the heap array. Call only when holding lock. 1086 */ 1087 private void grow() { 1088 size_t oldCapacity = queue.length; 1089 size_t newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50% 1090 if (newCapacity < 0) // overflow 1091 newCapacity = int.max; 1092 queue.length = newCapacity; 1093 } 1094 1095 /** 1096 * Finds index of given object, or -1 if absent. 1097 */ 1098 private int indexOf(Runnable x) { 1099 if (x !is null) { 1100 IScheduledFutureTask sf = cast(IScheduledFutureTask) x; 1101 if (sf !is null) { 1102 int i = sf.heapIndex; 1103 // Sanity check; x could conceivably be a 1104 // ScheduledFutureTask from some other pool. 1105 if (i >= 0 && i < size && queue[i] == x) 1106 return i; 1107 } else { 1108 for (int i = 0; i < size; i++) { 1109 // if (x.opEquals(cast(Object)queue[i])) 1110 if(x is queue[i]) 1111 return i; 1112 } 1113 } 1114 } 1115 return -1; 1116 } 1117 1118 override bool contains(Runnable x) { 1119 ReentrantLock lock = this.lock; 1120 lock.lock(); 1121 try { 1122 return indexOf(x) != -1; 1123 } finally { 1124 lock.unlock(); 1125 } 1126 } 1127 1128 override bool remove(Runnable x) { 1129 ReentrantLock lock = this.lock; 1130 trace(cast(Object)x); 1131 lock.lock(); 1132 try { 1133 int i = indexOf(x); 1134 if (i < 0) 1135 return false; 1136 1137 setIndex(queue[i], -1); 1138 int s = --_size; 1139 IRunnableScheduledFuture replacement = queue[s]; 1140 queue[s] = null; 1141 if (s != i) { 1142 siftDown(i, replacement); 1143 if (queue[i] == replacement) 1144 siftUp(i, replacement); 1145 } 1146 return true; 1147 } finally { 1148 lock.unlock(); 1149 } 1150 } 1151 1152 override int size() { 1153 // return _size; 1154 ReentrantLock lock = this.lock; 1155 lock.lock(); 1156 try { 1157 return _size; 1158 } finally { 1159 lock.unlock(); 1160 } 1161 } 1162 1163 override bool isEmpty() { 1164 return size() == 0; 1165 } 1166 1167 int remainingCapacity() { 1168 return int.max; 1169 } 1170 1171 IRunnableScheduledFuture peek() { 1172 ReentrantLock lock = this.lock; 1173 lock.lock(); 1174 try { 1175 return queue[0]; 1176 } finally { 1177 lock.unlock(); 1178 } 1179 } 1180 1181 bool offer(Runnable x) { 1182 if (x is null) 1183 throw new NullPointerException(); 1184 IRunnableScheduledFuture e = cast(IRunnableScheduledFuture)x; 1185 ReentrantLock lock = this.lock; 1186 lock.lock(); 1187 try { 1188 int i = _size; 1189 if (i >= queue.length) 1190 grow(); 1191 _size = i + 1; 1192 if (i == 0) { 1193 queue[0] = e; 1194 setIndex(e, 0); 1195 } else { 1196 siftUp(i, e); 1197 } 1198 if (queue[0] == e) { 1199 leader = null; 1200 available.notify(); 1201 } 1202 } finally { 1203 lock.unlock(); 1204 } 1205 return true; 1206 } 1207 1208 override void put(Runnable e) { 1209 offer(e); 1210 } 1211 1212 override bool add(Runnable e) { 1213 return offer(e); 1214 } 1215 1216 bool offer(Runnable e, Duration timeout) { 1217 return offer(e); 1218 } 1219 1220 /** 1221 * Performs common bookkeeping for poll and take: Replaces 1222 * first element with last and sifts it down. Call only when 1223 * holding lock. 1224 * @param f the task to remove and return 1225 */ 1226 private IRunnableScheduledFuture finishPoll(IRunnableScheduledFuture f) { 1227 int s = --_size; 1228 IRunnableScheduledFuture x = queue[s]; 1229 queue[s] = null; 1230 if (s != 0) 1231 siftDown(0, x); 1232 setIndex(f, -1); 1233 return f; 1234 } 1235 1236 IRunnableScheduledFuture poll() { 1237 ReentrantLock lock = this.lock; 1238 lock.lock(); 1239 try { 1240 IRunnableScheduledFuture first = queue[0]; 1241 return (first is null || first.getDelay() > Duration.zero) 1242 ? null 1243 : finishPoll(first); 1244 } finally { 1245 lock.unlock(); 1246 } 1247 } 1248 1249 IRunnableScheduledFuture take() { 1250 ReentrantLock lock = this.lock; 1251 // lock.lockInterruptibly(); 1252 lock.lock(); 1253 try { 1254 for (;;) { 1255 IRunnableScheduledFuture first = queue[0]; 1256 if (first is null) 1257 available.wait(); 1258 else { 1259 Duration delay = first.getDelay(); 1260 if (delay <= Duration.zero) 1261 return finishPoll(first); 1262 first = null; // don't retain ref while waiting 1263 if (leader !is null) 1264 available.wait(); 1265 else { 1266 ThreadEx thisThread = ThreadEx.currentThread(); 1267 leader = thisThread; 1268 try { 1269 available.wait(delay); 1270 } finally { 1271 if (leader == thisThread) 1272 leader = null; 1273 } 1274 } 1275 } 1276 } 1277 } finally { 1278 if (leader is null && queue[0] !is null) 1279 available.notify(); 1280 lock.unlock(); 1281 } 1282 } 1283 1284 IRunnableScheduledFuture poll(Duration timeout) { 1285 // long nanos = total!(TimeUnit.HectoNanosecond)(timeout); 1286 Duration nanos = timeout; 1287 ReentrantLock lock = this.lock; 1288 // lock.lockInterruptibly(); 1289 lock.lock(); 1290 try { 1291 for (;;) { 1292 IRunnableScheduledFuture first = queue[0]; 1293 if (first is null) { 1294 if (nanos <= Duration.zero) 1295 return null; 1296 else 1297 available.wait(nanos); // nanos = 1298 } else { 1299 Duration delay = first.getDelay(); 1300 if (delay <= Duration.zero) 1301 return finishPoll(first); 1302 if (nanos <= Duration.zero) 1303 return null; 1304 first = null; // don't retain ref while waiting 1305 if (nanos < delay || leader !is null) 1306 available.wait(nanos); // nanos = 1307 else { 1308 ThreadEx thisThread = ThreadEx.currentThread(); 1309 leader = thisThread; 1310 try { 1311 available.wait(delay); 1312 nanos -= delay; 1313 // long timeLeft = available.wait(delay); 1314 // nanos -= delay - timeLeft; 1315 } finally { 1316 if (leader == thisThread) 1317 leader = null; 1318 } 1319 } 1320 } 1321 } 1322 } finally { 1323 if (leader is null && queue[0] !is null) 1324 available.notify(); 1325 lock.unlock(); 1326 } 1327 } 1328 1329 override void clear() { 1330 ReentrantLock lock = this.lock; 1331 lock.lock(); 1332 try { 1333 for (int i = 0; i < size; i++) { 1334 IRunnableScheduledFuture t = queue[i]; 1335 if (t !is null) { 1336 queue[i] = null; 1337 setIndex(t, -1); 1338 } 1339 } 1340 _size = 0; 1341 } finally { 1342 lock.unlock(); 1343 } 1344 } 1345 1346 int drainTo(Collection!(Runnable) c) { 1347 return drainTo(c, int.max); 1348 } 1349 1350 int drainTo(Collection!(Runnable) c, int maxElements) { 1351 // Objects.requireNonNull(c); 1352 1353 if (c == this) 1354 throw new IllegalArgumentException(); 1355 if (maxElements <= 0) 1356 return 0; 1357 ReentrantLock lock = this.lock; 1358 lock.lock(); 1359 try { 1360 int n = 0; 1361 for (IRunnableScheduledFuture first; 1362 n < maxElements 1363 && (first = queue[0]) !is null 1364 && first.getDelay() <= Duration.zero;) { 1365 c.add(first); // In this order, in case add() throws. 1366 finishPoll(first); 1367 ++n; 1368 } 1369 return n; 1370 } finally { 1371 lock.unlock(); 1372 } 1373 } 1374 1375 override Runnable[] toArray() { 1376 ReentrantLock lock = this.lock; 1377 lock.lock(); 1378 try { 1379 Runnable[] r = new Runnable[_size]; 1380 for(int i=0; i<_size; i++) { 1381 r[i] = queue[i]; 1382 } 1383 return r; 1384 1385 } finally { 1386 lock.unlock(); 1387 } 1388 } 1389 1390 override int opApply(scope int delegate(ref Runnable) dg) { 1391 if(dg is null) 1392 throw new NullPointerException(); 1393 ReentrantLock lock = this.lock; 1394 lock.lock(); 1395 scope(exit) lock.unlock(); 1396 1397 int result = 0; 1398 foreach(int i; 0.._size) { 1399 Runnable v = queue[i]; 1400 result = dg(v); 1401 if(result != 0) return result; 1402 } 1403 return result; 1404 } 1405 1406 1407 // Iterator!(Runnable) iterator() { 1408 // ReentrantLock lock = this.lock; 1409 // lock.lock(); 1410 // try { 1411 // return new Itr(Arrays.copyOf(queue, size)); 1412 // } finally { 1413 // lock.unlock(); 1414 // } 1415 // } 1416 1417 /** 1418 * Snapshot iterator that works off copy of underlying q array. 1419 */ 1420 // private class Itr : Iterator!(Runnable) { 1421 // final IRunnableScheduledFuture[] array; 1422 // int cursor; // index of next element to return; initially 0 1423 // int lastRet = -1; // index of last element returned; -1 if no such 1424 1425 // this(IRunnableScheduledFuture[] array) { 1426 // this.array = array; 1427 // } 1428 1429 // bool hasNext() { 1430 // return cursor < array.length; 1431 // } 1432 1433 // Runnable next() { 1434 // if (cursor >= array.length) 1435 // throw new NoSuchElementException(); 1436 // return array[lastRet = cursor++]; 1437 // } 1438 1439 // void remove() { 1440 // if (lastRet < 0) 1441 // throw new IllegalStateException(); 1442 // DelayedWorkQueue.this.remove(array[lastRet]); 1443 // lastRet = -1; 1444 // } 1445 // } 1446 1447 override bool opEquals(IObject o) { 1448 return opEquals(cast(Object) o); 1449 } 1450 1451 override bool opEquals(Object o) { 1452 return super.opEquals(o); 1453 } 1454 1455 override string toString() { 1456 return super.toString(); 1457 } 1458 1459 override size_t toHash() @trusted nothrow { 1460 return super.toHash(); 1461 } 1462 }