1 /* 2 * Hunt - A refined core library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.concurrency.Executors; 13 14 import hunt.concurrency.AbstractExecutorService; 15 import hunt.concurrency.atomic.AtomicHelper; 16 import hunt.concurrency.Delayed; 17 import hunt.concurrency.Exceptions; 18 import hunt.concurrency.ExecutorService; 19 // import hunt.concurrency.ForkJoinPool; 20 import hunt.concurrency.Future; 21 import hunt.concurrency.LinkedBlockingQueue; 22 import hunt.concurrency.ScheduledExecutorService; 23 import hunt.concurrency.ScheduledThreadPoolExecutor; 24 import hunt.concurrency.ThreadFactory; 25 import hunt.concurrency.ThreadPoolExecutor; 26 27 import hunt.collection.List; 28 import hunt.Exceptions; 29 import hunt.util.Common; 30 import hunt.util.DateTime; 31 32 import core.time; 33 import std.conv; 34 35 // import static java.lang.ref.Reference.reachabilityFence; 36 // import java.security.AccessControlContext; 37 // import java.security.AccessControlException; 38 // import java.security.AccessController; 39 // import java.security.PrivilegedAction; 40 // import java.security.PrivilegedActionException; 41 // import java.security.PrivilegedExceptionAction; 42 // import hunt.collection.Collection; 43 // import java.util.List; 44 // import sun.security.util.SecurityConstants; 45 46 /** 47 * Factory and utility methods for {@link Executor}, {@link 48 * ExecutorService}, {@link ScheduledExecutorService}, {@link 49 * ThreadFactory}, and {@link Callable} classes defined in this 50 * package. This class supports the following kinds of methods: 51 * 52 * <ul> 53 * <li>Methods that create and return an {@link ExecutorService} 54 * set up with commonly useful configuration settings. 55 * <li>Methods that create and return a {@link ScheduledExecutorService} 56 * set up with commonly useful configuration settings. 57 * <li>Methods that create and return a "wrapped" ExecutorService, that 58 * disables reconfiguration by making implementation-specific methods 59 * inaccessible. 60 * <li>Methods that create and return a {@link ThreadFactory} 61 * that sets newly created threads to a known state. 62 * <li>Methods that create and return a {@link Callable} 63 * out of other closure-like forms, so they can be used 64 * in execution methods requiring {@code Callable}. 65 * </ul> 66 * 67 * @author Doug Lea 68 */ 69 class Executors { 70 71 /** 72 * Creates a thread pool that reuses a fixed number of threads 73 * operating off a shared unbounded queue. At any point, at most 74 * {@code nThreads} threads will be active processing tasks. 75 * If additional tasks are submitted when all threads are active, 76 * they will wait in the queue until a thread is available. 77 * If any thread terminates due to a failure during execution 78 * prior to shutdown, a new one will take its place if needed to 79 * execute subsequent tasks. The threads in the pool will exist 80 * until it is explicitly {@link ExecutorService#shutdown shutdown}. 81 * 82 * @param nThreads the number of threads in the pool 83 * @return the newly created thread pool 84 * @throws IllegalArgumentException if {@code nThreads <= 0} 85 */ 86 static ThreadPoolExecutor newFixedThreadPool(int nThreads) { 87 return new ThreadPoolExecutor(nThreads, nThreads, 0.hnsecs, 88 new LinkedBlockingQueue!(Runnable)()); 89 } 90 91 // /** 92 // * Creates a thread pool that maintains enough threads to support 93 // * the given parallelism level, and may use multiple queues to 94 // * reduce contention. The parallelism level corresponds to the 95 // * maximum number of threads actively engaged in, or available to 96 // * engage in, task processing. The actual number of threads may 97 // * grow and shrink dynamically. A work-stealing pool makes no 98 // * guarantees about the order in which submitted tasks are 99 // * executed. 100 // * 101 // * @param parallelism the targeted parallelism level 102 // * @return the newly created thread pool 103 // * @throws IllegalArgumentException if {@code parallelism <= 0} 104 // */ 105 // static ExecutorService newWorkStealingPool(int parallelism) { 106 // return new ForkJoinPool 107 // (parallelism, 108 // ForkJoinPool.defaultForkJoinWorkerThreadFactory, 109 // null, true); 110 // } 111 112 // /** 113 // * Creates a work-stealing thread pool using the number of 114 // * {@linkplain Runtime#availableProcessors available processors} 115 // * as its target parallelism level. 116 // * 117 // * @return the newly created thread pool 118 // * @see #newWorkStealingPool(int) 119 // */ 120 // static ExecutorService newWorkStealingPool() { 121 // return new ForkJoinPool 122 // (Runtime.getRuntime().availableProcessors(), 123 // ForkJoinPool.defaultForkJoinWorkerThreadFactory, 124 // null, true); 125 // } 126 127 /** 128 * Creates a thread pool that reuses a fixed number of threads 129 * operating off a shared unbounded queue, using the provided 130 * ThreadFactory to create new threads when needed. At any point, 131 * at most {@code nThreads} threads will be active processing 132 * tasks. If additional tasks are submitted when all threads are 133 * active, they will wait in the queue until a thread is 134 * available. If any thread terminates due to a failure during 135 * execution prior to shutdown, a new one will take its place if 136 * needed to execute subsequent tasks. The threads in the pool will 137 * exist until it is explicitly {@link ExecutorService#shutdown 138 * shutdown}. 139 * 140 * @param nThreads the number of threads in the pool 141 * @param threadFactory the factory to use when creating new threads 142 * @return the newly created thread pool 143 * @throws NullPointerException if threadFactory is null 144 * @throws IllegalArgumentException if {@code nThreads <= 0} 145 */ 146 static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { 147 return new ThreadPoolExecutor(nThreads, nThreads, 0.msecs, 148 new LinkedBlockingQueue!(Runnable)(), 149 threadFactory); 150 } 151 152 /** 153 * Creates an Executor that uses a single worker thread operating 154 * off an unbounded queue. (Note however that if this single 155 * thread terminates due to a failure during execution prior to 156 * shutdown, a new one will take its place if needed to execute 157 * subsequent tasks.) Tasks are guaranteed to execute 158 * sequentially, and no more than one task will be active at any 159 * given time. Unlike the otherwise equivalent 160 * {@code newFixedThreadPool(1)} the returned executor is 161 * guaranteed not to be reconfigurable to use additional threads. 162 * 163 * @return the newly created single-threaded Executor 164 */ 165 // static ExecutorService newSingleThreadExecutor() { 166 // return new FinalizableDelegatedExecutorService 167 // (new ThreadPoolExecutor(1, 1, 168 // 0L, TimeUnit.MILLISECONDS, 169 // new LinkedBlockingQueue!(Runnable)())); 170 // } 171 172 // /** 173 // * Creates an Executor that uses a single worker thread operating 174 // * off an unbounded queue, and uses the provided ThreadFactory to 175 // * create a new thread when needed. Unlike the otherwise 176 // * equivalent {@code newFixedThreadPool(1, threadFactory)} the 177 // * returned executor is guaranteed not to be reconfigurable to use 178 // * additional threads. 179 // * 180 // * @param threadFactory the factory to use when creating new threads 181 // * @return the newly created single-threaded Executor 182 // * @throws NullPointerException if threadFactory is null 183 // */ 184 // static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { 185 // return new FinalizableDelegatedExecutorService 186 // (new ThreadPoolExecutor(1, 1, 187 // 0L, TimeUnit.MILLISECONDS, 188 // new LinkedBlockingQueue!(Runnable)(), 189 // threadFactory)); 190 // } 191 192 /** 193 * Creates a thread pool that creates new threads as needed, but 194 * will reuse previously constructed threads when they are 195 * available. These pools will typically improve the performance 196 * of programs that execute many short-lived asynchronous tasks. 197 * Calls to {@code execute} will reuse previously constructed 198 * threads if available. If no existing thread is available, a new 199 * thread will be created and added to the pool. Threads that have 200 * not been used for sixty seconds are terminated and removed from 201 * the cache. Thus, a pool that remains idle for long enough will 202 * not consume any resources. Note that pools with similar 203 * properties but different details (for example, timeout parameters) 204 * may be created using {@link ThreadPoolExecutor} constructors. 205 * 206 * @return the newly created thread pool 207 */ 208 // static ExecutorService newCachedThreadPool() { 209 // return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 210 // 60L, TimeUnit.SECONDS, 211 // new SynchronousQueue!(Runnable)()); 212 // } 213 214 // /** 215 // * Creates a thread pool that creates new threads as needed, but 216 // * will reuse previously constructed threads when they are 217 // * available, and uses the provided 218 // * ThreadFactory to create new threads when needed. 219 // * 220 // * @param threadFactory the factory to use when creating new threads 221 // * @return the newly created thread pool 222 // * @throws NullPointerException if threadFactory is null 223 // */ 224 // static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { 225 // return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 226 // 60L, TimeUnit.SECONDS, 227 // new SynchronousQueue!(Runnable)(), 228 // threadFactory); 229 // } 230 231 /** 232 * Creates a single-threaded executor that can schedule commands 233 * to run after a given delay, or to execute periodically. 234 * (Note however that if this single 235 * thread terminates due to a failure during execution prior to 236 * shutdown, a new one will take its place if needed to execute 237 * subsequent tasks.) Tasks are guaranteed to execute 238 * sequentially, and no more than one task will be active at any 239 * given time. Unlike the otherwise equivalent 240 * {@code newScheduledThreadPool(1)} the returned executor is 241 * guaranteed not to be reconfigurable to use additional threads. 242 * 243 * @return the newly created scheduled executor 244 */ 245 static ScheduledExecutorService newSingleThreadScheduledExecutor() { 246 return new DelegatedScheduledExecutorService!ScheduledThreadPoolExecutor 247 (new ScheduledThreadPoolExecutor(1)); 248 } 249 250 /** 251 * Creates a single-threaded executor that can schedule commands 252 * to run after a given delay, or to execute periodically. (Note 253 * however that if this single thread terminates due to a failure 254 * during execution prior to shutdown, a new one will take its 255 * place if needed to execute subsequent tasks.) Tasks are 256 * guaranteed to execute sequentially, and no more than one task 257 * will be active at any given time. Unlike the otherwise 258 * equivalent {@code newScheduledThreadPool(1, threadFactory)} 259 * the returned executor is guaranteed not to be reconfigurable to 260 * use additional threads. 261 * 262 * @param threadFactory the factory to use when creating new threads 263 * @return the newly created scheduled executor 264 * @throws NullPointerException if threadFactory is null 265 */ 266 static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) { 267 return new DelegatedScheduledExecutorService!ScheduledThreadPoolExecutor 268 (new ScheduledThreadPoolExecutor(1, threadFactory)); 269 } 270 271 /** 272 * Creates a thread pool that can schedule commands to run after a 273 * given delay, or to execute periodically. 274 * @param corePoolSize the number of threads to keep in the pool, 275 * even if they are idle 276 * @return the newly created scheduled thread pool 277 * @throws IllegalArgumentException if {@code corePoolSize < 0} 278 */ 279 static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { 280 return new ScheduledThreadPoolExecutor(corePoolSize); 281 } 282 283 /** 284 * Creates a thread pool that can schedule commands to run after a 285 * given delay, or to execute periodically. 286 * @param corePoolSize the number of threads to keep in the pool, 287 * even if they are idle 288 * @param threadFactory the factory to use when the executor 289 * creates a new thread 290 * @return the newly created scheduled thread pool 291 * @throws IllegalArgumentException if {@code corePoolSize < 0} 292 * @throws NullPointerException if threadFactory is null 293 */ 294 static ScheduledExecutorService newScheduledThreadPool( 295 int corePoolSize, ThreadFactory threadFactory) { 296 return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory); 297 } 298 299 // /** 300 // * Returns an object that delegates all defined {@link 301 // * ExecutorService} methods to the given executor, but not any 302 // * other methods that might otherwise be accessible using 303 // * casts. This provides a way to safely "freeze" configuration and 304 // * disallow tuning of a given concrete implementation. 305 // * @param executor the underlying implementation 306 // * @return an {@code ExecutorService} instance 307 // * @throws NullPointerException if executor null 308 // */ 309 // static ExecutorService unconfigurableExecutorService(ExecutorService executor) { 310 // if (executor is null) 311 // throw new NullPointerException(); 312 // return new DelegatedExecutorService(executor); 313 // } 314 315 // /** 316 // * Returns an object that delegates all defined {@link 317 // * ScheduledExecutorService} methods to the given executor, but 318 // * not any other methods that might otherwise be accessible using 319 // * casts. This provides a way to safely "freeze" configuration and 320 // * disallow tuning of a given concrete implementation. 321 // * @param executor the underlying implementation 322 // * @return a {@code ScheduledExecutorService} instance 323 // * @throws NullPointerException if executor null 324 // */ 325 // static ScheduledExecutorService unconfigurableScheduledExecutorService(ScheduledExecutorService executor) { 326 // if (executor is null) 327 // throw new NullPointerException(); 328 // return new DelegatedScheduledExecutorService(executor); 329 // } 330 331 /** 332 * Returns a default thread factory used to create new threads. 333 * This factory creates all new threads used by an Executor in the 334 * same {@link ThreadGroupEx}. If there is a {@link 335 * java.lang.SecurityManager}, it uses the group of {@link 336 * System#getSecurityManager}, else the group of the thread 337 * invoking this {@code defaultThreadFactory} method. Each new 338 * thread is created as a non-daemon thread with priority set to 339 * the smaller of {@code Thread.PRIORITY_DEFAULT} and the maximum 340 * priority permitted in the thread group. New threads have names 341 * accessible via {@link Thread#getName} of 342 * <em>pool-N-thread-M</em>, where <em>N</em> is the sequence 343 * number of this factory, and <em>M</em> is the sequence number 344 * of the thread created by this factory. 345 * @return a thread factory 346 */ 347 static ThreadFactory defaultThreadFactory() { 348 return ThreadFactory.defaultThreadFactory(); 349 } 350 351 // /** 352 // * Returns a thread factory used to create new threads that 353 // * have the same permissions as the current thread. 354 // * This factory creates threads with the same settings as {@link 355 // * Executors#defaultThreadFactory}, additionally setting the 356 // * AccessControlContext and contextClassLoader of new threads to 357 // * be the same as the thread invoking this 358 // * {@code privilegedThreadFactory} method. A new 359 // * {@code privilegedThreadFactory} can be created within an 360 // * {@link AccessController#doPrivileged AccessController.doPrivileged} 361 // * action setting the current thread's access control context to 362 // * create threads with the selected permission settings holding 363 // * within that action. 364 // * 365 // * <p>Note that while tasks running within such threads will have 366 // * the same access control and class loader settings as the 367 // * current thread, they need not have the same {@link 368 // * java.lang.ThreadLocal} or {@link 369 // * java.lang.InheritableThreadLocal} values. If necessary, 370 // * particular values of thread locals can be set or reset before 371 // * any task runs in {@link ThreadPoolExecutor} subclasses using 372 // * {@link ThreadPoolExecutor#beforeExecute(Thread, Runnable)}. 373 // * Also, if it is necessary to initialize worker threads to have 374 // * the same InheritableThreadLocal settings as some other 375 // * designated thread, you can create a custom ThreadFactory in 376 // * which that thread waits for and services requests to create 377 // * others that will inherit its values. 378 // * 379 // * @return a thread factory 380 // * @throws AccessControlException if the current access control 381 // * context does not have permission to both get and set context 382 // * class loader 383 // */ 384 // static ThreadFactory privilegedThreadFactory() { 385 // return new PrivilegedThreadFactory(); 386 // } 387 388 /** 389 * Returns a {@link Callable} object that, when 390 * called, runs the given task and returns the given result. This 391 * can be useful when applying methods requiring a 392 * {@code Callable} to an otherwise resultless action. 393 * @param task the task to run 394 * @param result the result to return 395 * @param (T) the type of the result 396 * @return a callable object 397 * @throws NullPointerException if task null 398 */ 399 static Callable!(void) callable(Runnable task) { 400 if (task is null) 401 throw new NullPointerException(); 402 return new RunnableAdapter!(void)(task); 403 } 404 405 static Callable!(T) callable(T)(Runnable task, T result) if(!is(T == void)) { 406 if (task is null) 407 throw new NullPointerException(); 408 return new RunnableAdapter!(T)(task, result); 409 } 410 411 /** 412 * Returns a {@link Callable} object that, when 413 * called, runs the given task and returns {@code null}. 414 * @param task the task to run 415 * @return a callable object 416 * @throws NullPointerException if task null 417 */ 418 // static Callable!(Object) callable(Runnable task) { 419 // if (task is null) 420 // throw new NullPointerException(); 421 // return new RunnableAdapter!(Object)(task, null); 422 // } 423 424 // /** 425 // * Returns a {@link Callable} object that, when 426 // * called, runs the given privileged action and returns its result. 427 // * @param action the privileged action to run 428 // * @return a callable object 429 // * @throws NullPointerException if action null 430 // */ 431 // static Callable!(Object) callable(PrivilegedAction<?> action) { 432 // if (action is null) 433 // throw new NullPointerException(); 434 // return new Callable!(Object)() { 435 // Object call() { return action.run(); }}; 436 // } 437 438 // /** 439 // * Returns a {@link Callable} object that, when 440 // * called, runs the given privileged exception action and returns 441 // * its result. 442 // * @param action the privileged exception action to run 443 // * @return a callable object 444 // * @throws NullPointerException if action null 445 // */ 446 // static Callable!(Object) callable(PrivilegedExceptionAction<?> action) { 447 // if (action is null) 448 // throw new NullPointerException(); 449 // return new Callable!(Object)() { 450 // Object call() throws Exception { return action.run(); }}; 451 // } 452 453 // /** 454 // * Returns a {@link Callable} object that will, when called, 455 // * execute the given {@code callable} under the current access 456 // * control context. This method should normally be invoked within 457 // * an {@link AccessController#doPrivileged AccessController.doPrivileged} 458 // * action to create callables that will, if possible, execute 459 // * under the selected permission settings holding within that 460 // * action; or if not possible, throw an associated {@link 461 // * AccessControlException}. 462 // * @param callable the underlying task 463 // * @param (T) the type of the callable's result 464 // * @return a callable object 465 // * @throws NullPointerException if callable null 466 // */ 467 // static !(T) Callable!(T) privilegedCallable(Callable!(T) callable) { 468 // if (callable is null) 469 // throw new NullPointerException(); 470 // return new PrivilegedCallable!(T)(callable); 471 // } 472 473 // /** 474 // * Returns a {@link Callable} object that will, when called, 475 // * execute the given {@code callable} under the current access 476 // * control context, with the current context class loader as the 477 // * context class loader. This method should normally be invoked 478 // * within an 479 // * {@link AccessController#doPrivileged AccessController.doPrivileged} 480 // * action to create callables that will, if possible, execute 481 // * under the selected permission settings holding within that 482 // * action; or if not possible, throw an associated {@link 483 // * AccessControlException}. 484 // * 485 // * @param callable the underlying task 486 // * @param (T) the type of the callable's result 487 // * @return a callable object 488 // * @throws NullPointerException if callable null 489 // * @throws AccessControlException if the current access control 490 // * context does not have permission to both set and get context 491 // * class loader 492 // */ 493 // static !(T) Callable!(T) privilegedCallableUsingCurrentClassLoader(Callable!(T) callable) { 494 // if (callable is null) 495 // throw new NullPointerException(); 496 // return new PrivilegedCallableUsingCurrentClassLoader!(T)(callable); 497 // } 498 499 500 // Methods for ExecutorService 501 502 /** 503 * Submits a Runnable task for execution and returns a Future 504 * representing that task. The Future's {@code get} method will 505 * return {@code null} upon <em>successful</em> completion. 506 * 507 * @param task the task to submit 508 * @return a Future representing pending completion of the task 509 * @throws RejectedExecutionException if the task cannot be 510 * scheduled for execution 511 * @throws NullPointerException if the task is null 512 */ 513 static Future!(void) submit(ExecutorService es, Runnable task) { 514 515 AbstractExecutorService aes = cast(AbstractExecutorService)es; 516 if(aes is null) 517 throw new RejectedExecutionException("ExecutorService is null"); 518 else 519 return aes.submit(task); 520 521 // TypeInfo typeInfo = typeid(cast(Object)es); 522 // if(typeInfo == typeid(ThreadPoolExecutor)) { 523 // AbstractExecutorService aes = cast(AbstractExecutorService)es; 524 // return aes.submit(task); 525 // } else { 526 // implementationMissing(false); 527 // } 528 } 529 530 /** 531 * Submits a Runnable task for execution and returns a Future 532 * representing that task. The Future's {@code get} method will 533 * return the given result upon successful completion. 534 * 535 * @param task the task to submit 536 * @param result the result to return 537 * @param (T) the type of the result 538 * @return a Future representing pending completion of the task 539 * @throws RejectedExecutionException if the task cannot be 540 * scheduled for execution 541 * @throws NullPointerException if the task is null 542 */ 543 static Future!(T) submit(T)(ExecutorService es, Runnable task, T result) { 544 AbstractExecutorService aes = cast(AbstractExecutorService)es; 545 if(aes is null) 546 throw new RejectedExecutionException("ExecutorService is null"); 547 else 548 return aes.submit!T(task, result); 549 550 // TypeInfo typeInfo = typeid(cast(Object)es); 551 // if(typeInfo == typeid(ThreadPoolExecutor)) { 552 // AbstractExecutorService aes = cast(AbstractExecutorService)es; 553 // if(aes is null) 554 // throw new RejectedExecutionException("ExecutorService is null"); 555 // else 556 // return aes.submit!T(task, result); 557 // } else { 558 // implementationMissing(false); 559 // } 560 } 561 562 /** 563 * Submits a value-returning task for execution and returns a 564 * Future representing the pending results of the task. The 565 * Future's {@code get} method will return the task's result upon 566 * successful completion. 567 * 568 * <p> 569 * If you would like to immediately block waiting 570 * for a task, you can use constructions of the form 571 * {@code result = exec.submit(aCallable).get();} 572 * 573 * <p>Note: The {@link Executors} class includes a set of methods 574 * that can convert some other common closure-like objects, 575 * for example, {@link java.security.PrivilegedAction} to 576 * {@link Callable} form so they can be submitted. 577 * 578 * @param task the task to submit 579 * @param (T) the type of the task's result 580 * @return a Future representing pending completion of the task 581 * @throws RejectedExecutionException if the task cannot be 582 * scheduled for execution 583 * @throws NullPointerException if the task is null 584 */ 585 static Future!(T) submit(T)(ExecutorService es, Callable!(T) task) { 586 AbstractExecutorService aes = cast(AbstractExecutorService)es; 587 if(aes is null) 588 throw new RejectedExecutionException("ExecutorService is null"); 589 else 590 return aes.submit!(T)(task); 591 592 // TypeInfo typeInfo = typeid(cast(Object)es); 593 // if(typeInfo == typeid(ThreadPoolExecutor)) { 594 // AbstractExecutorService aes = cast(AbstractExecutorService)es; 595 // if(aes is null) 596 // throw new RejectedExecutionException("ExecutorService is null"); 597 // else 598 // return aes.submit!(T)(task); 599 // } else { 600 // implementationMissing(false); 601 // } 602 } 603 604 /** 605 * Executes the given tasks, returning a list of Futures holding 606 * their status and results when all complete. 607 * {@link Future#isDone} is {@code true} for each 608 * element of the returned list. 609 * Note that a <em>completed</em> task could have 610 * terminated either normally or by throwing an exception. 611 * The results of this method are undefined if the given 612 * collection is modified while this operation is in progress. 613 * 614 * @param tasks the collection of tasks 615 * @param (T) the type of the values returned from the tasks 616 * @return a list of Futures representing the tasks, in the same 617 * sequential order as produced by the iterator for the 618 * given task list, each of which has completed 619 * @throws InterruptedException if interrupted while waiting, in 620 * which case unfinished tasks are cancelled 621 * @throws NullPointerException if tasks or any of its elements are {@code null} 622 * @throws RejectedExecutionException if any task cannot be 623 * scheduled for execution 624 */ 625 static List!(Future!(T)) invokeAll(T)(ExecutorService es, Collection!(Callable!(T)) tasks) { 626 627 AbstractExecutorService aes = cast(AbstractExecutorService)es; 628 if(aes is null) 629 throw new RejectedExecutionException("ExecutorService is null"); 630 else { 631 aes.invokeAll!(T)(tasks); 632 } 633 634 } 635 636 /** 637 * Executes the given tasks, returning a list of Futures holding 638 * their status and results 639 * when all complete or the timeout expires, whichever happens first. 640 * {@link Future#isDone} is {@code true} for each 641 * element of the returned list. 642 * Upon return, tasks that have not completed are cancelled. 643 * Note that a <em>completed</em> task could have 644 * terminated either normally or by throwing an exception. 645 * The results of this method are undefined if the given 646 * collection is modified while this operation is in progress. 647 * 648 * @param tasks the collection of tasks 649 * @param timeout the maximum time to wait 650 * @param unit the time unit of the timeout argument 651 * @param (T) the type of the values returned from the tasks 652 * @return a list of Futures representing the tasks, in the same 653 * sequential order as produced by the iterator for the 654 * given task list. If the operation did not time out, 655 * each task will have completed. If it did time out, some 656 * of these tasks will not have completed. 657 * @throws InterruptedException if interrupted while waiting, in 658 * which case unfinished tasks are cancelled 659 * @throws NullPointerException if tasks, any of its elements, or 660 * unit are {@code null} 661 * @throws RejectedExecutionException if any task cannot be scheduled 662 * for execution 663 */ 664 static List!(Future!(T)) invokeAll(T)(ExecutorService es, Collection!(Callable!(T)) tasks, 665 Duration timeout) { 666 AbstractExecutorService aes = cast(AbstractExecutorService)es; 667 if(aes is null) 668 throw new RejectedExecutionException("ExecutorService is null"); 669 else { 670 aes.invokeAll!(T)(tasks, timeout); 671 } 672 } 673 674 /** 675 * Executes the given tasks, returning the result 676 * of one that has completed successfully (i.e., without throwing 677 * an exception), if any do. Upon normal or exceptional return, 678 * tasks that have not completed are cancelled. 679 * The results of this method are undefined if the given 680 * collection is modified while this operation is in progress. 681 * 682 * @param tasks the collection of tasks 683 * @param (T) the type of the values returned from the tasks 684 * @return the result returned by one of the tasks 685 * @throws InterruptedException if interrupted while waiting 686 * @throws NullPointerException if tasks or any element task 687 * subject to execution is {@code null} 688 * @throws IllegalArgumentException if tasks is empty 689 * @throws ExecutionException if no task successfully completes 690 * @throws RejectedExecutionException if tasks cannot be scheduled 691 * for execution 692 */ 693 static T invokeAny(T)(ExecutorService es, Collection!(Callable!(T)) tasks) { 694 AbstractExecutorService aes = cast(AbstractExecutorService)es; 695 if(aes is null) 696 throw new RejectedExecutionException("ExecutorService is null"); 697 else { 698 aes.invokeAny!(T)(tasks); 699 } 700 } 701 702 /** 703 * Executes the given tasks, returning the result 704 * of one that has completed successfully (i.e., without throwing 705 * an exception), if any do before the given timeout elapses. 706 * Upon normal or exceptional return, tasks that have not 707 * completed are cancelled. 708 * The results of this method are undefined if the given 709 * collection is modified while this operation is in progress. 710 * 711 * @param tasks the collection of tasks 712 * @param timeout the maximum time to wait 713 * @param unit the time unit of the timeout argument 714 * @param (T) the type of the values returned from the tasks 715 * @return the result returned by one of the tasks 716 * @throws InterruptedException if interrupted while waiting 717 * @throws NullPointerException if tasks, or unit, or any element 718 * task subject to execution is {@code null} 719 * @throws TimeoutException if the given timeout elapses before 720 * any task successfully completes 721 * @throws ExecutionException if no task successfully completes 722 * @throws RejectedExecutionException if tasks cannot be scheduled 723 * for execution 724 */ 725 static T invokeAny(T)(ExecutorService es, Collection!(Callable!(T)) tasks, 726 Duration timeout) { 727 AbstractExecutorService aes = cast(AbstractExecutorService)es; 728 if(aes is null) 729 throw new RejectedExecutionException("ExecutorService is null"); 730 else { 731 aes.invokeAny!(T)(tasks, timeout); 732 } 733 } 734 735 /** Cannot instantiate. */ 736 private this() {} 737 } 738 739 // Non-classes supporting the methods 740 741 /** 742 * A callable that runs given task and returns given result. 743 */ 744 private final class RunnableAdapter(T) : Callable!(T) if(is(T == void)) { 745 private Runnable task; 746 this(Runnable task) { 747 this.task = task; 748 } 749 750 T call() { 751 task.run(); 752 } 753 754 override string toString() { 755 return super.toString() ~ "[Wrapped task = " ~ (cast(Object)task).toString() ~ "]"; 756 } 757 } 758 759 private final class RunnableAdapter(T) : Callable!(T) if(!is(T == void)) { 760 private Runnable task; 761 private T result; 762 763 this(Runnable task, T result) { 764 this.task = task; 765 this.result = result; 766 } 767 768 T call() { 769 task.run(); 770 return result; 771 } 772 773 override string toString() { 774 return super.toString() ~ "[Wrapped task = " ~ (cast(Object)task).toString() ~ "]"; 775 } 776 } 777 778 // /** 779 // * A callable that runs under established access control settings. 780 // */ 781 // private final class PrivilegedCallable!(T) : Callable!(T) { 782 // Callable!(T) task; 783 // AccessControlContext acc; 784 785 // PrivilegedCallable(Callable!(T) task) { 786 // this.task = task; 787 // this.acc = AccessController.getContext(); 788 // } 789 790 // T call() throws Exception { 791 // try { 792 // return AccessController.doPrivileged( 793 // new PrivilegedExceptionAction!(T)() { 794 // T run() throws Exception { 795 // return task.call(); 796 // } 797 // }, acc); 798 // } catch (PrivilegedActionException e) { 799 // throw e.getException(); 800 // } 801 // } 802 803 // string toString() { 804 // return super.toString() ~ "[Wrapped task = " ~ task ~ "]"; 805 // } 806 // } 807 808 // /** 809 // * A callable that runs under established access control settings and 810 // * current ClassLoader. 811 // */ 812 // private final class PrivilegedCallableUsingCurrentClassLoader(T) 813 // : Callable!(T) { 814 // Callable!(T) task; 815 // AccessControlContext acc; 816 // ClassLoader ccl; 817 818 // this(Callable!(T) task) { 819 // SecurityManager sm = System.getSecurityManager(); 820 // if (sm !is null) { 821 // // Calls to getContextClassLoader from this class 822 // // never trigger a security check, but we check 823 // // whether our callers have this permission anyways. 824 // sm.checkPermission(SecurityConstants.GET_CLASSLOADER_PERMISSION); 825 826 // // Whether setContextClassLoader turns out to be necessary 827 // // or not, we fail fast if permission is not available. 828 // sm.checkPermission(new RuntimePermission("setContextClassLoader")); 829 // } 830 // this.task = task; 831 // this.acc = AccessController.getContext(); 832 // this.ccl = Thread.getThis().getContextClassLoader(); 833 // } 834 835 // T call() throws Exception { 836 // try { 837 // return AccessController.doPrivileged( 838 // new PrivilegedExceptionAction!(T)() { 839 // T run() throws Exception { 840 // Thread t = Thread.getThis(); 841 // ClassLoader cl = t.getContextClassLoader(); 842 // if (ccl == cl) { 843 // return task.call(); 844 // } else { 845 // t.setContextClassLoader(ccl); 846 // try { 847 // return task.call(); 848 // } finally { 849 // t.setContextClassLoader(cl); 850 // } 851 // } 852 // } 853 // }, acc); 854 // } catch (PrivilegedActionException e) { 855 // throw e.getException(); 856 // } 857 // } 858 859 // string toString() { 860 // return super.toString() ~ "[Wrapped task = " ~ task ~ "]"; 861 // } 862 // } 863 864 void reachabilityFence(ExecutorService) { 865 // do nothing; 866 // TODO: Tasks pending completion -@zxp at 5/10/2019, 10:50:31 AM 867 // remove this 868 } 869 870 /** 871 * A wrapper class that exposes only the ExecutorService methods 872 * of an ExecutorService implementation. 873 */ 874 private class DelegatedExecutorService(U) : ExecutorService 875 if(is(U : ExecutorService)) { 876 877 private U e; 878 879 this(U executor) { e = executor; } 880 881 void execute(Runnable command) { 882 try { 883 e.execute(command); 884 } finally { reachabilityFence(this); } 885 } 886 887 void shutdown() { e.shutdown(); } 888 889 List!(Runnable) shutdownNow() { 890 try { 891 return e.shutdownNow(); 892 } finally { reachabilityFence(this); } 893 } 894 895 bool isShutdown() { 896 try { 897 return e.isShutdown(); 898 } finally { reachabilityFence(this); } 899 } 900 901 bool isTerminated() { 902 try { 903 return e.isTerminated(); 904 } finally { reachabilityFence(this); } 905 } 906 907 bool awaitTermination(Duration timeout) { 908 try { 909 return e.awaitTermination(timeout); 910 } finally { reachabilityFence(this); } 911 } 912 913 Future!void submit(Runnable task) { 914 try { 915 return e.submit(task); 916 } finally { reachabilityFence(this); } 917 } 918 919 Future!(T) submit(T)(Callable!(T) task) { 920 try { 921 return e.submit(task); 922 } finally { reachabilityFence(this); } 923 } 924 925 Future!(T) submit(T)(Runnable task, T result) { 926 try { 927 return e.submit(task, result); 928 } finally { reachabilityFence(this); } 929 } 930 931 List!(Future!(T)) invokeAll(T)(Collection!(Callable!(T)) tasks) { 932 try { 933 return e.invokeAll(tasks); 934 } finally { reachabilityFence(this); } 935 } 936 937 List!(Future!(T)) invokeAll(T)(Collection!(Callable!(T)) tasks, 938 Duration timeout) { 939 try { 940 return e.invokeAll(tasks, timeout, unit); 941 } finally { reachabilityFence(this); } 942 } 943 944 T invokeAny(T)(Collection!(Callable!(T)) tasks) { 945 try { 946 return e.invokeAny(tasks); 947 } finally { reachabilityFence(this); } 948 } 949 950 T invokeAny(T)(Collection!(Callable!(T)) tasks, 951 Duration timeout) { 952 try { 953 return e.invokeAny(tasks, timeout, unit); 954 } finally { reachabilityFence(this); } 955 } 956 } 957 958 private class FinalizableDelegatedExecutorService(T) : DelegatedExecutorService!T { 959 this(T executor) { 960 super(executor); 961 } 962 963 protected void finalize() { 964 super.shutdown(); 965 } 966 } 967 968 /** 969 * A wrapper class that exposes only the ScheduledExecutorService 970 * methods of a ScheduledExecutorService implementation. 971 */ 972 private class DelegatedScheduledExecutorService(T) : DelegatedExecutorService!T, 973 ScheduledExecutorService if(is(T : ScheduledExecutorService)){ 974 975 private T e; 976 977 this(T executor) { 978 super(executor); 979 e = executor; 980 } 981 982 ScheduledFuture!void schedule(Runnable command, Duration delay) { 983 return e.schedule(command, delay); 984 } 985 986 ScheduledFuture!(V) schedule(V)(Callable!(V) callable, Duration delay) { 987 return e.schedule!V(callable, delay); 988 } 989 990 ScheduledFuture!void scheduleAtFixedRate(Runnable command, Duration initialDelay, Duration period) { 991 return e.scheduleAtFixedRate(command, initialDelay, period); 992 } 993 994 ScheduledFuture!void scheduleWithFixedDelay(Runnable command, Duration initialDelay, Duration delay) { 995 return e.scheduleWithFixedDelay(command, initialDelay, delay); 996 } 997 }