1 /* 2 * Hunt - A refined core library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.concurrency.BlockingQueue; 13 14 import hunt.collection.Collection; 15 import hunt.collection.Queue; 16 import core.time; 17 18 /** 19 * A {@link Queue} that additionally supports operations that wait for 20 * the queue to become non-empty when retrieving an element, and wait 21 * for space to become available in the queue when storing an element. 22 * 23 * <p>{@code BlockingQueue} methods come in four forms, with different ways 24 * of handling operations that cannot be satisfied immediately, but may be 25 * satisfied at some point in the future: 26 * one throws an exception, the second returns a special value (either 27 * {@code null} or {@code false}, depending on the operation), the third 28 * blocks the current thread indefinitely until the operation can succeed, 29 * and the fourth blocks for only a given maximum time limit before giving 30 * up. These methods are summarized in the following table: 31 * 32 * <table class="plain"> 33 * <caption>Summary of BlockingQueue methods</caption> 34 * <tr> 35 * <td></td> 36 * <th scope="col" style="font-weight:normal; font-style:italic">Throws exception</th> 37 * <th scope="col" style="font-weight:normal; font-style:italic">Special value</th> 38 * <th scope="col" style="font-weight:normal; font-style:italic">Blocks</th> 39 * <th scope="col" style="font-weight:normal; font-style:italic">Times out</th> 40 * </tr> 41 * <tr> 42 * <th scope="row" style="text-align:left">Insert</th> 43 * <td>{@link #add(Object) add(e)}</td> 44 * <td>{@link #offer(Object) offer(e)}</td> 45 * <td>{@link #put(Object) put(e)}</td> 46 * <td>{@link #offer(Object, long, TimeUnit) offer(e, time, unit)}</td> 47 * </tr> 48 * <tr> 49 * <th scope="row" style="text-align:left">Remove</th> 50 * <td>{@link #remove() remove()}</td> 51 * <td>{@link #poll() poll()}</td> 52 * <td>{@link #take() take()}</td> 53 * <td>{@link #poll(long, TimeUnit) poll(time, unit)}</td> 54 * </tr> 55 * <tr> 56 * <th scope="row" style="text-align:left">Examine</th> 57 * <td>{@link #element() element()}</td> 58 * <td>{@link #peek() peek()}</td> 59 * <td style="font-style: italic">not applicable</td> 60 * <td style="font-style: italic">not applicable</td> 61 * </tr> 62 * </table> 63 * 64 * <p>A {@code BlockingQueue} does not accept {@code null} elements. 65 * Implementations throw {@code NullPointerException} on attempts 66 * to {@code add}, {@code put} or {@code offer} a {@code null}. A 67 * {@code null} is used as a sentinel value to indicate failure of 68 * {@code poll} operations. 69 * 70 * <p>A {@code BlockingQueue} may be capacity bounded. At any given 71 * time it may have a {@code remainingCapacity} beyond which no 72 * additional elements can be {@code put} without blocking. 73 * A {@code BlockingQueue} without any intrinsic capacity constraints always 74 * reports a remaining capacity of {@code Integer.MAX_VALUE}. 75 * 76 * <p>{@code BlockingQueue} implementations are designed to be used 77 * primarily for producer-consumer queues, but additionally support 78 * the {@link Collection} interface. So, for example, it is 79 * possible to remove an arbitrary element from a queue using 80 * {@code remove(x)}. However, such operations are in general 81 * <em>not</em> performed very efficiently, and are intended for only 82 * occasional use, such as when a queued message is cancelled. 83 * 84 * <p>{@code BlockingQueue} implementations are thread-safe. All 85 * queuing methods achieve their effects atomically using internal 86 * locks or other forms of concurrency control. However, the 87 * <em>bulk</em> Collection operations {@code addAll}, 88 * {@code containsAll}, {@code retainAll} and {@code removeAll} are 89 * <em>not</em> necessarily performed atomically unless specified 90 * otherwise in an implementation. So it is possible, for example, for 91 * {@code addAll(c)} to fail (throwing an exception) after adding 92 * only some of the elements in {@code c}. 93 * 94 * <p>A {@code BlockingQueue} does <em>not</em> intrinsically support 95 * any kind of "close" or "shutdown" operation to 96 * indicate that no more items will be added. The needs and usage of 97 * such features tend to be implementation-dependent. For example, a 98 * common tactic is for producers to insert special 99 * <em>end-of-stream</em> or <em>poison</em> objects, that are 100 * interpreted accordingly when taken by consumers. 101 * 102 * <p> 103 * Usage example, based on a typical producer-consumer scenario. 104 * Note that a {@code BlockingQueue} can safely be used with multiple 105 * producers and multiple consumers. 106 * <pre> {@code 107 * class Producer implements Runnable { 108 * private final BlockingQueue queue; 109 * Producer(BlockingQueue q) { queue = q; } 110 * void run() { 111 * try { 112 * while (true) { queue.put(produce()); } 113 * } catch (InterruptedException ex) { ... handle ...} 114 * } 115 * Object produce() { ... } 116 * } 117 * 118 * class Consumer implements Runnable { 119 * private final BlockingQueue queue; 120 * Consumer(BlockingQueue q) { queue = q; } 121 * void run() { 122 * try { 123 * while (true) { consume(queue.take()); } 124 * } catch (InterruptedException ex) { ... handle ...} 125 * } 126 * void consume(Object x) { ... } 127 * } 128 * 129 * class Setup { 130 * void main() { 131 * BlockingQueue q = new SomeQueueImplementation(); 132 * Producer p = new Producer(q); 133 * Consumer c1 = new Consumer(q); 134 * Consumer c2 = new Consumer(q); 135 * new Thread(p).start(); 136 * new Thread(c1).start(); 137 * new Thread(c2).start(); 138 * } 139 * }}</pre> 140 * 141 * <p>Memory consistency effects: As with other concurrent 142 * collections, actions in a thread prior to placing an object into a 143 * {@code BlockingQueue} 144 * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a> 145 * actions subsequent to the access or removal of that element from 146 * the {@code BlockingQueue} in another thread. 147 * 148 * <p>This interface is a member of the 149 * <a href="{@docRoot}/java.base/java/util/package-summary.html#CollectionsFramework"> 150 * Java Collections Framework</a>. 151 * 152 * @since 1.5 153 * @author Doug Lea 154 * @param (E) the type of elements held in this queue 155 */ 156 interface BlockingQueue(E) : Queue!(E) { 157 /** 158 * Inserts the specified element into this queue if it is possible to do 159 * so immediately without violating capacity restrictions, returning 160 * {@code true} upon success and throwing an 161 * {@code IllegalStateException} if no space is currently available. 162 * When using a capacity-restricted queue, it is generally preferable to 163 * use {@link #offer(Object) offer}. 164 * 165 * @param e the element to add 166 * @return {@code true} (as specified by {@link Collection#add}) 167 * @throws IllegalStateException if the element cannot be added at this 168 * time due to capacity restrictions 169 * @throws ClassCastException if the class of the specified element 170 * prevents it from being added to this queue 171 * @throws NullPointerException if the specified element is null 172 * @throws IllegalArgumentException if some property of the specified 173 * element prevents it from being added to this queue 174 */ 175 bool add(E e); 176 177 /** 178 * Inserts the specified element into this queue if it is possible to do 179 * so immediately without violating capacity restrictions, returning 180 * {@code true} upon success and {@code false} if no space is currently 181 * available. When using a capacity-restricted queue, this method is 182 * generally preferable to {@link #add}, which can fail to insert an 183 * element only by throwing an exception. 184 * 185 * @param e the element to add 186 * @return {@code true} if the element was added to this queue, else 187 * {@code false} 188 * @throws ClassCastException if the class of the specified element 189 * prevents it from being added to this queue 190 * @throws NullPointerException if the specified element is null 191 * @throws IllegalArgumentException if some property of the specified 192 * element prevents it from being added to this queue 193 */ 194 bool offer(E e); 195 196 /** 197 * Inserts the specified element into this queue, waiting if necessary 198 * for space to become available. 199 * 200 * @param e the element to add 201 * @throws InterruptedException if interrupted while waiting 202 * @throws ClassCastException if the class of the specified element 203 * prevents it from being added to this queue 204 * @throws NullPointerException if the specified element is null 205 * @throws IllegalArgumentException if some property of the specified 206 * element prevents it from being added to this queue 207 */ 208 void put(E e); 209 210 /** 211 * Inserts the specified element into this queue, waiting up to the 212 * specified wait time if necessary for space to become available. 213 * 214 * @param e the element to add 215 * @param timeout how long to wait before giving up, in units of 216 * {@code unit} 217 * @param unit a {@code TimeUnit} determining how to interpret the 218 * {@code timeout} parameter 219 * @return {@code true} if successful, or {@code false} if 220 * the specified waiting time elapses before space is available 221 * @throws InterruptedException if interrupted while waiting 222 * @throws ClassCastException if the class of the specified element 223 * prevents it from being added to this queue 224 * @throws NullPointerException if the specified element is null 225 * @throws IllegalArgumentException if some property of the specified 226 * element prevents it from being added to this queue 227 */ 228 bool offer(E e, Duration timeout); 229 230 /** 231 * Retrieves and removes the head of this queue, waiting if necessary 232 * until an element becomes available. 233 * 234 * @return the head of this queue 235 * @throws InterruptedException if interrupted while waiting 236 */ 237 E take(); 238 239 /** 240 * Retrieves and removes the head of this queue, waiting up to the 241 * specified wait time if necessary for an element to become available. 242 * 243 * @param timeout how long to wait before giving up, in units of 244 * {@code unit} 245 * @param unit a {@code TimeUnit} determining how to interpret the 246 * {@code timeout} parameter 247 * @return the head of this queue, or {@code null} if the 248 * specified waiting time elapses before an element is available 249 * @throws InterruptedException if interrupted while waiting 250 */ 251 E poll(Duration timeout); 252 253 alias poll = Queue!E.poll; 254 255 /** 256 * Returns the number of additional elements that this queue can ideally 257 * (in the absence of memory or resource constraints) accept without 258 * blocking, or {@code Integer.MAX_VALUE} if there is no intrinsic 259 * limit. 260 * 261 * <p>Note that you <em>cannot</em> always tell if an attempt to insert 262 * an element will succeed by inspecting {@code remainingCapacity} 263 * because it may be the case that another thread is about to 264 * insert or remove an element. 265 * 266 * @return the remaining capacity 267 */ 268 int remainingCapacity(); 269 270 /** 271 * Removes a single instance of the specified element from this queue, 272 * if it is present. More formally, removes an element {@code e} such 273 * that {@code o.equals(e)}, if this queue contains one or more such 274 * elements. 275 * Returns {@code true} if this queue contained the specified element 276 * (or equivalently, if this queue changed as a result of the call). 277 * 278 * @param o element to be removed from this queue, if present 279 * @return {@code true} if this queue changed as a result of the call 280 * @throws ClassCastException if the class of the specified element 281 * is incompatible with this queue 282 * (<a href="{@docRoot}/java.base/java/util/Collection.html#optional-restrictions">optional</a>) 283 * @throws NullPointerException if the specified element is null 284 * (<a href="{@docRoot}/java.base/java/util/Collection.html#optional-restrictions">optional</a>) 285 */ 286 bool remove(E o); 287 288 /** 289 * Returns {@code true} if this queue contains the specified element. 290 * More formally, returns {@code true} if and only if this queue contains 291 * at least one element {@code e} such that {@code o.equals(e)}. 292 * 293 * @param o object to be checked for containment in this queue 294 * @return {@code true} if this queue contains the specified element 295 * @throws ClassCastException if the class of the specified element 296 * is incompatible with this queue 297 * (<a href="{@docRoot}/java.base/java/util/Collection.html#optional-restrictions">optional</a>) 298 * @throws NullPointerException if the specified element is null 299 * (<a href="{@docRoot}/java.base/java/util/Collection.html#optional-restrictions">optional</a>) 300 */ 301 bool contains(E o); 302 303 /** 304 * Removes all available elements from this queue and adds them 305 * to the given collection. This operation may be more 306 * efficient than repeatedly polling this queue. A failure 307 * encountered while attempting to add elements to 308 * collection {@code c} may result in elements being in neither, 309 * either or both collections when the associated exception is 310 * thrown. Attempts to drain a queue to itself result in 311 * {@code IllegalArgumentException}. Further, the behavior of 312 * this operation is undefined if the specified collection is 313 * modified while the operation is in progress. 314 * 315 * @param c the collection to transfer elements into 316 * @return the number of elements transferred 317 * @throws UnsupportedOperationException if addition of elements 318 * is not supported by the specified collection 319 * @throws ClassCastException if the class of an element of this queue 320 * prevents it from being added to the specified collection 321 * @throws NullPointerException if the specified collection is null 322 * @throws IllegalArgumentException if the specified collection is this 323 * queue, or some property of an element of this queue prevents 324 * it from being added to the specified collection 325 */ 326 int drainTo(Collection!(E) c); 327 328 /** 329 * Removes at most the given number of available elements from 330 * this queue and adds them to the given collection. A failure 331 * encountered while attempting to add elements to 332 * collection {@code c} may result in elements being in neither, 333 * either or both collections when the associated exception is 334 * thrown. Attempts to drain a queue to itself result in 335 * {@code IllegalArgumentException}. Further, the behavior of 336 * this operation is undefined if the specified collection is 337 * modified while the operation is in progress. 338 * 339 * @param c the collection to transfer elements into 340 * @param maxElements the maximum number of elements to transfer 341 * @return the number of elements transferred 342 * @throws UnsupportedOperationException if addition of elements 343 * is not supported by the specified collection 344 * @throws ClassCastException if the class of an element of this queue 345 * prevents it from being added to the specified collection 346 * @throws NullPointerException if the specified collection is null 347 * @throws IllegalArgumentException if the specified collection is this 348 * queue, or some property of an element of this queue prevents 349 * it from being added to the specified collection 350 */ 351 int drainTo(Collection!(E) c, int maxElements); 352 } 353 354 355 // TODO: Tasks pending completion -@zxp at 12/31/2018, 10:15:14 AM 356 // 357 // abstract class AbstractBlockingQueue(E) : AbstractQueue!(E), BlockingQueue!(E) { 358 359 // /** 360 // * Constructor for use by subclasses. 361 // */ 362 // protected this() { 363 // } 364 365 // /** 366 // * Inserts the specified element into this queue if it is possible to do so 367 // * immediately without violating capacity restrictions, returning 368 // * {@code true} upon success and throwing an {@code IllegalStateException} 369 // * if no space is currently available. 370 // * 371 // * <p>This implementation returns {@code true} if {@code offer} succeeds, 372 // * else throws an {@code IllegalStateException}. 373 // * 374 // * @param e the element to add 375 // * @return {@code true} (as specified by {@link Collection#add}) 376 // * @throws IllegalStateException if the element cannot be added at this 377 // * time due to capacity restrictions 378 // * @throws ClassCastException if the class of the specified element 379 // * prevents it from being added to this queue 380 // * @throws NullPointerException if the specified element is null and 381 // * this queue does not permit null elements 382 // * @throws IllegalArgumentException if some property of this element 383 // * prevents it from being added to this queue 384 // */ 385 // override bool add(E e) { 386 // if (offer(e)) 387 // return true; 388 // else 389 // throw new IllegalStateException("Queue full"); 390 // } 391 392 // /** 393 // * Retrieves and removes the head of this queue. This method differs 394 // * from {@link #poll poll} only in that it throws an exception if this 395 // * queue is empty. 396 // * 397 // * <p>This implementation returns the result of {@code poll} 398 // * unless the queue is empty. 399 // * 400 // * @return the head of this queue 401 // * @throws NoSuchElementException if this queue is empty 402 // */ 403 // E remove() { 404 // E x = poll(); 405 // static if(is(E == class) || is(E == string)) { 406 // if (x is null) throw new NoSuchElementException(); 407 // } 408 // return x; 409 // } 410 411 // /** 412 // * Retrieves, but does not remove, the head of this queue. This method 413 // * differs from {@link #peek peek} only in that it throws an exception if 414 // * this queue is empty. 415 // * 416 // * <p>This implementation returns the result of {@code peek} 417 // * unless the queue is empty. 418 // * 419 // * @return the head of this queue 420 // * @throws NoSuchElementException if this queue is empty 421 // */ 422 // E element() { 423 // E x = peek(); 424 425 // static if(is(E == class) || is(E == string)) { 426 // if (x is null) throw new NoSuchElementException(); 427 // } 428 // return x; 429 // } 430 431 // /** 432 // * Removes all of the elements from this queue. 433 // * The queue will be empty after this call returns. 434 // * 435 // * <p>This implementation repeatedly invokes {@link #poll poll} until it 436 // * returns {@code null}. 437 // */ 438 // override void clear() { 439 // static if(is(E == class) || is(E == string)) { 440 // while (poll() !is null) {} 441 // } else { 442 // while(size()>0) { 443 // poll(); 444 // } 445 // } 446 // } 447 448 // /** 449 // * Adds all of the elements in the specified collection to this 450 // * queue. Attempts to addAll of a queue to itself result in 451 // * {@code IllegalArgumentException}. Further, the behavior of 452 // * this operation is undefined if the specified collection is 453 // * modified while the operation is in progress. 454 // * 455 // * <p>This implementation iterates over the specified collection, 456 // * and adds each element returned by the iterator to this 457 // * queue, in turn. A runtime exception encountered while 458 // * trying to add an element (including, in particular, a 459 // * {@code null} element) may result in only some of the elements 460 // * having been successfully added when the associated exception is 461 // * thrown. 462 // * 463 // * @param c collection containing elements to be added to this queue 464 // * @return {@code true} if this queue changed as a result of the call 465 // * @throws ClassCastException if the class of an element of the specified 466 // * collection prevents it from being added to this queue 467 // * @throws NullPointerException if the specified collection contains a 468 // * null element and this queue does not permit null elements, 469 // * or if the specified collection is null 470 // * @throws IllegalArgumentException if some property of an element of the 471 // * specified collection prevents it from being added to this 472 // * queue, or if the specified collection is this queue 473 // * @throws IllegalStateException if not all the elements can be added at 474 // * this time due to insertion restrictions 475 // * @see #add(Object) 476 // */ 477 // override bool addAll(Collection!E c) { 478 // if (c is null) 479 // throw new NullPointerException(); 480 // if (c is this) 481 // throw new IllegalArgumentException(); 482 // bool modified = false; 483 // foreach (E e ; c) { 484 // if (add(e)) modified = true; 485 // } 486 // return modified; 487 // } 488 489 // override bool opEquals(IObject o) { 490 // return opEquals(cast(Object) o); 491 // } 492 493 // override bool opEquals(Object o) { 494 // return super.opEquals(o); 495 // } 496 497 // override size_t toHash() @trusted nothrow { 498 // return super.toHash(); 499 // } 500 501 // override string toString() { 502 // return super.toString(); 503 // } 504 // }