1 /*
2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3  *
4  * This code is free software; you can redistribute it and/or modify it
5  * under the terms of the GNU General Public License version 2 only, as
6  * published by the Free Software Foundation.  Oracle designates this
7  * particular file as subject to the "Classpath" exception as provided
8  * by Oracle in the LICENSE file that accompanied this code.
9  *
10  * This code is distributed in the hope that it will be useful, but WITHOUT
11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
13  * version 2 for more details (a copy is included in the LICENSE file that
14  * accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License version
17  * 2 along with this work; if not, write to the Free Software Foundation,
18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19  *
20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21  * or visit www.oracle.com if you need additional information or have any
22  * questions.
23  */
24 
25 /*
26  * This file is available under and governed by the GNU General Public
27  * License version 2 only, as published by the Free Software Foundation.
28  * However, the following notice accompanied the original version of this
29  * file:
30  *
31  * Written by Doug Lea with assistance from members of JCP JSR-166
32  * Expert Group and released to the public domain, as explained at
33  * http://creativecommons.org/publicdomain/zero/1.0/
34  */
35 
36 package java.util.concurrent;
37 
38 import java.util.AbstractQueue;
39 import java.util.Collection;
40 import java.util.Iterator;
41 import java.util.NoSuchElementException;
42 import java.util.Objects;
43 import java.util.Spliterator;
44 import java.util.Spliterators;
45 import java.util.concurrent.atomic.AtomicInteger;
46 import java.util.concurrent.locks.Condition;
47 import java.util.concurrent.locks.ReentrantLock;
48 import java.util.function.Consumer;
49 import java.util.function.Predicate;
50 
51 /**
52  * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
53  * linked nodes.
54  * This queue orders elements FIFO (first-in-first-out).
55  * The <em>head</em> of the queue is that element that has been on the
56  * queue the longest time.
57  * The <em>tail</em> of the queue is that element that has been on the
58  * queue the shortest time. New elements
59  * are inserted at the tail of the queue, and the queue retrieval
60  * operations obtain elements at the head of the queue.
61  * Linked queues typically have higher throughput than array-based queues but
62  * less predictable performance in most concurrent applications.
63  *
64  * <p>The optional capacity bound constructor argument serves as a
65  * way to prevent excessive queue expansion. The capacity, if unspecified,
66  * is equal to {@link Integer#MAX_VALUE}.  Linked nodes are
67  * dynamically created upon each insertion unless this would bring the
68  * queue above capacity.
69  *
70  * <p>This class and its iterator implement all of the <em>optional</em>
71  * methods of the {@link Collection} and {@link Iterator} interfaces.
72  *
73  * <p>This class is a member of the
74  * <a href="{@docRoot}/java.base/java/util/package-summary.html#CollectionsFramework">
75  * Java Collections Framework</a>.
76  *
77  * @since 1.5
78  * @author Doug Lea
79  * @param <E> the type of elements held in this queue
80  */
81 public class LinkedBlockingQueue<E> extends AbstractQueue<E>
82         implements BlockingQueue<E>, java.io.Serializable {
83     private static final long serialVersionUID = -6903933977591709194L;
84 
85     /*
86      * A variant of the "two lock queue" algorithm.  The putLock gates
87      * entry to put (and offer), and has an associated condition for
88      * waiting puts.  Similarly for the takeLock.  The "count" field
89      * that they both rely on is maintained as an atomic to avoid
90      * needing to get both locks in most cases. Also, to minimize need
91      * for puts to get takeLock and vice-versa, cascading notifies are
92      * used. When a put notices that it has enabled at least one take,
93      * it signals taker. That taker in turn signals others if more
94      * items have been entered since the signal. And symmetrically for
95      * takes signalling puts. Operations such as remove(Object) and
96      * iterators acquire both locks.
97      *
98      * Visibility between writers and readers is provided as follows:
99      *
100      * Whenever an element is enqueued, the putLock is acquired and
101      * count updated.  A subsequent reader guarantees visibility to the
102      * enqueued Node by either acquiring the putLock (via fullyLock)
103      * or by acquiring the takeLock, and then reading n = count.get();
104      * this gives visibility to the first n items.
105      *
106      * To implement weakly consistent iterators, it appears we need to
107      * keep all Nodes GC-reachable from a predecessor dequeued Node.
108      * That would cause two problems:
109      * - allow a rogue Iterator to cause unbounded memory retention
110      * - cause cross-generational linking of old Nodes to new Nodes if
111      *   a Node was tenured while live, which generational GCs have a
112      *   hard time dealing with, causing repeated major collections.
113      * However, only non-deleted Nodes need to be reachable from
114      * dequeued Nodes, and reachability does not necessarily have to
115      * be of the kind understood by the GC.  We use the trick of
116      * linking a Node that has just been dequeued to itself.  Such a
117      * self-link implicitly means to advance to head.next.
118      */
119 
120     /**
121      * Linked list node class.
122      */
123     static class Node<E> {
124         E item;
125 
126         /**
127          * One of:
128          * - the real successor Node
129          * - this Node, meaning the successor is head.next
130          * - null, meaning there is no successor (this is the last node)
131          */
132         Node<E> next;
133 
Node(E x)134         Node(E x) { item = x; }
135     }
136 
137     /** The capacity bound, or Integer.MAX_VALUE if none */
138     private final int capacity;
139 
140     /** Current number of elements */
141     private final AtomicInteger count = new AtomicInteger();
142 
143     /**
144      * Head of linked list.
145      * Invariant: head.item == null
146      */
147     transient Node<E> head;
148 
149     /**
150      * Tail of linked list.
151      * Invariant: last.next == null
152      */
153     private transient Node<E> last;
154 
155     /** Lock held by take, poll, etc */
156     private final ReentrantLock takeLock = new ReentrantLock();
157 
158     /** Wait queue for waiting takes */
159     @SuppressWarnings("serial") // Classes implementing Condition may be serializable.
160     private final Condition notEmpty = takeLock.newCondition();
161 
162     /** Lock held by put, offer, etc */
163     private final ReentrantLock putLock = new ReentrantLock();
164 
165     /** Wait queue for waiting puts */
166     @SuppressWarnings("serial") // Classes implementing Condition may be serializable.
167     private final Condition notFull = putLock.newCondition();
168 
169     /**
170      * Signals a waiting take. Called only from put/offer (which do not
171      * otherwise ordinarily lock takeLock.)
172      */
signalNotEmpty()173     private void signalNotEmpty() {
174         final ReentrantLock takeLock = this.takeLock;
175         takeLock.lock();
176         try {
177             notEmpty.signal();
178         } finally {
179             takeLock.unlock();
180         }
181     }
182 
183     /**
184      * Signals a waiting put. Called only from take/poll.
185      */
signalNotFull()186     private void signalNotFull() {
187         final ReentrantLock putLock = this.putLock;
188         putLock.lock();
189         try {
190             notFull.signal();
191         } finally {
192             putLock.unlock();
193         }
194     }
195 
196     /**
197      * Links node at end of queue.
198      *
199      * @param node the node
200      */
enqueue(Node<E> node)201     private void enqueue(Node<E> node) {
202         // assert putLock.isHeldByCurrentThread();
203         // assert last.next == null;
204         last = last.next = node;
205     }
206 
207     /**
208      * Removes a node from head of queue.
209      *
210      * @return the node
211      */
dequeue()212     private E dequeue() {
213         // assert takeLock.isHeldByCurrentThread();
214         // assert head.item == null;
215         Node<E> h = head;
216         Node<E> first = h.next;
217         h.next = h; // help GC
218         head = first;
219         E x = first.item;
220         first.item = null;
221         return x;
222     }
223 
224     /**
225      * Locks to prevent both puts and takes.
226      */
fullyLock()227     void fullyLock() {
228         putLock.lock();
229         takeLock.lock();
230     }
231 
232     /**
233      * Unlocks to allow both puts and takes.
234      */
fullyUnlock()235     void fullyUnlock() {
236         takeLock.unlock();
237         putLock.unlock();
238     }
239 
240     /**
241      * Creates a {@code LinkedBlockingQueue} with a capacity of
242      * {@link Integer#MAX_VALUE}.
243      */
LinkedBlockingQueue()244     public LinkedBlockingQueue() {
245         this(Integer.MAX_VALUE);
246     }
247 
248     /**
249      * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
250      *
251      * @param capacity the capacity of this queue
252      * @throws IllegalArgumentException if {@code capacity} is not greater
253      *         than zero
254      */
LinkedBlockingQueue(int capacity)255     public LinkedBlockingQueue(int capacity) {
256         if (capacity <= 0) throw new IllegalArgumentException();
257         this.capacity = capacity;
258         last = head = new Node<E>(null);
259     }
260 
261     /**
262      * Creates a {@code LinkedBlockingQueue} with a capacity of
263      * {@link Integer#MAX_VALUE}, initially containing the elements of the
264      * given collection,
265      * added in traversal order of the collection's iterator.
266      *
267      * @param c the collection of elements to initially contain
268      * @throws NullPointerException if the specified collection or any
269      *         of its elements are null
270      */
LinkedBlockingQueue(Collection<? extends E> c)271     public LinkedBlockingQueue(Collection<? extends E> c) {
272         this(Integer.MAX_VALUE);
273         final ReentrantLock putLock = this.putLock;
274         putLock.lock(); // Never contended, but necessary for visibility
275         try {
276             int n = 0;
277             for (E e : c) {
278                 if (e == null)
279                     throw new NullPointerException();
280                 if (n == capacity)
281                     throw new IllegalStateException("Queue full");
282                 enqueue(new Node<E>(e));
283                 ++n;
284             }
285             count.set(n);
286         } finally {
287             putLock.unlock();
288         }
289     }
290 
291     // this doc comment is overridden to remove the reference to collections
292     // greater in size than Integer.MAX_VALUE
293     /**
294      * Returns the number of elements in this queue.
295      *
296      * @return the number of elements in this queue
297      */
size()298     public int size() {
299         return count.get();
300     }
301 
302     // this doc comment is a modified copy of the inherited doc comment,
303     // without the reference to unlimited queues.
304     /**
305      * Returns the number of additional elements that this queue can ideally
306      * (in the absence of memory or resource constraints) accept without
307      * blocking. This is always equal to the initial capacity of this queue
308      * less the current {@code size} of this queue.
309      *
310      * <p>Note that you <em>cannot</em> always tell if an attempt to insert
311      * an element will succeed by inspecting {@code remainingCapacity}
312      * because it may be the case that another thread is about to
313      * insert or remove an element.
314      */
remainingCapacity()315     public int remainingCapacity() {
316         return capacity - count.get();
317     }
318 
319     /**
320      * Inserts the specified element at the tail of this queue, waiting if
321      * necessary for space to become available.
322      *
323      * @throws InterruptedException {@inheritDoc}
324      * @throws NullPointerException {@inheritDoc}
325      */
put(E e)326     public void put(E e) throws InterruptedException {
327         if (e == null) throw new NullPointerException();
328         final int c;
329         final Node<E> node = new Node<E>(e);
330         final ReentrantLock putLock = this.putLock;
331         final AtomicInteger count = this.count;
332         putLock.lockInterruptibly();
333         try {
334             /*
335              * Note that count is used in wait guard even though it is
336              * not protected by lock. This works because count can
337              * only decrease at this point (all other puts are shut
338              * out by lock), and we (or some other waiting put) are
339              * signalled if it ever changes from capacity. Similarly
340              * for all other uses of count in other wait guards.
341              */
342             while (count.get() == capacity) {
343                 notFull.await();
344             }
345             enqueue(node);
346             c = count.getAndIncrement();
347             if (c + 1 < capacity)
348                 notFull.signal();
349         } finally {
350             putLock.unlock();
351         }
352         if (c == 0)
353             signalNotEmpty();
354     }
355 
356     /**
357      * Inserts the specified element at the tail of this queue, waiting if
358      * necessary up to the specified wait time for space to become available.
359      *
360      * @return {@code true} if successful, or {@code false} if
361      *         the specified waiting time elapses before space is available
362      * @throws InterruptedException {@inheritDoc}
363      * @throws NullPointerException {@inheritDoc}
364      */
offer(E e, long timeout, TimeUnit unit)365     public boolean offer(E e, long timeout, TimeUnit unit)
366         throws InterruptedException {
367 
368         if (e == null) throw new NullPointerException();
369         long nanos = unit.toNanos(timeout);
370         final int c;
371         final ReentrantLock putLock = this.putLock;
372         final AtomicInteger count = this.count;
373         putLock.lockInterruptibly();
374         try {
375             while (count.get() == capacity) {
376                 if (nanos <= 0L)
377                     return false;
378                 nanos = notFull.awaitNanos(nanos);
379             }
380             enqueue(new Node<E>(e));
381             c = count.getAndIncrement();
382             if (c + 1 < capacity)
383                 notFull.signal();
384         } finally {
385             putLock.unlock();
386         }
387         if (c == 0)
388             signalNotEmpty();
389         return true;
390     }
391 
392     /**
393      * Inserts the specified element at the tail of this queue if it is
394      * possible to do so immediately without exceeding the queue's capacity,
395      * returning {@code true} upon success and {@code false} if this queue
396      * is full.
397      * When using a capacity-restricted queue, this method is generally
398      * preferable to method {@link BlockingQueue#add add}, which can fail to
399      * insert an element only by throwing an exception.
400      *
401      * @throws NullPointerException if the specified element is null
402      */
offer(E e)403     public boolean offer(E e) {
404         if (e == null) throw new NullPointerException();
405         final AtomicInteger count = this.count;
406         if (count.get() == capacity)
407             return false;
408         final int c;
409         final Node<E> node = new Node<E>(e);
410         final ReentrantLock putLock = this.putLock;
411         putLock.lock();
412         try {
413             if (count.get() == capacity)
414                 return false;
415             enqueue(node);
416             c = count.getAndIncrement();
417             if (c + 1 < capacity)
418                 notFull.signal();
419         } finally {
420             putLock.unlock();
421         }
422         if (c == 0)
423             signalNotEmpty();
424         return true;
425     }
426 
take()427     public E take() throws InterruptedException {
428         final E x;
429         final int c;
430         final AtomicInteger count = this.count;
431         final ReentrantLock takeLock = this.takeLock;
432         takeLock.lockInterruptibly();
433         try {
434             while (count.get() == 0) {
435                 notEmpty.await();
436             }
437             x = dequeue();
438             c = count.getAndDecrement();
439             if (c > 1)
440                 notEmpty.signal();
441         } finally {
442             takeLock.unlock();
443         }
444         if (c == capacity)
445             signalNotFull();
446         return x;
447     }
448 
poll(long timeout, TimeUnit unit)449     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
450         final E x;
451         final int c;
452         long nanos = unit.toNanos(timeout);
453         final AtomicInteger count = this.count;
454         final ReentrantLock takeLock = this.takeLock;
455         takeLock.lockInterruptibly();
456         try {
457             while (count.get() == 0) {
458                 if (nanos <= 0L)
459                     return null;
460                 nanos = notEmpty.awaitNanos(nanos);
461             }
462             x = dequeue();
463             c = count.getAndDecrement();
464             if (c > 1)
465                 notEmpty.signal();
466         } finally {
467             takeLock.unlock();
468         }
469         if (c == capacity)
470             signalNotFull();
471         return x;
472     }
473 
poll()474     public E poll() {
475         final AtomicInteger count = this.count;
476         if (count.get() == 0)
477             return null;
478         final E x;
479         final int c;
480         final ReentrantLock takeLock = this.takeLock;
481         takeLock.lock();
482         try {
483             if (count.get() == 0)
484                 return null;
485             x = dequeue();
486             c = count.getAndDecrement();
487             if (c > 1)
488                 notEmpty.signal();
489         } finally {
490             takeLock.unlock();
491         }
492         if (c == capacity)
493             signalNotFull();
494         return x;
495     }
496 
peek()497     public E peek() {
498         final AtomicInteger count = this.count;
499         if (count.get() == 0)
500             return null;
501         final ReentrantLock takeLock = this.takeLock;
502         takeLock.lock();
503         try {
504             return (count.get() > 0) ? head.next.item : null;
505         } finally {
506             takeLock.unlock();
507         }
508     }
509 
510     /**
511      * Unlinks interior Node p with predecessor pred.
512      */
unlink(Node<E> p, Node<E> pred)513     void unlink(Node<E> p, Node<E> pred) {
514         // assert putLock.isHeldByCurrentThread();
515         // assert takeLock.isHeldByCurrentThread();
516         // p.next is not changed, to allow iterators that are
517         // traversing p to maintain their weak-consistency guarantee.
518         p.item = null;
519         pred.next = p.next;
520         if (last == p)
521             last = pred;
522         if (count.getAndDecrement() == capacity)
523             notFull.signal();
524     }
525 
526     /**
527      * Removes a single instance of the specified element from this queue,
528      * if it is present.  More formally, removes an element {@code e} such
529      * that {@code o.equals(e)}, if this queue contains one or more such
530      * elements.
531      * Returns {@code true} if this queue contained the specified element
532      * (or equivalently, if this queue changed as a result of the call).
533      *
534      * @param o element to be removed from this queue, if present
535      * @return {@code true} if this queue changed as a result of the call
536      */
remove(Object o)537     public boolean remove(Object o) {
538         if (o == null) return false;
539         fullyLock();
540         try {
541             for (Node<E> pred = head, p = pred.next;
542                  p != null;
543                  pred = p, p = p.next) {
544                 if (o.equals(p.item)) {
545                     unlink(p, pred);
546                     return true;
547                 }
548             }
549             return false;
550         } finally {
551             fullyUnlock();
552         }
553     }
554 
555     /**
556      * Returns {@code true} if this queue contains the specified element.
557      * More formally, returns {@code true} if and only if this queue contains
558      * at least one element {@code e} such that {@code o.equals(e)}.
559      *
560      * @param o object to be checked for containment in this queue
561      * @return {@code true} if this queue contains the specified element
562      */
contains(Object o)563     public boolean contains(Object o) {
564         if (o == null) return false;
565         fullyLock();
566         try {
567             for (Node<E> p = head.next; p != null; p = p.next)
568                 if (o.equals(p.item))
569                     return true;
570             return false;
571         } finally {
572             fullyUnlock();
573         }
574     }
575 
576     /**
577      * Returns an array containing all of the elements in this queue, in
578      * proper sequence.
579      *
580      * <p>The returned array will be "safe" in that no references to it are
581      * maintained by this queue.  (In other words, this method must allocate
582      * a new array).  The caller is thus free to modify the returned array.
583      *
584      * <p>This method acts as bridge between array-based and collection-based
585      * APIs.
586      *
587      * @return an array containing all of the elements in this queue
588      */
toArray()589     public Object[] toArray() {
590         fullyLock();
591         try {
592             int size = count.get();
593             Object[] a = new Object[size];
594             int k = 0;
595             for (Node<E> p = head.next; p != null; p = p.next)
596                 a[k++] = p.item;
597             return a;
598         } finally {
599             fullyUnlock();
600         }
601     }
602 
603     /**
604      * Returns an array containing all of the elements in this queue, in
605      * proper sequence; the runtime type of the returned array is that of
606      * the specified array.  If the queue fits in the specified array, it
607      * is returned therein.  Otherwise, a new array is allocated with the
608      * runtime type of the specified array and the size of this queue.
609      *
610      * <p>If this queue fits in the specified array with room to spare
611      * (i.e., the array has more elements than this queue), the element in
612      * the array immediately following the end of the queue is set to
613      * {@code null}.
614      *
615      * <p>Like the {@link #toArray()} method, this method acts as bridge between
616      * array-based and collection-based APIs.  Further, this method allows
617      * precise control over the runtime type of the output array, and may,
618      * under certain circumstances, be used to save allocation costs.
619      *
620      * <p>Suppose {@code x} is a queue known to contain only strings.
621      * The following code can be used to dump the queue into a newly
622      * allocated array of {@code String}:
623      *
624      * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
625      *
626      * Note that {@code toArray(new Object[0])} is identical in function to
627      * {@code toArray()}.
628      *
629      * @param a the array into which the elements of the queue are to
630      *          be stored, if it is big enough; otherwise, a new array of the
631      *          same runtime type is allocated for this purpose
632      * @return an array containing all of the elements in this queue
633      * @throws ArrayStoreException if the runtime type of the specified array
634      *         is not a supertype of the runtime type of every element in
635      *         this queue
636      * @throws NullPointerException if the specified array is null
637      */
638     @SuppressWarnings("unchecked")
toArray(T[] a)639     public <T> T[] toArray(T[] a) {
640         fullyLock();
641         try {
642             int size = count.get();
643             if (a.length < size)
644                 a = (T[])java.lang.reflect.Array.newInstance
645                     (a.getClass().getComponentType(), size);
646 
647             int k = 0;
648             for (Node<E> p = head.next; p != null; p = p.next)
649                 a[k++] = (T)p.item;
650             if (a.length > k)
651                 a[k] = null;
652             return a;
653         } finally {
654             fullyUnlock();
655         }
656     }
657 
toString()658     public String toString() {
659         return Helpers.collectionToString(this);
660     }
661 
662     /**
663      * Atomically removes all of the elements from this queue.
664      * The queue will be empty after this call returns.
665      */
clear()666     public void clear() {
667         fullyLock();
668         try {
669             for (Node<E> p, h = head; (p = h.next) != null; h = p) {
670                 h.next = h;
671                 p.item = null;
672             }
673             head = last;
674             // assert head.item == null && head.next == null;
675             if (count.getAndSet(0) == capacity)
676                 notFull.signal();
677         } finally {
678             fullyUnlock();
679         }
680     }
681 
682     /**
683      * @throws UnsupportedOperationException {@inheritDoc}
684      * @throws ClassCastException            {@inheritDoc}
685      * @throws NullPointerException          {@inheritDoc}
686      * @throws IllegalArgumentException      {@inheritDoc}
687      */
drainTo(Collection<? super E> c)688     public int drainTo(Collection<? super E> c) {
689         return drainTo(c, Integer.MAX_VALUE);
690     }
691 
692     /**
693      * @throws UnsupportedOperationException {@inheritDoc}
694      * @throws ClassCastException            {@inheritDoc}
695      * @throws NullPointerException          {@inheritDoc}
696      * @throws IllegalArgumentException      {@inheritDoc}
697      */
drainTo(Collection<? super E> c, int maxElements)698     public int drainTo(Collection<? super E> c, int maxElements) {
699         Objects.requireNonNull(c);
700         if (c == this)
701             throw new IllegalArgumentException();
702         if (maxElements <= 0)
703             return 0;
704         boolean signalNotFull = false;
705         final ReentrantLock takeLock = this.takeLock;
706         takeLock.lock();
707         try {
708             int n = Math.min(maxElements, count.get());
709             // count.get provides visibility to first n Nodes
710             Node<E> h = head;
711             int i = 0;
712             try {
713                 while (i < n) {
714                     Node<E> p = h.next;
715                     c.add(p.item);
716                     p.item = null;
717                     h.next = h;
718                     h = p;
719                     ++i;
720                 }
721                 return n;
722             } finally {
723                 // Restore invariants even if c.add() threw
724                 if (i > 0) {
725                     // assert h.item == null;
726                     head = h;
727                     signalNotFull = (count.getAndAdd(-i) == capacity);
728                 }
729             }
730         } finally {
731             takeLock.unlock();
732             if (signalNotFull)
733                 signalNotFull();
734         }
735     }
736 
737     /**
738      * Used for any element traversal that is not entirely under lock.
739      * Such traversals must handle both:
740      * - dequeued nodes (p.next == p)
741      * - (possibly multiple) interior removed nodes (p.item == null)
742      */
succ(Node<E> p)743     Node<E> succ(Node<E> p) {
744         if (p == (p = p.next))
745             p = head.next;
746         return p;
747     }
748 
749     /**
750      * Returns an iterator over the elements in this queue in proper sequence.
751      * The elements will be returned in order from first (head) to last (tail).
752      *
753      * <p>The returned iterator is
754      * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
755      *
756      * @return an iterator over the elements in this queue in proper sequence
757      */
iterator()758     public Iterator<E> iterator() {
759         return new Itr();
760     }
761 
762     /**
763      * Weakly-consistent iterator.
764      *
765      * Lazily updated ancestor field provides expected O(1) remove(),
766      * but still O(n) in the worst case, whenever the saved ancestor
767      * is concurrently deleted.
768      */
769     private class Itr implements Iterator<E> {
770         private Node<E> next;           // Node holding nextItem
771         private E nextItem;             // next item to hand out
772         private Node<E> lastRet;
773         private Node<E> ancestor;       // Helps unlink lastRet on remove()
774 
Itr()775         Itr() {
776             fullyLock();
777             try {
778                 if ((next = head.next) != null)
779                     nextItem = next.item;
780             } finally {
781                 fullyUnlock();
782             }
783         }
784 
hasNext()785         public boolean hasNext() {
786             return next != null;
787         }
788 
next()789         public E next() {
790             Node<E> p;
791             if ((p = next) == null)
792                 throw new NoSuchElementException();
793             lastRet = p;
794             E x = nextItem;
795             fullyLock();
796             try {
797                 E e = null;
798                 for (p = p.next; p != null && (e = p.item) == null; )
799                     p = succ(p);
800                 next = p;
801                 nextItem = e;
802             } finally {
803                 fullyUnlock();
804             }
805             return x;
806         }
807 
forEachRemaining(Consumer<? super E> action)808         public void forEachRemaining(Consumer<? super E> action) {
809             // A variant of forEachFrom
810             Objects.requireNonNull(action);
811             Node<E> p;
812             if ((p = next) == null) return;
813             lastRet = p;
814             next = null;
815             final int batchSize = 64;
816             Object[] es = null;
817             int n, len = 1;
818             do {
819                 fullyLock();
820                 try {
821                     if (es == null) {
822                         p = p.next;
823                         for (Node<E> q = p; q != null; q = succ(q))
824                             if (q.item != null && ++len == batchSize)
825                                 break;
826                         es = new Object[len];
827                         es[0] = nextItem;
828                         nextItem = null;
829                         n = 1;
830                     } else
831                         n = 0;
832                     for (; p != null && n < len; p = succ(p))
833                         if ((es[n] = p.item) != null) {
834                             lastRet = p;
835                             n++;
836                         }
837                 } finally {
838                     fullyUnlock();
839                 }
840                 for (int i = 0; i < n; i++) {
841                     @SuppressWarnings("unchecked") E e = (E) es[i];
842                     action.accept(e);
843                 }
844             } while (n > 0 && p != null);
845         }
846 
remove()847         public void remove() {
848             Node<E> p = lastRet;
849             if (p == null)
850                 throw new IllegalStateException();
851             lastRet = null;
852             fullyLock();
853             try {
854                 if (p.item != null) {
855                     if (ancestor == null)
856                         ancestor = head;
857                     ancestor = findPred(p, ancestor);
858                     unlink(p, ancestor);
859                 }
860             } finally {
861                 fullyUnlock();
862             }
863         }
864     }
865 
866     /**
867      * A customized variant of Spliterators.IteratorSpliterator.
868      * Keep this class in sync with (very similar) LBDSpliterator.
869      */
870     private final class LBQSpliterator implements Spliterator<E> {
871         static final int MAX_BATCH = 1 << 25;  // max batch array size;
872         Node<E> current;    // current node; null until initialized
873         int batch;          // batch size for splits
874         boolean exhausted;  // true when no more nodes
875         long est = size();  // size estimate
876 
LBQSpliterator()877         LBQSpliterator() {}
878 
estimateSize()879         public long estimateSize() { return est; }
880 
trySplit()881         public Spliterator<E> trySplit() {
882             Node<E> h;
883             if (!exhausted &&
884                 ((h = current) != null || (h = head.next) != null)
885                 && h.next != null) {
886                 int n = batch = Math.min(batch + 1, MAX_BATCH);
887                 Object[] a = new Object[n];
888                 int i = 0;
889                 Node<E> p = current;
890                 fullyLock();
891                 try {
892                     if (p != null || (p = head.next) != null)
893                         for (; p != null && i < n; p = succ(p))
894                             if ((a[i] = p.item) != null)
895                                 i++;
896                 } finally {
897                     fullyUnlock();
898                 }
899                 if ((current = p) == null) {
900                     est = 0L;
901                     exhausted = true;
902                 }
903                 else if ((est -= i) < 0L)
904                     est = 0L;
905                 if (i > 0)
906                     return Spliterators.spliterator
907                         (a, 0, i, (Spliterator.ORDERED |
908                                    Spliterator.NONNULL |
909                                    Spliterator.CONCURRENT));
910             }
911             return null;
912         }
913 
tryAdvance(Consumer<? super E> action)914         public boolean tryAdvance(Consumer<? super E> action) {
915             Objects.requireNonNull(action);
916             if (!exhausted) {
917                 E e = null;
918                 fullyLock();
919                 try {
920                     Node<E> p;
921                     if ((p = current) != null || (p = head.next) != null)
922                         do {
923                             e = p.item;
924                             p = succ(p);
925                         } while (e == null && p != null);
926                     if ((current = p) == null)
927                         exhausted = true;
928                 } finally {
929                     fullyUnlock();
930                 }
931                 if (e != null) {
932                     action.accept(e);
933                     return true;
934                 }
935             }
936             return false;
937         }
938 
forEachRemaining(Consumer<? super E> action)939         public void forEachRemaining(Consumer<? super E> action) {
940             Objects.requireNonNull(action);
941             if (!exhausted) {
942                 exhausted = true;
943                 Node<E> p = current;
944                 current = null;
945                 forEachFrom(action, p);
946             }
947         }
948 
characteristics()949         public int characteristics() {
950             return (Spliterator.ORDERED |
951                     Spliterator.NONNULL |
952                     Spliterator.CONCURRENT);
953         }
954     }
955 
956     /**
957      * Returns a {@link Spliterator} over the elements in this queue.
958      *
959      * <p>The returned spliterator is
960      * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
961      *
962      * <p>The {@code Spliterator} reports {@link Spliterator#CONCURRENT},
963      * {@link Spliterator#ORDERED}, and {@link Spliterator#NONNULL}.
964      *
965      * @implNote
966      * The {@code Spliterator} implements {@code trySplit} to permit limited
967      * parallelism.
968      *
969      * @return a {@code Spliterator} over the elements in this queue
970      * @since 1.8
971      */
spliterator()972     public Spliterator<E> spliterator() {
973         return new LBQSpliterator();
974     }
975 
976     /**
977      * @throws NullPointerException {@inheritDoc}
978      */
forEach(Consumer<? super E> action)979     public void forEach(Consumer<? super E> action) {
980         Objects.requireNonNull(action);
981         forEachFrom(action, null);
982     }
983 
984     /**
985      * Runs action on each element found during a traversal starting at p.
986      * If p is null, traversal starts at head.
987      */
forEachFrom(Consumer<? super E> action, Node<E> p)988     void forEachFrom(Consumer<? super E> action, Node<E> p) {
989         // Extract batches of elements while holding the lock; then
990         // run the action on the elements while not
991         final int batchSize = 64;       // max number of elements per batch
992         Object[] es = null;             // container for batch of elements
993         int n, len = 0;
994         do {
995             fullyLock();
996             try {
997                 if (es == null) {
998                     if (p == null) p = head.next;
999                     for (Node<E> q = p; q != null; q = succ(q))
1000                         if (q.item != null && ++len == batchSize)
1001                             break;
1002                     es = new Object[len];
1003                 }
1004                 for (n = 0; p != null && n < len; p = succ(p))
1005                     if ((es[n] = p.item) != null)
1006                         n++;
1007             } finally {
1008                 fullyUnlock();
1009             }
1010             for (int i = 0; i < n; i++) {
1011                 @SuppressWarnings("unchecked") E e = (E) es[i];
1012                 action.accept(e);
1013             }
1014         } while (n > 0 && p != null);
1015     }
1016 
1017     /**
1018      * @throws NullPointerException {@inheritDoc}
1019      */
removeIf(Predicate<? super E> filter)1020     public boolean removeIf(Predicate<? super E> filter) {
1021         Objects.requireNonNull(filter);
1022         return bulkRemove(filter);
1023     }
1024 
1025     /**
1026      * @throws NullPointerException {@inheritDoc}
1027      */
removeAll(Collection<?> c)1028     public boolean removeAll(Collection<?> c) {
1029         Objects.requireNonNull(c);
1030         return bulkRemove(e -> c.contains(e));
1031     }
1032 
1033     /**
1034      * @throws NullPointerException {@inheritDoc}
1035      */
retainAll(Collection<?> c)1036     public boolean retainAll(Collection<?> c) {
1037         Objects.requireNonNull(c);
1038         return bulkRemove(e -> !c.contains(e));
1039     }
1040 
1041     /**
1042      * Returns the predecessor of live node p, given a node that was
1043      * once a live ancestor of p (or head); allows unlinking of p.
1044      */
findPred(Node<E> p, Node<E> ancestor)1045     Node<E> findPred(Node<E> p, Node<E> ancestor) {
1046         // assert p.item != null;
1047         if (ancestor.item == null)
1048             ancestor = head;
1049         // Fails with NPE if precondition not satisfied
1050         for (Node<E> q; (q = ancestor.next) != p; )
1051             ancestor = q;
1052         return ancestor;
1053     }
1054 
1055     /** Implementation of bulk remove methods. */
1056     @SuppressWarnings("unchecked")
bulkRemove(Predicate<? super E> filter)1057     private boolean bulkRemove(Predicate<? super E> filter) {
1058         boolean removed = false;
1059         Node<E> p = null, ancestor = head;
1060         Node<E>[] nodes = null;
1061         int n, len = 0;
1062         do {
1063             // 1. Extract batch of up to 64 elements while holding the lock.
1064             fullyLock();
1065             try {
1066                 if (nodes == null) {  // first batch; initialize
1067                     p = head.next;
1068                     for (Node<E> q = p; q != null; q = succ(q))
1069                         if (q.item != null && ++len == 64)
1070                             break;
1071                     nodes = (Node<E>[]) new Node<?>[len];
1072                 }
1073                 for (n = 0; p != null && n < len; p = succ(p))
1074                     nodes[n++] = p;
1075             } finally {
1076                 fullyUnlock();
1077             }
1078 
1079             // 2. Run the filter on the elements while lock is free.
1080             long deathRow = 0L;       // "bitset" of size 64
1081             for (int i = 0; i < n; i++) {
1082                 final E e;
1083                 if ((e = nodes[i].item) != null && filter.test(e))
1084                     deathRow |= 1L << i;
1085             }
1086 
1087             // 3. Remove any filtered elements while holding the lock.
1088             if (deathRow != 0) {
1089                 fullyLock();
1090                 try {
1091                     for (int i = 0; i < n; i++) {
1092                         final Node<E> q;
1093                         if ((deathRow & (1L << i)) != 0L
1094                             && (q = nodes[i]).item != null) {
1095                             ancestor = findPred(q, ancestor);
1096                             unlink(q, ancestor);
1097                             removed = true;
1098                         }
1099                         nodes[i] = null; // help GC
1100                     }
1101                 } finally {
1102                     fullyUnlock();
1103                 }
1104             }
1105         } while (n > 0 && p != null);
1106         return removed;
1107     }
1108 
1109     /**
1110      * Saves this queue to a stream (that is, serializes it).
1111      *
1112      * @param s the stream
1113      * @throws java.io.IOException if an I/O error occurs
1114      * @serialData The capacity is emitted (int), followed by all of
1115      * its elements (each an {@code Object}) in the proper order,
1116      * followed by a null
1117      */
writeObject(java.io.ObjectOutputStream s)1118     private void writeObject(java.io.ObjectOutputStream s)
1119         throws java.io.IOException {
1120 
1121         fullyLock();
1122         try {
1123             // Write out any hidden stuff, plus capacity
1124             s.defaultWriteObject();
1125 
1126             // Write out all elements in the proper order.
1127             for (Node<E> p = head.next; p != null; p = p.next)
1128                 s.writeObject(p.item);
1129 
1130             // Use trailing null as sentinel
1131             s.writeObject(null);
1132         } finally {
1133             fullyUnlock();
1134         }
1135     }
1136 
1137     /**
1138      * Reconstitutes this queue from a stream (that is, deserializes it).
1139      * @param s the stream
1140      * @throws ClassNotFoundException if the class of a serialized object
1141      *         could not be found
1142      * @throws java.io.IOException if an I/O error occurs
1143      */
readObject(java.io.ObjectInputStream s)1144     private void readObject(java.io.ObjectInputStream s)
1145         throws java.io.IOException, ClassNotFoundException {
1146         // Read in capacity, and any hidden stuff
1147         s.defaultReadObject();
1148 
1149         count.set(0);
1150         last = head = new Node<E>(null);
1151 
1152         // Read in all elements and place in queue
1153         for (;;) {
1154             @SuppressWarnings("unchecked")
1155             E item = (E)s.readObject();
1156             if (item == null)
1157                 break;
1158             add(item);
1159         }
1160     }
1161 }
1162