1 /*
2  * Hunt - A refined core library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net/
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module hunt.concurrency.AbstractExecutorService;
13 
14 import hunt.concurrency.ExecutorService;
15 import hunt.concurrency.Future;
16 import hunt.concurrency.FutureTask;
17 
18 import hunt.collection.ArrayList;
19 import hunt.collection.Collection;
20 import hunt.collection.Iterator;
21 import hunt.collection.List;
22 
23 import hunt.Exceptions;
24 import hunt.logging.ConsoleLogger;
25 import hunt.util.Common;
26 import hunt.util.DateTime;
27 
28 import std.datetime;
29 
30 
31 /**
32  * Provides default implementations of {@link ExecutorService}
33  * execution methods. This class implements the {@code submit},
34  * {@code invokeAny} and {@code invokeAll} methods using a
35  * {@link RunnableFuture} returned by {@code newTaskFor}, which defaults
36  * to the {@link FutureTask} class provided in this package.  For example,
37  * the implementation of {@code submit(Runnable)} creates an
38  * associated {@code RunnableFuture} that is executed and
39  * returned. Subclasses may override the {@code newTaskFor} methods
40  * to return {@code RunnableFuture} implementations other than
41  * {@code FutureTask}.
42  *
43  * <p><b>Extension example</b>. Here is a sketch of a class
44  * that customizes {@link ThreadPoolExecutor} to use
45  * a {@code CustomTask} class instead of the default {@code FutureTask}:
46  * <pre> {@code
47  * class CustomThreadPoolExecutor extends ThreadPoolExecutor {
48  *
49  *   static class CustomTask!(V) : RunnableFuture!(V) {...}
50  *
51  *   protected !(V) RunnableFuture!(V) newTaskFor(Callable!(V) c) {
52  *       return new CustomTask!(V)(c);
53  *   }
54  *   protected !(V) RunnableFuture!(V) newTaskFor(Runnable r, V v) {
55  *       return new CustomTask!(V)(r, v);
56  *   }
57  *   // ... add constructors, etc.
58  * }}</pre>
59  *
60  * @author Doug Lea
61  */
62 abstract class AbstractExecutorService : ExecutorService {
63 
64     /**
65      * Returns a {@code RunnableFuture} for the given runnable and default
66      * value.
67      *
68      * @param runnable the runnable task being wrapped
69      * @param value the default value for the returned future
70      * @param (T) the type of the given value
71      * @return a {@code RunnableFuture} which, when run, will run the
72      * underlying runnable and which, as a {@code Future}, will yield
73      * the given value as its result and provide for cancellation of
74      * the underlying task
75      */
76     static RunnableFuture!(T) newTaskFor(T)(Runnable runnable, T value) if(!is(T == void)) {
77         return new FutureTask!(T)(runnable, value);
78     }
79 
80     static RunnableFuture!(T) newTaskFor(T)(Runnable runnable) if(is(T == void)) {
81         return new FutureTask!(T)(runnable);
82     }
83 
84     /**
85      * Returns a {@code RunnableFuture} for the given callable task.
86      *
87      * @param callable the callable task being wrapped
88      * @param (T) the type of the callable's result
89      * @return a {@code RunnableFuture} which, when run, will call the
90      * underlying callable and which, as a {@code Future}, will yield
91      * the callable's result as its result and provide for
92      * cancellation of the underlying task
93      */
94     static RunnableFuture!(T) newTaskFor(T)(Callable!(T) callable) {
95         return new FutureTask!(T)(callable);
96     }
97 
98     /**
99      * @throws RejectedExecutionException {@inheritDoc}
100      * @throws NullPointerException       {@inheritDoc}
101      */
102     Future!(void) submit(Runnable task) {
103         if (task is null) throw new NullPointerException();
104         // RunnableFuture!(void) ftask = new FutureTask!(void)(task);
105         RunnableFuture!(void) ftask = newTaskFor!(void)(task);
106         execute(ftask);
107         return ftask;
108     }
109 
110     /**
111      * @throws RejectedExecutionException {@inheritDoc}
112      * @throws NullPointerException       {@inheritDoc}
113      */
114     Future!(T) submit(T)(Runnable task, T result) {
115         if (task is null) throw new NullPointerException();
116         RunnableFuture!(T) ftask = newTaskFor!(T)(task, result);
117         execute(ftask);
118         return ftask;
119     }
120 
121     /**
122      * @throws RejectedExecutionException {@inheritDoc}
123      * @throws NullPointerException       {@inheritDoc}
124      */
125     Future!(T) submit(T)(Callable!(T) task) {
126         if (task is null) throw new NullPointerException();
127         RunnableFuture!(T) ftask = newTaskFor(task);
128         execute(ftask);
129         return ftask;
130     }
131 
132     /**
133      * the main mechanics of invokeAny.
134      */
135     private T doInvokeAny(T)(Collection!(Callable!(T)) tasks,
136                               bool timed, long nanos) {
137         if (tasks is null)
138             throw new NullPointerException();
139         int ntasks = tasks.size();
140         if (ntasks == 0)
141             throw new IllegalArgumentException();
142         ArrayList!(Future!(T)) futures = new ArrayList!(Future!(T))(ntasks);
143         ExecutorCompletionService!(T) ecs =
144             new ExecutorCompletionService!(T)(this);
145 
146         // For efficiency, especially in executors with limited
147         // parallelism, check to see if previously submitted tasks are
148         // done before submitting more of them. This interleaving
149         // plus the exception mechanics account for messiness of main
150         // loop.
151 
152         try {
153             // Record exceptions so that if we fail to obtain any
154             // result, we can throw the last exception we got.
155             ExecutionException ee = null;
156             long deadline = timed ? Clock.currStdTime() + nanos : 0L;
157             Iterator!(Callable!(T)) it = tasks.iterator();
158 
159             // Start one task for sure; the rest incrementally
160             futures.add(ecs.submit(it.next()));
161             --ntasks;
162             int active = 1;
163 
164             for (;;) {
165                 Future!(T) f = ecs.poll();
166                 if (f is null) {
167                     if (ntasks > 0) {
168                         --ntasks;
169                         futures.add(ecs.submit(it.next()));
170                         ++active;
171                     }
172                     else if (active == 0)
173                         break;
174                     else if (timed) {
175                         f = ecs.poll(nanos, NANOSECONDS);
176                         if (f is null)
177                             throw new TimeoutException();
178                         nanos = deadline - Clock.currStdTime();
179                     }
180                     else
181                         f = ecs.take();
182                 }
183                 if (f !is null) {
184                     --active;
185                     try {
186                         return f.get();
187                     } catch (ExecutionException eex) {
188                         ee = eex;
189                     } catch (RuntimeException rex) {
190                         ee = new ExecutionException(rex);
191                     }
192                 }
193             }
194 
195             if (ee is null)
196                 ee = new ExecutionException();
197             throw ee;
198 
199         } finally {
200             cancelAll(futures);
201         }
202     }
203 
204     T invokeAny(T)(Collection!(Callable!(T)) tasks) {
205         try {
206             return doInvokeAny(tasks, false, 0);
207         } catch (TimeoutException cannotHappen) {
208             assert(false);
209             return null;
210         }
211     }
212 
213     T invokeAny(T)(Collection!(Callable!(T)) tasks,
214                            Duration timeout) {
215         return doInvokeAny(tasks, true, unit.toNanos(timeout));
216     }
217 
218     List!(Future!(T)) invokeAll(T)(Collection!(Callable!(T)) tasks) {
219         if (tasks is null)
220             throw new NullPointerException();
221         ArrayList!(Future!(T)) futures = new ArrayList!(Future!(T))(tasks.size());
222         try {
223             foreach (Callable!(T) t ; tasks) {
224                 RunnableFuture!(T) f = newTaskFor(t);
225                 futures.add(f);
226                 execute(f);
227             }
228             for (int i = 0, size = futures.size(); i < size; i++) {
229                 Future!(T) f = futures.get(i);
230                 if (!f.isDone()) {
231                     try { f.get(); }
232                     catch (CancellationException ex) {
233                         version(HUNT_DEBUG) warning(ex.message());
234                     }
235                     catch (ExecutionException) {
236                         version(HUNT_DEBUG) warning(ex.message());
237                     }
238                 }
239             }
240             return futures;
241         } catch (Throwable t) {
242             cancelAll(futures);
243             throw t;
244         }
245     }
246 
247     List!(Future!(T)) invokeAll(T)(Collection!(Callable!(T)) tasks,
248                                          Duration timeout)  {
249         if (tasks is null)
250             throw new NullPointerException();
251         long nanos = timeout.total!(TimeUnit.HectoNanosecond)();
252         long deadline = Clock.currStdTime + nanos;
253         ArrayList!(Future!(T)) futures = new ArrayList!(Future!(T))(tasks.size());
254         int j = 0;
255 
256         timedOut: 
257         try {
258             foreach (Callable!(T) t ; tasks)
259                 futures.add(newTaskFor(t));
260 
261             final int size = futures.size();
262 
263             // Interleave time checks and calls to execute in case
264             // executor doesn't have any/much parallelism.
265             for (int i = 0; i < size; i++) {
266                 if (((i == 0) ? nanos : deadline - Clock.currStdTime) <= 0L)
267                     break timedOut;
268                 execute(cast(Runnable)futures.get(i));
269             }
270 
271             for (; j < size; j++) {
272                 Future!(T) f = futures.get(j);
273                 if (!f.isDone()) {
274                     try { f.get( Duration(deadline - Clock.currStdTime)); }
275                     catch (CancellationException ex) {
276                         version(HUNT_DEBUG) warning(ex.message());
277                     }
278                     catch (ExecutionException ex) {
279                         version(HUNT_DEBUG) warning(ex.message());
280                     }
281                     catch (TimeoutException ex) {
282                         version(HUNT_DEBUG) warning(ex.message());
283                         break timedOut;
284                     }
285                 }
286             }
287             return futures;
288         } catch (Throwable t) {
289             cancelAll(futures);
290             throw t;
291         }
292         // Timed out before all the tasks could be completed; cancel remaining
293         cancelAll(futures, j);
294         return futures;
295     }
296 
297     private static void cancelAll(T)(ArrayList!(Future!(T)) futures) {
298         cancelAll(futures, 0);
299     }
300 
301     /** Cancels all futures with index at least j. */
302     private static void cancelAll(T)(ArrayList!(Future!(T)) futures, int j) {
303         for (int size = futures.size(); j < size; j++)
304             futures.get(j).cancel(true);
305     }
306 }