1 /*
2  * Hunt - A refined core library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net/
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module hunt.concurrency.AbstractExecutorService;
13 
14 import hunt.concurrency.Executor;
15 import hunt.concurrency.ExecutorService;
16 import hunt.concurrency.Future;
17 import hunt.concurrency.FutureTask;
18 
19 import hunt.collection.ArrayList;
20 import hunt.collection.Collection;
21 import hunt.collection.Iterator;
22 import hunt.collection.List;
23 import hunt.util.DateTime;
24 import hunt.Exceptions;
25 import hunt.util.Common;
26 
27 import std.datetime;
28 
29 version(HUNT_DEBUG) import hunt.logging;
30 
31 /**
32  * Provides default implementations of {@link ExecutorService}
33  * execution methods. This class implements the {@code submit},
34  * {@code invokeAny} and {@code invokeAll} methods using a
35  * {@link RunnableFuture} returned by {@code newTaskFor}, which defaults
36  * to the {@link FutureTask} class provided in this package.  For example,
37  * the implementation of {@code submit(Runnable)} creates an
38  * associated {@code RunnableFuture} that is executed and
39  * returned. Subclasses may override the {@code newTaskFor} methods
40  * to return {@code RunnableFuture} implementations other than
41  * {@code FutureTask}.
42  *
43  * <p><b>Extension example</b>. Here is a sketch of a class
44  * that customizes {@link ThreadPoolExecutor} to use
45  * a {@code CustomTask} class instead of the default {@code FutureTask}:
46  * <pre> {@code
47  * class CustomThreadPoolExecutor extends ThreadPoolExecutor {
48  *
49  *   static class CustomTask!(V) : RunnableFuture!(V) {...}
50  *
51  *   protected !(V) RunnableFuture!(V) newTaskFor(Callable!(V) c) {
52  *       return new CustomTask!(V)(c);
53  *   }
54  *   protected !(V) RunnableFuture!(V) newTaskFor(Runnable r, V v) {
55  *       return new CustomTask!(V)(r, v);
56  *   }
57  *   // ... add constructors, etc.
58  * }}</pre>
59  *
60  * @since 1.5
61  * @author Doug Lea
62  */
63 abstract class AbstractExecutorService : ExecutorService {
64 
65     /**
66      * Returns a {@code RunnableFuture} for the given runnable and default
67      * value.
68      *
69      * @param runnable the runnable task being wrapped
70      * @param value the default value for the returned future
71      * @param (T) the type of the given value
72      * @return a {@code RunnableFuture} which, when run, will run the
73      * underlying runnable and which, as a {@code Future}, will yield
74      * the given value as its result and provide for cancellation of
75      * the underlying task
76      * @since 1.6
77      */
78     static RunnableFuture!(T) newTaskFor(T)(Runnable runnable, T value) {
79         return new FutureTask!(T)(runnable, value);
80     }
81 
82     /**
83      * Returns a {@code RunnableFuture} for the given callable task.
84      *
85      * @param callable the callable task being wrapped
86      * @param (T) the type of the callable's result
87      * @return a {@code RunnableFuture} which, when run, will call the
88      * underlying callable and which, as a {@code Future}, will yield
89      * the callable's result as its result and provide for
90      * cancellation of the underlying task
91      * @since 1.6
92      */
93     static RunnableFuture!(T) newTaskFor(T)(Callable!(T) callable) {
94         return new FutureTask!(T)(callable);
95     }
96 
97     /**
98      * @throws RejectedExecutionException {@inheritDoc}
99      * @throws NullPointerException       {@inheritDoc}
100      */
101     Future!(void) submit(Runnable task) {
102         if (task is null) throw new NullPointerException();
103         RunnableFuture!(void) ftask = new FutureTask!(void)(task);
104         execute(ftask);
105         return ftask;
106     }
107 
108     /**
109      * @throws RejectedExecutionException {@inheritDoc}
110      * @throws NullPointerException       {@inheritDoc}
111      */
112     Future!(T) submit(T)(Runnable task, T result) {
113         if (task is null) throw new NullPointerException();
114         RunnableFuture!(T) ftask = newTaskFor!(T)(task, result);
115         execute(ftask);
116         return ftask;
117     }
118 
119     /**
120      * @throws RejectedExecutionException {@inheritDoc}
121      * @throws NullPointerException       {@inheritDoc}
122      */
123     Future!(T) submit(T)(Callable!(T) task) {
124         if (task is null) throw new NullPointerException();
125         RunnableFuture!(T) ftask = newTaskFor(task);
126         execute(ftask);
127         return ftask;
128     }
129 
130     /**
131      * the main mechanics of invokeAny.
132      */
133     private T doInvokeAny(T)(Collection!(Callable!(T)) tasks,
134                               bool timed, long nanos) {
135         if (tasks is null)
136             throw new NullPointerException();
137         int ntasks = tasks.size();
138         if (ntasks == 0)
139             throw new IllegalArgumentException();
140         ArrayList!(Future!(T)) futures = new ArrayList!(Future!(T))(ntasks);
141         ExecutorCompletionService!(T) ecs =
142             new ExecutorCompletionService!(T)(this);
143 
144         // For efficiency, especially in executors with limited
145         // parallelism, check to see if previously submitted tasks are
146         // done before submitting more of them. This interleaving
147         // plus the exception mechanics account for messiness of main
148         // loop.
149 
150         try {
151             // Record exceptions so that if we fail to obtain any
152             // result, we can throw the last exception we got.
153             ExecutionException ee = null;
154             long deadline = timed ? Clock.currStdTime() + nanos : 0L;
155             Iterator!(Callable!(T)) it = tasks.iterator();
156 
157             // Start one task for sure; the rest incrementally
158             futures.add(ecs.submit(it.next()));
159             --ntasks;
160             int active = 1;
161 
162             for (;;) {
163                 Future!(T) f = ecs.poll();
164                 if (f is null) {
165                     if (ntasks > 0) {
166                         --ntasks;
167                         futures.add(ecs.submit(it.next()));
168                         ++active;
169                     }
170                     else if (active == 0)
171                         break;
172                     else if (timed) {
173                         f = ecs.poll(nanos, NANOSECONDS);
174                         if (f is null)
175                             throw new TimeoutException();
176                         nanos = deadline - Clock.currStdTime();
177                     }
178                     else
179                         f = ecs.take();
180                 }
181                 if (f !is null) {
182                     --active;
183                     try {
184                         return f.get();
185                     } catch (ExecutionException eex) {
186                         ee = eex;
187                     } catch (RuntimeException rex) {
188                         ee = new ExecutionException(rex);
189                     }
190                 }
191             }
192 
193             if (ee is null)
194                 ee = new ExecutionException();
195             throw ee;
196 
197         } finally {
198             cancelAll(futures);
199         }
200     }
201 
202     T invokeAny(T)(Collection!(Callable!(T)) tasks) {
203         try {
204             return doInvokeAny(tasks, false, 0);
205         } catch (TimeoutException cannotHappen) {
206             assert(false);
207             return null;
208         }
209     }
210 
211     T invokeAny(T)(Collection!(Callable!(T)) tasks,
212                            Duration timeout) {
213         return doInvokeAny(tasks, true, unit.toNanos(timeout));
214     }
215 
216     List!(Future!(T)) invokeAll(T)(Collection!(Callable!(T)) tasks) {
217         if (tasks is null)
218             throw new NullPointerException();
219         ArrayList!(Future!(T)) futures = new ArrayList!(Future!(T))(tasks.size());
220         try {
221             foreach (Callable!(T) t ; tasks) {
222                 RunnableFuture!(T) f = newTaskFor(t);
223                 futures.add(f);
224                 execute(f);
225             }
226             for (int i = 0, size = futures.size(); i < size; i++) {
227                 Future!(T) f = futures.get(i);
228                 if (!f.isDone()) {
229                     try { f.get(); }
230                     catch (CancellationException ex) {
231                         version(HUNT_DEBUG) warning(ex.message());
232                     }
233                     catch (ExecutionException) {
234                         version(HUNT_DEBUG) warning(ex.message());
235                     }
236                 }
237             }
238             return futures;
239         } catch (Throwable t) {
240             cancelAll(futures);
241             throw t;
242         }
243     }
244 
245     List!(Future!(T)) invokeAll(T)(Collection!(Callable!(T)) tasks,
246                                          Duration timeout)  {
247         if (tasks is null)
248             throw new NullPointerException();
249         long nanos = timeout.total!(TimeUnit.HectoNanosecond)();
250         long deadline = Clock.currStdTime + nanos;
251         ArrayList!(Future!(T)) futures = new ArrayList!(Future!(T))(tasks.size());
252         int j = 0;
253 
254         timedOut: 
255         try {
256             foreach (Callable!(T) t ; tasks)
257                 futures.add(newTaskFor(t));
258 
259             final int size = futures.size();
260 
261             // Interleave time checks and calls to execute in case
262             // executor doesn't have any/much parallelism.
263             for (int i = 0; i < size; i++) {
264                 if (((i == 0) ? nanos : deadline - Clock.currStdTime) <= 0L)
265                     break timedOut;
266                 execute(cast(Runnable)futures.get(i));
267             }
268 
269             for (; j < size; j++) {
270                 Future!(T) f = futures.get(j);
271                 if (!f.isDone()) {
272                     try { f.get( Duration(deadline - Clock.currStdTime)); }
273                     catch (CancellationException ex) {
274                         version(HUNT_DEBUG) warning(ex.message());
275                     }
276                     catch (ExecutionException ex) {
277                         version(HUNT_DEBUG) warning(ex.message());
278                     }
279                     catch (TimeoutException ex) {
280                         version(HUNT_DEBUG) warning(ex.message());
281                         break timedOut;
282                     }
283                 }
284             }
285             return futures;
286         } catch (Throwable t) {
287             cancelAll(futures);
288             throw t;
289         }
290         // Timed out before all the tasks could be completed; cancel remaining
291         cancelAll(futures, j);
292         return futures;
293     }
294 
295     private static void cancelAll(T)(ArrayList!(Future!(T)) futures) {
296         cancelAll(futures, 0);
297     }
298 
299     /** Cancels all futures with index at least j. */
300     private static void cancelAll(T)(ArrayList!(Future!(T)) futures, int j) {
301         for (int size = futures.size(); j < size; j++)
302             futures.get(j).cancel(true);
303     }
304 }