1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea and Martin Buchholz with assistance from members of 32 * JCP JSR-166 Expert Group and released to the public domain, as explained 33 * at http://creativecommons.org/publicdomain/zero/1.0/ 34 */ 35 36 package java.util.concurrent; 37 38 import java.lang.invoke.MethodHandles; 39 import java.lang.invoke.VarHandle; 40 import java.util.AbstractQueue; 41 import java.util.Arrays; 42 import java.util.Collection; 43 import java.util.Iterator; 44 import java.util.NoSuchElementException; 45 import java.util.Objects; 46 import java.util.Queue; 47 import java.util.Spliterator; 48 import java.util.Spliterators; 49 import java.util.function.Consumer; 50 import java.util.function.Predicate; 51 52 /** 53 * An unbounded thread-safe {@linkplain Queue queue} based on linked nodes. 54 * This queue orders elements FIFO (first-in-first-out). 55 * The <em>head</em> of the queue is that element that has been on the 56 * queue the longest time. 57 * The <em>tail</em> of the queue is that element that has been on the 58 * queue the shortest time. New elements 59 * are inserted at the tail of the queue, and the queue retrieval 60 * operations obtain elements at the head of the queue. 61 * A {@code ConcurrentLinkedQueue} is an appropriate choice when 62 * many threads will share access to a common collection. 63 * Like most other concurrent collection implementations, this class 64 * does not permit the use of {@code null} elements. 65 * 66 * <p>This implementation employs an efficient <em>non-blocking</em> 67 * algorithm based on one described in 68 * <a href="http://www.cs.rochester.edu/~scott/papers/1996_PODC_queues.pdf"> 69 * Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue 70 * Algorithms</a> by Maged M. Michael and Michael L. Scott. 71 * 72 * <p>Iterators are <i>weakly consistent</i>, returning elements 73 * reflecting the state of the queue at some point at or since the 74 * creation of the iterator. They do <em>not</em> throw {@link 75 * java.util.ConcurrentModificationException}, and may proceed concurrently 76 * with other operations. Elements contained in the queue since the creation 77 * of the iterator will be returned exactly once. 78 * 79 * <p>Beware that, unlike in most collections, the {@code size} method 80 * is <em>NOT</em> a constant-time operation. Because of the 81 * asynchronous nature of these queues, determining the current number 82 * of elements requires a traversal of the elements, and so may report 83 * inaccurate results if this collection is modified during traversal. 84 * 85 * <p>Bulk operations that add, remove, or examine multiple elements, 86 * such as {@link #addAll}, {@link #removeIf} or {@link #forEach}, 87 * are <em>not</em> guaranteed to be performed atomically. 88 * For example, a {@code forEach} traversal concurrent with an {@code 89 * addAll} operation might observe only some of the added elements. 90 * 91 * <p>This class and its iterator implement all of the <em>optional</em> 92 * methods of the {@link Queue} and {@link Iterator} interfaces. 93 * 94 * <p>Memory consistency effects: As with other concurrent 95 * collections, actions in a thread prior to placing an object into a 96 * {@code ConcurrentLinkedQueue} 97 * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a> 98 * actions subsequent to the access or removal of that element from 99 * the {@code ConcurrentLinkedQueue} in another thread. 100 * 101 * <p>This class is a member of the 102 * <a href="{@docRoot}/java.base/java/util/package-summary.html#CollectionsFramework"> 103 * Java Collections Framework</a>. 104 * 105 * @since 1.5 106 * @author Doug Lea 107 * @param <E> the type of elements held in this queue 108 */ 109 public class ConcurrentLinkedQueue<E> extends AbstractQueue<E> 110 implements Queue<E>, java.io.Serializable { 111 private static final long serialVersionUID = 196745693267521676L; 112 113 /* 114 * This is a modification of the Michael & Scott algorithm, 115 * adapted for a garbage-collected environment, with support for 116 * interior node deletion (to support e.g. remove(Object)). For 117 * explanation, read the paper. 118 * 119 * Note that like most non-blocking algorithms in this package, 120 * this implementation relies on the fact that in garbage 121 * collected systems, there is no possibility of ABA problems due 122 * to recycled nodes, so there is no need to use "counted 123 * pointers" or related techniques seen in versions used in 124 * non-GC'ed settings. 125 * 126 * The fundamental invariants are: 127 * - There is exactly one (last) Node with a null next reference, 128 * which is CASed when enqueueing. This last Node can be 129 * reached in O(1) time from tail, but tail is merely an 130 * optimization - it can always be reached in O(N) time from 131 * head as well. 132 * - The elements contained in the queue are the non-null items in 133 * Nodes that are reachable from head. CASing the item 134 * reference of a Node to null atomically removes it from the 135 * queue. Reachability of all elements from head must remain 136 * true even in the case of concurrent modifications that cause 137 * head to advance. A dequeued Node may remain in use 138 * indefinitely due to creation of an Iterator or simply a 139 * poll() that has lost its time slice. 140 * 141 * The above might appear to imply that all Nodes are GC-reachable 142 * from a predecessor dequeued Node. That would cause two problems: 143 * - allow a rogue Iterator to cause unbounded memory retention 144 * - cause cross-generational linking of old Nodes to new Nodes if 145 * a Node was tenured while live, which generational GCs have a 146 * hard time dealing with, causing repeated major collections. 147 * However, only non-deleted Nodes need to be reachable from 148 * dequeued Nodes, and reachability does not necessarily have to 149 * be of the kind understood by the GC. We use the trick of 150 * linking a Node that has just been dequeued to itself. Such a 151 * self-link implicitly means to advance to head. 152 * 153 * Both head and tail are permitted to lag. In fact, failing to 154 * update them every time one could is a significant optimization 155 * (fewer CASes). As with LinkedTransferQueue (see the internal 156 * documentation for that class), we use a slack threshold of two; 157 * that is, we update head/tail when the current pointer appears 158 * to be two or more steps away from the first/last node. 159 * 160 * Since head and tail are updated concurrently and independently, 161 * it is possible for tail to lag behind head (why not)? 162 * 163 * CASing a Node's item reference to null atomically removes the 164 * element from the queue, leaving a "dead" node that should later 165 * be unlinked (but unlinking is merely an optimization). 166 * Interior element removal methods (other than Iterator.remove()) 167 * keep track of the predecessor node during traversal so that the 168 * node can be CAS-unlinked. Some traversal methods try to unlink 169 * any deleted nodes encountered during traversal. See comments 170 * in bulkRemove. 171 * 172 * When constructing a Node (before enqueuing it) we avoid paying 173 * for a volatile write to item. This allows the cost of enqueue 174 * to be "one-and-a-half" CASes. 175 * 176 * Both head and tail may or may not point to a Node with a 177 * non-null item. If the queue is empty, all items must of course 178 * be null. Upon creation, both head and tail refer to a dummy 179 * Node with null item. Both head and tail are only updated using 180 * CAS, so they never regress, although again this is merely an 181 * optimization. 182 */ 183 184 static final class Node<E> { 185 volatile E item; 186 volatile Node<E> next; 187 188 /** 189 * Constructs a node holding item. Uses relaxed write because 190 * item can only be seen after piggy-backing publication via CAS. 191 */ Node(E item)192 Node(E item) { 193 ITEM.set(this, item); 194 } 195 196 /** Constructs a dead dummy node. */ Node()197 Node() {} 198 appendRelaxed(Node<E> next)199 void appendRelaxed(Node<E> next) { 200 // assert next != null; 201 // assert this.next == null; 202 NEXT.set(this, next); 203 } 204 casItem(E cmp, E val)205 boolean casItem(E cmp, E val) { 206 // assert item == cmp || item == null; 207 // assert cmp != null; 208 // assert val == null; 209 return ITEM.compareAndSet(this, cmp, val); 210 } 211 } 212 213 /** 214 * A node from which the first live (non-deleted) node (if any) 215 * can be reached in O(1) time. 216 * Invariants: 217 * - all live nodes are reachable from head via succ() 218 * - head != null 219 * - (tmp = head).next != tmp || tmp != head 220 * Non-invariants: 221 * - head.item may or may not be null. 222 * - it is permitted for tail to lag behind head, that is, for tail 223 * to not be reachable from head! 224 */ 225 transient volatile Node<E> head; 226 227 /** 228 * A node from which the last node on list (that is, the unique 229 * node with node.next == null) can be reached in O(1) time. 230 * Invariants: 231 * - the last node is always reachable from tail via succ() 232 * - tail != null 233 * Non-invariants: 234 * - tail.item may or may not be null. 235 * - it is permitted for tail to lag behind head, that is, for tail 236 * to not be reachable from head! 237 * - tail.next may or may not be self-linked. 238 */ 239 private transient volatile Node<E> tail; 240 241 /** 242 * Creates a {@code ConcurrentLinkedQueue} that is initially empty. 243 */ ConcurrentLinkedQueue()244 public ConcurrentLinkedQueue() { 245 head = tail = new Node<E>(); 246 } 247 248 /** 249 * Creates a {@code ConcurrentLinkedQueue} 250 * initially containing the elements of the given collection, 251 * added in traversal order of the collection's iterator. 252 * 253 * @param c the collection of elements to initially contain 254 * @throws NullPointerException if the specified collection or any 255 * of its elements are null 256 */ ConcurrentLinkedQueue(Collection<? extends E> c)257 public ConcurrentLinkedQueue(Collection<? extends E> c) { 258 Node<E> h = null, t = null; 259 for (E e : c) { 260 Node<E> newNode = new Node<E>(Objects.requireNonNull(e)); 261 if (h == null) 262 h = t = newNode; 263 else 264 t.appendRelaxed(t = newNode); 265 } 266 if (h == null) 267 h = t = new Node<E>(); 268 head = h; 269 tail = t; 270 } 271 272 // Have to override just to update the javadoc 273 274 /** 275 * Inserts the specified element at the tail of this queue. 276 * As the queue is unbounded, this method will never throw 277 * {@link IllegalStateException} or return {@code false}. 278 * 279 * @return {@code true} (as specified by {@link Collection#add}) 280 * @throws NullPointerException if the specified element is null 281 */ add(E e)282 public boolean add(E e) { 283 return offer(e); 284 } 285 286 /** 287 * Tries to CAS head to p. If successful, repoint old head to itself 288 * as sentinel for succ(), below. 289 */ updateHead(Node<E> h, Node<E> p)290 final void updateHead(Node<E> h, Node<E> p) { 291 // assert h != null && p != null && (h == p || h.item == null); 292 if (h != p && HEAD.compareAndSet(this, h, p)) 293 NEXT.setRelease(h, h); 294 } 295 296 /** 297 * Returns the successor of p, or the head node if p.next has been 298 * linked to self, which will only be true if traversing with a 299 * stale pointer that is now off the list. 300 */ succ(Node<E> p)301 final Node<E> succ(Node<E> p) { 302 if (p == (p = p.next)) 303 p = head; 304 return p; 305 } 306 307 /** 308 * Tries to CAS pred.next (or head, if pred is null) from c to p. 309 * Caller must ensure that we're not unlinking the trailing node. 310 */ tryCasSuccessor(Node<E> pred, Node<E> c, Node<E> p)311 private boolean tryCasSuccessor(Node<E> pred, Node<E> c, Node<E> p) { 312 // assert p != null; 313 // assert c.item == null; 314 // assert c != p; 315 if (pred != null) 316 return NEXT.compareAndSet(pred, c, p); 317 if (HEAD.compareAndSet(this, c, p)) { 318 NEXT.setRelease(c, c); 319 return true; 320 } 321 return false; 322 } 323 324 /** 325 * Collapse dead nodes between pred and q. 326 * @param pred the last known live node, or null if none 327 * @param c the first dead node 328 * @param p the last dead node 329 * @param q p.next: the next live node, or null if at end 330 * @return either old pred or p if pred dead or CAS failed 331 */ skipDeadNodes(Node<E> pred, Node<E> c, Node<E> p, Node<E> q)332 private Node<E> skipDeadNodes(Node<E> pred, Node<E> c, Node<E> p, Node<E> q) { 333 // assert pred != c; 334 // assert p != q; 335 // assert c.item == null; 336 // assert p.item == null; 337 if (q == null) { 338 // Never unlink trailing node. 339 if (c == p) return pred; 340 q = p; 341 } 342 return (tryCasSuccessor(pred, c, q) 343 && (pred == null || ITEM.get(pred) != null)) 344 ? pred : p; 345 } 346 347 /** 348 * Inserts the specified element at the tail of this queue. 349 * As the queue is unbounded, this method will never return {@code false}. 350 * 351 * @return {@code true} (as specified by {@link Queue#offer}) 352 * @throws NullPointerException if the specified element is null 353 */ offer(E e)354 public boolean offer(E e) { 355 final Node<E> newNode = new Node<E>(Objects.requireNonNull(e)); 356 357 for (Node<E> t = tail, p = t;;) { 358 Node<E> q = p.next; 359 if (q == null) { 360 // p is last node 361 if (NEXT.compareAndSet(p, null, newNode)) { 362 // Successful CAS is the linearization point 363 // for e to become an element of this queue, 364 // and for newNode to become "live". 365 if (p != t) // hop two nodes at a time; failure is OK 366 TAIL.weakCompareAndSet(this, t, newNode); 367 return true; 368 } 369 // Lost CAS race to another thread; re-read next 370 } 371 else if (p == q) 372 // We have fallen off list. If tail is unchanged, it 373 // will also be off-list, in which case we need to 374 // jump to head, from which all live nodes are always 375 // reachable. Else the new tail is a better bet. 376 p = (t != (t = tail)) ? t : head; 377 else 378 // Check for tail updates after two hops. 379 p = (p != t && t != (t = tail)) ? t : q; 380 } 381 } 382 poll()383 public E poll() { 384 restartFromHead: for (;;) { 385 for (Node<E> h = head, p = h, q;; p = q) { 386 final E item; 387 if ((item = p.item) != null && p.casItem(item, null)) { 388 // Successful CAS is the linearization point 389 // for item to be removed from this queue. 390 if (p != h) // hop two nodes at a time 391 updateHead(h, ((q = p.next) != null) ? q : p); 392 return item; 393 } 394 else if ((q = p.next) == null) { 395 updateHead(h, p); 396 return null; 397 } 398 else if (p == q) 399 continue restartFromHead; 400 } 401 } 402 } 403 peek()404 public E peek() { 405 restartFromHead: for (;;) { 406 for (Node<E> h = head, p = h, q;; p = q) { 407 final E item; 408 if ((item = p.item) != null 409 || (q = p.next) == null) { 410 updateHead(h, p); 411 return item; 412 } 413 else if (p == q) 414 continue restartFromHead; 415 } 416 } 417 } 418 419 /** 420 * Returns the first live (non-deleted) node on list, or null if none. 421 * This is yet another variant of poll/peek; here returning the 422 * first node, not element. We could make peek() a wrapper around 423 * first(), but that would cost an extra volatile read of item, 424 * and the need to add a retry loop to deal with the possibility 425 * of losing a race to a concurrent poll(). 426 */ first()427 Node<E> first() { 428 restartFromHead: for (;;) { 429 for (Node<E> h = head, p = h, q;; p = q) { 430 boolean hasItem = (p.item != null); 431 if (hasItem || (q = p.next) == null) { 432 updateHead(h, p); 433 return hasItem ? p : null; 434 } 435 else if (p == q) 436 continue restartFromHead; 437 } 438 } 439 } 440 441 /** 442 * Returns {@code true} if this queue contains no elements. 443 * 444 * @return {@code true} if this queue contains no elements 445 */ isEmpty()446 public boolean isEmpty() { 447 return first() == null; 448 } 449 450 /** 451 * Returns the number of elements in this queue. If this queue 452 * contains more than {@code Integer.MAX_VALUE} elements, returns 453 * {@code Integer.MAX_VALUE}. 454 * 455 * <p>Beware that, unlike in most collections, this method is 456 * <em>NOT</em> a constant-time operation. Because of the 457 * asynchronous nature of these queues, determining the current 458 * number of elements requires an O(n) traversal. 459 * Additionally, if elements are added or removed during execution 460 * of this method, the returned result may be inaccurate. Thus, 461 * this method is typically not very useful in concurrent 462 * applications. 463 * 464 * @return the number of elements in this queue 465 */ size()466 public int size() { 467 restartFromHead: for (;;) { 468 int count = 0; 469 for (Node<E> p = first(); p != null;) { 470 if (p.item != null) 471 if (++count == Integer.MAX_VALUE) 472 break; // @see Collection.size() 473 if (p == (p = p.next)) 474 continue restartFromHead; 475 } 476 return count; 477 } 478 } 479 480 /** 481 * Returns {@code true} if this queue contains the specified element. 482 * More formally, returns {@code true} if and only if this queue contains 483 * at least one element {@code e} such that {@code o.equals(e)}. 484 * 485 * @param o object to be checked for containment in this queue 486 * @return {@code true} if this queue contains the specified element 487 */ contains(Object o)488 public boolean contains(Object o) { 489 if (o == null) return false; 490 restartFromHead: for (;;) { 491 for (Node<E> p = head, pred = null; p != null; ) { 492 Node<E> q = p.next; 493 final E item; 494 if ((item = p.item) != null) { 495 if (o.equals(item)) 496 return true; 497 pred = p; p = q; continue; 498 } 499 for (Node<E> c = p;; q = p.next) { 500 if (q == null || q.item != null) { 501 pred = skipDeadNodes(pred, c, p, q); p = q; break; 502 } 503 if (p == (p = q)) continue restartFromHead; 504 } 505 } 506 return false; 507 } 508 } 509 510 /** 511 * Removes a single instance of the specified element from this queue, 512 * if it is present. More formally, removes an element {@code e} such 513 * that {@code o.equals(e)}, if this queue contains one or more such 514 * elements. 515 * Returns {@code true} if this queue contained the specified element 516 * (or equivalently, if this queue changed as a result of the call). 517 * 518 * @param o element to be removed from this queue, if present 519 * @return {@code true} if this queue changed as a result of the call 520 */ remove(Object o)521 public boolean remove(Object o) { 522 if (o == null) return false; 523 restartFromHead: for (;;) { 524 for (Node<E> p = head, pred = null; p != null; ) { 525 Node<E> q = p.next; 526 final E item; 527 if ((item = p.item) != null) { 528 if (o.equals(item) && p.casItem(item, null)) { 529 skipDeadNodes(pred, p, p, q); 530 return true; 531 } 532 pred = p; p = q; continue; 533 } 534 for (Node<E> c = p;; q = p.next) { 535 if (q == null || q.item != null) { 536 pred = skipDeadNodes(pred, c, p, q); p = q; break; 537 } 538 if (p == (p = q)) continue restartFromHead; 539 } 540 } 541 return false; 542 } 543 } 544 545 /** 546 * Appends all of the elements in the specified collection to the end of 547 * this queue, in the order that they are returned by the specified 548 * collection's iterator. Attempts to {@code addAll} of a queue to 549 * itself result in {@code IllegalArgumentException}. 550 * 551 * @param c the elements to be inserted into this queue 552 * @return {@code true} if this queue changed as a result of the call 553 * @throws NullPointerException if the specified collection or any 554 * of its elements are null 555 * @throws IllegalArgumentException if the collection is this queue 556 */ addAll(Collection<? extends E> c)557 public boolean addAll(Collection<? extends E> c) { 558 if (c == this) 559 // As historically specified in AbstractQueue#addAll 560 throw new IllegalArgumentException(); 561 562 // Copy c into a private chain of Nodes 563 Node<E> beginningOfTheEnd = null, last = null; 564 for (E e : c) { 565 Node<E> newNode = new Node<E>(Objects.requireNonNull(e)); 566 if (beginningOfTheEnd == null) 567 beginningOfTheEnd = last = newNode; 568 else 569 last.appendRelaxed(last = newNode); 570 } 571 if (beginningOfTheEnd == null) 572 return false; 573 574 // Atomically append the chain at the tail of this collection 575 for (Node<E> t = tail, p = t;;) { 576 Node<E> q = p.next; 577 if (q == null) { 578 // p is last node 579 if (NEXT.compareAndSet(p, null, beginningOfTheEnd)) { 580 // Successful CAS is the linearization point 581 // for all elements to be added to this queue. 582 if (!TAIL.weakCompareAndSet(this, t, last)) { 583 // Try a little harder to update tail, 584 // since we may be adding many elements. 585 t = tail; 586 if (last.next == null) 587 TAIL.weakCompareAndSet(this, t, last); 588 } 589 return true; 590 } 591 // Lost CAS race to another thread; re-read next 592 } 593 else if (p == q) 594 // We have fallen off list. If tail is unchanged, it 595 // will also be off-list, in which case we need to 596 // jump to head, from which all live nodes are always 597 // reachable. Else the new tail is a better bet. 598 p = (t != (t = tail)) ? t : head; 599 else 600 // Check for tail updates after two hops. 601 p = (p != t && t != (t = tail)) ? t : q; 602 } 603 } 604 toString()605 public String toString() { 606 String[] a = null; 607 restartFromHead: for (;;) { 608 int charLength = 0; 609 int size = 0; 610 for (Node<E> p = first(); p != null;) { 611 final E item; 612 if ((item = p.item) != null) { 613 if (a == null) 614 a = new String[4]; 615 else if (size == a.length) 616 a = Arrays.copyOf(a, 2 * size); 617 String s = item.toString(); 618 a[size++] = s; 619 charLength += s.length(); 620 } 621 if (p == (p = p.next)) 622 continue restartFromHead; 623 } 624 625 if (size == 0) 626 return "[]"; 627 628 return Helpers.toString(a, size, charLength); 629 } 630 } 631 toArrayInternal(Object[] a)632 private Object[] toArrayInternal(Object[] a) { 633 Object[] x = a; 634 restartFromHead: for (;;) { 635 int size = 0; 636 for (Node<E> p = first(); p != null;) { 637 final E item; 638 if ((item = p.item) != null) { 639 if (x == null) 640 x = new Object[4]; 641 else if (size == x.length) 642 x = Arrays.copyOf(x, 2 * (size + 4)); 643 x[size++] = item; 644 } 645 if (p == (p = p.next)) 646 continue restartFromHead; 647 } 648 if (x == null) 649 return new Object[0]; 650 else if (a != null && size <= a.length) { 651 if (a != x) 652 System.arraycopy(x, 0, a, 0, size); 653 if (size < a.length) 654 a[size] = null; 655 return a; 656 } 657 return (size == x.length) ? x : Arrays.copyOf(x, size); 658 } 659 } 660 661 /** 662 * Returns an array containing all of the elements in this queue, in 663 * proper sequence. 664 * 665 * <p>The returned array will be "safe" in that no references to it are 666 * maintained by this queue. (In other words, this method must allocate 667 * a new array). The caller is thus free to modify the returned array. 668 * 669 * <p>This method acts as bridge between array-based and collection-based 670 * APIs. 671 * 672 * @return an array containing all of the elements in this queue 673 */ toArray()674 public Object[] toArray() { 675 return toArrayInternal(null); 676 } 677 678 /** 679 * Returns an array containing all of the elements in this queue, in 680 * proper sequence; the runtime type of the returned array is that of 681 * the specified array. If the queue fits in the specified array, it 682 * is returned therein. Otherwise, a new array is allocated with the 683 * runtime type of the specified array and the size of this queue. 684 * 685 * <p>If this queue fits in the specified array with room to spare 686 * (i.e., the array has more elements than this queue), the element in 687 * the array immediately following the end of the queue is set to 688 * {@code null}. 689 * 690 * <p>Like the {@link #toArray()} method, this method acts as bridge between 691 * array-based and collection-based APIs. Further, this method allows 692 * precise control over the runtime type of the output array, and may, 693 * under certain circumstances, be used to save allocation costs. 694 * 695 * <p>Suppose {@code x} is a queue known to contain only strings. 696 * The following code can be used to dump the queue into a newly 697 * allocated array of {@code String}: 698 * 699 * <pre> {@code String[] y = x.toArray(new String[0]);}</pre> 700 * 701 * Note that {@code toArray(new Object[0])} is identical in function to 702 * {@code toArray()}. 703 * 704 * @param a the array into which the elements of the queue are to 705 * be stored, if it is big enough; otherwise, a new array of the 706 * same runtime type is allocated for this purpose 707 * @return an array containing all of the elements in this queue 708 * @throws ArrayStoreException if the runtime type of the specified array 709 * is not a supertype of the runtime type of every element in 710 * this queue 711 * @throws NullPointerException if the specified array is null 712 */ 713 @SuppressWarnings("unchecked") toArray(T[] a)714 public <T> T[] toArray(T[] a) { 715 Objects.requireNonNull(a); 716 return (T[]) toArrayInternal(a); 717 } 718 719 /** 720 * Returns an iterator over the elements in this queue in proper sequence. 721 * The elements will be returned in order from first (head) to last (tail). 722 * 723 * <p>The returned iterator is 724 * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>. 725 * 726 * @return an iterator over the elements in this queue in proper sequence 727 */ iterator()728 public Iterator<E> iterator() { 729 return new Itr(); 730 } 731 732 private class Itr implements Iterator<E> { 733 /** 734 * Next node to return item for. 735 */ 736 private Node<E> nextNode; 737 738 /** 739 * nextItem holds on to item fields because once we claim 740 * that an element exists in hasNext(), we must return it in 741 * the following next() call even if it was in the process of 742 * being removed when hasNext() was called. 743 */ 744 private E nextItem; 745 746 /** 747 * Node of the last returned item, to support remove. 748 */ 749 private Node<E> lastRet; 750 Itr()751 Itr() { 752 restartFromHead: for (;;) { 753 Node<E> h, p, q; 754 for (p = h = head;; p = q) { 755 final E item; 756 if ((item = p.item) != null) { 757 nextNode = p; 758 nextItem = item; 759 break; 760 } 761 else if ((q = p.next) == null) 762 break; 763 else if (p == q) 764 continue restartFromHead; 765 } 766 updateHead(h, p); 767 return; 768 } 769 } 770 hasNext()771 public boolean hasNext() { 772 return nextItem != null; 773 } 774 next()775 public E next() { 776 final Node<E> pred = nextNode; 777 if (pred == null) throw new NoSuchElementException(); 778 // assert nextItem != null; 779 lastRet = pred; 780 E item = null; 781 782 for (Node<E> p = succ(pred), q;; p = q) { 783 if (p == null || (item = p.item) != null) { 784 nextNode = p; 785 E x = nextItem; 786 nextItem = item; 787 return x; 788 } 789 // unlink deleted nodes 790 if ((q = succ(p)) != null) 791 NEXT.compareAndSet(pred, p, q); 792 } 793 } 794 795 // Default implementation of forEachRemaining is "good enough". 796 remove()797 public void remove() { 798 Node<E> l = lastRet; 799 if (l == null) throw new IllegalStateException(); 800 // rely on a future traversal to relink. 801 l.item = null; 802 lastRet = null; 803 } 804 } 805 806 /** 807 * Saves this queue to a stream (that is, serializes it). 808 * 809 * @param s the stream 810 * @throws java.io.IOException if an I/O error occurs 811 * @serialData All of the elements (each an {@code E}) in 812 * the proper order, followed by a null 813 */ writeObject(java.io.ObjectOutputStream s)814 private void writeObject(java.io.ObjectOutputStream s) 815 throws java.io.IOException { 816 817 // Write out any hidden stuff 818 s.defaultWriteObject(); 819 820 // Write out all elements in the proper order. 821 for (Node<E> p = first(); p != null; p = succ(p)) { 822 final E item; 823 if ((item = p.item) != null) 824 s.writeObject(item); 825 } 826 827 // Use trailing null as sentinel 828 s.writeObject(null); 829 } 830 831 /** 832 * Reconstitutes this queue from a stream (that is, deserializes it). 833 * @param s the stream 834 * @throws ClassNotFoundException if the class of a serialized object 835 * could not be found 836 * @throws java.io.IOException if an I/O error occurs 837 */ readObject(java.io.ObjectInputStream s)838 private void readObject(java.io.ObjectInputStream s) 839 throws java.io.IOException, ClassNotFoundException { 840 s.defaultReadObject(); 841 842 // Read in elements until trailing null sentinel found 843 Node<E> h = null, t = null; 844 for (Object item; (item = s.readObject()) != null; ) { 845 @SuppressWarnings("unchecked") 846 Node<E> newNode = new Node<E>((E) item); 847 if (h == null) 848 h = t = newNode; 849 else 850 t.appendRelaxed(t = newNode); 851 } 852 if (h == null) 853 h = t = new Node<E>(); 854 head = h; 855 tail = t; 856 } 857 858 /** A customized variant of Spliterators.IteratorSpliterator */ 859 final class CLQSpliterator implements Spliterator<E> { 860 static final int MAX_BATCH = 1 << 25; // max batch array size; 861 Node<E> current; // current node; null until initialized 862 int batch; // batch size for splits 863 boolean exhausted; // true when no more nodes 864 trySplit()865 public Spliterator<E> trySplit() { 866 Node<E> p, q; 867 if ((p = current()) == null || (q = p.next) == null) 868 return null; 869 int i = 0, n = batch = Math.min(batch + 1, MAX_BATCH); 870 Object[] a = null; 871 do { 872 final E e; 873 if ((e = p.item) != null) { 874 if (a == null) 875 a = new Object[n]; 876 a[i++] = e; 877 } 878 if (p == (p = q)) 879 p = first(); 880 } while (p != null && (q = p.next) != null && i < n); 881 setCurrent(p); 882 return (i == 0) ? null : 883 Spliterators.spliterator(a, 0, i, (Spliterator.ORDERED | 884 Spliterator.NONNULL | 885 Spliterator.CONCURRENT)); 886 } 887 forEachRemaining(Consumer<? super E> action)888 public void forEachRemaining(Consumer<? super E> action) { 889 Objects.requireNonNull(action); 890 final Node<E> p; 891 if ((p = current()) != null) { 892 current = null; 893 exhausted = true; 894 forEachFrom(action, p); 895 } 896 } 897 tryAdvance(Consumer<? super E> action)898 public boolean tryAdvance(Consumer<? super E> action) { 899 Objects.requireNonNull(action); 900 Node<E> p; 901 if ((p = current()) != null) { 902 E e; 903 do { 904 e = p.item; 905 if (p == (p = p.next)) 906 p = first(); 907 } while (e == null && p != null); 908 setCurrent(p); 909 if (e != null) { 910 action.accept(e); 911 return true; 912 } 913 } 914 return false; 915 } 916 setCurrent(Node<E> p)917 private void setCurrent(Node<E> p) { 918 if ((current = p) == null) 919 exhausted = true; 920 } 921 current()922 private Node<E> current() { 923 Node<E> p; 924 if ((p = current) == null && !exhausted) 925 setCurrent(p = first()); 926 return p; 927 } 928 estimateSize()929 public long estimateSize() { return Long.MAX_VALUE; } 930 characteristics()931 public int characteristics() { 932 return (Spliterator.ORDERED | 933 Spliterator.NONNULL | 934 Spliterator.CONCURRENT); 935 } 936 } 937 938 /** 939 * Returns a {@link Spliterator} over the elements in this queue. 940 * 941 * <p>The returned spliterator is 942 * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>. 943 * 944 * <p>The {@code Spliterator} reports {@link Spliterator#CONCURRENT}, 945 * {@link Spliterator#ORDERED}, and {@link Spliterator#NONNULL}. 946 * 947 * @implNote 948 * The {@code Spliterator} implements {@code trySplit} to permit limited 949 * parallelism. 950 * 951 * @return a {@code Spliterator} over the elements in this queue 952 * @since 1.8 953 */ 954 @Override spliterator()955 public Spliterator<E> spliterator() { 956 return new CLQSpliterator(); 957 } 958 959 /** 960 * @throws NullPointerException {@inheritDoc} 961 */ removeIf(Predicate<? super E> filter)962 public boolean removeIf(Predicate<? super E> filter) { 963 Objects.requireNonNull(filter); 964 return bulkRemove(filter); 965 } 966 967 /** 968 * @throws NullPointerException {@inheritDoc} 969 */ removeAll(Collection<?> c)970 public boolean removeAll(Collection<?> c) { 971 Objects.requireNonNull(c); 972 return bulkRemove(e -> c.contains(e)); 973 } 974 975 /** 976 * @throws NullPointerException {@inheritDoc} 977 */ retainAll(Collection<?> c)978 public boolean retainAll(Collection<?> c) { 979 Objects.requireNonNull(c); 980 return bulkRemove(e -> !c.contains(e)); 981 } 982 clear()983 public void clear() { 984 bulkRemove(e -> true); 985 } 986 987 /** 988 * Tolerate this many consecutive dead nodes before CAS-collapsing. 989 * Amortized cost of clear() is (1 + 1/MAX_HOPS) CASes per element. 990 */ 991 private static final int MAX_HOPS = 8; 992 993 /** Implementation of bulk remove methods. */ bulkRemove(Predicate<? super E> filter)994 private boolean bulkRemove(Predicate<? super E> filter) { 995 boolean removed = false; 996 restartFromHead: for (;;) { 997 int hops = MAX_HOPS; 998 // c will be CASed to collapse intervening dead nodes between 999 // pred (or head if null) and p. 1000 for (Node<E> p = head, c = p, pred = null, q; p != null; p = q) { 1001 q = p.next; 1002 final E item; boolean pAlive; 1003 if (pAlive = ((item = p.item) != null)) { 1004 if (filter.test(item)) { 1005 if (p.casItem(item, null)) 1006 removed = true; 1007 pAlive = false; 1008 } 1009 } 1010 if (pAlive || q == null || --hops == 0) { 1011 // p might already be self-linked here, but if so: 1012 // - CASing head will surely fail 1013 // - CASing pred's next will be useless but harmless. 1014 if ((c != p && !tryCasSuccessor(pred, c, c = p)) 1015 || pAlive) { 1016 // if CAS failed or alive, abandon old pred 1017 hops = MAX_HOPS; 1018 pred = p; 1019 c = q; 1020 } 1021 } else if (p == q) 1022 continue restartFromHead; 1023 } 1024 return removed; 1025 } 1026 } 1027 1028 /** 1029 * Runs action on each element found during a traversal starting at p. 1030 * If p is null, the action is not run. 1031 */ forEachFrom(Consumer<? super E> action, Node<E> p)1032 void forEachFrom(Consumer<? super E> action, Node<E> p) { 1033 for (Node<E> pred = null; p != null; ) { 1034 Node<E> q = p.next; 1035 final E item; 1036 if ((item = p.item) != null) { 1037 action.accept(item); 1038 pred = p; p = q; continue; 1039 } 1040 for (Node<E> c = p;; q = p.next) { 1041 if (q == null || q.item != null) { 1042 pred = skipDeadNodes(pred, c, p, q); p = q; break; 1043 } 1044 if (p == (p = q)) { pred = null; p = head; break; } 1045 } 1046 } 1047 } 1048 1049 /** 1050 * @throws NullPointerException {@inheritDoc} 1051 */ forEach(Consumer<? super E> action)1052 public void forEach(Consumer<? super E> action) { 1053 Objects.requireNonNull(action); 1054 forEachFrom(action, head); 1055 } 1056 1057 // VarHandle mechanics 1058 private static final VarHandle HEAD; 1059 private static final VarHandle TAIL; 1060 static final VarHandle ITEM; 1061 static final VarHandle NEXT; 1062 static { 1063 try { 1064 MethodHandles.Lookup l = MethodHandles.lookup(); 1065 HEAD = l.findVarHandle(ConcurrentLinkedQueue.class, "head", 1066 Node.class); 1067 TAIL = l.findVarHandle(ConcurrentLinkedQueue.class, "tail", 1068 Node.class); 1069 ITEM = l.findVarHandle(Node.class, "item", Object.class); 1070 NEXT = l.findVarHandle(Node.class, "next", Node.class); 1071 } catch (ReflectiveOperationException e) { 1072 throw new ExceptionInInitializerError(e); 1073 } 1074 } 1075 } 1076