1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the public domain, as explained at 33 * http://creativecommons.org/publicdomain/zero/1.0/ 34 */ 35 36 package java.util.concurrent; 37 38 import java.util.AbstractQueue; 39 import java.util.Collection; 40 import java.util.Iterator; 41 import java.util.NoSuchElementException; 42 import java.util.Objects; 43 import java.util.Spliterator; 44 import java.util.Spliterators; 45 import java.util.concurrent.atomic.AtomicInteger; 46 import java.util.concurrent.locks.Condition; 47 import java.util.concurrent.locks.ReentrantLock; 48 import java.util.function.Consumer; 49 import java.util.function.Predicate; 50 51 /** 52 * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on 53 * linked nodes. 54 * This queue orders elements FIFO (first-in-first-out). 55 * The <em>head</em> of the queue is that element that has been on the 56 * queue the longest time. 57 * The <em>tail</em> of the queue is that element that has been on the 58 * queue the shortest time. New elements 59 * are inserted at the tail of the queue, and the queue retrieval 60 * operations obtain elements at the head of the queue. 61 * Linked queues typically have higher throughput than array-based queues but 62 * less predictable performance in most concurrent applications. 63 * 64 * <p>The optional capacity bound constructor argument serves as a 65 * way to prevent excessive queue expansion. The capacity, if unspecified, 66 * is equal to {@link Integer#MAX_VALUE}. Linked nodes are 67 * dynamically created upon each insertion unless this would bring the 68 * queue above capacity. 69 * 70 * <p>This class and its iterator implement all of the <em>optional</em> 71 * methods of the {@link Collection} and {@link Iterator} interfaces. 72 * 73 * <p>This class is a member of the 74 * <a href="{@docRoot}/java.base/java/util/package-summary.html#CollectionsFramework"> 75 * Java Collections Framework</a>. 76 * 77 * @since 1.5 78 * @author Doug Lea 79 * @param <E> the type of elements held in this queue 80 */ 81 public class LinkedBlockingQueue<E> extends AbstractQueue<E> 82 implements BlockingQueue<E>, java.io.Serializable { 83 private static final long serialVersionUID = -6903933977591709194L; 84 85 /* 86 * A variant of the "two lock queue" algorithm. The putLock gates 87 * entry to put (and offer), and has an associated condition for 88 * waiting puts. Similarly for the takeLock. The "count" field 89 * that they both rely on is maintained as an atomic to avoid 90 * needing to get both locks in most cases. Also, to minimize need 91 * for puts to get takeLock and vice-versa, cascading notifies are 92 * used. When a put notices that it has enabled at least one take, 93 * it signals taker. That taker in turn signals others if more 94 * items have been entered since the signal. And symmetrically for 95 * takes signalling puts. Operations such as remove(Object) and 96 * iterators acquire both locks. 97 * 98 * Visibility between writers and readers is provided as follows: 99 * 100 * Whenever an element is enqueued, the putLock is acquired and 101 * count updated. A subsequent reader guarantees visibility to the 102 * enqueued Node by either acquiring the putLock (via fullyLock) 103 * or by acquiring the takeLock, and then reading n = count.get(); 104 * this gives visibility to the first n items. 105 * 106 * To implement weakly consistent iterators, it appears we need to 107 * keep all Nodes GC-reachable from a predecessor dequeued Node. 108 * That would cause two problems: 109 * - allow a rogue Iterator to cause unbounded memory retention 110 * - cause cross-generational linking of old Nodes to new Nodes if 111 * a Node was tenured while live, which generational GCs have a 112 * hard time dealing with, causing repeated major collections. 113 * However, only non-deleted Nodes need to be reachable from 114 * dequeued Nodes, and reachability does not necessarily have to 115 * be of the kind understood by the GC. We use the trick of 116 * linking a Node that has just been dequeued to itself. Such a 117 * self-link implicitly means to advance to head.next. 118 */ 119 120 /** 121 * Linked list node class. 122 */ 123 static class Node<E> { 124 E item; 125 126 /** 127 * One of: 128 * - the real successor Node 129 * - this Node, meaning the successor is head.next 130 * - null, meaning there is no successor (this is the last node) 131 */ 132 Node<E> next; 133 Node(E x)134 Node(E x) { item = x; } 135 } 136 137 /** The capacity bound, or Integer.MAX_VALUE if none */ 138 private final int capacity; 139 140 /** Current number of elements */ 141 private final AtomicInteger count = new AtomicInteger(); 142 143 /** 144 * Head of linked list. 145 * Invariant: head.item == null 146 */ 147 transient Node<E> head; 148 149 /** 150 * Tail of linked list. 151 * Invariant: last.next == null 152 */ 153 private transient Node<E> last; 154 155 /** Lock held by take, poll, etc */ 156 private final ReentrantLock takeLock = new ReentrantLock(); 157 158 /** Wait queue for waiting takes */ 159 @SuppressWarnings("serial") // Classes implementing Condition may be serializable. 160 private final Condition notEmpty = takeLock.newCondition(); 161 162 /** Lock held by put, offer, etc */ 163 private final ReentrantLock putLock = new ReentrantLock(); 164 165 /** Wait queue for waiting puts */ 166 @SuppressWarnings("serial") // Classes implementing Condition may be serializable. 167 private final Condition notFull = putLock.newCondition(); 168 169 /** 170 * Signals a waiting take. Called only from put/offer (which do not 171 * otherwise ordinarily lock takeLock.) 172 */ signalNotEmpty()173 private void signalNotEmpty() { 174 final ReentrantLock takeLock = this.takeLock; 175 takeLock.lock(); 176 try { 177 notEmpty.signal(); 178 } finally { 179 takeLock.unlock(); 180 } 181 } 182 183 /** 184 * Signals a waiting put. Called only from take/poll. 185 */ signalNotFull()186 private void signalNotFull() { 187 final ReentrantLock putLock = this.putLock; 188 putLock.lock(); 189 try { 190 notFull.signal(); 191 } finally { 192 putLock.unlock(); 193 } 194 } 195 196 /** 197 * Links node at end of queue. 198 * 199 * @param node the node 200 */ enqueue(Node<E> node)201 private void enqueue(Node<E> node) { 202 // assert putLock.isHeldByCurrentThread(); 203 // assert last.next == null; 204 last = last.next = node; 205 } 206 207 /** 208 * Removes a node from head of queue. 209 * 210 * @return the node 211 */ dequeue()212 private E dequeue() { 213 // assert takeLock.isHeldByCurrentThread(); 214 // assert head.item == null; 215 Node<E> h = head; 216 Node<E> first = h.next; 217 h.next = h; // help GC 218 head = first; 219 E x = first.item; 220 first.item = null; 221 return x; 222 } 223 224 /** 225 * Locks to prevent both puts and takes. 226 */ fullyLock()227 void fullyLock() { 228 putLock.lock(); 229 takeLock.lock(); 230 } 231 232 /** 233 * Unlocks to allow both puts and takes. 234 */ fullyUnlock()235 void fullyUnlock() { 236 takeLock.unlock(); 237 putLock.unlock(); 238 } 239 240 /** 241 * Creates a {@code LinkedBlockingQueue} with a capacity of 242 * {@link Integer#MAX_VALUE}. 243 */ LinkedBlockingQueue()244 public LinkedBlockingQueue() { 245 this(Integer.MAX_VALUE); 246 } 247 248 /** 249 * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity. 250 * 251 * @param capacity the capacity of this queue 252 * @throws IllegalArgumentException if {@code capacity} is not greater 253 * than zero 254 */ LinkedBlockingQueue(int capacity)255 public LinkedBlockingQueue(int capacity) { 256 if (capacity <= 0) throw new IllegalArgumentException(); 257 this.capacity = capacity; 258 last = head = new Node<E>(null); 259 } 260 261 /** 262 * Creates a {@code LinkedBlockingQueue} with a capacity of 263 * {@link Integer#MAX_VALUE}, initially containing the elements of the 264 * given collection, 265 * added in traversal order of the collection's iterator. 266 * 267 * @param c the collection of elements to initially contain 268 * @throws NullPointerException if the specified collection or any 269 * of its elements are null 270 */ LinkedBlockingQueue(Collection<? extends E> c)271 public LinkedBlockingQueue(Collection<? extends E> c) { 272 this(Integer.MAX_VALUE); 273 final ReentrantLock putLock = this.putLock; 274 putLock.lock(); // Never contended, but necessary for visibility 275 try { 276 int n = 0; 277 for (E e : c) { 278 if (e == null) 279 throw new NullPointerException(); 280 if (n == capacity) 281 throw new IllegalStateException("Queue full"); 282 enqueue(new Node<E>(e)); 283 ++n; 284 } 285 count.set(n); 286 } finally { 287 putLock.unlock(); 288 } 289 } 290 291 // this doc comment is overridden to remove the reference to collections 292 // greater in size than Integer.MAX_VALUE 293 /** 294 * Returns the number of elements in this queue. 295 * 296 * @return the number of elements in this queue 297 */ size()298 public int size() { 299 return count.get(); 300 } 301 302 // this doc comment is a modified copy of the inherited doc comment, 303 // without the reference to unlimited queues. 304 /** 305 * Returns the number of additional elements that this queue can ideally 306 * (in the absence of memory or resource constraints) accept without 307 * blocking. This is always equal to the initial capacity of this queue 308 * less the current {@code size} of this queue. 309 * 310 * <p>Note that you <em>cannot</em> always tell if an attempt to insert 311 * an element will succeed by inspecting {@code remainingCapacity} 312 * because it may be the case that another thread is about to 313 * insert or remove an element. 314 */ remainingCapacity()315 public int remainingCapacity() { 316 return capacity - count.get(); 317 } 318 319 /** 320 * Inserts the specified element at the tail of this queue, waiting if 321 * necessary for space to become available. 322 * 323 * @throws InterruptedException {@inheritDoc} 324 * @throws NullPointerException {@inheritDoc} 325 */ put(E e)326 public void put(E e) throws InterruptedException { 327 if (e == null) throw new NullPointerException(); 328 final int c; 329 final Node<E> node = new Node<E>(e); 330 final ReentrantLock putLock = this.putLock; 331 final AtomicInteger count = this.count; 332 putLock.lockInterruptibly(); 333 try { 334 /* 335 * Note that count is used in wait guard even though it is 336 * not protected by lock. This works because count can 337 * only decrease at this point (all other puts are shut 338 * out by lock), and we (or some other waiting put) are 339 * signalled if it ever changes from capacity. Similarly 340 * for all other uses of count in other wait guards. 341 */ 342 while (count.get() == capacity) { 343 notFull.await(); 344 } 345 enqueue(node); 346 c = count.getAndIncrement(); 347 if (c + 1 < capacity) 348 notFull.signal(); 349 } finally { 350 putLock.unlock(); 351 } 352 if (c == 0) 353 signalNotEmpty(); 354 } 355 356 /** 357 * Inserts the specified element at the tail of this queue, waiting if 358 * necessary up to the specified wait time for space to become available. 359 * 360 * @return {@code true} if successful, or {@code false} if 361 * the specified waiting time elapses before space is available 362 * @throws InterruptedException {@inheritDoc} 363 * @throws NullPointerException {@inheritDoc} 364 */ offer(E e, long timeout, TimeUnit unit)365 public boolean offer(E e, long timeout, TimeUnit unit) 366 throws InterruptedException { 367 368 if (e == null) throw new NullPointerException(); 369 long nanos = unit.toNanos(timeout); 370 final int c; 371 final ReentrantLock putLock = this.putLock; 372 final AtomicInteger count = this.count; 373 putLock.lockInterruptibly(); 374 try { 375 while (count.get() == capacity) { 376 if (nanos <= 0L) 377 return false; 378 nanos = notFull.awaitNanos(nanos); 379 } 380 enqueue(new Node<E>(e)); 381 c = count.getAndIncrement(); 382 if (c + 1 < capacity) 383 notFull.signal(); 384 } finally { 385 putLock.unlock(); 386 } 387 if (c == 0) 388 signalNotEmpty(); 389 return true; 390 } 391 392 /** 393 * Inserts the specified element at the tail of this queue if it is 394 * possible to do so immediately without exceeding the queue's capacity, 395 * returning {@code true} upon success and {@code false} if this queue 396 * is full. 397 * When using a capacity-restricted queue, this method is generally 398 * preferable to method {@link BlockingQueue#add add}, which can fail to 399 * insert an element only by throwing an exception. 400 * 401 * @throws NullPointerException if the specified element is null 402 */ offer(E e)403 public boolean offer(E e) { 404 if (e == null) throw new NullPointerException(); 405 final AtomicInteger count = this.count; 406 if (count.get() == capacity) 407 return false; 408 final int c; 409 final Node<E> node = new Node<E>(e); 410 final ReentrantLock putLock = this.putLock; 411 putLock.lock(); 412 try { 413 if (count.get() == capacity) 414 return false; 415 enqueue(node); 416 c = count.getAndIncrement(); 417 if (c + 1 < capacity) 418 notFull.signal(); 419 } finally { 420 putLock.unlock(); 421 } 422 if (c == 0) 423 signalNotEmpty(); 424 return true; 425 } 426 take()427 public E take() throws InterruptedException { 428 final E x; 429 final int c; 430 final AtomicInteger count = this.count; 431 final ReentrantLock takeLock = this.takeLock; 432 takeLock.lockInterruptibly(); 433 try { 434 while (count.get() == 0) { 435 notEmpty.await(); 436 } 437 x = dequeue(); 438 c = count.getAndDecrement(); 439 if (c > 1) 440 notEmpty.signal(); 441 } finally { 442 takeLock.unlock(); 443 } 444 if (c == capacity) 445 signalNotFull(); 446 return x; 447 } 448 poll(long timeout, TimeUnit unit)449 public E poll(long timeout, TimeUnit unit) throws InterruptedException { 450 final E x; 451 final int c; 452 long nanos = unit.toNanos(timeout); 453 final AtomicInteger count = this.count; 454 final ReentrantLock takeLock = this.takeLock; 455 takeLock.lockInterruptibly(); 456 try { 457 while (count.get() == 0) { 458 if (nanos <= 0L) 459 return null; 460 nanos = notEmpty.awaitNanos(nanos); 461 } 462 x = dequeue(); 463 c = count.getAndDecrement(); 464 if (c > 1) 465 notEmpty.signal(); 466 } finally { 467 takeLock.unlock(); 468 } 469 if (c == capacity) 470 signalNotFull(); 471 return x; 472 } 473 poll()474 public E poll() { 475 final AtomicInteger count = this.count; 476 if (count.get() == 0) 477 return null; 478 final E x; 479 final int c; 480 final ReentrantLock takeLock = this.takeLock; 481 takeLock.lock(); 482 try { 483 if (count.get() == 0) 484 return null; 485 x = dequeue(); 486 c = count.getAndDecrement(); 487 if (c > 1) 488 notEmpty.signal(); 489 } finally { 490 takeLock.unlock(); 491 } 492 if (c == capacity) 493 signalNotFull(); 494 return x; 495 } 496 peek()497 public E peek() { 498 final AtomicInteger count = this.count; 499 if (count.get() == 0) 500 return null; 501 final ReentrantLock takeLock = this.takeLock; 502 takeLock.lock(); 503 try { 504 return (count.get() > 0) ? head.next.item : null; 505 } finally { 506 takeLock.unlock(); 507 } 508 } 509 510 /** 511 * Unlinks interior Node p with predecessor pred. 512 */ unlink(Node<E> p, Node<E> pred)513 void unlink(Node<E> p, Node<E> pred) { 514 // assert putLock.isHeldByCurrentThread(); 515 // assert takeLock.isHeldByCurrentThread(); 516 // p.next is not changed, to allow iterators that are 517 // traversing p to maintain their weak-consistency guarantee. 518 p.item = null; 519 pred.next = p.next; 520 if (last == p) 521 last = pred; 522 if (count.getAndDecrement() == capacity) 523 notFull.signal(); 524 } 525 526 /** 527 * Removes a single instance of the specified element from this queue, 528 * if it is present. More formally, removes an element {@code e} such 529 * that {@code o.equals(e)}, if this queue contains one or more such 530 * elements. 531 * Returns {@code true} if this queue contained the specified element 532 * (or equivalently, if this queue changed as a result of the call). 533 * 534 * @param o element to be removed from this queue, if present 535 * @return {@code true} if this queue changed as a result of the call 536 */ remove(Object o)537 public boolean remove(Object o) { 538 if (o == null) return false; 539 fullyLock(); 540 try { 541 for (Node<E> pred = head, p = pred.next; 542 p != null; 543 pred = p, p = p.next) { 544 if (o.equals(p.item)) { 545 unlink(p, pred); 546 return true; 547 } 548 } 549 return false; 550 } finally { 551 fullyUnlock(); 552 } 553 } 554 555 /** 556 * Returns {@code true} if this queue contains the specified element. 557 * More formally, returns {@code true} if and only if this queue contains 558 * at least one element {@code e} such that {@code o.equals(e)}. 559 * 560 * @param o object to be checked for containment in this queue 561 * @return {@code true} if this queue contains the specified element 562 */ contains(Object o)563 public boolean contains(Object o) { 564 if (o == null) return false; 565 fullyLock(); 566 try { 567 for (Node<E> p = head.next; p != null; p = p.next) 568 if (o.equals(p.item)) 569 return true; 570 return false; 571 } finally { 572 fullyUnlock(); 573 } 574 } 575 576 /** 577 * Returns an array containing all of the elements in this queue, in 578 * proper sequence. 579 * 580 * <p>The returned array will be "safe" in that no references to it are 581 * maintained by this queue. (In other words, this method must allocate 582 * a new array). The caller is thus free to modify the returned array. 583 * 584 * <p>This method acts as bridge between array-based and collection-based 585 * APIs. 586 * 587 * @return an array containing all of the elements in this queue 588 */ toArray()589 public Object[] toArray() { 590 fullyLock(); 591 try { 592 int size = count.get(); 593 Object[] a = new Object[size]; 594 int k = 0; 595 for (Node<E> p = head.next; p != null; p = p.next) 596 a[k++] = p.item; 597 return a; 598 } finally { 599 fullyUnlock(); 600 } 601 } 602 603 /** 604 * Returns an array containing all of the elements in this queue, in 605 * proper sequence; the runtime type of the returned array is that of 606 * the specified array. If the queue fits in the specified array, it 607 * is returned therein. Otherwise, a new array is allocated with the 608 * runtime type of the specified array and the size of this queue. 609 * 610 * <p>If this queue fits in the specified array with room to spare 611 * (i.e., the array has more elements than this queue), the element in 612 * the array immediately following the end of the queue is set to 613 * {@code null}. 614 * 615 * <p>Like the {@link #toArray()} method, this method acts as bridge between 616 * array-based and collection-based APIs. Further, this method allows 617 * precise control over the runtime type of the output array, and may, 618 * under certain circumstances, be used to save allocation costs. 619 * 620 * <p>Suppose {@code x} is a queue known to contain only strings. 621 * The following code can be used to dump the queue into a newly 622 * allocated array of {@code String}: 623 * 624 * <pre> {@code String[] y = x.toArray(new String[0]);}</pre> 625 * 626 * Note that {@code toArray(new Object[0])} is identical in function to 627 * {@code toArray()}. 628 * 629 * @param a the array into which the elements of the queue are to 630 * be stored, if it is big enough; otherwise, a new array of the 631 * same runtime type is allocated for this purpose 632 * @return an array containing all of the elements in this queue 633 * @throws ArrayStoreException if the runtime type of the specified array 634 * is not a supertype of the runtime type of every element in 635 * this queue 636 * @throws NullPointerException if the specified array is null 637 */ 638 @SuppressWarnings("unchecked") toArray(T[] a)639 public <T> T[] toArray(T[] a) { 640 fullyLock(); 641 try { 642 int size = count.get(); 643 if (a.length < size) 644 a = (T[])java.lang.reflect.Array.newInstance 645 (a.getClass().getComponentType(), size); 646 647 int k = 0; 648 for (Node<E> p = head.next; p != null; p = p.next) 649 a[k++] = (T)p.item; 650 if (a.length > k) 651 a[k] = null; 652 return a; 653 } finally { 654 fullyUnlock(); 655 } 656 } 657 toString()658 public String toString() { 659 return Helpers.collectionToString(this); 660 } 661 662 /** 663 * Atomically removes all of the elements from this queue. 664 * The queue will be empty after this call returns. 665 */ clear()666 public void clear() { 667 fullyLock(); 668 try { 669 for (Node<E> p, h = head; (p = h.next) != null; h = p) { 670 h.next = h; 671 p.item = null; 672 } 673 head = last; 674 // assert head.item == null && head.next == null; 675 if (count.getAndSet(0) == capacity) 676 notFull.signal(); 677 } finally { 678 fullyUnlock(); 679 } 680 } 681 682 /** 683 * @throws UnsupportedOperationException {@inheritDoc} 684 * @throws ClassCastException {@inheritDoc} 685 * @throws NullPointerException {@inheritDoc} 686 * @throws IllegalArgumentException {@inheritDoc} 687 */ drainTo(Collection<? super E> c)688 public int drainTo(Collection<? super E> c) { 689 return drainTo(c, Integer.MAX_VALUE); 690 } 691 692 /** 693 * @throws UnsupportedOperationException {@inheritDoc} 694 * @throws ClassCastException {@inheritDoc} 695 * @throws NullPointerException {@inheritDoc} 696 * @throws IllegalArgumentException {@inheritDoc} 697 */ drainTo(Collection<? super E> c, int maxElements)698 public int drainTo(Collection<? super E> c, int maxElements) { 699 Objects.requireNonNull(c); 700 if (c == this) 701 throw new IllegalArgumentException(); 702 if (maxElements <= 0) 703 return 0; 704 boolean signalNotFull = false; 705 final ReentrantLock takeLock = this.takeLock; 706 takeLock.lock(); 707 try { 708 int n = Math.min(maxElements, count.get()); 709 // count.get provides visibility to first n Nodes 710 Node<E> h = head; 711 int i = 0; 712 try { 713 while (i < n) { 714 Node<E> p = h.next; 715 c.add(p.item); 716 p.item = null; 717 h.next = h; 718 h = p; 719 ++i; 720 } 721 return n; 722 } finally { 723 // Restore invariants even if c.add() threw 724 if (i > 0) { 725 // assert h.item == null; 726 head = h; 727 signalNotFull = (count.getAndAdd(-i) == capacity); 728 } 729 } 730 } finally { 731 takeLock.unlock(); 732 if (signalNotFull) 733 signalNotFull(); 734 } 735 } 736 737 /** 738 * Used for any element traversal that is not entirely under lock. 739 * Such traversals must handle both: 740 * - dequeued nodes (p.next == p) 741 * - (possibly multiple) interior removed nodes (p.item == null) 742 */ succ(Node<E> p)743 Node<E> succ(Node<E> p) { 744 if (p == (p = p.next)) 745 p = head.next; 746 return p; 747 } 748 749 /** 750 * Returns an iterator over the elements in this queue in proper sequence. 751 * The elements will be returned in order from first (head) to last (tail). 752 * 753 * <p>The returned iterator is 754 * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>. 755 * 756 * @return an iterator over the elements in this queue in proper sequence 757 */ iterator()758 public Iterator<E> iterator() { 759 return new Itr(); 760 } 761 762 /** 763 * Weakly-consistent iterator. 764 * 765 * Lazily updated ancestor field provides expected O(1) remove(), 766 * but still O(n) in the worst case, whenever the saved ancestor 767 * is concurrently deleted. 768 */ 769 private class Itr implements Iterator<E> { 770 private Node<E> next; // Node holding nextItem 771 private E nextItem; // next item to hand out 772 private Node<E> lastRet; 773 private Node<E> ancestor; // Helps unlink lastRet on remove() 774 Itr()775 Itr() { 776 fullyLock(); 777 try { 778 if ((next = head.next) != null) 779 nextItem = next.item; 780 } finally { 781 fullyUnlock(); 782 } 783 } 784 hasNext()785 public boolean hasNext() { 786 return next != null; 787 } 788 next()789 public E next() { 790 Node<E> p; 791 if ((p = next) == null) 792 throw new NoSuchElementException(); 793 lastRet = p; 794 E x = nextItem; 795 fullyLock(); 796 try { 797 E e = null; 798 for (p = p.next; p != null && (e = p.item) == null; ) 799 p = succ(p); 800 next = p; 801 nextItem = e; 802 } finally { 803 fullyUnlock(); 804 } 805 return x; 806 } 807 forEachRemaining(Consumer<? super E> action)808 public void forEachRemaining(Consumer<? super E> action) { 809 // A variant of forEachFrom 810 Objects.requireNonNull(action); 811 Node<E> p; 812 if ((p = next) == null) return; 813 lastRet = p; 814 next = null; 815 final int batchSize = 64; 816 Object[] es = null; 817 int n, len = 1; 818 do { 819 fullyLock(); 820 try { 821 if (es == null) { 822 p = p.next; 823 for (Node<E> q = p; q != null; q = succ(q)) 824 if (q.item != null && ++len == batchSize) 825 break; 826 es = new Object[len]; 827 es[0] = nextItem; 828 nextItem = null; 829 n = 1; 830 } else 831 n = 0; 832 for (; p != null && n < len; p = succ(p)) 833 if ((es[n] = p.item) != null) { 834 lastRet = p; 835 n++; 836 } 837 } finally { 838 fullyUnlock(); 839 } 840 for (int i = 0; i < n; i++) { 841 @SuppressWarnings("unchecked") E e = (E) es[i]; 842 action.accept(e); 843 } 844 } while (n > 0 && p != null); 845 } 846 remove()847 public void remove() { 848 Node<E> p = lastRet; 849 if (p == null) 850 throw new IllegalStateException(); 851 lastRet = null; 852 fullyLock(); 853 try { 854 if (p.item != null) { 855 if (ancestor == null) 856 ancestor = head; 857 ancestor = findPred(p, ancestor); 858 unlink(p, ancestor); 859 } 860 } finally { 861 fullyUnlock(); 862 } 863 } 864 } 865 866 /** 867 * A customized variant of Spliterators.IteratorSpliterator. 868 * Keep this class in sync with (very similar) LBDSpliterator. 869 */ 870 private final class LBQSpliterator implements Spliterator<E> { 871 static final int MAX_BATCH = 1 << 25; // max batch array size; 872 Node<E> current; // current node; null until initialized 873 int batch; // batch size for splits 874 boolean exhausted; // true when no more nodes 875 long est = size(); // size estimate 876 LBQSpliterator()877 LBQSpliterator() {} 878 estimateSize()879 public long estimateSize() { return est; } 880 trySplit()881 public Spliterator<E> trySplit() { 882 Node<E> h; 883 if (!exhausted && 884 ((h = current) != null || (h = head.next) != null) 885 && h.next != null) { 886 int n = batch = Math.min(batch + 1, MAX_BATCH); 887 Object[] a = new Object[n]; 888 int i = 0; 889 Node<E> p = current; 890 fullyLock(); 891 try { 892 if (p != null || (p = head.next) != null) 893 for (; p != null && i < n; p = succ(p)) 894 if ((a[i] = p.item) != null) 895 i++; 896 } finally { 897 fullyUnlock(); 898 } 899 if ((current = p) == null) { 900 est = 0L; 901 exhausted = true; 902 } 903 else if ((est -= i) < 0L) 904 est = 0L; 905 if (i > 0) 906 return Spliterators.spliterator 907 (a, 0, i, (Spliterator.ORDERED | 908 Spliterator.NONNULL | 909 Spliterator.CONCURRENT)); 910 } 911 return null; 912 } 913 tryAdvance(Consumer<? super E> action)914 public boolean tryAdvance(Consumer<? super E> action) { 915 Objects.requireNonNull(action); 916 if (!exhausted) { 917 E e = null; 918 fullyLock(); 919 try { 920 Node<E> p; 921 if ((p = current) != null || (p = head.next) != null) 922 do { 923 e = p.item; 924 p = succ(p); 925 } while (e == null && p != null); 926 if ((current = p) == null) 927 exhausted = true; 928 } finally { 929 fullyUnlock(); 930 } 931 if (e != null) { 932 action.accept(e); 933 return true; 934 } 935 } 936 return false; 937 } 938 forEachRemaining(Consumer<? super E> action)939 public void forEachRemaining(Consumer<? super E> action) { 940 Objects.requireNonNull(action); 941 if (!exhausted) { 942 exhausted = true; 943 Node<E> p = current; 944 current = null; 945 forEachFrom(action, p); 946 } 947 } 948 characteristics()949 public int characteristics() { 950 return (Spliterator.ORDERED | 951 Spliterator.NONNULL | 952 Spliterator.CONCURRENT); 953 } 954 } 955 956 /** 957 * Returns a {@link Spliterator} over the elements in this queue. 958 * 959 * <p>The returned spliterator is 960 * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>. 961 * 962 * <p>The {@code Spliterator} reports {@link Spliterator#CONCURRENT}, 963 * {@link Spliterator#ORDERED}, and {@link Spliterator#NONNULL}. 964 * 965 * @implNote 966 * The {@code Spliterator} implements {@code trySplit} to permit limited 967 * parallelism. 968 * 969 * @return a {@code Spliterator} over the elements in this queue 970 * @since 1.8 971 */ spliterator()972 public Spliterator<E> spliterator() { 973 return new LBQSpliterator(); 974 } 975 976 /** 977 * @throws NullPointerException {@inheritDoc} 978 */ forEach(Consumer<? super E> action)979 public void forEach(Consumer<? super E> action) { 980 Objects.requireNonNull(action); 981 forEachFrom(action, null); 982 } 983 984 /** 985 * Runs action on each element found during a traversal starting at p. 986 * If p is null, traversal starts at head. 987 */ forEachFrom(Consumer<? super E> action, Node<E> p)988 void forEachFrom(Consumer<? super E> action, Node<E> p) { 989 // Extract batches of elements while holding the lock; then 990 // run the action on the elements while not 991 final int batchSize = 64; // max number of elements per batch 992 Object[] es = null; // container for batch of elements 993 int n, len = 0; 994 do { 995 fullyLock(); 996 try { 997 if (es == null) { 998 if (p == null) p = head.next; 999 for (Node<E> q = p; q != null; q = succ(q)) 1000 if (q.item != null && ++len == batchSize) 1001 break; 1002 es = new Object[len]; 1003 } 1004 for (n = 0; p != null && n < len; p = succ(p)) 1005 if ((es[n] = p.item) != null) 1006 n++; 1007 } finally { 1008 fullyUnlock(); 1009 } 1010 for (int i = 0; i < n; i++) { 1011 @SuppressWarnings("unchecked") E e = (E) es[i]; 1012 action.accept(e); 1013 } 1014 } while (n > 0 && p != null); 1015 } 1016 1017 /** 1018 * @throws NullPointerException {@inheritDoc} 1019 */ removeIf(Predicate<? super E> filter)1020 public boolean removeIf(Predicate<? super E> filter) { 1021 Objects.requireNonNull(filter); 1022 return bulkRemove(filter); 1023 } 1024 1025 /** 1026 * @throws NullPointerException {@inheritDoc} 1027 */ removeAll(Collection<?> c)1028 public boolean removeAll(Collection<?> c) { 1029 Objects.requireNonNull(c); 1030 return bulkRemove(e -> c.contains(e)); 1031 } 1032 1033 /** 1034 * @throws NullPointerException {@inheritDoc} 1035 */ retainAll(Collection<?> c)1036 public boolean retainAll(Collection<?> c) { 1037 Objects.requireNonNull(c); 1038 return bulkRemove(e -> !c.contains(e)); 1039 } 1040 1041 /** 1042 * Returns the predecessor of live node p, given a node that was 1043 * once a live ancestor of p (or head); allows unlinking of p. 1044 */ findPred(Node<E> p, Node<E> ancestor)1045 Node<E> findPred(Node<E> p, Node<E> ancestor) { 1046 // assert p.item != null; 1047 if (ancestor.item == null) 1048 ancestor = head; 1049 // Fails with NPE if precondition not satisfied 1050 for (Node<E> q; (q = ancestor.next) != p; ) 1051 ancestor = q; 1052 return ancestor; 1053 } 1054 1055 /** Implementation of bulk remove methods. */ 1056 @SuppressWarnings("unchecked") bulkRemove(Predicate<? super E> filter)1057 private boolean bulkRemove(Predicate<? super E> filter) { 1058 boolean removed = false; 1059 Node<E> p = null, ancestor = head; 1060 Node<E>[] nodes = null; 1061 int n, len = 0; 1062 do { 1063 // 1. Extract batch of up to 64 elements while holding the lock. 1064 fullyLock(); 1065 try { 1066 if (nodes == null) { // first batch; initialize 1067 p = head.next; 1068 for (Node<E> q = p; q != null; q = succ(q)) 1069 if (q.item != null && ++len == 64) 1070 break; 1071 nodes = (Node<E>[]) new Node<?>[len]; 1072 } 1073 for (n = 0; p != null && n < len; p = succ(p)) 1074 nodes[n++] = p; 1075 } finally { 1076 fullyUnlock(); 1077 } 1078 1079 // 2. Run the filter on the elements while lock is free. 1080 long deathRow = 0L; // "bitset" of size 64 1081 for (int i = 0; i < n; i++) { 1082 final E e; 1083 if ((e = nodes[i].item) != null && filter.test(e)) 1084 deathRow |= 1L << i; 1085 } 1086 1087 // 3. Remove any filtered elements while holding the lock. 1088 if (deathRow != 0) { 1089 fullyLock(); 1090 try { 1091 for (int i = 0; i < n; i++) { 1092 final Node<E> q; 1093 if ((deathRow & (1L << i)) != 0L 1094 && (q = nodes[i]).item != null) { 1095 ancestor = findPred(q, ancestor); 1096 unlink(q, ancestor); 1097 removed = true; 1098 } 1099 nodes[i] = null; // help GC 1100 } 1101 } finally { 1102 fullyUnlock(); 1103 } 1104 } 1105 } while (n > 0 && p != null); 1106 return removed; 1107 } 1108 1109 /** 1110 * Saves this queue to a stream (that is, serializes it). 1111 * 1112 * @param s the stream 1113 * @throws java.io.IOException if an I/O error occurs 1114 * @serialData The capacity is emitted (int), followed by all of 1115 * its elements (each an {@code Object}) in the proper order, 1116 * followed by a null 1117 */ writeObject(java.io.ObjectOutputStream s)1118 private void writeObject(java.io.ObjectOutputStream s) 1119 throws java.io.IOException { 1120 1121 fullyLock(); 1122 try { 1123 // Write out any hidden stuff, plus capacity 1124 s.defaultWriteObject(); 1125 1126 // Write out all elements in the proper order. 1127 for (Node<E> p = head.next; p != null; p = p.next) 1128 s.writeObject(p.item); 1129 1130 // Use trailing null as sentinel 1131 s.writeObject(null); 1132 } finally { 1133 fullyUnlock(); 1134 } 1135 } 1136 1137 /** 1138 * Reconstitutes this queue from a stream (that is, deserializes it). 1139 * @param s the stream 1140 * @throws ClassNotFoundException if the class of a serialized object 1141 * could not be found 1142 * @throws java.io.IOException if an I/O error occurs 1143 */ readObject(java.io.ObjectInputStream s)1144 private void readObject(java.io.ObjectInputStream s) 1145 throws java.io.IOException, ClassNotFoundException { 1146 // Read in capacity, and any hidden stuff 1147 s.defaultReadObject(); 1148 1149 count.set(0); 1150 last = head = new Node<E>(null); 1151 1152 // Read in all elements and place in queue 1153 for (;;) { 1154 @SuppressWarnings("unchecked") 1155 E item = (E)s.readObject(); 1156 if (item == null) 1157 break; 1158 add(item); 1159 } 1160 } 1161 } 1162