1 /*
2  * Hunt - A refined core library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net/
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module hunt.concurrency.BlockingQueue;
13 
14 import hunt.collection.Collection;
15 import hunt.collection.Queue;
16 import core.time;
17 
18 /**
19  * A {@link Queue} that additionally supports operations that wait for
20  * the queue to become non-empty when retrieving an element, and wait
21  * for space to become available in the queue when storing an element.
22  *
23  * <p>{@code BlockingQueue} methods come in four forms, with different ways
24  * of handling operations that cannot be satisfied immediately, but may be
25  * satisfied at some point in the future:
26  * one throws an exception, the second returns a special value (either
27  * {@code null} or {@code false}, depending on the operation), the third
28  * blocks the current thread indefinitely until the operation can succeed,
29  * and the fourth blocks for only a given maximum time limit before giving
30  * up.  These methods are summarized in the following table:
31  *
32  * <table class="plain">
33  * <caption>Summary of BlockingQueue methods</caption>
34  *  <tr>
35  *    <td></td>
36  *    <th scope="col" style="font-weight:normal; font-style:italic">Throws exception</th>
37  *    <th scope="col" style="font-weight:normal; font-style:italic">Special value</th>
38  *    <th scope="col" style="font-weight:normal; font-style:italic">Blocks</th>
39  *    <th scope="col" style="font-weight:normal; font-style:italic">Times out</th>
40  *  </tr>
41  *  <tr>
42  *    <th scope="row" style="text-align:left">Insert</th>
43  *    <td>{@link #add(Object) add(e)}</td>
44  *    <td>{@link #offer(Object) offer(e)}</td>
45  *    <td>{@link #put(Object) put(e)}</td>
46  *    <td>{@link #offer(Object, long, TimeUnit) offer(e, time, unit)}</td>
47  *  </tr>
48  *  <tr>
49  *    <th scope="row" style="text-align:left">Remove</th>
50  *    <td>{@link #remove() remove()}</td>
51  *    <td>{@link #poll() poll()}</td>
52  *    <td>{@link #take() take()}</td>
53  *    <td>{@link #poll(long, TimeUnit) poll(time, unit)}</td>
54  *  </tr>
55  *  <tr>
56  *    <th scope="row" style="text-align:left">Examine</th>
57  *    <td>{@link #element() element()}</td>
58  *    <td>{@link #peek() peek()}</td>
59  *    <td style="font-style: italic">not applicable</td>
60  *    <td style="font-style: italic">not applicable</td>
61  *  </tr>
62  * </table>
63  *
64  * <p>A {@code BlockingQueue} does not accept {@code null} elements.
65  * Implementations throw {@code NullPointerException} on attempts
66  * to {@code add}, {@code put} or {@code offer} a {@code null}.  A
67  * {@code null} is used as a sentinel value to indicate failure of
68  * {@code poll} operations.
69  *
70  * <p>A {@code BlockingQueue} may be capacity bounded. At any given
71  * time it may have a {@code remainingCapacity} beyond which no
72  * additional elements can be {@code put} without blocking.
73  * A {@code BlockingQueue} without any intrinsic capacity constraints always
74  * reports a remaining capacity of {@code Integer.MAX_VALUE}.
75  *
76  * <p>{@code BlockingQueue} implementations are designed to be used
77  * primarily for producer-consumer queues, but additionally support
78  * the {@link Collection} interface.  So, for example, it is
79  * possible to remove an arbitrary element from a queue using
80  * {@code remove(x)}. However, such operations are in general
81  * <em>not</em> performed very efficiently, and are intended for only
82  * occasional use, such as when a queued message is cancelled.
83  *
84  * <p>{@code BlockingQueue} implementations are thread-safe.  All
85  * queuing methods achieve their effects atomically using internal
86  * locks or other forms of concurrency control. However, the
87  * <em>bulk</em> Collection operations {@code addAll},
88  * {@code containsAll}, {@code retainAll} and {@code removeAll} are
89  * <em>not</em> necessarily performed atomically unless specified
90  * otherwise in an implementation. So it is possible, for example, for
91  * {@code addAll(c)} to fail (throwing an exception) after adding
92  * only some of the elements in {@code c}.
93  *
94  * <p>A {@code BlockingQueue} does <em>not</em> intrinsically support
95  * any kind of &quot;close&quot; or &quot;shutdown&quot; operation to
96  * indicate that no more items will be added.  The needs and usage of
97  * such features tend to be implementation-dependent. For example, a
98  * common tactic is for producers to insert special
99  * <em>end-of-stream</em> or <em>poison</em> objects, that are
100  * interpreted accordingly when taken by consumers.
101  *
102  * <p>
103  * Usage example, based on a typical producer-consumer scenario.
104  * Note that a {@code BlockingQueue} can safely be used with multiple
105  * producers and multiple consumers.
106  * <pre> {@code
107  * class Producer implements Runnable {
108  *   private final BlockingQueue queue;
109  *   Producer(BlockingQueue q) { queue = q; }
110  *   void run() {
111  *     try {
112  *       while (true) { queue.put(produce()); }
113  *     } catch (InterruptedException ex) { ... handle ...}
114  *   }
115  *   Object produce() { ... }
116  * }
117  *
118  * class Consumer implements Runnable {
119  *   private final BlockingQueue queue;
120  *   Consumer(BlockingQueue q) { queue = q; }
121  *   void run() {
122  *     try {
123  *       while (true) { consume(queue.take()); }
124  *     } catch (InterruptedException ex) { ... handle ...}
125  *   }
126  *   void consume(Object x) { ... }
127  * }
128  *
129  * class Setup {
130  *   void main() {
131  *     BlockingQueue q = new SomeQueueImplementation();
132  *     Producer p = new Producer(q);
133  *     Consumer c1 = new Consumer(q);
134  *     Consumer c2 = new Consumer(q);
135  *     new Thread(p).start();
136  *     new Thread(c1).start();
137  *     new Thread(c2).start();
138  *   }
139  * }}</pre>
140  *
141  * <p>Memory consistency effects: As with other concurrent
142  * collections, actions in a thread prior to placing an object into a
143  * {@code BlockingQueue}
144  * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
145  * actions subsequent to the access or removal of that element from
146  * the {@code BlockingQueue} in another thread.
147  *
148  * <p>This interface is a member of the
149  * <a href="{@docRoot}/java.base/java/util/package-summary.html#CollectionsFramework">
150  * Java Collections Framework</a>.
151  *
152  * @since 1.5
153  * @author Doug Lea
154  * @param (E) the type of elements held in this queue
155  */
156 interface BlockingQueue(E) : Queue!(E) {
157     /**
158      * Inserts the specified element into this queue if it is possible to do
159      * so immediately without violating capacity restrictions, returning
160      * {@code true} upon success and throwing an
161      * {@code IllegalStateException} if no space is currently available.
162      * When using a capacity-restricted queue, it is generally preferable to
163      * use {@link #offer(Object) offer}.
164      *
165      * @param e the element to add
166      * @return {@code true} (as specified by {@link Collection#add})
167      * @throws IllegalStateException if the element cannot be added at this
168      *         time due to capacity restrictions
169      * @throws ClassCastException if the class of the specified element
170      *         prevents it from being added to this queue
171      * @throws NullPointerException if the specified element is null
172      * @throws IllegalArgumentException if some property of the specified
173      *         element prevents it from being added to this queue
174      */
175     bool add(E e);
176 
177     /**
178      * Inserts the specified element into this queue if it is possible to do
179      * so immediately without violating capacity restrictions, returning
180      * {@code true} upon success and {@code false} if no space is currently
181      * available.  When using a capacity-restricted queue, this method is
182      * generally preferable to {@link #add}, which can fail to insert an
183      * element only by throwing an exception.
184      *
185      * @param e the element to add
186      * @return {@code true} if the element was added to this queue, else
187      *         {@code false}
188      * @throws ClassCastException if the class of the specified element
189      *         prevents it from being added to this queue
190      * @throws NullPointerException if the specified element is null
191      * @throws IllegalArgumentException if some property of the specified
192      *         element prevents it from being added to this queue
193      */
194     bool offer(E e);
195 
196     /**
197      * Inserts the specified element into this queue, waiting if necessary
198      * for space to become available.
199      *
200      * @param e the element to add
201      * @throws InterruptedException if interrupted while waiting
202      * @throws ClassCastException if the class of the specified element
203      *         prevents it from being added to this queue
204      * @throws NullPointerException if the specified element is null
205      * @throws IllegalArgumentException if some property of the specified
206      *         element prevents it from being added to this queue
207      */
208     void put(E e);
209 
210     /**
211      * Inserts the specified element into this queue, waiting up to the
212      * specified wait time if necessary for space to become available.
213      *
214      * @param e the element to add
215      * @param timeout how long to wait before giving up, in units of
216      *        {@code unit}
217      * @param unit a {@code TimeUnit} determining how to interpret the
218      *        {@code timeout} parameter
219      * @return {@code true} if successful, or {@code false} if
220      *         the specified waiting time elapses before space is available
221      * @throws InterruptedException if interrupted while waiting
222      * @throws ClassCastException if the class of the specified element
223      *         prevents it from being added to this queue
224      * @throws NullPointerException if the specified element is null
225      * @throws IllegalArgumentException if some property of the specified
226      *         element prevents it from being added to this queue
227      */
228     bool offer(E e, Duration timeout);
229 
230     /**
231      * Retrieves and removes the head of this queue, waiting if necessary
232      * until an element becomes available.
233      *
234      * @return the head of this queue
235      * @throws InterruptedException if interrupted while waiting
236      */
237     E take();
238 
239     /**
240      * Retrieves and removes the head of this queue, waiting up to the
241      * specified wait time if necessary for an element to become available.
242      *
243      * @param timeout how long to wait before giving up, in units of
244      *        {@code unit}
245      * @param unit a {@code TimeUnit} determining how to interpret the
246      *        {@code timeout} parameter
247      * @return the head of this queue, or {@code null} if the
248      *         specified waiting time elapses before an element is available
249      * @throws InterruptedException if interrupted while waiting
250      */
251     E poll(Duration timeout);
252 
253     alias poll = Queue!E.poll;
254 
255     /**
256      * Returns the number of additional elements that this queue can ideally
257      * (in the absence of memory or resource constraints) accept without
258      * blocking, or {@code Integer.MAX_VALUE} if there is no intrinsic
259      * limit.
260      *
261      * <p>Note that you <em>cannot</em> always tell if an attempt to insert
262      * an element will succeed by inspecting {@code remainingCapacity}
263      * because it may be the case that another thread is about to
264      * insert or remove an element.
265      *
266      * @return the remaining capacity
267      */
268     int remainingCapacity();
269 
270     /**
271      * Removes a single instance of the specified element from this queue,
272      * if it is present.  More formally, removes an element {@code e} such
273      * that {@code o.equals(e)}, if this queue contains one or more such
274      * elements.
275      * Returns {@code true} if this queue contained the specified element
276      * (or equivalently, if this queue changed as a result of the call).
277      *
278      * @param o element to be removed from this queue, if present
279      * @return {@code true} if this queue changed as a result of the call
280      * @throws ClassCastException if the class of the specified element
281      *         is incompatible with this queue
282      * (<a href="{@docRoot}/java.base/java/util/Collection.html#optional-restrictions">optional</a>)
283      * @throws NullPointerException if the specified element is null
284      * (<a href="{@docRoot}/java.base/java/util/Collection.html#optional-restrictions">optional</a>)
285      */
286     bool remove(E o);
287 
288     /**
289      * Returns {@code true} if this queue contains the specified element.
290      * More formally, returns {@code true} if and only if this queue contains
291      * at least one element {@code e} such that {@code o.equals(e)}.
292      *
293      * @param o object to be checked for containment in this queue
294      * @return {@code true} if this queue contains the specified element
295      * @throws ClassCastException if the class of the specified element
296      *         is incompatible with this queue
297      * (<a href="{@docRoot}/java.base/java/util/Collection.html#optional-restrictions">optional</a>)
298      * @throws NullPointerException if the specified element is null
299      * (<a href="{@docRoot}/java.base/java/util/Collection.html#optional-restrictions">optional</a>)
300      */
301     bool contains(E o);
302 
303     /**
304      * Removes all available elements from this queue and adds them
305      * to the given collection.  This operation may be more
306      * efficient than repeatedly polling this queue.  A failure
307      * encountered while attempting to add elements to
308      * collection {@code c} may result in elements being in neither,
309      * either or both collections when the associated exception is
310      * thrown.  Attempts to drain a queue to itself result in
311      * {@code IllegalArgumentException}. Further, the behavior of
312      * this operation is undefined if the specified collection is
313      * modified while the operation is in progress.
314      *
315      * @param c the collection to transfer elements into
316      * @return the number of elements transferred
317      * @throws UnsupportedOperationException if addition of elements
318      *         is not supported by the specified collection
319      * @throws ClassCastException if the class of an element of this queue
320      *         prevents it from being added to the specified collection
321      * @throws NullPointerException if the specified collection is null
322      * @throws IllegalArgumentException if the specified collection is this
323      *         queue, or some property of an element of this queue prevents
324      *         it from being added to the specified collection
325      */
326     int drainTo(Collection!(E) c);
327 
328     /**
329      * Removes at most the given number of available elements from
330      * this queue and adds them to the given collection.  A failure
331      * encountered while attempting to add elements to
332      * collection {@code c} may result in elements being in neither,
333      * either or both collections when the associated exception is
334      * thrown.  Attempts to drain a queue to itself result in
335      * {@code IllegalArgumentException}. Further, the behavior of
336      * this operation is undefined if the specified collection is
337      * modified while the operation is in progress.
338      *
339      * @param c the collection to transfer elements into
340      * @param maxElements the maximum number of elements to transfer
341      * @return the number of elements transferred
342      * @throws UnsupportedOperationException if addition of elements
343      *         is not supported by the specified collection
344      * @throws ClassCastException if the class of an element of this queue
345      *         prevents it from being added to the specified collection
346      * @throws NullPointerException if the specified collection is null
347      * @throws IllegalArgumentException if the specified collection is this
348      *         queue, or some property of an element of this queue prevents
349      *         it from being added to the specified collection
350      */
351     int drainTo(Collection!(E) c, int maxElements);
352 }
353 
354 
355 // TODO: Tasks pending completion -@zxp at 12/31/2018, 10:15:14 AM
356 // 
357 // abstract class AbstractBlockingQueue(E) : AbstractQueue!(E), BlockingQueue!(E) {
358 
359 //     /**
360 //      * Constructor for use by subclasses.
361 //      */
362 //     protected this() {
363 //     }
364 
365 //     /**
366 //      * Inserts the specified element into this queue if it is possible to do so
367 //      * immediately without violating capacity restrictions, returning
368 //      * {@code true} upon success and throwing an {@code IllegalStateException}
369 //      * if no space is currently available.
370 //      *
371 //      * <p>This implementation returns {@code true} if {@code offer} succeeds,
372 //      * else throws an {@code IllegalStateException}.
373 //      *
374 //      * @param e the element to add
375 //      * @return {@code true} (as specified by {@link Collection#add})
376 //      * @throws IllegalStateException if the element cannot be added at this
377 //      *         time due to capacity restrictions
378 //      * @throws ClassCastException if the class of the specified element
379 //      *         prevents it from being added to this queue
380 //      * @throws NullPointerException if the specified element is null and
381 //      *         this queue does not permit null elements
382 //      * @throws IllegalArgumentException if some property of this element
383 //      *         prevents it from being added to this queue
384 //      */
385 //     override bool add(E e) {
386 //         if (offer(e))
387 //             return true;
388 //         else
389 //             throw new IllegalStateException("Queue full");
390 //     }
391 
392 //     /**
393 //      * Retrieves and removes the head of this queue.  This method differs
394 //      * from {@link #poll poll} only in that it throws an exception if this
395 //      * queue is empty.
396 //      *
397 //      * <p>This implementation returns the result of {@code poll}
398 //      * unless the queue is empty.
399 //      *
400 //      * @return the head of this queue
401 //      * @throws NoSuchElementException if this queue is empty
402 //      */
403 //     E remove() {
404 //         E x = poll();
405 //         static if(is(E == class) || is(E == string)) {
406 //             if (x is null) throw new NoSuchElementException();
407 //         }
408 //         return x;
409 //     }
410 
411 //     /**
412 //      * Retrieves, but does not remove, the head of this queue.  This method
413 //      * differs from {@link #peek peek} only in that it throws an exception if
414 //      * this queue is empty.
415 //      *
416 //      * <p>This implementation returns the result of {@code peek}
417 //      * unless the queue is empty.
418 //      *
419 //      * @return the head of this queue
420 //      * @throws NoSuchElementException if this queue is empty
421 //      */
422 //     E element() {
423 //         E x = peek();
424         
425 //         static if(is(E == class) || is(E == string)) {
426 //             if (x is null) throw new NoSuchElementException();
427 //         }
428 //         return x;
429 //     }
430 
431 //     /**
432 //      * Removes all of the elements from this queue.
433 //      * The queue will be empty after this call returns.
434 //      *
435 //      * <p>This implementation repeatedly invokes {@link #poll poll} until it
436 //      * returns {@code null}.
437 //      */
438 //     override void clear() {
439 //         static if(is(E == class) || is(E == string)) {
440 //             while (poll() !is null) {}
441 //         } else {
442 //             while(size()>0) {
443 //                 poll();
444 //             }
445 //         }
446 //     }
447 
448 //     /**
449 //      * Adds all of the elements in the specified collection to this
450 //      * queue.  Attempts to addAll of a queue to itself result in
451 //      * {@code IllegalArgumentException}. Further, the behavior of
452 //      * this operation is undefined if the specified collection is
453 //      * modified while the operation is in progress.
454 //      *
455 //      * <p>This implementation iterates over the specified collection,
456 //      * and adds each element returned by the iterator to this
457 //      * queue, in turn.  A runtime exception encountered while
458 //      * trying to add an element (including, in particular, a
459 //      * {@code null} element) may result in only some of the elements
460 //      * having been successfully added when the associated exception is
461 //      * thrown.
462 //      *
463 //      * @param c collection containing elements to be added to this queue
464 //      * @return {@code true} if this queue changed as a result of the call
465 //      * @throws ClassCastException if the class of an element of the specified
466 //      *         collection prevents it from being added to this queue
467 //      * @throws NullPointerException if the specified collection contains a
468 //      *         null element and this queue does not permit null elements,
469 //      *         or if the specified collection is null
470 //      * @throws IllegalArgumentException if some property of an element of the
471 //      *         specified collection prevents it from being added to this
472 //      *         queue, or if the specified collection is this queue
473 //      * @throws IllegalStateException if not all the elements can be added at
474 //      *         this time due to insertion restrictions
475 //      * @see #add(Object)
476 //      */
477 //     override bool addAll(Collection!E c) {
478 //         if (c is null)
479 //             throw new NullPointerException();
480 //         if (c is this)
481 //             throw new IllegalArgumentException();
482 //         bool modified = false;
483 //         foreach (E e ; c) {
484 //             if (add(e)) modified = true;
485 //         }
486 //         return modified;
487 //     }
488 
489 //     override bool opEquals(IObject o) {
490 //         return opEquals(cast(Object) o);
491 //     }
492     
493 //     override bool opEquals(Object o) {
494 //         return super.opEquals(o);
495 //     }
496 
497 //     override size_t toHash() @trusted nothrow {
498 //         return super.toHash();
499 //     }
500 
501 //     override string toString() {
502 //         return super.toString();
503 //     }
504 // }