1 /* 2 * Hunt - A refined core library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.concurrency.Executors; 13 14 import hunt.concurrency.AbstractExecutorService; 15 import hunt.concurrency.atomic.AtomicHelper; 16 import hunt.concurrency.Exceptions; 17 import hunt.concurrency.ExecutorService; 18 // import hunt.concurrency.ForkJoinPool; 19 import hunt.concurrency.Future; 20 import hunt.concurrency.LinkedBlockingQueue; 21 import hunt.concurrency.ScheduledExecutorService; 22 import hunt.concurrency.ScheduledThreadPoolExecutor; 23 import hunt.concurrency.ThreadFactory; 24 import hunt.concurrency.ThreadPoolExecutor; 25 26 import hunt.util.DateTime; 27 import hunt.Exceptions; 28 import hunt.util.Common; 29 30 import core.thread; 31 import core.time; 32 import std.conv; 33 34 // import static java.lang.ref.Reference.reachabilityFence; 35 // import java.security.AccessControlContext; 36 // import java.security.AccessControlException; 37 // import java.security.AccessController; 38 // import java.security.PrivilegedAction; 39 // import java.security.PrivilegedActionException; 40 // import java.security.PrivilegedExceptionAction; 41 // import hunt.collection.Collection; 42 // import java.util.List; 43 // import sun.security.util.SecurityConstants; 44 45 /** 46 * Factory and utility methods for {@link Executor}, {@link 47 * ExecutorService}, {@link ScheduledExecutorService}, {@link 48 * ThreadFactory}, and {@link Callable} classes defined in this 49 * package. This class supports the following kinds of methods: 50 * 51 * <ul> 52 * <li>Methods that create and return an {@link ExecutorService} 53 * set up with commonly useful configuration settings. 54 * <li>Methods that create and return a {@link ScheduledExecutorService} 55 * set up with commonly useful configuration settings. 56 * <li>Methods that create and return a "wrapped" ExecutorService, that 57 * disables reconfiguration by making implementation-specific methods 58 * inaccessible. 59 * <li>Methods that create and return a {@link ThreadFactory} 60 * that sets newly created threads to a known state. 61 * <li>Methods that create and return a {@link Callable} 62 * out of other closure-like forms, so they can be used 63 * in execution methods requiring {@code Callable}. 64 * </ul> 65 * 66 * @since 1.5 67 * @author Doug Lea 68 */ 69 class Executors { 70 71 /** 72 * Creates a thread pool that reuses a fixed number of threads 73 * operating off a shared unbounded queue. At any point, at most 74 * {@code nThreads} threads will be active processing tasks. 75 * If additional tasks are submitted when all threads are active, 76 * they will wait in the queue until a thread is available. 77 * If any thread terminates due to a failure during execution 78 * prior to shutdown, a new one will take its place if needed to 79 * execute subsequent tasks. The threads in the pool will exist 80 * until it is explicitly {@link ExecutorService#shutdown shutdown}. 81 * 82 * @param nThreads the number of threads in the pool 83 * @return the newly created thread pool 84 * @throws IllegalArgumentException if {@code nThreads <= 0} 85 */ 86 static ThreadPoolExecutor newFixedThreadPool(int nThreads) { 87 return new ThreadPoolExecutor(nThreads, nThreads, 0.hnsecs, 88 new LinkedBlockingQueue!(Runnable)()); 89 } 90 91 // /** 92 // * Creates a thread pool that maintains enough threads to support 93 // * the given parallelism level, and may use multiple queues to 94 // * reduce contention. The parallelism level corresponds to the 95 // * maximum number of threads actively engaged in, or available to 96 // * engage in, task processing. The actual number of threads may 97 // * grow and shrink dynamically. A work-stealing pool makes no 98 // * guarantees about the order in which submitted tasks are 99 // * executed. 100 // * 101 // * @param parallelism the targeted parallelism level 102 // * @return the newly created thread pool 103 // * @throws IllegalArgumentException if {@code parallelism <= 0} 104 // * @since 1.8 105 // */ 106 // static ExecutorService newWorkStealingPool(int parallelism) { 107 // return new ForkJoinPool 108 // (parallelism, 109 // ForkJoinPool.defaultForkJoinWorkerThreadFactory, 110 // null, true); 111 // } 112 113 // /** 114 // * Creates a work-stealing thread pool using the number of 115 // * {@linkplain Runtime#availableProcessors available processors} 116 // * as its target parallelism level. 117 // * 118 // * @return the newly created thread pool 119 // * @see #newWorkStealingPool(int) 120 // * @since 1.8 121 // */ 122 // static ExecutorService newWorkStealingPool() { 123 // return new ForkJoinPool 124 // (Runtime.getRuntime().availableProcessors(), 125 // ForkJoinPool.defaultForkJoinWorkerThreadFactory, 126 // null, true); 127 // } 128 129 /** 130 * Creates a thread pool that reuses a fixed number of threads 131 * operating off a shared unbounded queue, using the provided 132 * ThreadFactory to create new threads when needed. At any point, 133 * at most {@code nThreads} threads will be active processing 134 * tasks. If additional tasks are submitted when all threads are 135 * active, they will wait in the queue until a thread is 136 * available. If any thread terminates due to a failure during 137 * execution prior to shutdown, a new one will take its place if 138 * needed to execute subsequent tasks. The threads in the pool will 139 * exist until it is explicitly {@link ExecutorService#shutdown 140 * shutdown}. 141 * 142 * @param nThreads the number of threads in the pool 143 * @param threadFactory the factory to use when creating new threads 144 * @return the newly created thread pool 145 * @throws NullPointerException if threadFactory is null 146 * @throws IllegalArgumentException if {@code nThreads <= 0} 147 */ 148 static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { 149 return new ThreadPoolExecutor(nThreads, nThreads, 0.msecs, 150 new LinkedBlockingQueue!(Runnable)(), 151 threadFactory); 152 } 153 154 /** 155 * Creates an Executor that uses a single worker thread operating 156 * off an unbounded queue. (Note however that if this single 157 * thread terminates due to a failure during execution prior to 158 * shutdown, a new one will take its place if needed to execute 159 * subsequent tasks.) Tasks are guaranteed to execute 160 * sequentially, and no more than one task will be active at any 161 * given time. Unlike the otherwise equivalent 162 * {@code newFixedThreadPool(1)} the returned executor is 163 * guaranteed not to be reconfigurable to use additional threads. 164 * 165 * @return the newly created single-threaded Executor 166 */ 167 // static ExecutorService newSingleThreadExecutor() { 168 // return new FinalizableDelegatedExecutorService 169 // (new ThreadPoolExecutor(1, 1, 170 // 0L, TimeUnit.MILLISECONDS, 171 // new LinkedBlockingQueue!(Runnable)())); 172 // } 173 174 // /** 175 // * Creates an Executor that uses a single worker thread operating 176 // * off an unbounded queue, and uses the provided ThreadFactory to 177 // * create a new thread when needed. Unlike the otherwise 178 // * equivalent {@code newFixedThreadPool(1, threadFactory)} the 179 // * returned executor is guaranteed not to be reconfigurable to use 180 // * additional threads. 181 // * 182 // * @param threadFactory the factory to use when creating new threads 183 // * @return the newly created single-threaded Executor 184 // * @throws NullPointerException if threadFactory is null 185 // */ 186 // static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { 187 // return new FinalizableDelegatedExecutorService 188 // (new ThreadPoolExecutor(1, 1, 189 // 0L, TimeUnit.MILLISECONDS, 190 // new LinkedBlockingQueue!(Runnable)(), 191 // threadFactory)); 192 // } 193 194 /** 195 * Creates a thread pool that creates new threads as needed, but 196 * will reuse previously constructed threads when they are 197 * available. These pools will typically improve the performance 198 * of programs that execute many short-lived asynchronous tasks. 199 * Calls to {@code execute} will reuse previously constructed 200 * threads if available. If no existing thread is available, a new 201 * thread will be created and added to the pool. Threads that have 202 * not been used for sixty seconds are terminated and removed from 203 * the cache. Thus, a pool that remains idle for long enough will 204 * not consume any resources. Note that pools with similar 205 * properties but different details (for example, timeout parameters) 206 * may be created using {@link ThreadPoolExecutor} constructors. 207 * 208 * @return the newly created thread pool 209 */ 210 // static ExecutorService newCachedThreadPool() { 211 // return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 212 // 60L, TimeUnit.SECONDS, 213 // new SynchronousQueue!(Runnable)()); 214 // } 215 216 // /** 217 // * Creates a thread pool that creates new threads as needed, but 218 // * will reuse previously constructed threads when they are 219 // * available, and uses the provided 220 // * ThreadFactory to create new threads when needed. 221 // * 222 // * @param threadFactory the factory to use when creating new threads 223 // * @return the newly created thread pool 224 // * @throws NullPointerException if threadFactory is null 225 // */ 226 // static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { 227 // return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 228 // 60L, TimeUnit.SECONDS, 229 // new SynchronousQueue!(Runnable)(), 230 // threadFactory); 231 // } 232 233 // /** 234 // * Creates a single-threaded executor that can schedule commands 235 // * to run after a given delay, or to execute periodically. 236 // * (Note however that if this single 237 // * thread terminates due to a failure during execution prior to 238 // * shutdown, a new one will take its place if needed to execute 239 // * subsequent tasks.) Tasks are guaranteed to execute 240 // * sequentially, and no more than one task will be active at any 241 // * given time. Unlike the otherwise equivalent 242 // * {@code newScheduledThreadPool(1)} the returned executor is 243 // * guaranteed not to be reconfigurable to use additional threads. 244 // * 245 // * @return the newly created scheduled executor 246 // */ 247 // static ScheduledExecutorService newSingleThreadScheduledExecutor() { 248 // return new DelegatedScheduledExecutorService 249 // (new ScheduledThreadPoolExecutor(1)); 250 // } 251 252 // /** 253 // * Creates a single-threaded executor that can schedule commands 254 // * to run after a given delay, or to execute periodically. (Note 255 // * however that if this single thread terminates due to a failure 256 // * during execution prior to shutdown, a new one will take its 257 // * place if needed to execute subsequent tasks.) Tasks are 258 // * guaranteed to execute sequentially, and no more than one task 259 // * will be active at any given time. Unlike the otherwise 260 // * equivalent {@code newScheduledThreadPool(1, threadFactory)} 261 // * the returned executor is guaranteed not to be reconfigurable to 262 // * use additional threads. 263 // * 264 // * @param threadFactory the factory to use when creating new threads 265 // * @return the newly created scheduled executor 266 // * @throws NullPointerException if threadFactory is null 267 // */ 268 // static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) { 269 // return new DelegatedScheduledExecutorService 270 // (new ScheduledThreadPoolExecutor(1, threadFactory)); 271 // } 272 273 /** 274 * Creates a thread pool that can schedule commands to run after a 275 * given delay, or to execute periodically. 276 * @param corePoolSize the number of threads to keep in the pool, 277 * even if they are idle 278 * @return the newly created scheduled thread pool 279 * @throws IllegalArgumentException if {@code corePoolSize < 0} 280 */ 281 static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { 282 return new ScheduledThreadPoolExecutor(corePoolSize); 283 } 284 285 /** 286 * Creates a thread pool that can schedule commands to run after a 287 * given delay, or to execute periodically. 288 * @param corePoolSize the number of threads to keep in the pool, 289 * even if they are idle 290 * @param threadFactory the factory to use when the executor 291 * creates a new thread 292 * @return the newly created scheduled thread pool 293 * @throws IllegalArgumentException if {@code corePoolSize < 0} 294 * @throws NullPointerException if threadFactory is null 295 */ 296 static ScheduledExecutorService newScheduledThreadPool( 297 int corePoolSize, ThreadFactory threadFactory) { 298 return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory); 299 } 300 301 // /** 302 // * Returns an object that delegates all defined {@link 303 // * ExecutorService} methods to the given executor, but not any 304 // * other methods that might otherwise be accessible using 305 // * casts. This provides a way to safely "freeze" configuration and 306 // * disallow tuning of a given concrete implementation. 307 // * @param executor the underlying implementation 308 // * @return an {@code ExecutorService} instance 309 // * @throws NullPointerException if executor null 310 // */ 311 // static ExecutorService unconfigurableExecutorService(ExecutorService executor) { 312 // if (executor is null) 313 // throw new NullPointerException(); 314 // return new DelegatedExecutorService(executor); 315 // } 316 317 // /** 318 // * Returns an object that delegates all defined {@link 319 // * ScheduledExecutorService} methods to the given executor, but 320 // * not any other methods that might otherwise be accessible using 321 // * casts. This provides a way to safely "freeze" configuration and 322 // * disallow tuning of a given concrete implementation. 323 // * @param executor the underlying implementation 324 // * @return a {@code ScheduledExecutorService} instance 325 // * @throws NullPointerException if executor null 326 // */ 327 // static ScheduledExecutorService unconfigurableScheduledExecutorService(ScheduledExecutorService executor) { 328 // if (executor is null) 329 // throw new NullPointerException(); 330 // return new DelegatedScheduledExecutorService(executor); 331 // } 332 333 /** 334 * Returns a default thread factory used to create new threads. 335 * This factory creates all new threads used by an Executor in the 336 * same {@link ThreadGroupEx}. If there is a {@link 337 * java.lang.SecurityManager}, it uses the group of {@link 338 * System#getSecurityManager}, else the group of the thread 339 * invoking this {@code defaultThreadFactory} method. Each new 340 * thread is created as a non-daemon thread with priority set to 341 * the smaller of {@code Thread.PRIORITY_DEFAULT} and the maximum 342 * priority permitted in the thread group. New threads have names 343 * accessible via {@link Thread#getName} of 344 * <em>pool-N-thread-M</em>, where <em>N</em> is the sequence 345 * number of this factory, and <em>M</em> is the sequence number 346 * of the thread created by this factory. 347 * @return a thread factory 348 */ 349 static ThreadFactory defaultThreadFactory() { 350 return ThreadFactory.defaultThreadFactory(); 351 } 352 353 // /** 354 // * Returns a thread factory used to create new threads that 355 // * have the same permissions as the current thread. 356 // * This factory creates threads with the same settings as {@link 357 // * Executors#defaultThreadFactory}, additionally setting the 358 // * AccessControlContext and contextClassLoader of new threads to 359 // * be the same as the thread invoking this 360 // * {@code privilegedThreadFactory} method. A new 361 // * {@code privilegedThreadFactory} can be created within an 362 // * {@link AccessController#doPrivileged AccessController.doPrivileged} 363 // * action setting the current thread's access control context to 364 // * create threads with the selected permission settings holding 365 // * within that action. 366 // * 367 // * <p>Note that while tasks running within such threads will have 368 // * the same access control and class loader settings as the 369 // * current thread, they need not have the same {@link 370 // * java.lang.ThreadLocal} or {@link 371 // * java.lang.InheritableThreadLocal} values. If necessary, 372 // * particular values of thread locals can be set or reset before 373 // * any task runs in {@link ThreadPoolExecutor} subclasses using 374 // * {@link ThreadPoolExecutor#beforeExecute(Thread, Runnable)}. 375 // * Also, if it is necessary to initialize worker threads to have 376 // * the same InheritableThreadLocal settings as some other 377 // * designated thread, you can create a custom ThreadFactory in 378 // * which that thread waits for and services requests to create 379 // * others that will inherit its values. 380 // * 381 // * @return a thread factory 382 // * @throws AccessControlException if the current access control 383 // * context does not have permission to both get and set context 384 // * class loader 385 // */ 386 // static ThreadFactory privilegedThreadFactory() { 387 // return new PrivilegedThreadFactory(); 388 // } 389 390 /** 391 * Returns a {@link Callable} object that, when 392 * called, runs the given task and returns the given result. This 393 * can be useful when applying methods requiring a 394 * {@code Callable} to an otherwise resultless action. 395 * @param task the task to run 396 * @param result the result to return 397 * @param (T) the type of the result 398 * @return a callable object 399 * @throws NullPointerException if task null 400 */ 401 static Callable!(void) callable(Runnable task) { 402 if (task is null) 403 throw new NullPointerException(); 404 return new RunnableAdapter!(void)(task); 405 } 406 407 static Callable!(T) callable(T)(Runnable task, T result) if(!is(T == void)) { 408 if (task is null) 409 throw new NullPointerException(); 410 return new RunnableAdapter!(T)(task, result); 411 } 412 413 /** 414 * Returns a {@link Callable} object that, when 415 * called, runs the given task and returns {@code null}. 416 * @param task the task to run 417 * @return a callable object 418 * @throws NullPointerException if task null 419 */ 420 // static Callable!(Object) callable(Runnable task) { 421 // if (task is null) 422 // throw new NullPointerException(); 423 // return new RunnableAdapter!(Object)(task, null); 424 // } 425 426 // /** 427 // * Returns a {@link Callable} object that, when 428 // * called, runs the given privileged action and returns its result. 429 // * @param action the privileged action to run 430 // * @return a callable object 431 // * @throws NullPointerException if action null 432 // */ 433 // static Callable!(Object) callable(PrivilegedAction<?> action) { 434 // if (action is null) 435 // throw new NullPointerException(); 436 // return new Callable!(Object)() { 437 // Object call() { return action.run(); }}; 438 // } 439 440 // /** 441 // * Returns a {@link Callable} object that, when 442 // * called, runs the given privileged exception action and returns 443 // * its result. 444 // * @param action the privileged exception action to run 445 // * @return a callable object 446 // * @throws NullPointerException if action null 447 // */ 448 // static Callable!(Object) callable(PrivilegedExceptionAction<?> action) { 449 // if (action is null) 450 // throw new NullPointerException(); 451 // return new Callable!(Object)() { 452 // Object call() throws Exception { return action.run(); }}; 453 // } 454 455 // /** 456 // * Returns a {@link Callable} object that will, when called, 457 // * execute the given {@code callable} under the current access 458 // * control context. This method should normally be invoked within 459 // * an {@link AccessController#doPrivileged AccessController.doPrivileged} 460 // * action to create callables that will, if possible, execute 461 // * under the selected permission settings holding within that 462 // * action; or if not possible, throw an associated {@link 463 // * AccessControlException}. 464 // * @param callable the underlying task 465 // * @param (T) the type of the callable's result 466 // * @return a callable object 467 // * @throws NullPointerException if callable null 468 // */ 469 // static !(T) Callable!(T) privilegedCallable(Callable!(T) callable) { 470 // if (callable is null) 471 // throw new NullPointerException(); 472 // return new PrivilegedCallable!(T)(callable); 473 // } 474 475 // /** 476 // * Returns a {@link Callable} object that will, when called, 477 // * execute the given {@code callable} under the current access 478 // * control context, with the current context class loader as the 479 // * context class loader. This method should normally be invoked 480 // * within an 481 // * {@link AccessController#doPrivileged AccessController.doPrivileged} 482 // * action to create callables that will, if possible, execute 483 // * under the selected permission settings holding within that 484 // * action; or if not possible, throw an associated {@link 485 // * AccessControlException}. 486 // * 487 // * @param callable the underlying task 488 // * @param (T) the type of the callable's result 489 // * @return a callable object 490 // * @throws NullPointerException if callable null 491 // * @throws AccessControlException if the current access control 492 // * context does not have permission to both set and get context 493 // * class loader 494 // */ 495 // static !(T) Callable!(T) privilegedCallableUsingCurrentClassLoader(Callable!(T) callable) { 496 // if (callable is null) 497 // throw new NullPointerException(); 498 // return new PrivilegedCallableUsingCurrentClassLoader!(T)(callable); 499 // } 500 501 502 // Methods for ExecutorService 503 504 /** 505 * Submits a Runnable task for execution and returns a Future 506 * representing that task. The Future's {@code get} method will 507 * return {@code null} upon <em>successful</em> completion. 508 * 509 * @param task the task to submit 510 * @return a Future representing pending completion of the task 511 * @throws RejectedExecutionException if the task cannot be 512 * scheduled for execution 513 * @throws NullPointerException if the task is null 514 */ 515 static Future!(void) submit(ExecutorService es, Runnable task) { 516 517 AbstractExecutorService aes = cast(AbstractExecutorService)es; 518 if(aes is null) 519 throw new RejectedExecutionException("ExecutorService is null"); 520 else 521 return aes.submit(task); 522 523 // TypeInfo typeInfo = typeid(cast(Object)es); 524 // if(typeInfo == typeid(ThreadPoolExecutor)) { 525 // AbstractExecutorService aes = cast(AbstractExecutorService)es; 526 // return aes.submit(task); 527 // } else { 528 // implementationMissing(false); 529 // } 530 } 531 532 /** 533 * Submits a Runnable task for execution and returns a Future 534 * representing that task. The Future's {@code get} method will 535 * return the given result upon successful completion. 536 * 537 * @param task the task to submit 538 * @param result the result to return 539 * @param (T) the type of the result 540 * @return a Future representing pending completion of the task 541 * @throws RejectedExecutionException if the task cannot be 542 * scheduled for execution 543 * @throws NullPointerException if the task is null 544 */ 545 static Future!(T) submit(T)(ExecutorService es, Runnable task, T result) { 546 AbstractExecutorService aes = cast(AbstractExecutorService)es; 547 if(aes is null) 548 throw new RejectedExecutionException("ExecutorService is null"); 549 else 550 return aes.submit!T(task, result); 551 552 // TypeInfo typeInfo = typeid(cast(Object)es); 553 // if(typeInfo == typeid(ThreadPoolExecutor)) { 554 // AbstractExecutorService aes = cast(AbstractExecutorService)es; 555 // if(aes is null) 556 // throw new RejectedExecutionException("ExecutorService is null"); 557 // else 558 // return aes.submit!T(task, result); 559 // } else { 560 // implementationMissing(false); 561 // } 562 } 563 564 /** 565 * Submits a value-returning task for execution and returns a 566 * Future representing the pending results of the task. The 567 * Future's {@code get} method will return the task's result upon 568 * successful completion. 569 * 570 * <p> 571 * If you would like to immediately block waiting 572 * for a task, you can use constructions of the form 573 * {@code result = exec.submit(aCallable).get();} 574 * 575 * <p>Note: The {@link Executors} class includes a set of methods 576 * that can convert some other common closure-like objects, 577 * for example, {@link java.security.PrivilegedAction} to 578 * {@link Callable} form so they can be submitted. 579 * 580 * @param task the task to submit 581 * @param (T) the type of the task's result 582 * @return a Future representing pending completion of the task 583 * @throws RejectedExecutionException if the task cannot be 584 * scheduled for execution 585 * @throws NullPointerException if the task is null 586 */ 587 static Future!(T) submit(T)(ExecutorService es, Callable!(T) task) { 588 AbstractExecutorService aes = cast(AbstractExecutorService)es; 589 if(aes is null) 590 throw new RejectedExecutionException("ExecutorService is null"); 591 else 592 return aes.submit!(T)(task); 593 594 // TypeInfo typeInfo = typeid(cast(Object)es); 595 // if(typeInfo == typeid(ThreadPoolExecutor)) { 596 // AbstractExecutorService aes = cast(AbstractExecutorService)es; 597 // if(aes is null) 598 // throw new RejectedExecutionException("ExecutorService is null"); 599 // else 600 // return aes.submit!(T)(task); 601 // } else { 602 // implementationMissing(false); 603 // } 604 } 605 606 /** 607 * Executes the given tasks, returning a list of Futures holding 608 * their status and results when all complete. 609 * {@link Future#isDone} is {@code true} for each 610 * element of the returned list. 611 * Note that a <em>completed</em> task could have 612 * terminated either normally or by throwing an exception. 613 * The results of this method are undefined if the given 614 * collection is modified while this operation is in progress. 615 * 616 * @param tasks the collection of tasks 617 * @param (T) the type of the values returned from the tasks 618 * @return a list of Futures representing the tasks, in the same 619 * sequential order as produced by the iterator for the 620 * given task list, each of which has completed 621 * @throws InterruptedException if interrupted while waiting, in 622 * which case unfinished tasks are cancelled 623 * @throws NullPointerException if tasks or any of its elements are {@code null} 624 * @throws RejectedExecutionException if any task cannot be 625 * scheduled for execution 626 */ 627 static List!(Future!(T)) invokeAll(T)(ExecutorService es, Collection!(Callable!(T)) tasks) { 628 629 AbstractExecutorService aes = cast(AbstractExecutorService)es; 630 if(aes is null) 631 throw new RejectedExecutionException("ExecutorService is null"); 632 else { 633 aes.invokeAll!(T)(tasks); 634 } 635 636 } 637 638 /** 639 * Executes the given tasks, returning a list of Futures holding 640 * their status and results 641 * when all complete or the timeout expires, whichever happens first. 642 * {@link Future#isDone} is {@code true} for each 643 * element of the returned list. 644 * Upon return, tasks that have not completed are cancelled. 645 * Note that a <em>completed</em> task could have 646 * terminated either normally or by throwing an exception. 647 * The results of this method are undefined if the given 648 * collection is modified while this operation is in progress. 649 * 650 * @param tasks the collection of tasks 651 * @param timeout the maximum time to wait 652 * @param unit the time unit of the timeout argument 653 * @param (T) the type of the values returned from the tasks 654 * @return a list of Futures representing the tasks, in the same 655 * sequential order as produced by the iterator for the 656 * given task list. If the operation did not time out, 657 * each task will have completed. If it did time out, some 658 * of these tasks will not have completed. 659 * @throws InterruptedException if interrupted while waiting, in 660 * which case unfinished tasks are cancelled 661 * @throws NullPointerException if tasks, any of its elements, or 662 * unit are {@code null} 663 * @throws RejectedExecutionException if any task cannot be scheduled 664 * for execution 665 */ 666 static List!(Future!(T)) invokeAll(T)(ExecutorService es, Collection!(Callable!(T)) tasks, 667 Duration timeout) { 668 AbstractExecutorService aes = cast(AbstractExecutorService)es; 669 if(aes is null) 670 throw new RejectedExecutionException("ExecutorService is null"); 671 else { 672 aes.invokeAll!(T)(tasks, timeout); 673 } 674 } 675 676 /** 677 * Executes the given tasks, returning the result 678 * of one that has completed successfully (i.e., without throwing 679 * an exception), if any do. Upon normal or exceptional return, 680 * tasks that have not completed are cancelled. 681 * The results of this method are undefined if the given 682 * collection is modified while this operation is in progress. 683 * 684 * @param tasks the collection of tasks 685 * @param (T) the type of the values returned from the tasks 686 * @return the result returned by one of the tasks 687 * @throws InterruptedException if interrupted while waiting 688 * @throws NullPointerException if tasks or any element task 689 * subject to execution is {@code null} 690 * @throws IllegalArgumentException if tasks is empty 691 * @throws ExecutionException if no task successfully completes 692 * @throws RejectedExecutionException if tasks cannot be scheduled 693 * for execution 694 */ 695 static T invokeAny(T)(ExecutorService es, Collection!(Callable!(T)) tasks) { 696 AbstractExecutorService aes = cast(AbstractExecutorService)es; 697 if(aes is null) 698 throw new RejectedExecutionException("ExecutorService is null"); 699 else { 700 aes.invokeAny!(T)(tasks); 701 } 702 } 703 704 /** 705 * Executes the given tasks, returning the result 706 * of one that has completed successfully (i.e., without throwing 707 * an exception), if any do before the given timeout elapses. 708 * Upon normal or exceptional return, tasks that have not 709 * completed are cancelled. 710 * The results of this method are undefined if the given 711 * collection is modified while this operation is in progress. 712 * 713 * @param tasks the collection of tasks 714 * @param timeout the maximum time to wait 715 * @param unit the time unit of the timeout argument 716 * @param (T) the type of the values returned from the tasks 717 * @return the result returned by one of the tasks 718 * @throws InterruptedException if interrupted while waiting 719 * @throws NullPointerException if tasks, or unit, or any element 720 * task subject to execution is {@code null} 721 * @throws TimeoutException if the given timeout elapses before 722 * any task successfully completes 723 * @throws ExecutionException if no task successfully completes 724 * @throws RejectedExecutionException if tasks cannot be scheduled 725 * for execution 726 */ 727 static T invokeAny(T)(ExecutorService es, Collection!(Callable!(T)) tasks, 728 Duration timeout) { 729 AbstractExecutorService aes = cast(AbstractExecutorService)es; 730 if(aes is null) 731 throw new RejectedExecutionException("ExecutorService is null"); 732 else { 733 aes.invokeAny!(T)(tasks, timeout); 734 } 735 } 736 737 /** Cannot instantiate. */ 738 private this() {} 739 } 740 741 // Non-classes supporting the methods 742 743 /** 744 * A callable that runs given task and returns given result. 745 */ 746 private final class RunnableAdapter(T) : Callable!(T) if(is(T == void)) { 747 private Runnable task; 748 this(Runnable task) { 749 this.task = task; 750 } 751 752 T call() { 753 task.run(); 754 } 755 756 override string toString() { 757 return super.toString() ~ "[Wrapped task = " ~ (cast(Object)task).toString() ~ "]"; 758 } 759 } 760 761 private final class RunnableAdapter(T) : Callable!(T) if(!is(T == void)) { 762 private Runnable task; 763 private T result; 764 765 this(Runnable task, T result) { 766 this.task = task; 767 this.result = result; 768 } 769 770 T call() { 771 task.run(); 772 return result; 773 } 774 775 override string toString() { 776 return super.toString() ~ "[Wrapped task = " ~ (cast(Object)task).toString() ~ "]"; 777 } 778 } 779 780 // /** 781 // * A callable that runs under established access control settings. 782 // */ 783 // private final class PrivilegedCallable!(T) : Callable!(T) { 784 // Callable!(T) task; 785 // AccessControlContext acc; 786 787 // PrivilegedCallable(Callable!(T) task) { 788 // this.task = task; 789 // this.acc = AccessController.getContext(); 790 // } 791 792 // T call() throws Exception { 793 // try { 794 // return AccessController.doPrivileged( 795 // new PrivilegedExceptionAction!(T)() { 796 // T run() throws Exception { 797 // return task.call(); 798 // } 799 // }, acc); 800 // } catch (PrivilegedActionException e) { 801 // throw e.getException(); 802 // } 803 // } 804 805 // string toString() { 806 // return super.toString() ~ "[Wrapped task = " ~ task ~ "]"; 807 // } 808 // } 809 810 // /** 811 // * A callable that runs under established access control settings and 812 // * current ClassLoader. 813 // */ 814 // private final class PrivilegedCallableUsingCurrentClassLoader(T) 815 // : Callable!(T) { 816 // Callable!(T) task; 817 // AccessControlContext acc; 818 // ClassLoader ccl; 819 820 // this(Callable!(T) task) { 821 // SecurityManager sm = System.getSecurityManager(); 822 // if (sm !is null) { 823 // // Calls to getContextClassLoader from this class 824 // // never trigger a security check, but we check 825 // // whether our callers have this permission anyways. 826 // sm.checkPermission(SecurityConstants.GET_CLASSLOADER_PERMISSION); 827 828 // // Whether setContextClassLoader turns out to be necessary 829 // // or not, we fail fast if permission is not available. 830 // sm.checkPermission(new RuntimePermission("setContextClassLoader")); 831 // } 832 // this.task = task; 833 // this.acc = AccessController.getContext(); 834 // this.ccl = Thread.getThis().getContextClassLoader(); 835 // } 836 837 // T call() throws Exception { 838 // try { 839 // return AccessController.doPrivileged( 840 // new PrivilegedExceptionAction!(T)() { 841 // T run() throws Exception { 842 // Thread t = Thread.getThis(); 843 // ClassLoader cl = t.getContextClassLoader(); 844 // if (ccl == cl) { 845 // return task.call(); 846 // } else { 847 // t.setContextClassLoader(ccl); 848 // try { 849 // return task.call(); 850 // } finally { 851 // t.setContextClassLoader(cl); 852 // } 853 // } 854 // } 855 // }, acc); 856 // } catch (PrivilegedActionException e) { 857 // throw e.getException(); 858 // } 859 // } 860 861 // string toString() { 862 // return super.toString() ~ "[Wrapped task = " ~ task ~ "]"; 863 // } 864 // } 865 866 867 868 // /** 869 // * A wrapper class that exposes only the ExecutorService methods 870 // * of an ExecutorService implementation. 871 // */ 872 // private class DelegatedExecutorService 873 // implements ExecutorService { 874 // private ExecutorService e; 875 // DelegatedExecutorService(ExecutorService executor) { e = executor; } 876 // void execute(Runnable command) { 877 // try { 878 // e.execute(command); 879 // } finally { reachabilityFence(this); } 880 // } 881 // void shutdown() { e.shutdown(); } 882 // List!(Runnable) shutdownNow() { 883 // try { 884 // return e.shutdownNow(); 885 // } finally { reachabilityFence(this); } 886 // } 887 // bool isShutdown() { 888 // try { 889 // return e.isShutdown(); 890 // } finally { reachabilityFence(this); } 891 // } 892 // bool isTerminated() { 893 // try { 894 // return e.isTerminated(); 895 // } finally { reachabilityFence(this); } 896 // } 897 // bool awaitTermination(Duration timeout) 898 // throws InterruptedException { 899 // try { 900 // return e.awaitTermination(timeout, unit); 901 // } finally { reachabilityFence(this); } 902 // } 903 // Future<?> submit(Runnable task) { 904 // try { 905 // return e.submit(task); 906 // } finally { reachabilityFence(this); } 907 // } 908 // !(T) Future!(T) submit(Callable!(T) task) { 909 // try { 910 // return e.submit(task); 911 // } finally { reachabilityFence(this); } 912 // } 913 // !(T) Future!(T) submit(Runnable task, T result) { 914 // try { 915 // return e.submit(task, result); 916 // } finally { reachabilityFence(this); } 917 // } 918 // !(T) List!(Future!(T)) invokeAll(Collection!(Callable!(T)) tasks) 919 // throws InterruptedException { 920 // try { 921 // return e.invokeAll(tasks); 922 // } finally { reachabilityFence(this); } 923 // } 924 // !(T) List!(Future!(T)) invokeAll(Collection!(Callable!(T)) tasks, 925 // Duration timeout) 926 // throws InterruptedException { 927 // try { 928 // return e.invokeAll(tasks, timeout, unit); 929 // } finally { reachabilityFence(this); } 930 // } 931 // !(T) T invokeAny(Collection!(Callable!(T)) tasks) 932 // throws InterruptedException, ExecutionException { 933 // try { 934 // return e.invokeAny(tasks); 935 // } finally { reachabilityFence(this); } 936 // } 937 // !(T) T invokeAny(Collection!(Callable!(T)) tasks, 938 // Duration timeout) 939 // throws InterruptedException, ExecutionException, TimeoutException { 940 // try { 941 // return e.invokeAny(tasks, timeout, unit); 942 // } finally { reachabilityFence(this); } 943 // } 944 // } 945 946 // private class FinalizableDelegatedExecutorService 947 // extends DelegatedExecutorService { 948 // FinalizableDelegatedExecutorService(ExecutorService executor) { 949 // super(executor); 950 // } 951 952 // protected void finalize() { 953 // super.shutdown(); 954 // } 955 // } 956 957 // /** 958 // * A wrapper class that exposes only the ScheduledExecutorService 959 // * methods of a ScheduledExecutorService implementation. 960 // */ 961 // private class DelegatedScheduledExecutorService 962 // extends DelegatedExecutorService 963 // implements ScheduledExecutorService { 964 // private ScheduledExecutorService e; 965 // DelegatedScheduledExecutorService(ScheduledExecutorService executor) { 966 // super(executor); 967 // e = executor; 968 // } 969 // ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { 970 // return e.schedule(command, delay, unit); 971 // } 972 // !(V) ScheduledFuture!(V) schedule(Callable!(V) callable, long delay, TimeUnit unit) { 973 // return e.schedule(callable, delay, unit); 974 // } 975 // ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { 976 // return e.scheduleAtFixedRate(command, initialDelay, period, unit); 977 // } 978 // ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { 979 // return e.scheduleWithFixedDelay(command, initialDelay, delay, unit); 980 // } 981 // }