1 /* 2 * Hunt - A refined core library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.concurrency.AbstractExecutorService; 13 14 import hunt.concurrency.ExecutorService; 15 import hunt.concurrency.Future; 16 import hunt.concurrency.FutureTask; 17 18 import hunt.collection.ArrayList; 19 import hunt.collection.Collection; 20 import hunt.collection.Iterator; 21 import hunt.collection.List; 22 23 import hunt.Exceptions; 24 import hunt.logging.ConsoleLogger; 25 import hunt.util.Common; 26 import hunt.util.DateTime; 27 28 import std.datetime; 29 30 31 /** 32 * Provides default implementations of {@link ExecutorService} 33 * execution methods. This class implements the {@code submit}, 34 * {@code invokeAny} and {@code invokeAll} methods using a 35 * {@link RunnableFuture} returned by {@code newTaskFor}, which defaults 36 * to the {@link FutureTask} class provided in this package. For example, 37 * the implementation of {@code submit(Runnable)} creates an 38 * associated {@code RunnableFuture} that is executed and 39 * returned. Subclasses may override the {@code newTaskFor} methods 40 * to return {@code RunnableFuture} implementations other than 41 * {@code FutureTask}. 42 * 43 * <p><b>Extension example</b>. Here is a sketch of a class 44 * that customizes {@link ThreadPoolExecutor} to use 45 * a {@code CustomTask} class instead of the default {@code FutureTask}: 46 * <pre> {@code 47 * class CustomThreadPoolExecutor extends ThreadPoolExecutor { 48 * 49 * static class CustomTask!(V) : RunnableFuture!(V) {...} 50 * 51 * protected !(V) RunnableFuture!(V) newTaskFor(Callable!(V) c) { 52 * return new CustomTask!(V)(c); 53 * } 54 * protected !(V) RunnableFuture!(V) newTaskFor(Runnable r, V v) { 55 * return new CustomTask!(V)(r, v); 56 * } 57 * // ... add constructors, etc. 58 * }}</pre> 59 * 60 * @author Doug Lea 61 */ 62 abstract class AbstractExecutorService : ExecutorService { 63 64 /** 65 * Returns a {@code RunnableFuture} for the given runnable and default 66 * value. 67 * 68 * @param runnable the runnable task being wrapped 69 * @param value the default value for the returned future 70 * @param (T) the type of the given value 71 * @return a {@code RunnableFuture} which, when run, will run the 72 * underlying runnable and which, as a {@code Future}, will yield 73 * the given value as its result and provide for cancellation of 74 * the underlying task 75 */ 76 static RunnableFuture!(T) newTaskFor(T)(Runnable runnable, T value) if(!is(T == void)) { 77 return new FutureTask!(T)(runnable, value); 78 } 79 80 static RunnableFuture!(T) newTaskFor(T)(Runnable runnable) if(is(T == void)) { 81 return new FutureTask!(T)(runnable); 82 } 83 84 /** 85 * Returns a {@code RunnableFuture} for the given callable task. 86 * 87 * @param callable the callable task being wrapped 88 * @param (T) the type of the callable's result 89 * @return a {@code RunnableFuture} which, when run, will call the 90 * underlying callable and which, as a {@code Future}, will yield 91 * the callable's result as its result and provide for 92 * cancellation of the underlying task 93 */ 94 static RunnableFuture!(T) newTaskFor(T)(Callable!(T) callable) { 95 return new FutureTask!(T)(callable); 96 } 97 98 /** 99 * @throws RejectedExecutionException {@inheritDoc} 100 * @throws NullPointerException {@inheritDoc} 101 */ 102 Future!(void) submit(Runnable task) { 103 if (task is null) throw new NullPointerException(); 104 // RunnableFuture!(void) ftask = new FutureTask!(void)(task); 105 RunnableFuture!(void) ftask = newTaskFor!(void)(task); 106 execute(ftask); 107 return ftask; 108 } 109 110 /** 111 * @throws RejectedExecutionException {@inheritDoc} 112 * @throws NullPointerException {@inheritDoc} 113 */ 114 Future!(T) submit(T)(Runnable task, T result) { 115 if (task is null) throw new NullPointerException(); 116 RunnableFuture!(T) ftask = newTaskFor!(T)(task, result); 117 execute(ftask); 118 return ftask; 119 } 120 121 /** 122 * @throws RejectedExecutionException {@inheritDoc} 123 * @throws NullPointerException {@inheritDoc} 124 */ 125 Future!(T) submit(T)(Callable!(T) task) { 126 if (task is null) throw new NullPointerException(); 127 RunnableFuture!(T) ftask = newTaskFor(task); 128 execute(ftask); 129 return ftask; 130 } 131 132 /** 133 * the main mechanics of invokeAny. 134 */ 135 private T doInvokeAny(T)(Collection!(Callable!(T)) tasks, 136 bool timed, long nanos) { 137 if (tasks is null) 138 throw new NullPointerException(); 139 int ntasks = tasks.size(); 140 if (ntasks == 0) 141 throw new IllegalArgumentException(); 142 ArrayList!(Future!(T)) futures = new ArrayList!(Future!(T))(ntasks); 143 ExecutorCompletionService!(T) ecs = 144 new ExecutorCompletionService!(T)(this); 145 146 // For efficiency, especially in executors with limited 147 // parallelism, check to see if previously submitted tasks are 148 // done before submitting more of them. This interleaving 149 // plus the exception mechanics account for messiness of main 150 // loop. 151 152 try { 153 // Record exceptions so that if we fail to obtain any 154 // result, we can throw the last exception we got. 155 ExecutionException ee = null; 156 long deadline = timed ? Clock.currStdTime() + nanos : 0L; 157 Iterator!(Callable!(T)) it = tasks.iterator(); 158 159 // Start one task for sure; the rest incrementally 160 futures.add(ecs.submit(it.next())); 161 --ntasks; 162 int active = 1; 163 164 for (;;) { 165 Future!(T) f = ecs.poll(); 166 if (f is null) { 167 if (ntasks > 0) { 168 --ntasks; 169 futures.add(ecs.submit(it.next())); 170 ++active; 171 } 172 else if (active == 0) 173 break; 174 else if (timed) { 175 f = ecs.poll(nanos, NANOSECONDS); 176 if (f is null) 177 throw new TimeoutException(); 178 nanos = deadline - Clock.currStdTime(); 179 } 180 else 181 f = ecs.take(); 182 } 183 if (f !is null) { 184 --active; 185 try { 186 return f.get(); 187 } catch (ExecutionException eex) { 188 ee = eex; 189 } catch (RuntimeException rex) { 190 ee = new ExecutionException(rex); 191 } 192 } 193 } 194 195 if (ee is null) 196 ee = new ExecutionException(); 197 throw ee; 198 199 } finally { 200 cancelAll(futures); 201 } 202 } 203 204 T invokeAny(T)(Collection!(Callable!(T)) tasks) { 205 try { 206 return doInvokeAny(tasks, false, 0); 207 } catch (TimeoutException cannotHappen) { 208 assert(false); 209 return null; 210 } 211 } 212 213 T invokeAny(T)(Collection!(Callable!(T)) tasks, 214 Duration timeout) { 215 return doInvokeAny(tasks, true, unit.toNanos(timeout)); 216 } 217 218 List!(Future!(T)) invokeAll(T)(Collection!(Callable!(T)) tasks) { 219 if (tasks is null) 220 throw new NullPointerException(); 221 ArrayList!(Future!(T)) futures = new ArrayList!(Future!(T))(tasks.size()); 222 try { 223 foreach (Callable!(T) t ; tasks) { 224 RunnableFuture!(T) f = newTaskFor(t); 225 futures.add(f); 226 execute(f); 227 } 228 for (int i = 0, size = futures.size(); i < size; i++) { 229 Future!(T) f = futures.get(i); 230 if (!f.isDone()) { 231 try { f.get(); } 232 catch (CancellationException ex) { 233 version(HUNT_DEBUG) warning(ex.message()); 234 } 235 catch (ExecutionException) { 236 version(HUNT_DEBUG) warning(ex.message()); 237 } 238 } 239 } 240 return futures; 241 } catch (Throwable t) { 242 cancelAll(futures); 243 throw t; 244 } 245 } 246 247 List!(Future!(T)) invokeAll(T)(Collection!(Callable!(T)) tasks, 248 Duration timeout) { 249 if (tasks is null) 250 throw new NullPointerException(); 251 long nanos = timeout.total!(TimeUnit.HectoNanosecond)(); 252 long deadline = Clock.currStdTime + nanos; 253 ArrayList!(Future!(T)) futures = new ArrayList!(Future!(T))(tasks.size()); 254 int j = 0; 255 256 timedOut: 257 try { 258 foreach (Callable!(T) t ; tasks) 259 futures.add(newTaskFor(t)); 260 261 final int size = futures.size(); 262 263 // Interleave time checks and calls to execute in case 264 // executor doesn't have any/much parallelism. 265 for (int i = 0; i < size; i++) { 266 if (((i == 0) ? nanos : deadline - Clock.currStdTime) <= 0L) 267 break timedOut; 268 execute(cast(Runnable)futures.get(i)); 269 } 270 271 for (; j < size; j++) { 272 Future!(T) f = futures.get(j); 273 if (!f.isDone()) { 274 try { f.get( Duration(deadline - Clock.currStdTime)); } 275 catch (CancellationException ex) { 276 version(HUNT_DEBUG) warning(ex.message()); 277 } 278 catch (ExecutionException ex) { 279 version(HUNT_DEBUG) warning(ex.message()); 280 } 281 catch (TimeoutException ex) { 282 version(HUNT_DEBUG) warning(ex.message()); 283 break timedOut; 284 } 285 } 286 } 287 return futures; 288 } catch (Throwable t) { 289 cancelAll(futures); 290 throw t; 291 } 292 // Timed out before all the tasks could be completed; cancel remaining 293 cancelAll(futures, j); 294 return futures; 295 } 296 297 private static void cancelAll(T)(ArrayList!(Future!(T)) futures) { 298 cancelAll(futures, 0); 299 } 300 301 /** Cancels all futures with index at least j. */ 302 private static void cancelAll(T)(ArrayList!(Future!(T)) futures, int j) { 303 for (int size = futures.size(); j < size; j++) 304 futures.get(j).cancel(true); 305 } 306 }