1 /* 2 * Hunt - A refined core library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.concurrency.LinkedBlockingQueue; 13 14 import hunt.concurrency.atomic.AtomicHelper; 15 import hunt.concurrency.Helpers; 16 import hunt.concurrency.BlockingQueue; 17 18 import hunt.collection.AbstractQueue; 19 import hunt.collection.Collection; 20 import hunt.collection.Iterator; 21 import hunt.util.DateTime; 22 import hunt.Exceptions; 23 import hunt.Functions; 24 import hunt.Object; 25 26 // import core.atomic; 27 import core.sync.mutex; 28 import core.sync.condition; 29 import core.time; 30 31 import hunt.logging.ConsoleLogger; 32 33 /** 34 * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on 35 * linked nodes. 36 * This queue orders elements FIFO (first-in-first-out). 37 * The <em>head</em> of the queue is that element that has been on the 38 * queue the longest time. 39 * The <em>tail</em> of the queue is that element that has been on the 40 * queue the shortest time. New elements 41 * are inserted at the tail of the queue, and the queue retrieval 42 * operations obtain elements at the head of the queue. 43 * Linked queues typically have higher throughput than array-based queues but 44 * less predictable performance in most concurrent applications. 45 * 46 * <p>The optional capacity bound constructor argument serves as a 47 * way to prevent excessive queue expansion. The capacity, if unspecified, 48 * is equal to {@link Integer#MAX_VALUE}. Linked nodes are 49 * dynamically created upon each insertion unless this would bring the 50 * queue above capacity. 51 * 52 * <p>This class and its iterator implement all of the <em>optional</em> 53 * methods of the {@link Collection} and {@link Iterator} interfaces. 54 * 55 * <p>This class is a member of the 56 * <a href="{@docRoot}/java.base/java/util/package-summary.html#CollectionsFramework"> 57 * Java Collections Framework</a>. 58 * 59 * @since 1.5 60 * @author Doug Lea 61 * @param (E) the type of elements held in this queue 62 */ 63 class LinkedBlockingQueue(E) : AbstractQueue!(E), BlockingQueue!(E) { 64 65 /* 66 * A variant of the "two lock queue" algorithm. The putLock gates 67 * entry to put (and offer), and has an associated condition for 68 * waiting puts. Similarly for the takeLock. The "count" field 69 * that they both rely on is maintained as an atomic to avoid 70 * needing to get both locks in most cases. Also, to minimize need 71 * for puts to get takeLock and vice-versa, cascading notifies are 72 * used. When a put notices that it has enabled at least one take, 73 * it signals taker. That taker in turn signals others if more 74 * items have been entered since the signal. And symmetrically for 75 * takes signalling puts. Operations such as remove(Object) and 76 * iterators acquire both locks. 77 * 78 * Visibility between writers and readers is provided as follows: 79 * 80 * Whenever an element is enqueued, the putLock is acquired and 81 * count updated. A subsequent reader guarantees visibility to the 82 * enqueued Node by either acquiring the putLock (via fullyLock) 83 * or by acquiring the takeLock, and then reading n = atomicLoad(count); 84 * this gives visibility to the first n items. 85 * 86 * To implement weakly consistent iterators, it appears we need to 87 * keep all Nodes GC-reachable from a predecessor dequeued Node. 88 * That would cause two problems: 89 * - allow a rogue Iterator to cause unbounded memory retention 90 * - cause cross-generational linking of old Nodes to new Nodes if 91 * a Node was tenured while live, which generational GCs have a 92 * hard time dealing with, causing repeated major collections. 93 * However, only non-deleted Nodes need to be reachable from 94 * dequeued Nodes, and reachability does not necessarily have to 95 * be of the kind understood by the GC. We use the trick of 96 * linking a Node that has just been dequeued to itself. Such a 97 * self-link implicitly means to advance to head.next. 98 */ 99 100 /** 101 * Linked list node class. 102 */ 103 static class Node(E) { 104 E item; 105 106 /** 107 * One of: 108 * - the real successor Node 109 * - this Node, meaning the successor is head.next 110 * - null, meaning there is no successor (this is the last node) 111 */ 112 Node!(E) next; 113 114 this(E x) { item = x; } 115 } 116 117 /** The capacity bound, or int.max if none */ 118 private int capacity; 119 120 /** Current number of elements */ 121 private shared(int) count; 122 123 /** 124 * Head of linked list. 125 * Invariant: head.item is null 126 */ 127 private Node!(E) head; 128 129 /** 130 * Tail of linked list. 131 * Invariant: last.next is null 132 */ 133 private Node!(E) last; 134 135 /** Lock held by take, poll, etc */ 136 private Mutex takeLock; 137 138 /** Wait queue for waiting takes */ 139 private Condition notEmpty; 140 141 /** Lock held by put, offer, etc */ 142 private Mutex putLock; 143 144 /** Wait queue for waiting puts */ 145 private Condition notFull; 146 147 private void initilize() { 148 takeLock = new Mutex(); 149 putLock = new Mutex(); 150 notEmpty = new Condition(takeLock); 151 notFull = new Condition(putLock); 152 } 153 154 /** 155 * Signals a waiting take. Called only from put/offer (which do not 156 * otherwise ordinarily lock takeLock.) 157 */ 158 private void signalNotEmpty() { 159 Mutex takeLock = this.takeLock; 160 takeLock.lock(); 161 // scope(exit) takeLock.unlock(); 162 try { 163 notEmpty.notify(); 164 } finally { 165 takeLock.unlock(); 166 } 167 } 168 169 /** 170 * Signals a waiting put. Called only from take/poll. 171 */ 172 private void signalNotFull() { 173 Mutex putLock = this.putLock; 174 putLock.lock(); 175 try { 176 notFull.notify(); 177 } finally { 178 putLock.unlock(); 179 } 180 } 181 182 /** 183 * Links node at end of queue. 184 * 185 * @param node the node 186 */ 187 private void enqueue(Node!(E) node) { 188 // assert putLock.isHeldByCurrentThread(); 189 // assert last.next is null; 190 last = last.next = node; 191 } 192 193 /** 194 * Removes a node from head of queue. 195 * 196 * @return the node 197 */ 198 private E dequeue() { 199 // assert takeLock.isHeldByCurrentThread(); 200 // assert head.item is null; 201 Node!(E) h = head; 202 Node!(E) first = h.next; 203 h.next = h; // help GC 204 head = first; 205 E x = first.item; 206 first.item = E.init; 207 return x; 208 } 209 210 /** 211 * Locks to prevent both puts and takes. 212 */ 213 void fullyLock() { 214 putLock.lock(); 215 takeLock.lock(); 216 } 217 218 /** 219 * Unlocks to allow both puts and takes. 220 */ 221 void fullyUnlock() { 222 takeLock.unlock(); 223 putLock.unlock(); 224 } 225 226 /** 227 * Creates a {@code LinkedBlockingQueue} with a capacity of 228 * {@link Integer#MAX_VALUE}. 229 */ 230 this() { 231 this(int.max); 232 } 233 234 /** 235 * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity. 236 * 237 * @param capacity the capacity of this queue 238 * @throws IllegalArgumentException if {@code capacity} is not greater 239 * than zero 240 */ 241 this(int capacity) { 242 if (capacity <= 0) throw new IllegalArgumentException(); 243 this.capacity = capacity; 244 last = head = new Node!(E)(E.init); 245 initilize(); 246 } 247 248 /** 249 * Creates a {@code LinkedBlockingQueue} with a capacity of 250 * {@link Integer#MAX_VALUE}, initially containing the elements of the 251 * given collection, 252 * added in traversal order of the collection's iterator. 253 * 254 * @param c the collection of elements to initially contain 255 * @throws NullPointerException if the specified collection or any 256 * of its elements are null 257 */ 258 this(Collection!(E) c) { 259 this(int.max); 260 Mutex putLock = this.putLock; 261 putLock.lock(); // Never contended, but necessary for visibility 262 try { 263 int n = 0; 264 foreach (E e ; c) { 265 static if(is(E == class) || is(E == string)) { 266 if (e is null) throw new NullPointerException(); 267 } 268 if (n == capacity) 269 throw new IllegalStateException("Queue full"); 270 enqueue(new Node!(E)(e)); 271 ++n; 272 } 273 count = n; 274 } finally { 275 putLock.unlock(); 276 } 277 } 278 279 // this doc comment is overridden to remove the reference to collections 280 // greater in size than int.max 281 /** 282 * Returns the number of elements in this queue. 283 * 284 * @return the number of elements in this queue 285 */ 286 override int size() { 287 return count; 288 } 289 290 // this doc comment is a modified copy of the inherited doc comment, 291 // without the reference to unlimited queues. 292 /** 293 * Returns the number of additional elements that this queue can ideally 294 * (in the absence of memory or resource constraints) accept without 295 * blocking. This is always equal to the initial capacity of this queue 296 * less the current {@code size} of this queue. 297 * 298 * <p>Note that you <em>cannot</em> always tell if an attempt to insert 299 * an element will succeed by inspecting {@code remainingCapacity} 300 * because it may be the case that another thread is about to 301 * insert or remove an element. 302 */ 303 int remainingCapacity() { 304 return capacity - count; 305 } 306 307 override bool add(E e) { 308 return super.add(e); 309 } 310 311 /** 312 * Inserts the specified element at the tail of this queue, waiting if 313 * necessary for space to become available. 314 * 315 * @throws InterruptedException {@inheritDoc} 316 * @throws NullPointerException {@inheritDoc} 317 */ 318 void put(E e) { 319 static if(is(E == class) || is(E == string)) { 320 if (e is null) throw new NullPointerException(); 321 } 322 int c; 323 Node!(E) node = new Node!(E)(e); 324 Mutex putLock = this.putLock; 325 putLock.lock(); 326 try { 327 /* 328 * Note that count is used in wait guard even though it is 329 * not protected by lock. This works because count can 330 * only decrease at this point (all other puts are shut 331 * out by lock), and we (or some other waiting put) are 332 * signalled if it ever changes from capacity. Similarly 333 * for all other uses of count in other wait guards. 334 */ 335 while (count == capacity) { 336 notFull.wait(); 337 } 338 enqueue(node); 339 c = AtomicHelper.getAndIncrement(count); 340 if (c + 1 < capacity) 341 notFull.notify(); 342 } finally { 343 putLock.unlock(); 344 } 345 if (c == 0) 346 signalNotEmpty(); 347 } 348 349 /** 350 * Inserts the specified element at the tail of this queue, waiting if 351 * necessary up to the specified wait time for space to become available. 352 * 353 * @return {@code true} if successful, or {@code false} if 354 * the specified waiting time elapses before space is available 355 * @throws InterruptedException {@inheritDoc} 356 * @throws NullPointerException {@inheritDoc} 357 */ 358 bool offer(E e, Duration timeout) { 359 static if(is(E == class) || is(E == string)) { 360 if (e is null) throw new NullPointerException(); 361 } 362 363 int c; 364 Mutex putLock = this.putLock; 365 putLock.lock(); 366 try { 367 while (count == capacity) { 368 // if (nanos <= 0L) 369 // return false; 370 // nanos = notFull.wait(nanos); 371 if(!notFull.wait(timeout)) return false; 372 } 373 enqueue(new Node!(E)(e)); 374 c = AtomicHelper.getAndIncrement(count); 375 if (c + 1 < capacity) 376 notFull.notify(); 377 } finally { 378 putLock.unlock(); 379 } 380 if (c == 0) 381 signalNotEmpty(); 382 return true; 383 } 384 385 /** 386 * Inserts the specified element at the tail of this queue if it is 387 * possible to do so immediately without exceeding the queue's capacity, 388 * returning {@code true} upon success and {@code false} if this queue 389 * is full. 390 * When using a capacity-restricted queue, this method is generally 391 * preferable to method {@link BlockingQueue#add add}, which can fail to 392 * insert an element only by throwing an exception. 393 * 394 * @throws NullPointerException if the specified element is null 395 */ 396 bool offer(E e) { 397 static if(is(E == class) || is(E == string)) { 398 if (e is null) throw new NullPointerException(); 399 } 400 // int count = this.count; 401 if (count == capacity) 402 return false; 403 int c; 404 Node!(E) node = new Node!(E)(e); 405 Mutex putLock = this.putLock; 406 putLock.lock(); 407 try { 408 if (count == capacity) 409 return false; 410 enqueue(node); 411 c = AtomicHelper.getAndIncrement(count); 412 if (c + 1 < capacity) 413 notFull.notify(); 414 } finally { 415 putLock.unlock(); 416 } 417 418 if (c == 0) 419 signalNotEmpty(); 420 return true; 421 } 422 423 E take() { 424 E x; 425 int c; 426 Mutex takeLock = this.takeLock; 427 takeLock.lock(); 428 try { 429 while (count == 0) { 430 notEmpty.wait(); 431 } 432 x = dequeue(); 433 c = AtomicHelper.getAndDecrement(count); 434 if (c > 1) 435 notEmpty.notify(); 436 } finally { 437 takeLock.unlock(); 438 } 439 if (c == capacity) 440 signalNotFull(); 441 return x; 442 } 443 444 E poll(Duration timeout) { 445 E x; 446 int c; 447 // int count = this.count; 448 Mutex takeLock = this.takeLock; 449 takeLock.lock(); 450 try { 451 while (count == 0) { 452 if(!notFull.wait(timeout)) return E.init; 453 } 454 x = dequeue(); 455 c = AtomicHelper.getAndDecrement(count); 456 if (c > 1) 457 notEmpty.notify(); 458 } finally { 459 takeLock.unlock(); 460 } 461 if (c == capacity) 462 signalNotFull(); 463 return x; 464 } 465 466 E poll() { 467 // int count = this.count; 468 if (count == 0) 469 throw new NoSuchElementException(); 470 471 E x; 472 int c; 473 Mutex takeLock = this.takeLock; 474 takeLock.lock(); 475 try { 476 if (count == 0) 477 return E.init; 478 x = dequeue(); 479 c = AtomicHelper.getAndDecrement(count); 480 if (c > 1) 481 notEmpty.notify(); 482 } finally { 483 takeLock.unlock(); 484 } 485 if (c == capacity) 486 signalNotFull(); 487 return x; 488 } 489 490 E peek() { 491 // if (atomicLoad(count) == 0) 492 // return E.init; 493 494 if (count == 0) 495 throw new NoSuchElementException(); 496 497 Mutex takeLock = this.takeLock; 498 takeLock.lock(); 499 try { 500 return (count > 0) ? head.next.item : E.init; 501 } finally { 502 takeLock.unlock(); 503 } 504 } 505 506 /** 507 * Unlinks interior Node p with predecessor pred. 508 */ 509 private void unlink(Node!(E) p, Node!(E) pred) { 510 // assert putLock.isHeldByCurrentThread(); 511 // assert takeLock.isHeldByCurrentThread(); 512 // p.next is not changed, to allow iterators that are 513 // traversing p to maintain their weak-consistency guarantee. 514 p.item = E.init; 515 pred.next = p.next; 516 if (last == p) 517 last = pred; 518 if (AtomicHelper.getAndDecrement(count) == capacity) 519 notFull.notify(); 520 } 521 522 /** 523 * Removes a single instance of the specified element from this queue, 524 * if it is present. More formally, removes an element {@code e} such 525 * that {@code o.equals(e)}, if this queue contains one or more such 526 * elements. 527 * Returns {@code true} if this queue contained the specified element 528 * (or equivalently, if this queue changed as a result of the call). 529 * 530 * @param o element to be removed from this queue, if present 531 * @return {@code true} if this queue changed as a result of the call 532 */ 533 override bool remove(E o) { 534 static if(is(E == class) || is(E == string)) { 535 if (o is null) return false; 536 } 537 538 fullyLock(); 539 try { 540 for (Node!(E) pred = head, p = pred.next; 541 p !is null; 542 pred = p, p = p.next) { 543 if (o == p.item) { 544 unlink(p, pred); 545 return true; 546 } 547 } 548 return false; 549 } finally { 550 fullyUnlock(); 551 } 552 } 553 554 /** 555 * Returns {@code true} if this queue contains the specified element. 556 * More formally, returns {@code true} if and only if this queue contains 557 * at least one element {@code e} such that {@code o.equals(e)}. 558 * 559 * @param o object to be checked for containment in this queue 560 * @return {@code true} if this queue contains the specified element 561 */ 562 override bool contains(E o) { 563 static if(is(E == class) || is(E == string)) { 564 if (o is null) return false; 565 } 566 fullyLock(); 567 try { 568 for (Node!(E) p = head.next; p !is null; p = p.next) 569 if (o == p.item) 570 return true; 571 return false; 572 } finally { 573 fullyUnlock(); 574 } 575 } 576 577 /** 578 * Returns an array containing all of the elements in this queue, in 579 * proper sequence. 580 * 581 * <p>The returned array will be "safe" in that no references to it are 582 * maintained by this queue. (In other words, this method must allocate 583 * a new array). The caller is thus free to modify the returned array. 584 * 585 * <p>This method acts as bridge between array-based and collection-based 586 * APIs. 587 * 588 * @return an array containing all of the elements in this queue 589 */ 590 override E[] toArray() { 591 fullyLock(); 592 try { 593 int size = count; 594 E[] a = new E[size]; 595 int k = 0; 596 for (Node!(E) p = head.next; p !is null; p = p.next) 597 a[k++] = p.item; 598 return a; 599 } finally { 600 fullyUnlock(); 601 } 602 } 603 604 /** 605 * Returns an array containing all of the elements in this queue, in 606 * proper sequence; the runtime type of the returned array is that of 607 * the specified array. If the queue fits in the specified array, it 608 * is returned therein. Otherwise, a new array is allocated with the 609 * runtime type of the specified array and the size of this queue. 610 * 611 * <p>If this queue fits in the specified array with room to spare 612 * (i.e., the array has more elements than this queue), the element in 613 * the array immediately following the end of the queue is set to 614 * {@code null}. 615 * 616 * <p>Like the {@link #toArray()} method, this method acts as bridge between 617 * array-based and collection-based APIs. Further, this method allows 618 * precise control over the runtime type of the output array, and may, 619 * under certain circumstances, be used to save allocation costs. 620 * 621 * <p>Suppose {@code x} is a queue known to contain only strings. 622 * The following code can be used to dump the queue into a newly 623 * allocated array of {@code string}: 624 * 625 * <pre> {@code string[] y = x.toArray(new string[0]);}</pre> 626 * 627 * Note that {@code toArray(new Object[0])} is identical in function to 628 * {@code toArray()}. 629 * 630 * @param a the array into which the elements of the queue are to 631 * be stored, if it is big enough; otherwise, a new array of the 632 * same runtime type is allocated for this purpose 633 * @return an array containing all of the elements in this queue 634 * @throws ArrayStoreException if the runtime type of the specified array 635 * is not a supertype of the runtime type of every element in 636 * this queue 637 * @throws NullPointerException if the specified array is null 638 */ 639 640 // !(T) T[] toArray(T[] a) { 641 // fullyLock(); 642 // try { 643 // int size = atomicLoad(count); 644 // if (a.length < size) 645 // a = (T[])java.lang.reflect.Array.newInstance 646 // (a.getClass().getComponentType(), size); 647 648 // int k = 0; 649 // for (Node!(E) p = head.next; p !is null; p = p.next) 650 // a[k++] = (T)p.item; 651 // if (a.length > k) 652 // a[k] = null; 653 // return a; 654 // } finally { 655 // fullyUnlock(); 656 // } 657 // } 658 659 override string toString() { 660 return Helpers.collectionToString(this); 661 } 662 663 /** 664 * Atomically removes all of the elements from this queue. 665 * The queue will be empty after this call returns. 666 */ 667 override void clear() { 668 fullyLock(); 669 try { 670 for (Node!(E) p, h = head; (p = h.next) !is null; h = p) { 671 h.next = h; 672 p.item = E.init; 673 } 674 head = last; 675 // assert head.item is null && head.next is null; 676 int c = count; 677 AtomicHelper.store(count, 0); 678 if (c == capacity) 679 notFull.notify(); 680 } finally { 681 fullyUnlock(); 682 } 683 } 684 685 /** 686 * @throws UnsupportedOperationException {@inheritDoc} 687 * @throws ClassCastException {@inheritDoc} 688 * @throws NullPointerException {@inheritDoc} 689 * @throws IllegalArgumentException {@inheritDoc} 690 */ 691 int drainTo(Collection!(E) c) { 692 return drainTo(c, int.max); 693 } 694 695 /** 696 * @throws UnsupportedOperationException {@inheritDoc} 697 * @throws ClassCastException {@inheritDoc} 698 * @throws NullPointerException {@inheritDoc} 699 * @throws IllegalArgumentException {@inheritDoc} 700 */ 701 int drainTo(Collection!(E) c, int maxElements) { 702 // Objects.requireNonNull(c); 703 if (c == this) 704 throw new IllegalArgumentException(); 705 if (maxElements <= 0) 706 return 0; 707 bool canSignalNotFull = false; 708 Mutex takeLock = this.takeLock; 709 takeLock.lock(); 710 try { 711 import std.algorithm : min; 712 int n = min(maxElements, count); 713 // count.get provides visibility to first n Nodes 714 Node!(E) h = head; 715 int i = 0; 716 try { 717 while (i < n) { 718 Node!(E) p = h.next; 719 c.add(p.item); 720 p.item = E.init; 721 h.next = h; 722 h = p; 723 ++i; 724 } 725 return n; 726 } finally { 727 // Restore invariants even if c.add() threw 728 if (i > 0) { 729 // assert h.item is null; 730 head = h; 731 int ct = AtomicHelper.getAndAdd(count, i); 732 canSignalNotFull = ((ct - i) == capacity); 733 } 734 } 735 } finally { 736 takeLock.unlock(); 737 if (canSignalNotFull) 738 signalNotFull(); 739 } 740 } 741 742 /** 743 * Used for any element traversal that is not entirely under lock. 744 * Such traversals must handle both: 745 * - dequeued nodes (p.next == p) 746 * - (possibly multiple) interior removed nodes (p.item is null) 747 */ 748 Node!(E) succ(Node!(E) p) { 749 if (p == (p = p.next)) 750 p = head.next; 751 return p; 752 } 753 754 /** 755 * Returns an iterator over the elements in this queue in proper sequence. 756 * The elements will be returned in order from first (head) to last (tail). 757 * 758 * <p>The returned iterator is 759 * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>. 760 * 761 * @return an iterator over the elements in this queue in proper sequence 762 */ 763 // Iterator!(E) iterator() { 764 // return new Itr(); 765 // } 766 767 /** 768 * Weakly-consistent iterator. 769 * 770 * Lazily updated ancestor field provides expected O(1) remove(), 771 * but still O(n) in the worst case, whenever the saved ancestor 772 * is concurrently deleted. 773 */ 774 // private class Itr : Iterator!(E) { 775 // private Node!(E) next; // Node holding nextItem 776 // private E nextItem; // next item to hand out 777 // private Node!(E) lastRet; 778 // private Node!(E) ancestor; // Helps unlink lastRet on remove() 779 780 // Itr() { 781 // fullyLock(); 782 // try { 783 // if ((next = head.next) !is null) 784 // nextItem = next.item; 785 // } finally { 786 // fullyUnlock(); 787 // } 788 // } 789 790 // bool hasNext() { 791 // return next !is null; 792 // } 793 794 // E next() { 795 // Node!(E) p; 796 // if ((p = next) is null) 797 // throw new NoSuchElementException(); 798 // lastRet = p; 799 // E x = nextItem; 800 // fullyLock(); 801 // try { 802 // E e = null; 803 // for (p = p.next; p !is null && (e = p.item) is null; ) 804 // p = succ(p); 805 // next = p; 806 // nextItem = e; 807 // } finally { 808 // fullyUnlock(); 809 // } 810 // return x; 811 // } 812 813 // void forEachRemaining(Consumer!(E) action) { 814 // // A variant of forEachFrom 815 // Objects.requireNonNull(action); 816 // Node!(E) p; 817 // if ((p = next) is null) return; 818 // lastRet = p; 819 // next = null; 820 // int batchSize = 64; 821 // Object[] es = null; 822 // int n, len = 1; 823 // do { 824 // fullyLock(); 825 // try { 826 // if (es is null) { 827 // p = p.next; 828 // for (Node!(E) q = p; q !is null; q = succ(q)) 829 // if (q.item !is null && ++len == batchSize) 830 // break; 831 // es = new Object[len]; 832 // es[0] = nextItem; 833 // nextItem = null; 834 // n = 1; 835 // } else 836 // n = 0; 837 // for (; p !is null && n < len; p = succ(p)) 838 // if ((es[n] = p.item) !is null) { 839 // lastRet = p; 840 // n++; 841 // } 842 // } finally { 843 // fullyUnlock(); 844 // } 845 // for (int i = 0; i < n; i++) { 846 // E e = (E) es[i]; 847 // action.accept(e); 848 // } 849 // } while (n > 0 && p !is null); 850 // } 851 852 // void remove() { 853 // Node!(E) p = lastRet; 854 // if (p is null) 855 // throw new IllegalStateException(); 856 // lastRet = null; 857 // fullyLock(); 858 // try { 859 // if (p.item !is null) { 860 // if (ancestor is null) 861 // ancestor = head; 862 // ancestor = findPred(p, ancestor); 863 // unlink(p, ancestor); 864 // } 865 // } finally { 866 // fullyUnlock(); 867 // } 868 // } 869 // } 870 871 /** 872 * A customized variant of Spliterators.IteratorSpliterator. 873 * Keep this class in sync with (very similar) LBDSpliterator. 874 */ 875 // private final class LBQSpliterator : Spliterator!(E) { 876 // static int MAX_BATCH = 1 << 25; // max batch array size; 877 // Node!(E) current; // current node; null until initialized 878 // int batch; // batch size for splits 879 // bool exhausted; // true when no more nodes 880 // long est = size(); // size estimate 881 882 // this() {} 883 884 // long estimateSize() { return est; } 885 886 // Spliterator!(E) trySplit() { 887 // Node!(E) h; 888 // if (!exhausted && 889 // ((h = current) !is null || (h = head.next) !is null) 890 // && h.next !is null) { 891 // int n = batch = Math.min(batch + 1, MAX_BATCH); 892 // Object[] a = new Object[n]; 893 // int i = 0; 894 // Node!(E) p = current; 895 // fullyLock(); 896 // try { 897 // if (p !is null || (p = head.next) !is null) 898 // for (; p !is null && i < n; p = succ(p)) 899 // if ((a[i] = p.item) !is null) 900 // i++; 901 // } finally { 902 // fullyUnlock(); 903 // } 904 // if ((current = p) is null) { 905 // est = 0L; 906 // exhausted = true; 907 // } 908 // else if ((est -= i) < 0L) 909 // est = 0L; 910 // if (i > 0) 911 // return Spliterators.spliterator 912 // (a, 0, i, (Spliterator.ORDERED | 913 // Spliterator.NONNULL | 914 // Spliterator.CONCURRENT)); 915 // } 916 // return null; 917 // } 918 919 // bool tryAdvance(Consumer!(E) action) { 920 // Objects.requireNonNull(action); 921 // if (!exhausted) { 922 // E e = null; 923 // fullyLock(); 924 // try { 925 // Node!(E) p; 926 // if ((p = current) !is null || (p = head.next) !is null) 927 // do { 928 // e = p.item; 929 // p = succ(p); 930 // } while (e is null && p !is null); 931 // if ((current = p) is null) 932 // exhausted = true; 933 // } finally { 934 // fullyUnlock(); 935 // } 936 // if (e !is null) { 937 // action.accept(e); 938 // return true; 939 // } 940 // } 941 // return false; 942 // } 943 944 // void forEachRemaining(Consumer!(E) action) { 945 // Objects.requireNonNull(action); 946 // if (!exhausted) { 947 // exhausted = true; 948 // Node!(E) p = current; 949 // current = null; 950 // forEachFrom(action, p); 951 // } 952 // } 953 954 // int characteristics() { 955 // return (Spliterator.ORDERED | 956 // Spliterator.NONNULL | 957 // Spliterator.CONCURRENT); 958 // } 959 // } 960 961 /** 962 * Returns a {@link Spliterator} over the elements in this queue. 963 * 964 * <p>The returned spliterator is 965 * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>. 966 * 967 * <p>The {@code Spliterator} reports {@link Spliterator#CONCURRENT}, 968 * {@link Spliterator#ORDERED}, and {@link Spliterator#NONNULL}. 969 * 970 * @implNote 971 * The {@code Spliterator} implements {@code trySplit} to permit limited 972 * parallelism. 973 * 974 * @return a {@code Spliterator} over the elements in this queue 975 * @since 1.8 976 */ 977 // Spliterator!(E) spliterator() { 978 // return new LBQSpliterator(); 979 // } 980 981 /** 982 * @throws NullPointerException {@inheritDoc} 983 */ 984 // void forEach(Consumer!(E) action) { 985 // Objects.requireNonNull(action); 986 // forEachFrom(action, null); 987 // } 988 989 override int opApply(scope int delegate(ref E) dg) { 990 if(dg is null) 991 throw new NullPointerException(); 992 993 return forEachFrom(dg, null); 994 } 995 996 /** 997 * Runs action on each element found during a traversal starting at p. 998 * If p is null, traversal starts at head. 999 */ 1000 private int forEachFrom(scope int delegate(ref E) action, Node!(E) p) { 1001 // Extract batches of elements while holding the lock; then 1002 // run the action on the elements while not 1003 const int batchSize = 64; // max number of elements per batch 1004 E[] es = null; // container for batch of elements 1005 int n, len = 0; 1006 int result = 0; 1007 do { 1008 fullyLock(); 1009 try { 1010 if (es is null) { 1011 if (p is null) p = head.next; 1012 for (Node!(E) q = p; q !is null; q = succ(q)) 1013 static if(is(E == class) || is(E == string)) { 1014 if (q.item !is null && ++len == batchSize) 1015 break; 1016 } else { 1017 if (++len == batchSize) 1018 break; 1019 } 1020 es = new E[len]; 1021 } 1022 for (n = 0; p !is null && n < len; p = succ(p)) { 1023 es[n] = p.item; 1024 static if(is(E == class) || is(E == string)) { 1025 if (es[n] !is null) 1026 n++; 1027 } else { 1028 n++; 1029 } 1030 } 1031 } finally { 1032 fullyUnlock(); 1033 } 1034 1035 for (int i = 0; i < n; i++) { 1036 E e = es[i]; 1037 result = action(e); 1038 if(result != 0) return result; 1039 } 1040 } while (n > 0 && p !is null); 1041 1042 return result; 1043 } 1044 1045 /** 1046 * @throws NullPointerException {@inheritDoc} 1047 */ 1048 override bool removeIf(Predicate!(E) filter) { 1049 // Objects.requireNonNull(filter); 1050 return bulkRemove(filter); 1051 } 1052 1053 /** 1054 * @throws NullPointerException {@inheritDoc} 1055 */ 1056 override bool removeAll(Collection!E c) { 1057 // Objects.requireNonNull(c); 1058 return bulkRemove(e => c.contains(e)); 1059 } 1060 1061 /** 1062 * @throws NullPointerException {@inheritDoc} 1063 */ 1064 override bool retainAll(Collection!E c) { 1065 // Objects.requireNonNull(c); 1066 return bulkRemove(e => !c.contains(e)); 1067 } 1068 1069 /** 1070 * Returns the predecessor of live node p, given a node that was 1071 * once a live ancestor of p (or head); allows unlinking of p. 1072 */ 1073 Node!(E) findPred(Node!(E) p, Node!(E) ancestor) { 1074 // assert p.item !is null; 1075 static if(is(E == class) || is(E == string)) { 1076 if (ancestor.item is null) 1077 ancestor = head; 1078 } 1079 // Fails with NPE if precondition not satisfied 1080 for (Node!(E) q; (q = ancestor.next) != p; ) 1081 ancestor = q; 1082 return ancestor; 1083 } 1084 1085 /** Implementation of bulk remove methods. */ 1086 1087 private bool bulkRemove(Predicate!(E) filter) { 1088 bool removed = false; 1089 Node!(E) p = null, ancestor = head; 1090 Node!(E)[] nodes = null; 1091 int n, len = 0; 1092 do { 1093 // 1. Extract batch of up to 64 elements while holding the lock. 1094 fullyLock(); 1095 try { 1096 if (nodes is null) { // first batch; initialize 1097 p = head.next; 1098 for (Node!(E) q = p; q !is null; q = succ(q)) { 1099 static if(is(E == class) || is(E == string)) { 1100 if (q.item !is null && ++len == 64) 1101 break; 1102 } else { 1103 if (++len == 64) 1104 break; 1105 } 1106 } 1107 nodes = new Node!(E)[len]; 1108 } 1109 for (n = 0; p !is null && n < len; p = succ(p)) 1110 nodes[n++] = p; 1111 } finally { 1112 fullyUnlock(); 1113 } 1114 1115 // 2. Run the filter on the elements while lock is free. 1116 long deathRow = 0L; // "bitset" of size 64 1117 for (int i = 0; i < n; i++) { 1118 E e = nodes[i].item; 1119 static if(is(E == class) || is(E == string)) { 1120 if (e !is null && filter(e)) deathRow |= 1L << i; 1121 } else { 1122 if (filter(e)) deathRow |= 1L << i; 1123 } 1124 } 1125 1126 // 3. Remove any filtered elements while holding the lock. 1127 if (deathRow != 0) { 1128 fullyLock(); 1129 try { 1130 for (int i = 0; i < n; i++) { 1131 Node!(E) q; 1132 static if(is(E == class) || is(E == string)) { 1133 if ((deathRow & (1L << i)) != 0L 1134 && (q = nodes[i]).item !is null) { 1135 ancestor = findPred(q, ancestor); 1136 unlink(q, ancestor); 1137 removed = true; 1138 } 1139 } else { 1140 if ((deathRow & (1L << i)) != 0L) { 1141 q = nodes[i]; 1142 ancestor = findPred(q, ancestor); 1143 unlink(q, ancestor); 1144 removed = true; 1145 } 1146 } 1147 nodes[i] = null; // help GC 1148 } 1149 } finally { 1150 fullyUnlock(); 1151 } 1152 } 1153 } while (n > 0 && p !is null); 1154 return removed; 1155 } 1156 1157 override bool opEquals(IObject o) { 1158 return opEquals(cast(Object) o); 1159 } 1160 1161 override bool opEquals(Object o) { 1162 return super.opEquals(o); 1163 } 1164 1165 override size_t toHash() @trusted nothrow { 1166 return super.toHash(); 1167 } 1168 1169 }