1 /*
2  * Hunt - A refined core library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net/
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module hunt.concurrency.Executors;
13 
14 import hunt.concurrency.AbstractExecutorService;
15 import hunt.concurrency.atomic.AtomicHelper;
16 import hunt.concurrency.Delayed;
17 import hunt.concurrency.Exceptions;
18 import hunt.concurrency.ExecutorService;
19 // import hunt.concurrency.ForkJoinPool;
20 import hunt.concurrency.Future;
21 import hunt.concurrency.LinkedBlockingQueue;
22 import hunt.concurrency.ScheduledExecutorService;
23 import hunt.concurrency.ScheduledThreadPoolExecutor;
24 import hunt.concurrency.ThreadFactory;
25 import hunt.concurrency.ThreadPoolExecutor;
26 
27 import hunt.collection.List;
28 import hunt.Exceptions;
29 import hunt.util.Common;
30 import hunt.util.DateTime;
31 
32 import core.time;
33 import std.conv;
34 
35 // import static java.lang.ref.Reference.reachabilityFence;
36 // import java.security.AccessControlContext;
37 // import java.security.AccessControlException;
38 // import java.security.AccessController;
39 // import java.security.PrivilegedAction;
40 // import java.security.PrivilegedActionException;
41 // import java.security.PrivilegedExceptionAction;
42 // import hunt.collection.Collection;
43 // import java.util.List;
44 // import sun.security.util.SecurityConstants;
45 
46 /**
47  * Factory and utility methods for {@link Executor}, {@link
48  * ExecutorService}, {@link ScheduledExecutorService}, {@link
49  * ThreadFactory}, and {@link Callable} classes defined in this
50  * package. This class supports the following kinds of methods:
51  *
52  * <ul>
53  *   <li>Methods that create and return an {@link ExecutorService}
54  *       set up with commonly useful configuration settings.
55  *   <li>Methods that create and return a {@link ScheduledExecutorService}
56  *       set up with commonly useful configuration settings.
57  *   <li>Methods that create and return a "wrapped" ExecutorService, that
58  *       disables reconfiguration by making implementation-specific methods
59  *       inaccessible.
60  *   <li>Methods that create and return a {@link ThreadFactory}
61  *       that sets newly created threads to a known state.
62  *   <li>Methods that create and return a {@link Callable}
63  *       out of other closure-like forms, so they can be used
64  *       in execution methods requiring {@code Callable}.
65  * </ul>
66  *
67  * @author Doug Lea
68  */
69 class Executors {
70 
71     /**
72      * Creates a thread pool that reuses a fixed number of threads
73      * operating off a shared unbounded queue.  At any point, at most
74      * {@code nThreads} threads will be active processing tasks.
75      * If additional tasks are submitted when all threads are active,
76      * they will wait in the queue until a thread is available.
77      * If any thread terminates due to a failure during execution
78      * prior to shutdown, a new one will take its place if needed to
79      * execute subsequent tasks.  The threads in the pool will exist
80      * until it is explicitly {@link ExecutorService#shutdown shutdown}.
81      *
82      * @param nThreads the number of threads in the pool
83      * @return the newly created thread pool
84      * @throws IllegalArgumentException if {@code nThreads <= 0}
85      */
86     static ThreadPoolExecutor newFixedThreadPool(int nThreads) {
87         return new ThreadPoolExecutor(nThreads, nThreads, 0.hnsecs,
88                                       new LinkedBlockingQueue!(Runnable)());
89     }
90 
91     // /**
92     //  * Creates a thread pool that maintains enough threads to support
93     //  * the given parallelism level, and may use multiple queues to
94     //  * reduce contention. The parallelism level corresponds to the
95     //  * maximum number of threads actively engaged in, or available to
96     //  * engage in, task processing. The actual number of threads may
97     //  * grow and shrink dynamically. A work-stealing pool makes no
98     //  * guarantees about the order in which submitted tasks are
99     //  * executed.
100     //  *
101     //  * @param parallelism the targeted parallelism level
102     //  * @return the newly created thread pool
103     //  * @throws IllegalArgumentException if {@code parallelism <= 0}
104     //  */
105     // static ExecutorService newWorkStealingPool(int parallelism) {
106     //     return new ForkJoinPool
107     //         (parallelism,
108     //          ForkJoinPool.defaultForkJoinWorkerThreadFactory,
109     //          null, true);
110     // }
111 
112     // /**
113     //  * Creates a work-stealing thread pool using the number of
114     //  * {@linkplain Runtime#availableProcessors available processors}
115     //  * as its target parallelism level.
116     //  *
117     //  * @return the newly created thread pool
118     //  * @see #newWorkStealingPool(int)
119     //  */
120     // static ExecutorService newWorkStealingPool() {
121     //     return new ForkJoinPool
122     //         (Runtime.getRuntime().availableProcessors(),
123     //          ForkJoinPool.defaultForkJoinWorkerThreadFactory,
124     //          null, true);
125     // }
126 
127     /**
128      * Creates a thread pool that reuses a fixed number of threads
129      * operating off a shared unbounded queue, using the provided
130      * ThreadFactory to create new threads when needed.  At any point,
131      * at most {@code nThreads} threads will be active processing
132      * tasks.  If additional tasks are submitted when all threads are
133      * active, they will wait in the queue until a thread is
134      * available.  If any thread terminates due to a failure during
135      * execution prior to shutdown, a new one will take its place if
136      * needed to execute subsequent tasks.  The threads in the pool will
137      * exist until it is explicitly {@link ExecutorService#shutdown
138      * shutdown}.
139      *
140      * @param nThreads the number of threads in the pool
141      * @param threadFactory the factory to use when creating new threads
142      * @return the newly created thread pool
143      * @throws NullPointerException if threadFactory is null
144      * @throws IllegalArgumentException if {@code nThreads <= 0}
145      */
146     static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
147         return new ThreadPoolExecutor(nThreads, nThreads, 0.msecs,
148                                       new LinkedBlockingQueue!(Runnable)(),
149                                       threadFactory);
150     }
151 
152     /**
153      * Creates an Executor that uses a single worker thread operating
154      * off an unbounded queue. (Note however that if this single
155      * thread terminates due to a failure during execution prior to
156      * shutdown, a new one will take its place if needed to execute
157      * subsequent tasks.)  Tasks are guaranteed to execute
158      * sequentially, and no more than one task will be active at any
159      * given time. Unlike the otherwise equivalent
160      * {@code newFixedThreadPool(1)} the returned executor is
161      * guaranteed not to be reconfigurable to use additional threads.
162      *
163      * @return the newly created single-threaded Executor
164      */
165     // static ExecutorService newSingleThreadExecutor() {
166     //     return new FinalizableDelegatedExecutorService
167     //         (new ThreadPoolExecutor(1, 1,
168     //                                 0L, TimeUnit.MILLISECONDS,
169     //                                 new LinkedBlockingQueue!(Runnable)()));
170     // }
171 
172     // /**
173     //  * Creates an Executor that uses a single worker thread operating
174     //  * off an unbounded queue, and uses the provided ThreadFactory to
175     //  * create a new thread when needed. Unlike the otherwise
176     //  * equivalent {@code newFixedThreadPool(1, threadFactory)} the
177     //  * returned executor is guaranteed not to be reconfigurable to use
178     //  * additional threads.
179     //  *
180     //  * @param threadFactory the factory to use when creating new threads
181     //  * @return the newly created single-threaded Executor
182     //  * @throws NullPointerException if threadFactory is null
183     //  */
184     // static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
185     //     return new FinalizableDelegatedExecutorService
186     //         (new ThreadPoolExecutor(1, 1,
187     //                                 0L, TimeUnit.MILLISECONDS,
188     //                                 new LinkedBlockingQueue!(Runnable)(),
189     //                                 threadFactory));
190     // }
191 
192     /**
193      * Creates a thread pool that creates new threads as needed, but
194      * will reuse previously constructed threads when they are
195      * available.  These pools will typically improve the performance
196      * of programs that execute many short-lived asynchronous tasks.
197      * Calls to {@code execute} will reuse previously constructed
198      * threads if available. If no existing thread is available, a new
199      * thread will be created and added to the pool. Threads that have
200      * not been used for sixty seconds are terminated and removed from
201      * the cache. Thus, a pool that remains idle for long enough will
202      * not consume any resources. Note that pools with similar
203      * properties but different details (for example, timeout parameters)
204      * may be created using {@link ThreadPoolExecutor} constructors.
205      *
206      * @return the newly created thread pool
207      */
208     // static ExecutorService newCachedThreadPool() {
209     //     return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
210     //                                   60L, TimeUnit.SECONDS,
211     //                                   new SynchronousQueue!(Runnable)());
212     // }
213 
214     // /**
215     //  * Creates a thread pool that creates new threads as needed, but
216     //  * will reuse previously constructed threads when they are
217     //  * available, and uses the provided
218     //  * ThreadFactory to create new threads when needed.
219     //  *
220     //  * @param threadFactory the factory to use when creating new threads
221     //  * @return the newly created thread pool
222     //  * @throws NullPointerException if threadFactory is null
223     //  */
224     // static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
225     //     return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
226     //                                   60L, TimeUnit.SECONDS,
227     //                                   new SynchronousQueue!(Runnable)(),
228     //                                   threadFactory);
229     // }
230 
231     /**
232      * Creates a single-threaded executor that can schedule commands
233      * to run after a given delay, or to execute periodically.
234      * (Note however that if this single
235      * thread terminates due to a failure during execution prior to
236      * shutdown, a new one will take its place if needed to execute
237      * subsequent tasks.)  Tasks are guaranteed to execute
238      * sequentially, and no more than one task will be active at any
239      * given time. Unlike the otherwise equivalent
240      * {@code newScheduledThreadPool(1)} the returned executor is
241      * guaranteed not to be reconfigurable to use additional threads.
242      *
243      * @return the newly created scheduled executor
244      */
245     static ScheduledExecutorService newSingleThreadScheduledExecutor() {
246         return new DelegatedScheduledExecutorService!ScheduledThreadPoolExecutor
247             (new ScheduledThreadPoolExecutor(1));
248     }
249 
250     /**
251      * Creates a single-threaded executor that can schedule commands
252      * to run after a given delay, or to execute periodically.  (Note
253      * however that if this single thread terminates due to a failure
254      * during execution prior to shutdown, a new one will take its
255      * place if needed to execute subsequent tasks.)  Tasks are
256      * guaranteed to execute sequentially, and no more than one task
257      * will be active at any given time. Unlike the otherwise
258      * equivalent {@code newScheduledThreadPool(1, threadFactory)}
259      * the returned executor is guaranteed not to be reconfigurable to
260      * use additional threads.
261      *
262      * @param threadFactory the factory to use when creating new threads
263      * @return the newly created scheduled executor
264      * @throws NullPointerException if threadFactory is null
265      */
266     static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
267         return new DelegatedScheduledExecutorService!ScheduledThreadPoolExecutor
268             (new ScheduledThreadPoolExecutor(1, threadFactory));
269     }
270 
271     /**
272      * Creates a thread pool that can schedule commands to run after a
273      * given delay, or to execute periodically.
274      * @param corePoolSize the number of threads to keep in the pool,
275      * even if they are idle
276      * @return the newly created scheduled thread pool
277      * @throws IllegalArgumentException if {@code corePoolSize < 0}
278      */
279     static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
280         return new ScheduledThreadPoolExecutor(corePoolSize);
281     }
282 
283     /**
284      * Creates a thread pool that can schedule commands to run after a
285      * given delay, or to execute periodically.
286      * @param corePoolSize the number of threads to keep in the pool,
287      * even if they are idle
288      * @param threadFactory the factory to use when the executor
289      * creates a new thread
290      * @return the newly created scheduled thread pool
291      * @throws IllegalArgumentException if {@code corePoolSize < 0}
292      * @throws NullPointerException if threadFactory is null
293      */
294     static ScheduledExecutorService newScheduledThreadPool(
295             int corePoolSize, ThreadFactory threadFactory) {
296         return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
297     }
298 
299     // /**
300     //  * Returns an object that delegates all defined {@link
301     //  * ExecutorService} methods to the given executor, but not any
302     //  * other methods that might otherwise be accessible using
303     //  * casts. This provides a way to safely "freeze" configuration and
304     //  * disallow tuning of a given concrete implementation.
305     //  * @param executor the underlying implementation
306     //  * @return an {@code ExecutorService} instance
307     //  * @throws NullPointerException if executor null
308     //  */
309     // static ExecutorService unconfigurableExecutorService(ExecutorService executor) {
310     //     if (executor is null)
311     //         throw new NullPointerException();
312     //     return new DelegatedExecutorService(executor);
313     // }
314 
315     // /**
316     //  * Returns an object that delegates all defined {@link
317     //  * ScheduledExecutorService} methods to the given executor, but
318     //  * not any other methods that might otherwise be accessible using
319     //  * casts. This provides a way to safely "freeze" configuration and
320     //  * disallow tuning of a given concrete implementation.
321     //  * @param executor the underlying implementation
322     //  * @return a {@code ScheduledExecutorService} instance
323     //  * @throws NullPointerException if executor null
324     //  */
325     // static ScheduledExecutorService unconfigurableScheduledExecutorService(ScheduledExecutorService executor) {
326     //     if (executor is null)
327     //         throw new NullPointerException();
328     //     return new DelegatedScheduledExecutorService(executor);
329     // }
330 
331     /**
332      * Returns a default thread factory used to create new threads.
333      * This factory creates all new threads used by an Executor in the
334      * same {@link ThreadGroupEx}. If there is a {@link
335      * java.lang.SecurityManager}, it uses the group of {@link
336      * System#getSecurityManager}, else the group of the thread
337      * invoking this {@code defaultThreadFactory} method. Each new
338      * thread is created as a non-daemon thread with priority set to
339      * the smaller of {@code Thread.PRIORITY_DEFAULT} and the maximum
340      * priority permitted in the thread group.  New threads have names
341      * accessible via {@link Thread#getName} of
342      * <em>pool-N-thread-M</em>, where <em>N</em> is the sequence
343      * number of this factory, and <em>M</em> is the sequence number
344      * of the thread created by this factory.
345      * @return a thread factory
346      */
347     static ThreadFactory defaultThreadFactory() {
348         return ThreadFactory.defaultThreadFactory();
349     }
350 
351     // /**
352     //  * Returns a thread factory used to create new threads that
353     //  * have the same permissions as the current thread.
354     //  * This factory creates threads with the same settings as {@link
355     //  * Executors#defaultThreadFactory}, additionally setting the
356     //  * AccessControlContext and contextClassLoader of new threads to
357     //  * be the same as the thread invoking this
358     //  * {@code privilegedThreadFactory} method.  A new
359     //  * {@code privilegedThreadFactory} can be created within an
360     //  * {@link AccessController#doPrivileged AccessController.doPrivileged}
361     //  * action setting the current thread's access control context to
362     //  * create threads with the selected permission settings holding
363     //  * within that action.
364     //  *
365     //  * <p>Note that while tasks running within such threads will have
366     //  * the same access control and class loader settings as the
367     //  * current thread, they need not have the same {@link
368     //  * java.lang.ThreadLocal} or {@link
369     //  * java.lang.InheritableThreadLocal} values. If necessary,
370     //  * particular values of thread locals can be set or reset before
371     //  * any task runs in {@link ThreadPoolExecutor} subclasses using
372     //  * {@link ThreadPoolExecutor#beforeExecute(Thread, Runnable)}.
373     //  * Also, if it is necessary to initialize worker threads to have
374     //  * the same InheritableThreadLocal settings as some other
375     //  * designated thread, you can create a custom ThreadFactory in
376     //  * which that thread waits for and services requests to create
377     //  * others that will inherit its values.
378     //  *
379     //  * @return a thread factory
380     //  * @throws AccessControlException if the current access control
381     //  * context does not have permission to both get and set context
382     //  * class loader
383     //  */
384     // static ThreadFactory privilegedThreadFactory() {
385     //     return new PrivilegedThreadFactory();
386     // }
387 
388     /**
389      * Returns a {@link Callable} object that, when
390      * called, runs the given task and returns the given result.  This
391      * can be useful when applying methods requiring a
392      * {@code Callable} to an otherwise resultless action.
393      * @param task the task to run
394      * @param result the result to return
395      * @param (T) the type of the result
396      * @return a callable object
397      * @throws NullPointerException if task null
398      */
399     static Callable!(void) callable(Runnable task) {
400         if (task is null)
401             throw new NullPointerException();
402         return new RunnableAdapter!(void)(task);
403     }
404 
405     static Callable!(T) callable(T)(Runnable task, T result) if(!is(T == void)) {
406         if (task is null)
407             throw new NullPointerException();
408         return new RunnableAdapter!(T)(task, result);
409     }
410 
411     /**
412      * Returns a {@link Callable} object that, when
413      * called, runs the given task and returns {@code null}.
414      * @param task the task to run
415      * @return a callable object
416      * @throws NullPointerException if task null
417      */
418     // static Callable!(Object) callable(Runnable task) {
419     //     if (task is null)
420     //         throw new NullPointerException();
421     //     return new RunnableAdapter!(Object)(task, null);
422     // }
423 
424     // /**
425     //  * Returns a {@link Callable} object that, when
426     //  * called, runs the given privileged action and returns its result.
427     //  * @param action the privileged action to run
428     //  * @return a callable object
429     //  * @throws NullPointerException if action null
430     //  */
431     // static Callable!(Object) callable(PrivilegedAction<?> action) {
432     //     if (action is null)
433     //         throw new NullPointerException();
434     //     return new Callable!(Object)() {
435     //         Object call() { return action.run(); }};
436     // }
437 
438     // /**
439     //  * Returns a {@link Callable} object that, when
440     //  * called, runs the given privileged exception action and returns
441     //  * its result.
442     //  * @param action the privileged exception action to run
443     //  * @return a callable object
444     //  * @throws NullPointerException if action null
445     //  */
446     // static Callable!(Object) callable(PrivilegedExceptionAction<?> action) {
447     //     if (action is null)
448     //         throw new NullPointerException();
449     //     return new Callable!(Object)() {
450     //         Object call() throws Exception { return action.run(); }};
451     // }
452 
453     // /**
454     //  * Returns a {@link Callable} object that will, when called,
455     //  * execute the given {@code callable} under the current access
456     //  * control context. This method should normally be invoked within
457     //  * an {@link AccessController#doPrivileged AccessController.doPrivileged}
458     //  * action to create callables that will, if possible, execute
459     //  * under the selected permission settings holding within that
460     //  * action; or if not possible, throw an associated {@link
461     //  * AccessControlException}.
462     //  * @param callable the underlying task
463     //  * @param (T) the type of the callable's result
464     //  * @return a callable object
465     //  * @throws NullPointerException if callable null
466     //  */
467     // static !(T) Callable!(T) privilegedCallable(Callable!(T) callable) {
468     //     if (callable is null)
469     //         throw new NullPointerException();
470     //     return new PrivilegedCallable!(T)(callable);
471     // }
472 
473     // /**
474     //  * Returns a {@link Callable} object that will, when called,
475     //  * execute the given {@code callable} under the current access
476     //  * control context, with the current context class loader as the
477     //  * context class loader. This method should normally be invoked
478     //  * within an
479     //  * {@link AccessController#doPrivileged AccessController.doPrivileged}
480     //  * action to create callables that will, if possible, execute
481     //  * under the selected permission settings holding within that
482     //  * action; or if not possible, throw an associated {@link
483     //  * AccessControlException}.
484     //  *
485     //  * @param callable the underlying task
486     //  * @param (T) the type of the callable's result
487     //  * @return a callable object
488     //  * @throws NullPointerException if callable null
489     //  * @throws AccessControlException if the current access control
490     //  * context does not have permission to both set and get context
491     //  * class loader
492     //  */
493     // static !(T) Callable!(T) privilegedCallableUsingCurrentClassLoader(Callable!(T) callable) {
494     //     if (callable is null)
495     //         throw new NullPointerException();
496     //     return new PrivilegedCallableUsingCurrentClassLoader!(T)(callable);
497     // }
498 
499 
500     // Methods for ExecutorService
501 
502     /**
503      * Submits a Runnable task for execution and returns a Future
504      * representing that task. The Future's {@code get} method will
505      * return {@code null} upon <em>successful</em> completion.
506      *
507      * @param task the task to submit
508      * @return a Future representing pending completion of the task
509      * @throws RejectedExecutionException if the task cannot be
510      *         scheduled for execution
511      * @throws NullPointerException if the task is null
512      */
513     static Future!(void) submit(ExecutorService es, Runnable task) {
514 
515         AbstractExecutorService aes = cast(AbstractExecutorService)es;
516         if(aes is null) 
517             throw new RejectedExecutionException("ExecutorService is null");
518         else
519             return aes.submit(task);
520 
521         // TypeInfo typeInfo = typeid(cast(Object)es);
522         // if(typeInfo == typeid(ThreadPoolExecutor)) {
523         //     AbstractExecutorService aes = cast(AbstractExecutorService)es;
524         //     return aes.submit(task);
525         // } else {
526         //     implementationMissing(false);
527         // }
528     }
529 
530     /**
531      * Submits a Runnable task for execution and returns a Future
532      * representing that task. The Future's {@code get} method will
533      * return the given result upon successful completion.
534      *
535      * @param task the task to submit
536      * @param result the result to return
537      * @param (T) the type of the result
538      * @return a Future representing pending completion of the task
539      * @throws RejectedExecutionException if the task cannot be
540      *         scheduled for execution
541      * @throws NullPointerException if the task is null
542      */
543     static Future!(T) submit(T)(ExecutorService es, Runnable task, T result) {
544         AbstractExecutorService aes = cast(AbstractExecutorService)es;
545         if(aes is null) 
546             throw new RejectedExecutionException("ExecutorService is null");
547         else
548             return aes.submit!T(task, result);
549                     
550         // TypeInfo typeInfo = typeid(cast(Object)es);
551         // if(typeInfo == typeid(ThreadPoolExecutor)) {
552         //     AbstractExecutorService aes = cast(AbstractExecutorService)es;
553         //     if(aes is null) 
554         //         throw new RejectedExecutionException("ExecutorService is null");
555         //     else
556         //         return aes.submit!T(task, result);
557         // } else {
558         //     implementationMissing(false);
559         // }
560     }
561 
562     /**
563      * Submits a value-returning task for execution and returns a
564      * Future representing the pending results of the task. The
565      * Future's {@code get} method will return the task's result upon
566      * successful completion.
567      *
568      * <p>
569      * If you would like to immediately block waiting
570      * for a task, you can use constructions of the form
571      * {@code result = exec.submit(aCallable).get();}
572      *
573      * <p>Note: The {@link Executors} class includes a set of methods
574      * that can convert some other common closure-like objects,
575      * for example, {@link java.security.PrivilegedAction} to
576      * {@link Callable} form so they can be submitted.
577      *
578      * @param task the task to submit
579      * @param (T) the type of the task's result
580      * @return a Future representing pending completion of the task
581      * @throws RejectedExecutionException if the task cannot be
582      *         scheduled for execution
583      * @throws NullPointerException if the task is null
584      */
585     static Future!(T) submit(T)(ExecutorService es, Callable!(T) task) {
586         AbstractExecutorService aes = cast(AbstractExecutorService)es;
587         if(aes is null) 
588             throw new RejectedExecutionException("ExecutorService is null");
589         else
590             return aes.submit!(T)(task);
591             
592         // TypeInfo typeInfo = typeid(cast(Object)es);
593         // if(typeInfo == typeid(ThreadPoolExecutor)) {
594         //     AbstractExecutorService aes = cast(AbstractExecutorService)es;
595         //     if(aes is null) 
596         //         throw new RejectedExecutionException("ExecutorService is null");
597         //     else
598         //         return aes.submit!(T)(task);
599         // } else {
600         //     implementationMissing(false);
601         // }
602     }
603 
604     /**
605      * Executes the given tasks, returning a list of Futures holding
606      * their status and results when all complete.
607      * {@link Future#isDone} is {@code true} for each
608      * element of the returned list.
609      * Note that a <em>completed</em> task could have
610      * terminated either normally or by throwing an exception.
611      * The results of this method are undefined if the given
612      * collection is modified while this operation is in progress.
613      *
614      * @param tasks the collection of tasks
615      * @param (T) the type of the values returned from the tasks
616      * @return a list of Futures representing the tasks, in the same
617      *         sequential order as produced by the iterator for the
618      *         given task list, each of which has completed
619      * @throws InterruptedException if interrupted while waiting, in
620      *         which case unfinished tasks are cancelled
621      * @throws NullPointerException if tasks or any of its elements are {@code null}
622      * @throws RejectedExecutionException if any task cannot be
623      *         scheduled for execution
624      */
625     static List!(Future!(T)) invokeAll(T)(ExecutorService es, Collection!(Callable!(T)) tasks) {
626 
627         AbstractExecutorService aes = cast(AbstractExecutorService)es;
628         if(aes is null) 
629             throw new RejectedExecutionException("ExecutorService is null");
630         else {
631             aes.invokeAll!(T)(tasks);
632         }
633 
634     }
635 
636     /**
637      * Executes the given tasks, returning a list of Futures holding
638      * their status and results
639      * when all complete or the timeout expires, whichever happens first.
640      * {@link Future#isDone} is {@code true} for each
641      * element of the returned list.
642      * Upon return, tasks that have not completed are cancelled.
643      * Note that a <em>completed</em> task could have
644      * terminated either normally or by throwing an exception.
645      * The results of this method are undefined if the given
646      * collection is modified while this operation is in progress.
647      *
648      * @param tasks the collection of tasks
649      * @param timeout the maximum time to wait
650      * @param unit the time unit of the timeout argument
651      * @param (T) the type of the values returned from the tasks
652      * @return a list of Futures representing the tasks, in the same
653      *         sequential order as produced by the iterator for the
654      *         given task list. If the operation did not time out,
655      *         each task will have completed. If it did time out, some
656      *         of these tasks will not have completed.
657      * @throws InterruptedException if interrupted while waiting, in
658      *         which case unfinished tasks are cancelled
659      * @throws NullPointerException if tasks, any of its elements, or
660      *         unit are {@code null}
661      * @throws RejectedExecutionException if any task cannot be scheduled
662      *         for execution
663      */
664     static List!(Future!(T)) invokeAll(T)(ExecutorService es, Collection!(Callable!(T)) tasks,
665                                   Duration timeout) {
666         AbstractExecutorService aes = cast(AbstractExecutorService)es;
667         if(aes is null) 
668             throw new RejectedExecutionException("ExecutorService is null");
669         else {
670             aes.invokeAll!(T)(tasks, timeout);
671         }
672     }
673 
674     /**
675      * Executes the given tasks, returning the result
676      * of one that has completed successfully (i.e., without throwing
677      * an exception), if any do. Upon normal or exceptional return,
678      * tasks that have not completed are cancelled.
679      * The results of this method are undefined if the given
680      * collection is modified while this operation is in progress.
681      *
682      * @param tasks the collection of tasks
683      * @param (T) the type of the values returned from the tasks
684      * @return the result returned by one of the tasks
685      * @throws InterruptedException if interrupted while waiting
686      * @throws NullPointerException if tasks or any element task
687      *         subject to execution is {@code null}
688      * @throws IllegalArgumentException if tasks is empty
689      * @throws ExecutionException if no task successfully completes
690      * @throws RejectedExecutionException if tasks cannot be scheduled
691      *         for execution
692      */
693     static T invokeAny(T)(ExecutorService es, Collection!(Callable!(T)) tasks) {
694         AbstractExecutorService aes = cast(AbstractExecutorService)es;
695         if(aes is null) 
696             throw new RejectedExecutionException("ExecutorService is null");
697         else {
698             aes.invokeAny!(T)(tasks);
699         }
700     }
701 
702     /**
703      * Executes the given tasks, returning the result
704      * of one that has completed successfully (i.e., without throwing
705      * an exception), if any do before the given timeout elapses.
706      * Upon normal or exceptional return, tasks that have not
707      * completed are cancelled.
708      * The results of this method are undefined if the given
709      * collection is modified while this operation is in progress.
710      *
711      * @param tasks the collection of tasks
712      * @param timeout the maximum time to wait
713      * @param unit the time unit of the timeout argument
714      * @param (T) the type of the values returned from the tasks
715      * @return the result returned by one of the tasks
716      * @throws InterruptedException if interrupted while waiting
717      * @throws NullPointerException if tasks, or unit, or any element
718      *         task subject to execution is {@code null}
719      * @throws TimeoutException if the given timeout elapses before
720      *         any task successfully completes
721      * @throws ExecutionException if no task successfully completes
722      * @throws RejectedExecutionException if tasks cannot be scheduled
723      *         for execution
724      */
725     static T invokeAny(T)(ExecutorService es, Collection!(Callable!(T)) tasks,
726                     Duration timeout)  {
727         AbstractExecutorService aes = cast(AbstractExecutorService)es;
728         if(aes is null) 
729             throw new RejectedExecutionException("ExecutorService is null");
730         else {
731             aes.invokeAny!(T)(tasks, timeout);
732         }
733     }
734 
735     /** Cannot instantiate. */
736     private this() {}
737 }
738 
739 // Non-classes supporting the methods
740 
741 /**
742  * A callable that runs given task and returns given result.
743  */
744 private final class RunnableAdapter(T) : Callable!(T) if(is(T == void)) {
745     private Runnable task;
746     this(Runnable task) {
747         this.task = task;
748     }
749 
750     T call() {
751         task.run();
752     }
753 
754     override string toString() {
755         return super.toString() ~ "[Wrapped task = " ~ (cast(Object)task).toString() ~ "]";
756     }
757 }
758 
759 private final class RunnableAdapter(T) : Callable!(T) if(!is(T == void)) {
760     private Runnable task;
761     private T result;
762 
763     this(Runnable task, T result) {
764         this.task = task;
765         this.result = result;
766     }
767 
768     T call() {
769         task.run();
770         return result;
771     }
772 
773     override string toString() {
774         return super.toString() ~ "[Wrapped task = " ~ (cast(Object)task).toString() ~ "]";
775     }
776 }
777 
778 // /**
779 //  * A callable that runs under established access control settings.
780 //  */
781 // private final class PrivilegedCallable!(T) : Callable!(T) {
782 //     Callable!(T) task;
783 //     AccessControlContext acc;
784 
785 //     PrivilegedCallable(Callable!(T) task) {
786 //         this.task = task;
787 //         this.acc = AccessController.getContext();
788 //     }
789 
790 //     T call() throws Exception {
791 //         try {
792 //             return AccessController.doPrivileged(
793 //                 new PrivilegedExceptionAction!(T)() {
794 //                     T run() throws Exception {
795 //                         return task.call();
796 //                     }
797 //                 }, acc);
798 //         } catch (PrivilegedActionException e) {
799 //             throw e.getException();
800 //         }
801 //     }
802 
803 //     string toString() {
804 //         return super.toString() ~ "[Wrapped task = " ~ task ~ "]";
805 //     }
806 // }
807 
808 // /**
809 //  * A callable that runs under established access control settings and
810 //  * current ClassLoader.
811 //  */
812 // private final class PrivilegedCallableUsingCurrentClassLoader(T)
813 //         : Callable!(T) {
814 //     Callable!(T) task;
815 //     AccessControlContext acc;
816 //     ClassLoader ccl;
817 
818 //     this(Callable!(T) task) {
819 //         SecurityManager sm = System.getSecurityManager();
820 //         if (sm !is null) {
821 //             // Calls to getContextClassLoader from this class
822 //             // never trigger a security check, but we check
823 //             // whether our callers have this permission anyways.
824 //             sm.checkPermission(SecurityConstants.GET_CLASSLOADER_PERMISSION);
825 
826 //             // Whether setContextClassLoader turns out to be necessary
827 //             // or not, we fail fast if permission is not available.
828 //             sm.checkPermission(new RuntimePermission("setContextClassLoader"));
829 //         }
830 //         this.task = task;
831 //         this.acc = AccessController.getContext();
832 //         this.ccl = Thread.getThis().getContextClassLoader();
833 //     }
834 
835 //     T call() throws Exception {
836 //         try {
837 //             return AccessController.doPrivileged(
838 //                 new PrivilegedExceptionAction!(T)() {
839 //                     T run() throws Exception {
840 //                         Thread t = Thread.getThis();
841 //                         ClassLoader cl = t.getContextClassLoader();
842 //                         if (ccl == cl) {
843 //                             return task.call();
844 //                         } else {
845 //                             t.setContextClassLoader(ccl);
846 //                             try {
847 //                                 return task.call();
848 //                             } finally {
849 //                                 t.setContextClassLoader(cl);
850 //                             }
851 //                         }
852 //                     }
853 //                 }, acc);
854 //         } catch (PrivilegedActionException e) {
855 //             throw e.getException();
856 //         }
857 //     }
858 
859 //     string toString() {
860 //         return super.toString() ~ "[Wrapped task = " ~ task ~ "]";
861 //     }
862 // }
863 
864 void reachabilityFence(ExecutorService) {
865     // do nothing;
866     // TODO: Tasks pending completion -@zxp at 5/10/2019, 10:50:31 AM    
867     // remove this
868 }
869 
870 /**
871  * A wrapper class that exposes only the ExecutorService methods
872  * of an ExecutorService implementation.
873  */
874 private class DelegatedExecutorService(U) : ExecutorService 
875     if(is(U : ExecutorService)) {
876         
877     private U e;
878 
879     this(U executor) { e = executor; }
880 
881     void execute(Runnable command) {
882         try {
883             e.execute(command);
884         } finally { reachabilityFence(this); }
885     }
886 
887     void shutdown() { e.shutdown(); }
888 
889     List!(Runnable) shutdownNow() {
890         try {
891             return e.shutdownNow();
892         } finally { reachabilityFence(this); }
893     }
894 
895     bool isShutdown() {
896         try {
897             return e.isShutdown();
898         } finally { reachabilityFence(this); }
899     }
900 
901     bool isTerminated() {
902         try {
903             return e.isTerminated();
904         } finally { reachabilityFence(this); }
905     }
906 
907     bool awaitTermination(Duration timeout) {
908         try {
909             return e.awaitTermination(timeout);
910         } finally { reachabilityFence(this); }
911     }
912 
913     Future!void submit(Runnable task) {
914         try {
915             return e.submit(task);
916         } finally { reachabilityFence(this); }
917     }
918 
919     Future!(T) submit(T)(Callable!(T) task) {
920         try {
921             return e.submit(task);
922         } finally { reachabilityFence(this); }
923     }
924 
925     Future!(T) submit(T)(Runnable task, T result) {
926         try {
927             return e.submit(task, result);
928         } finally { reachabilityFence(this); }
929     }
930 
931     List!(Future!(T)) invokeAll(T)(Collection!(Callable!(T)) tasks) {
932         try {
933             return e.invokeAll(tasks);
934         } finally { reachabilityFence(this); }
935     }
936 
937     List!(Future!(T)) invokeAll(T)(Collection!(Callable!(T)) tasks,
938                                          Duration timeout) {
939         try {
940             return e.invokeAll(tasks, timeout, unit);
941         } finally { reachabilityFence(this); }
942     }
943 
944     T invokeAny(T)(Collection!(Callable!(T)) tasks) {
945         try {
946             return e.invokeAny(tasks);
947         } finally { reachabilityFence(this); }
948     }
949     
950     T invokeAny(T)(Collection!(Callable!(T)) tasks,
951                            Duration timeout) {
952         try {
953             return e.invokeAny(tasks, timeout, unit);
954         } finally { reachabilityFence(this); }
955     }
956 }
957 
958 private class FinalizableDelegatedExecutorService(T) : DelegatedExecutorService!T {
959     this(T executor) {
960         super(executor);
961     }
962 
963     protected void finalize() {
964         super.shutdown();
965     }
966 }
967 
968 /**
969  * A wrapper class that exposes only the ScheduledExecutorService
970  * methods of a ScheduledExecutorService implementation.
971  */
972 private class DelegatedScheduledExecutorService(T) : DelegatedExecutorService!T,
973         ScheduledExecutorService if(is(T : ScheduledExecutorService)){
974 
975     private T e;
976 
977     this(T executor) {
978         super(executor);
979         e = executor;
980     }
981 
982     ScheduledFuture!void schedule(Runnable command, Duration delay) {
983         return e.schedule(command, delay);
984     }
985 
986     ScheduledFuture!(V) schedule(V)(Callable!(V) callable, Duration delay) {
987         return e.schedule!V(callable, delay);
988     }
989 
990     ScheduledFuture!void scheduleAtFixedRate(Runnable command, Duration initialDelay, Duration period) {
991         return e.scheduleAtFixedRate(command, initialDelay, period);
992     }
993 
994     ScheduledFuture!void scheduleWithFixedDelay(Runnable command, Duration initialDelay, Duration delay) {
995         return e.scheduleWithFixedDelay(command, initialDelay, delay);
996     }
997 }