1 /*
2  * Hunt - A refined core library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net/
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module hunt.concurrency.LinkedBlockingQueue;
13 
14 import hunt.concurrency.atomic.AtomicHelper;
15 import hunt.concurrency.Helpers;
16 import hunt.concurrency.BlockingQueue;
17 
18 import hunt.collection.AbstractQueue;
19 import hunt.collection.Collection;
20 import hunt.collection.Iterator;
21 import hunt.util.DateTime;
22 import hunt.Exceptions;
23 import hunt.Functions;
24 import hunt.Object;
25 
26 // import core.atomic;
27 import core.sync.mutex;
28 import core.sync.condition;
29 import core.time;
30 
31 import hunt.logging.ConsoleLogger;
32 
33 /**
34  * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
35  * linked nodes.
36  * This queue orders elements FIFO (first-in-first-out).
37  * The <em>head</em> of the queue is that element that has been on the
38  * queue the longest time.
39  * The <em>tail</em> of the queue is that element that has been on the
40  * queue the shortest time. New elements
41  * are inserted at the tail of the queue, and the queue retrieval
42  * operations obtain elements at the head of the queue.
43  * Linked queues typically have higher throughput than array-based queues but
44  * less predictable performance in most concurrent applications.
45  *
46  * <p>The optional capacity bound constructor argument serves as a
47  * way to prevent excessive queue expansion. The capacity, if unspecified,
48  * is equal to {@link Integer#MAX_VALUE}.  Linked nodes are
49  * dynamically created upon each insertion unless this would bring the
50  * queue above capacity.
51  *
52  * <p>This class and its iterator implement all of the <em>optional</em>
53  * methods of the {@link Collection} and {@link Iterator} interfaces.
54  *
55  * <p>This class is a member of the
56  * <a href="{@docRoot}/java.base/java/util/package-summary.html#CollectionsFramework">
57  * Java Collections Framework</a>.
58  *
59  * @since 1.5
60  * @author Doug Lea
61  * @param (E) the type of elements held in this queue
62  */
63 class LinkedBlockingQueue(E) : AbstractQueue!(E), BlockingQueue!(E) {
64 
65     /*
66      * A variant of the "two lock queue" algorithm.  The putLock gates
67      * entry to put (and offer), and has an associated condition for
68      * waiting puts.  Similarly for the takeLock.  The "count" field
69      * that they both rely on is maintained as an atomic to avoid
70      * needing to get both locks in most cases. Also, to minimize need
71      * for puts to get takeLock and vice-versa, cascading notifies are
72      * used. When a put notices that it has enabled at least one take,
73      * it signals taker. That taker in turn signals others if more
74      * items have been entered since the signal. And symmetrically for
75      * takes signalling puts. Operations such as remove(Object) and
76      * iterators acquire both locks.
77      *
78      * Visibility between writers and readers is provided as follows:
79      *
80      * Whenever an element is enqueued, the putLock is acquired and
81      * count updated.  A subsequent reader guarantees visibility to the
82      * enqueued Node by either acquiring the putLock (via fullyLock)
83      * or by acquiring the takeLock, and then reading n = atomicLoad(count);
84      * this gives visibility to the first n items.
85      *
86      * To implement weakly consistent iterators, it appears we need to
87      * keep all Nodes GC-reachable from a predecessor dequeued Node.
88      * That would cause two problems:
89      * - allow a rogue Iterator to cause unbounded memory retention
90      * - cause cross-generational linking of old Nodes to new Nodes if
91      *   a Node was tenured while live, which generational GCs have a
92      *   hard time dealing with, causing repeated major collections.
93      * However, only non-deleted Nodes need to be reachable from
94      * dequeued Nodes, and reachability does not necessarily have to
95      * be of the kind understood by the GC.  We use the trick of
96      * linking a Node that has just been dequeued to itself.  Such a
97      * self-link implicitly means to advance to head.next.
98      */
99 
100     /**
101      * Linked list node class.
102      */
103     static class Node(E) {
104         E item;
105 
106         /**
107          * One of:
108          * - the real successor Node
109          * - this Node, meaning the successor is head.next
110          * - null, meaning there is no successor (this is the last node)
111          */
112         Node!(E) next;
113 
114         this(E x) { item = x; }
115     }
116 
117     /** The capacity bound, or int.max if none */
118     private int capacity;
119 
120     /** Current number of elements */
121     private shared(int) count;
122 
123     /**
124      * Head of linked list.
125      * Invariant: head.item is null
126      */
127     private Node!(E) head;
128 
129     /**
130      * Tail of linked list.
131      * Invariant: last.next is null
132      */
133     private Node!(E) last;
134 
135     /** Lock held by take, poll, etc */
136     private Mutex takeLock;
137 
138     /** Wait queue for waiting takes */
139     private Condition notEmpty;
140 
141     /** Lock held by put, offer, etc */
142     private Mutex putLock;
143 
144     /** Wait queue for waiting puts */
145     private Condition notFull;
146 
147     private void initilize() {
148         takeLock = new Mutex();
149         putLock = new Mutex();
150         notEmpty = new Condition(takeLock);
151         notFull = new Condition(putLock);
152     }
153 
154     /**
155      * Signals a waiting take. Called only from put/offer (which do not
156      * otherwise ordinarily lock takeLock.)
157      */
158     private void signalNotEmpty() {
159         Mutex takeLock = this.takeLock;
160         takeLock.lock();
161         // scope(exit) takeLock.unlock();
162         try {
163             notEmpty.notify();
164         } finally {
165             takeLock.unlock();
166         }
167     }
168 
169     /**
170      * Signals a waiting put. Called only from take/poll.
171      */
172     private void signalNotFull() {
173         Mutex putLock = this.putLock;
174         putLock.lock();
175         try {
176             notFull.notify();
177         } finally {
178             putLock.unlock();
179         }
180     }
181 
182     /**
183      * Links node at end of queue.
184      *
185      * @param node the node
186      */
187     private void enqueue(Node!(E) node) {
188         // assert putLock.isHeldByCurrentThread();
189         // assert last.next is null;
190         last = last.next = node;
191     }
192 
193     /**
194      * Removes a node from head of queue.
195      *
196      * @return the node
197      */
198     private E dequeue() {
199         // assert takeLock.isHeldByCurrentThread();
200         // assert head.item is null;
201         Node!(E) h = head;
202         Node!(E) first = h.next;
203         h.next = h; // help GC
204         head = first;
205         E x = first.item;
206         first.item = E.init;
207         return x;
208     }
209 
210     /**
211      * Locks to prevent both puts and takes.
212      */
213     void fullyLock() {
214         putLock.lock();
215         takeLock.lock();
216     }
217 
218     /**
219      * Unlocks to allow both puts and takes.
220      */
221     void fullyUnlock() {
222         takeLock.unlock();
223         putLock.unlock();
224     }
225 
226     /**
227      * Creates a {@code LinkedBlockingQueue} with a capacity of
228      * {@link Integer#MAX_VALUE}.
229      */
230     this() {
231         this(int.max);
232     }
233 
234     /**
235      * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
236      *
237      * @param capacity the capacity of this queue
238      * @throws IllegalArgumentException if {@code capacity} is not greater
239      *         than zero
240      */
241     this(int capacity) {
242         if (capacity <= 0) throw new IllegalArgumentException();
243         this.capacity = capacity;
244         last = head = new Node!(E)(E.init);
245         initilize();
246     }
247 
248     /**
249      * Creates a {@code LinkedBlockingQueue} with a capacity of
250      * {@link Integer#MAX_VALUE}, initially containing the elements of the
251      * given collection,
252      * added in traversal order of the collection's iterator.
253      *
254      * @param c the collection of elements to initially contain
255      * @throws NullPointerException if the specified collection or any
256      *         of its elements are null
257      */
258     this(Collection!(E) c) {
259         this(int.max);
260         Mutex putLock = this.putLock;
261         putLock.lock(); // Never contended, but necessary for visibility
262         try {
263             int n = 0;
264             foreach (E e ; c) {
265                 static if(is(E == class) || is(E == string)) {
266                     if (e is null) throw new NullPointerException();
267                 }
268                 if (n == capacity)
269                     throw new IllegalStateException("Queue full");
270                 enqueue(new Node!(E)(e));
271                 ++n;
272             }
273             count = n;
274         } finally {
275             putLock.unlock();
276         }
277     }
278 
279     // this doc comment is overridden to remove the reference to collections
280     // greater in size than int.max
281     /**
282      * Returns the number of elements in this queue.
283      *
284      * @return the number of elements in this queue
285      */
286     override int size() {
287         return count;
288     }
289 
290     // this doc comment is a modified copy of the inherited doc comment,
291     // without the reference to unlimited queues.
292     /**
293      * Returns the number of additional elements that this queue can ideally
294      * (in the absence of memory or resource constraints) accept without
295      * blocking. This is always equal to the initial capacity of this queue
296      * less the current {@code size} of this queue.
297      *
298      * <p>Note that you <em>cannot</em> always tell if an attempt to insert
299      * an element will succeed by inspecting {@code remainingCapacity}
300      * because it may be the case that another thread is about to
301      * insert or remove an element.
302      */
303     int remainingCapacity() {
304         return capacity - count;
305     }
306 
307     override bool add(E e) {
308         return super.add(e);
309     }
310 
311     /**
312      * Inserts the specified element at the tail of this queue, waiting if
313      * necessary for space to become available.
314      *
315      * @throws InterruptedException {@inheritDoc}
316      * @throws NullPointerException {@inheritDoc}
317      */
318     void put(E e) {
319         static if(is(E == class) || is(E == string)) {
320             if (e is null) throw new NullPointerException();
321         }
322         int c;
323         Node!(E) node = new Node!(E)(e);
324         Mutex putLock = this.putLock;
325         putLock.lock();
326         try {
327             /*
328              * Note that count is used in wait guard even though it is
329              * not protected by lock. This works because count can
330              * only decrease at this point (all other puts are shut
331              * out by lock), and we (or some other waiting put) are
332              * signalled if it ever changes from capacity. Similarly
333              * for all other uses of count in other wait guards.
334              */
335             while (count == capacity) {
336                 notFull.wait();
337             }
338             enqueue(node);
339             c = AtomicHelper.getAndIncrement(count);
340             if (c + 1 < capacity)
341                 notFull.notify();
342         } finally {
343             putLock.unlock();
344         }
345         if (c == 0)
346             signalNotEmpty();
347     }
348 
349     /**
350      * Inserts the specified element at the tail of this queue, waiting if
351      * necessary up to the specified wait time for space to become available.
352      *
353      * @return {@code true} if successful, or {@code false} if
354      *         the specified waiting time elapses before space is available
355      * @throws InterruptedException {@inheritDoc}
356      * @throws NullPointerException {@inheritDoc}
357      */
358     bool offer(E e, Duration timeout) {
359         static if(is(E == class) || is(E == string)) {
360             if (e is null) throw new NullPointerException();
361         }
362 
363         int c;
364         Mutex putLock = this.putLock;
365         putLock.lock();
366         try {
367             while (count == capacity) {
368                 // if (nanos <= 0L)
369                 //     return false;
370                 // nanos = notFull.wait(nanos);
371                 if(!notFull.wait(timeout)) return false;
372             }
373             enqueue(new Node!(E)(e));
374             c = AtomicHelper.getAndIncrement(count);
375             if (c + 1 < capacity)
376                 notFull.notify();
377         } finally {
378             putLock.unlock();
379         }
380         if (c == 0)
381             signalNotEmpty();
382         return true;
383     }
384 
385     /**
386      * Inserts the specified element at the tail of this queue if it is
387      * possible to do so immediately without exceeding the queue's capacity,
388      * returning {@code true} upon success and {@code false} if this queue
389      * is full.
390      * When using a capacity-restricted queue, this method is generally
391      * preferable to method {@link BlockingQueue#add add}, which can fail to
392      * insert an element only by throwing an exception.
393      *
394      * @throws NullPointerException if the specified element is null
395      */
396     bool offer(E e) {
397         static if(is(E == class) || is(E == string)) {
398             if (e is null) throw new NullPointerException();
399         }
400         // int count = this.count;
401         if (count  == capacity)
402             return false;
403         int c;
404         Node!(E) node = new Node!(E)(e);
405         Mutex putLock = this.putLock;
406         putLock.lock();
407         try {
408             if (count == capacity)
409                 return false;
410             enqueue(node);
411             c = AtomicHelper.getAndIncrement(count);
412             if (c + 1 < capacity)
413                 notFull.notify();
414         } finally {
415             putLock.unlock();
416         }
417         
418         if (c == 0)
419             signalNotEmpty();
420         return true;
421     }
422 
423     E take() {
424         E x;
425         int c;
426         Mutex takeLock = this.takeLock;
427         takeLock.lock();
428         try {
429             while (count == 0) {
430                 notEmpty.wait();
431             }
432             x = dequeue();
433             c = AtomicHelper.getAndDecrement(count);
434             if (c > 1)
435                 notEmpty.notify();
436         } finally {
437             takeLock.unlock();
438         }
439         if (c == capacity)
440             signalNotFull();
441         return x;
442     }
443 
444     E poll(Duration timeout) {
445         E x;
446         int c;
447         // int count = this.count;
448         Mutex takeLock = this.takeLock;
449         takeLock.lock();
450         try {
451             while (count == 0) {
452                 if(!notFull.wait(timeout)) return E.init;
453             }
454             x = dequeue();
455             c = AtomicHelper.getAndDecrement(count);
456             if (c > 1)
457                 notEmpty.notify();
458         } finally {
459             takeLock.unlock();
460         }
461         if (c == capacity)
462             signalNotFull();
463         return x;
464     }
465 
466     E poll() {
467         // int count = this.count;
468         if (count == 0)
469             throw new NoSuchElementException();
470 
471         E x;
472         int c;
473         Mutex takeLock = this.takeLock;
474         takeLock.lock();
475         try {
476             if (count == 0)
477                 return E.init;
478             x = dequeue();
479             c = AtomicHelper.getAndDecrement(count);
480             if (c > 1)
481                 notEmpty.notify();
482         } finally {
483             takeLock.unlock();
484         }
485         if (c == capacity)
486             signalNotFull();
487         return x;
488     }
489 
490     E peek() {
491         // if (atomicLoad(count) == 0)
492         //     return E.init;
493         
494         if (count == 0)
495             throw new NoSuchElementException();
496 
497         Mutex takeLock = this.takeLock;
498         takeLock.lock();
499         try {
500             return (count > 0) ? head.next.item : E.init;
501         } finally {
502             takeLock.unlock();
503         }
504     }
505 
506     /**
507      * Unlinks interior Node p with predecessor pred.
508      */
509     private void unlink(Node!(E) p, Node!(E) pred) {
510         // assert putLock.isHeldByCurrentThread();
511         // assert takeLock.isHeldByCurrentThread();
512         // p.next is not changed, to allow iterators that are
513         // traversing p to maintain their weak-consistency guarantee.
514         p.item = E.init;
515         pred.next = p.next;
516         if (last == p)
517             last = pred;
518         if (AtomicHelper.getAndDecrement(count) == capacity)
519             notFull.notify();
520     }
521 
522     /**
523      * Removes a single instance of the specified element from this queue,
524      * if it is present.  More formally, removes an element {@code e} such
525      * that {@code o.equals(e)}, if this queue contains one or more such
526      * elements.
527      * Returns {@code true} if this queue contained the specified element
528      * (or equivalently, if this queue changed as a result of the call).
529      *
530      * @param o element to be removed from this queue, if present
531      * @return {@code true} if this queue changed as a result of the call
532      */
533     override bool remove(E o) {
534         static if(is(E == class) || is(E == string)) {
535             if (o is null) return false;
536         }
537 
538         fullyLock();
539         try {
540             for (Node!(E) pred = head, p = pred.next;
541                  p !is null;
542                  pred = p, p = p.next) {
543                 if (o == p.item) {
544                     unlink(p, pred);
545                     return true;
546                 }
547             }
548             return false;
549         } finally {
550             fullyUnlock();
551         }
552     }
553 
554     /**
555      * Returns {@code true} if this queue contains the specified element.
556      * More formally, returns {@code true} if and only if this queue contains
557      * at least one element {@code e} such that {@code o.equals(e)}.
558      *
559      * @param o object to be checked for containment in this queue
560      * @return {@code true} if this queue contains the specified element
561      */
562     override bool contains(E o) {
563         static if(is(E == class) || is(E == string)) {
564             if (o is null) return false;
565         }
566         fullyLock();
567         try {
568             for (Node!(E) p = head.next; p !is null; p = p.next)
569                 if (o == p.item)
570                     return true;
571             return false;
572         } finally {
573             fullyUnlock();
574         }
575     }
576 
577     /**
578      * Returns an array containing all of the elements in this queue, in
579      * proper sequence.
580      *
581      * <p>The returned array will be "safe" in that no references to it are
582      * maintained by this queue.  (In other words, this method must allocate
583      * a new array).  The caller is thus free to modify the returned array.
584      *
585      * <p>This method acts as bridge between array-based and collection-based
586      * APIs.
587      *
588      * @return an array containing all of the elements in this queue
589      */
590     override E[] toArray() {
591         fullyLock();
592         try {
593             int size = count;
594             E[] a = new E[size];
595             int k = 0;
596             for (Node!(E) p = head.next; p !is null; p = p.next)
597                 a[k++] = p.item;
598             return a;
599         } finally {
600             fullyUnlock();
601         }
602     }
603 
604     /**
605      * Returns an array containing all of the elements in this queue, in
606      * proper sequence; the runtime type of the returned array is that of
607      * the specified array.  If the queue fits in the specified array, it
608      * is returned therein.  Otherwise, a new array is allocated with the
609      * runtime type of the specified array and the size of this queue.
610      *
611      * <p>If this queue fits in the specified array with room to spare
612      * (i.e., the array has more elements than this queue), the element in
613      * the array immediately following the end of the queue is set to
614      * {@code null}.
615      *
616      * <p>Like the {@link #toArray()} method, this method acts as bridge between
617      * array-based and collection-based APIs.  Further, this method allows
618      * precise control over the runtime type of the output array, and may,
619      * under certain circumstances, be used to save allocation costs.
620      *
621      * <p>Suppose {@code x} is a queue known to contain only strings.
622      * The following code can be used to dump the queue into a newly
623      * allocated array of {@code string}:
624      *
625      * <pre> {@code string[] y = x.toArray(new string[0]);}</pre>
626      *
627      * Note that {@code toArray(new Object[0])} is identical in function to
628      * {@code toArray()}.
629      *
630      * @param a the array into which the elements of the queue are to
631      *          be stored, if it is big enough; otherwise, a new array of the
632      *          same runtime type is allocated for this purpose
633      * @return an array containing all of the elements in this queue
634      * @throws ArrayStoreException if the runtime type of the specified array
635      *         is not a supertype of the runtime type of every element in
636      *         this queue
637      * @throws NullPointerException if the specified array is null
638      */
639 
640     // !(T) T[] toArray(T[] a) {
641     //     fullyLock();
642     //     try {
643     //         int size = atomicLoad(count);
644     //         if (a.length < size)
645     //             a = (T[])java.lang.reflect.Array.newInstance
646     //                 (a.getClass().getComponentType(), size);
647 
648     //         int k = 0;
649     //         for (Node!(E) p = head.next; p !is null; p = p.next)
650     //             a[k++] = (T)p.item;
651     //         if (a.length > k)
652     //             a[k] = null;
653     //         return a;
654     //     } finally {
655     //         fullyUnlock();
656     //     }
657     // }
658 
659     override string toString() {
660         return Helpers.collectionToString(this);
661     }
662 
663     /**
664      * Atomically removes all of the elements from this queue.
665      * The queue will be empty after this call returns.
666      */
667     override void clear() {
668         fullyLock();
669         try {
670             for (Node!(E) p, h = head; (p = h.next) !is null; h = p) {
671                 h.next = h;
672                 p.item = E.init;
673             }
674             head = last;
675             // assert head.item is null && head.next is null;
676             int c = count;
677             AtomicHelper.store(count, 0);
678             if (c == capacity)
679                 notFull.notify();
680         } finally {
681             fullyUnlock();
682         }
683     }
684 
685     /**
686      * @throws UnsupportedOperationException {@inheritDoc}
687      * @throws ClassCastException            {@inheritDoc}
688      * @throws NullPointerException          {@inheritDoc}
689      * @throws IllegalArgumentException      {@inheritDoc}
690      */
691     int drainTo(Collection!(E) c) {
692         return drainTo(c, int.max);
693     }
694 
695     /**
696      * @throws UnsupportedOperationException {@inheritDoc}
697      * @throws ClassCastException            {@inheritDoc}
698      * @throws NullPointerException          {@inheritDoc}
699      * @throws IllegalArgumentException      {@inheritDoc}
700      */
701     int drainTo(Collection!(E) c, int maxElements) {
702         // Objects.requireNonNull(c);
703         if (c == this)
704             throw new IllegalArgumentException();
705         if (maxElements <= 0)
706             return 0;
707         bool canSignalNotFull = false;
708         Mutex takeLock = this.takeLock;
709         takeLock.lock();
710         try {
711             import std.algorithm : min;
712             int n = min(maxElements, count);
713             // count.get provides visibility to first n Nodes
714             Node!(E) h = head;
715             int i = 0;
716             try {
717                 while (i < n) {
718                     Node!(E) p = h.next;
719                     c.add(p.item);
720                     p.item = E.init;
721                     h.next = h;
722                     h = p;
723                     ++i;
724                 }
725                 return n;
726             } finally {
727                 // Restore invariants even if c.add() threw
728                 if (i > 0) {
729                     // assert h.item is null;
730                     head = h;
731                     int ct = AtomicHelper.getAndAdd(count, i);
732                     canSignalNotFull = ((ct - i) == capacity);
733                 }
734             }
735         } finally {
736             takeLock.unlock();
737             if (canSignalNotFull)
738                 signalNotFull();
739         }
740     }
741 
742     /**
743      * Used for any element traversal that is not entirely under lock.
744      * Such traversals must handle both:
745      * - dequeued nodes (p.next == p)
746      * - (possibly multiple) interior removed nodes (p.item is null)
747      */
748     Node!(E) succ(Node!(E) p) {
749         if (p == (p = p.next))
750             p = head.next;
751         return p;
752     }
753 
754     /**
755      * Returns an iterator over the elements in this queue in proper sequence.
756      * The elements will be returned in order from first (head) to last (tail).
757      *
758      * <p>The returned iterator is
759      * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
760      *
761      * @return an iterator over the elements in this queue in proper sequence
762      */
763     // Iterator!(E) iterator() {
764     //     return new Itr();
765     // }
766 
767     /**
768      * Weakly-consistent iterator.
769      *
770      * Lazily updated ancestor field provides expected O(1) remove(),
771      * but still O(n) in the worst case, whenever the saved ancestor
772      * is concurrently deleted.
773      */
774     // private class Itr : Iterator!(E) {
775     //     private Node!(E) next;           // Node holding nextItem
776     //     private E nextItem;             // next item to hand out
777     //     private Node!(E) lastRet;
778     //     private Node!(E) ancestor;       // Helps unlink lastRet on remove()
779 
780     //     Itr() {
781     //         fullyLock();
782     //         try {
783     //             if ((next = head.next) !is null)
784     //                 nextItem = next.item;
785     //         } finally {
786     //             fullyUnlock();
787     //         }
788     //     }
789 
790     //     bool hasNext() {
791     //         return next !is null;
792     //     }
793 
794     //     E next() {
795     //         Node!(E) p;
796     //         if ((p = next) is null)
797     //             throw new NoSuchElementException();
798     //         lastRet = p;
799     //         E x = nextItem;
800     //         fullyLock();
801     //         try {
802     //             E e = null;
803     //             for (p = p.next; p !is null && (e = p.item) is null; )
804     //                 p = succ(p);
805     //             next = p;
806     //             nextItem = e;
807     //         } finally {
808     //             fullyUnlock();
809     //         }
810     //         return x;
811     //     }
812 
813     //     void forEachRemaining(Consumer!(E) action) {
814     //         // A variant of forEachFrom
815     //         Objects.requireNonNull(action);
816     //         Node!(E) p;
817     //         if ((p = next) is null) return;
818     //         lastRet = p;
819     //         next = null;
820     //         int batchSize = 64;
821     //         Object[] es = null;
822     //         int n, len = 1;
823     //         do {
824     //             fullyLock();
825     //             try {
826     //                 if (es is null) {
827     //                     p = p.next;
828     //                     for (Node!(E) q = p; q !is null; q = succ(q))
829     //                         if (q.item !is null && ++len == batchSize)
830     //                             break;
831     //                     es = new Object[len];
832     //                     es[0] = nextItem;
833     //                     nextItem = null;
834     //                     n = 1;
835     //                 } else
836     //                     n = 0;
837     //                 for (; p !is null && n < len; p = succ(p))
838     //                     if ((es[n] = p.item) !is null) {
839     //                         lastRet = p;
840     //                         n++;
841     //                     }
842     //             } finally {
843     //                 fullyUnlock();
844     //             }
845     //             for (int i = 0; i < n; i++) {
846     //             E e = (E) es[i];
847     //                 action.accept(e);
848     //             }
849     //         } while (n > 0 && p !is null);
850     //     }
851 
852     //     void remove() {
853     //         Node!(E) p = lastRet;
854     //         if (p is null)
855     //             throw new IllegalStateException();
856     //         lastRet = null;
857     //         fullyLock();
858     //         try {
859     //             if (p.item !is null) {
860     //                 if (ancestor is null)
861     //                     ancestor = head;
862     //                 ancestor = findPred(p, ancestor);
863     //                 unlink(p, ancestor);
864     //             }
865     //         } finally {
866     //             fullyUnlock();
867     //         }
868     //     }
869     // }
870 
871     /**
872      * A customized variant of Spliterators.IteratorSpliterator.
873      * Keep this class in sync with (very similar) LBDSpliterator.
874      */
875     // private final class LBQSpliterator : Spliterator!(E) {
876     //     static int MAX_BATCH = 1 << 25;  // max batch array size;
877     //     Node!(E) current;    // current node; null until initialized
878     //     int batch;          // batch size for splits
879     //     bool exhausted;  // true when no more nodes
880     //     long est = size();  // size estimate
881 
882     //     this() {}
883 
884     //     long estimateSize() { return est; }
885 
886     //     Spliterator!(E) trySplit() {
887     //         Node!(E) h;
888     //         if (!exhausted &&
889     //             ((h = current) !is null || (h = head.next) !is null)
890     //             && h.next !is null) {
891     //             int n = batch = Math.min(batch + 1, MAX_BATCH);
892     //             Object[] a = new Object[n];
893     //             int i = 0;
894     //             Node!(E) p = current;
895     //             fullyLock();
896     //             try {
897     //                 if (p !is null || (p = head.next) !is null)
898     //                     for (; p !is null && i < n; p = succ(p))
899     //                         if ((a[i] = p.item) !is null)
900     //                             i++;
901     //             } finally {
902     //                 fullyUnlock();
903     //             }
904     //             if ((current = p) is null) {
905     //                 est = 0L;
906     //                 exhausted = true;
907     //             }
908     //             else if ((est -= i) < 0L)
909     //                 est = 0L;
910     //             if (i > 0)
911     //                 return Spliterators.spliterator
912     //                     (a, 0, i, (Spliterator.ORDERED |
913     //                                Spliterator.NONNULL |
914     //                                Spliterator.CONCURRENT));
915     //         }
916     //         return null;
917     //     }
918 
919     //     bool tryAdvance(Consumer!(E) action) {
920     //         Objects.requireNonNull(action);
921     //         if (!exhausted) {
922     //             E e = null;
923     //             fullyLock();
924     //             try {
925     //                 Node!(E) p;
926     //                 if ((p = current) !is null || (p = head.next) !is null)
927     //                     do {
928     //                         e = p.item;
929     //                         p = succ(p);
930     //                     } while (e is null && p !is null);
931     //                 if ((current = p) is null)
932     //                     exhausted = true;
933     //             } finally {
934     //                 fullyUnlock();
935     //             }
936     //             if (e !is null) {
937     //                 action.accept(e);
938     //                 return true;
939     //             }
940     //         }
941     //         return false;
942     //     }
943 
944     //     void forEachRemaining(Consumer!(E) action) {
945     //         Objects.requireNonNull(action);
946     //         if (!exhausted) {
947     //             exhausted = true;
948     //             Node!(E) p = current;
949     //             current = null;
950     //             forEachFrom(action, p);
951     //         }
952     //     }
953 
954     //     int characteristics() {
955     //         return (Spliterator.ORDERED |
956     //                 Spliterator.NONNULL |
957     //                 Spliterator.CONCURRENT);
958     //     }
959     // }
960 
961     /**
962      * Returns a {@link Spliterator} over the elements in this queue.
963      *
964      * <p>The returned spliterator is
965      * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
966      *
967      * <p>The {@code Spliterator} reports {@link Spliterator#CONCURRENT},
968      * {@link Spliterator#ORDERED}, and {@link Spliterator#NONNULL}.
969      *
970      * @implNote
971      * The {@code Spliterator} implements {@code trySplit} to permit limited
972      * parallelism.
973      *
974      * @return a {@code Spliterator} over the elements in this queue
975      * @since 1.8
976      */
977     // Spliterator!(E) spliterator() {
978     //     return new LBQSpliterator();
979     // }
980 
981     /**
982      * @throws NullPointerException {@inheritDoc}
983      */
984     // void forEach(Consumer!(E) action) {
985     //     Objects.requireNonNull(action);
986     //     forEachFrom(action, null);
987     // }
988 
989     override int opApply(scope int delegate(ref E) dg) {
990         if(dg is null)
991             throw new NullPointerException();
992 
993         return forEachFrom(dg, null);
994     }
995 
996     /**
997      * Runs action on each element found during a traversal starting at p.
998      * If p is null, traversal starts at head.
999      */
1000     private int forEachFrom(scope int delegate(ref E) action, Node!(E) p) {
1001         // Extract batches of elements while holding the lock; then
1002         // run the action on the elements while not
1003         const int batchSize = 64;       // max number of elements per batch
1004         E[] es = null;             // container for batch of elements
1005         int n, len = 0;
1006         int result = 0;
1007         do {
1008             fullyLock();
1009             try {
1010                 if (es is null) {
1011                     if (p is null) p = head.next;
1012                     for (Node!(E) q = p; q !is null; q = succ(q))
1013                         static if(is(E == class) || is(E == string)) {
1014                             if (q.item !is null && ++len == batchSize)
1015                                 break;
1016                         } else {
1017                             if (++len == batchSize)
1018                                 break;
1019                         }
1020                     es = new E[len];
1021                 }
1022                 for (n = 0; p !is null && n < len; p = succ(p)) {
1023                     es[n] = p.item;
1024                     static if(is(E == class) || is(E == string)) {
1025                         if (es[n] !is null)
1026                             n++;
1027                     } else {
1028                         n++;
1029                     }
1030                 }
1031             } finally {
1032                 fullyUnlock();
1033             }
1034 
1035             for (int i = 0; i < n; i++) {
1036                 E e = es[i];
1037                 result = action(e);
1038                 if(result != 0) return result;
1039             }
1040         } while (n > 0 && p !is null);
1041 
1042         return result;
1043     }
1044 
1045     /**
1046      * @throws NullPointerException {@inheritDoc}
1047      */
1048     override bool removeIf(Predicate!(E) filter) {
1049         // Objects.requireNonNull(filter);
1050         return bulkRemove(filter);
1051     }
1052 
1053     /**
1054      * @throws NullPointerException {@inheritDoc}
1055      */
1056     override bool removeAll(Collection!E c) {
1057         // Objects.requireNonNull(c);
1058         return bulkRemove(e => c.contains(e));
1059     }
1060 
1061     /**
1062      * @throws NullPointerException {@inheritDoc}
1063      */
1064     override bool retainAll(Collection!E c) {
1065         // Objects.requireNonNull(c);
1066         return bulkRemove(e => !c.contains(e));
1067     }
1068 
1069     /**
1070      * Returns the predecessor of live node p, given a node that was
1071      * once a live ancestor of p (or head); allows unlinking of p.
1072      */
1073     Node!(E) findPred(Node!(E) p, Node!(E) ancestor) {
1074         // assert p.item !is null;
1075         static if(is(E == class) || is(E == string)) {
1076             if (ancestor.item is null)
1077                 ancestor = head;
1078         }
1079         // Fails with NPE if precondition not satisfied
1080         for (Node!(E) q; (q = ancestor.next) != p; )
1081             ancestor = q;
1082         return ancestor;
1083     }
1084 
1085     /** Implementation of bulk remove methods. */
1086 
1087     private bool bulkRemove(Predicate!(E) filter) {
1088         bool removed = false;
1089         Node!(E) p = null, ancestor = head;
1090         Node!(E)[] nodes = null;
1091         int n, len = 0;
1092         do {
1093             // 1. Extract batch of up to 64 elements while holding the lock.
1094             fullyLock();
1095             try {
1096                 if (nodes is null) {  // first batch; initialize
1097                     p = head.next;
1098                     for (Node!(E) q = p; q !is null; q = succ(q)) {
1099                         static if(is(E == class) || is(E == string)) {
1100                             if (q.item !is null && ++len == 64)
1101                                 break;
1102                         } else {
1103                             if (++len == 64)
1104                                 break;
1105                         }
1106                     }
1107                     nodes = new Node!(E)[len];
1108                 }
1109                 for (n = 0; p !is null && n < len; p = succ(p))
1110                     nodes[n++] = p;
1111             } finally {
1112                 fullyUnlock();
1113             }
1114 
1115             // 2. Run the filter on the elements while lock is free.
1116             long deathRow = 0L;       // "bitset" of size 64
1117             for (int i = 0; i < n; i++) {
1118                 E e = nodes[i].item;
1119                 static if(is(E == class) || is(E == string)) {
1120                     if (e !is null && filter(e)) deathRow |= 1L << i;
1121                 } else {
1122                     if (filter(e)) deathRow |= 1L << i;
1123                 }
1124             }
1125 
1126             // 3. Remove any filtered elements while holding the lock.
1127             if (deathRow != 0) {
1128                 fullyLock();
1129                 try {
1130                     for (int i = 0; i < n; i++) {
1131                         Node!(E) q;
1132                         static if(is(E == class) || is(E == string)) {
1133                             if ((deathRow & (1L << i)) != 0L
1134                                 && (q = nodes[i]).item !is null) {
1135                                 ancestor = findPred(q, ancestor);
1136                                 unlink(q, ancestor);
1137                                 removed = true;
1138                             }
1139                         } else {
1140                             if ((deathRow & (1L << i)) != 0L) {
1141                                 q = nodes[i];
1142                                 ancestor = findPred(q, ancestor);
1143                                 unlink(q, ancestor);
1144                                 removed = true;
1145                             }
1146                         }
1147                         nodes[i] = null; // help GC
1148                     }
1149                 } finally {
1150                     fullyUnlock();
1151                 }
1152             }
1153         } while (n > 0 && p !is null);
1154         return removed;
1155     }
1156 
1157     override bool opEquals(IObject o) {
1158         return opEquals(cast(Object) o);
1159     }
1160     
1161     override bool opEquals(Object o) {
1162         return super.opEquals(o);
1163     }
1164 
1165     override size_t toHash() @trusted nothrow {
1166         return super.toHash();
1167     }
1168 
1169 }