1 /* 2 * Hunt - A refined core library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.concurrency.AbstractExecutorService; 13 14 import hunt.concurrency.Executor; 15 import hunt.concurrency.ExecutorService; 16 import hunt.concurrency.Future; 17 import hunt.concurrency.FutureTask; 18 19 import hunt.collection.ArrayList; 20 import hunt.collection.Collection; 21 import hunt.collection.Iterator; 22 import hunt.collection.List; 23 import hunt.util.DateTime; 24 import hunt.Exceptions; 25 import hunt.util.Common; 26 27 import std.datetime; 28 29 version(HUNT_DEBUG) import hunt.logging; 30 31 /** 32 * Provides default implementations of {@link ExecutorService} 33 * execution methods. This class implements the {@code submit}, 34 * {@code invokeAny} and {@code invokeAll} methods using a 35 * {@link RunnableFuture} returned by {@code newTaskFor}, which defaults 36 * to the {@link FutureTask} class provided in this package. For example, 37 * the implementation of {@code submit(Runnable)} creates an 38 * associated {@code RunnableFuture} that is executed and 39 * returned. Subclasses may override the {@code newTaskFor} methods 40 * to return {@code RunnableFuture} implementations other than 41 * {@code FutureTask}. 42 * 43 * <p><b>Extension example</b>. Here is a sketch of a class 44 * that customizes {@link ThreadPoolExecutor} to use 45 * a {@code CustomTask} class instead of the default {@code FutureTask}: 46 * <pre> {@code 47 * class CustomThreadPoolExecutor extends ThreadPoolExecutor { 48 * 49 * static class CustomTask!(V) : RunnableFuture!(V) {...} 50 * 51 * protected !(V) RunnableFuture!(V) newTaskFor(Callable!(V) c) { 52 * return new CustomTask!(V)(c); 53 * } 54 * protected !(V) RunnableFuture!(V) newTaskFor(Runnable r, V v) { 55 * return new CustomTask!(V)(r, v); 56 * } 57 * // ... add constructors, etc. 58 * }}</pre> 59 * 60 * @since 1.5 61 * @author Doug Lea 62 */ 63 abstract class AbstractExecutorService : ExecutorService { 64 65 /** 66 * Returns a {@code RunnableFuture} for the given runnable and default 67 * value. 68 * 69 * @param runnable the runnable task being wrapped 70 * @param value the default value for the returned future 71 * @param (T) the type of the given value 72 * @return a {@code RunnableFuture} which, when run, will run the 73 * underlying runnable and which, as a {@code Future}, will yield 74 * the given value as its result and provide for cancellation of 75 * the underlying task 76 * @since 1.6 77 */ 78 static RunnableFuture!(T) newTaskFor(T)(Runnable runnable, T value) { 79 return new FutureTask!(T)(runnable, value); 80 } 81 82 /** 83 * Returns a {@code RunnableFuture} for the given callable task. 84 * 85 * @param callable the callable task being wrapped 86 * @param (T) the type of the callable's result 87 * @return a {@code RunnableFuture} which, when run, will call the 88 * underlying callable and which, as a {@code Future}, will yield 89 * the callable's result as its result and provide for 90 * cancellation of the underlying task 91 * @since 1.6 92 */ 93 static RunnableFuture!(T) newTaskFor(T)(Callable!(T) callable) { 94 return new FutureTask!(T)(callable); 95 } 96 97 /** 98 * @throws RejectedExecutionException {@inheritDoc} 99 * @throws NullPointerException {@inheritDoc} 100 */ 101 Future!(void) submit(Runnable task) { 102 if (task is null) throw new NullPointerException(); 103 RunnableFuture!(void) ftask = new FutureTask!(void)(task); 104 execute(ftask); 105 return ftask; 106 } 107 108 /** 109 * @throws RejectedExecutionException {@inheritDoc} 110 * @throws NullPointerException {@inheritDoc} 111 */ 112 Future!(T) submit(T)(Runnable task, T result) { 113 if (task is null) throw new NullPointerException(); 114 RunnableFuture!(T) ftask = newTaskFor!(T)(task, result); 115 execute(ftask); 116 return ftask; 117 } 118 119 /** 120 * @throws RejectedExecutionException {@inheritDoc} 121 * @throws NullPointerException {@inheritDoc} 122 */ 123 Future!(T) submit(T)(Callable!(T) task) { 124 if (task is null) throw new NullPointerException(); 125 RunnableFuture!(T) ftask = newTaskFor(task); 126 execute(ftask); 127 return ftask; 128 } 129 130 /** 131 * the main mechanics of invokeAny. 132 */ 133 private T doInvokeAny(T)(Collection!(Callable!(T)) tasks, 134 bool timed, long nanos) { 135 if (tasks is null) 136 throw new NullPointerException(); 137 int ntasks = tasks.size(); 138 if (ntasks == 0) 139 throw new IllegalArgumentException(); 140 ArrayList!(Future!(T)) futures = new ArrayList!(Future!(T))(ntasks); 141 ExecutorCompletionService!(T) ecs = 142 new ExecutorCompletionService!(T)(this); 143 144 // For efficiency, especially in executors with limited 145 // parallelism, check to see if previously submitted tasks are 146 // done before submitting more of them. This interleaving 147 // plus the exception mechanics account for messiness of main 148 // loop. 149 150 try { 151 // Record exceptions so that if we fail to obtain any 152 // result, we can throw the last exception we got. 153 ExecutionException ee = null; 154 long deadline = timed ? Clock.currStdTime() + nanos : 0L; 155 Iterator!(Callable!(T)) it = tasks.iterator(); 156 157 // Start one task for sure; the rest incrementally 158 futures.add(ecs.submit(it.next())); 159 --ntasks; 160 int active = 1; 161 162 for (;;) { 163 Future!(T) f = ecs.poll(); 164 if (f is null) { 165 if (ntasks > 0) { 166 --ntasks; 167 futures.add(ecs.submit(it.next())); 168 ++active; 169 } 170 else if (active == 0) 171 break; 172 else if (timed) { 173 f = ecs.poll(nanos, NANOSECONDS); 174 if (f is null) 175 throw new TimeoutException(); 176 nanos = deadline - Clock.currStdTime(); 177 } 178 else 179 f = ecs.take(); 180 } 181 if (f !is null) { 182 --active; 183 try { 184 return f.get(); 185 } catch (ExecutionException eex) { 186 ee = eex; 187 } catch (RuntimeException rex) { 188 ee = new ExecutionException(rex); 189 } 190 } 191 } 192 193 if (ee is null) 194 ee = new ExecutionException(); 195 throw ee; 196 197 } finally { 198 cancelAll(futures); 199 } 200 } 201 202 T invokeAny(T)(Collection!(Callable!(T)) tasks) { 203 try { 204 return doInvokeAny(tasks, false, 0); 205 } catch (TimeoutException cannotHappen) { 206 assert(false); 207 return null; 208 } 209 } 210 211 T invokeAny(T)(Collection!(Callable!(T)) tasks, 212 Duration timeout) { 213 return doInvokeAny(tasks, true, unit.toNanos(timeout)); 214 } 215 216 List!(Future!(T)) invokeAll(T)(Collection!(Callable!(T)) tasks) { 217 if (tasks is null) 218 throw new NullPointerException(); 219 ArrayList!(Future!(T)) futures = new ArrayList!(Future!(T))(tasks.size()); 220 try { 221 foreach (Callable!(T) t ; tasks) { 222 RunnableFuture!(T) f = newTaskFor(t); 223 futures.add(f); 224 execute(f); 225 } 226 for (int i = 0, size = futures.size(); i < size; i++) { 227 Future!(T) f = futures.get(i); 228 if (!f.isDone()) { 229 try { f.get(); } 230 catch (CancellationException ex) { 231 version(HUNT_DEBUG) warning(ex.message()); 232 } 233 catch (ExecutionException) { 234 version(HUNT_DEBUG) warning(ex.message()); 235 } 236 } 237 } 238 return futures; 239 } catch (Throwable t) { 240 cancelAll(futures); 241 throw t; 242 } 243 } 244 245 List!(Future!(T)) invokeAll(T)(Collection!(Callable!(T)) tasks, 246 Duration timeout) { 247 if (tasks is null) 248 throw new NullPointerException(); 249 long nanos = timeout.total!(TimeUnit.HectoNanosecond)(); 250 long deadline = Clock.currStdTime + nanos; 251 ArrayList!(Future!(T)) futures = new ArrayList!(Future!(T))(tasks.size()); 252 int j = 0; 253 254 timedOut: 255 try { 256 foreach (Callable!(T) t ; tasks) 257 futures.add(newTaskFor(t)); 258 259 final int size = futures.size(); 260 261 // Interleave time checks and calls to execute in case 262 // executor doesn't have any/much parallelism. 263 for (int i = 0; i < size; i++) { 264 if (((i == 0) ? nanos : deadline - Clock.currStdTime) <= 0L) 265 break timedOut; 266 execute(cast(Runnable)futures.get(i)); 267 } 268 269 for (; j < size; j++) { 270 Future!(T) f = futures.get(j); 271 if (!f.isDone()) { 272 try { f.get( Duration(deadline - Clock.currStdTime)); } 273 catch (CancellationException ex) { 274 version(HUNT_DEBUG) warning(ex.message()); 275 } 276 catch (ExecutionException ex) { 277 version(HUNT_DEBUG) warning(ex.message()); 278 } 279 catch (TimeoutException ex) { 280 version(HUNT_DEBUG) warning(ex.message()); 281 break timedOut; 282 } 283 } 284 } 285 return futures; 286 } catch (Throwable t) { 287 cancelAll(futures); 288 throw t; 289 } 290 // Timed out before all the tasks could be completed; cancel remaining 291 cancelAll(futures, j); 292 return futures; 293 } 294 295 private static void cancelAll(T)(ArrayList!(Future!(T)) futures) { 296 cancelAll(futures, 0); 297 } 298 299 /** Cancels all futures with index at least j. */ 300 private static void cancelAll(T)(ArrayList!(Future!(T)) futures, int j) { 301 for (int size = futures.size(); j < size; j++) 302 futures.get(j).cancel(true); 303 } 304 }