1 /*
2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3  *
4  * This code is free software; you can redistribute it and/or modify it
5  * under the terms of the GNU General Public License version 2 only, as
6  * published by the Free Software Foundation.  Oracle designates this
7  * particular file as subject to the "Classpath" exception as provided
8  * by Oracle in the LICENSE file that accompanied this code.
9  *
10  * This code is distributed in the hope that it will be useful, but WITHOUT
11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
13  * version 2 for more details (a copy is included in the LICENSE file that
14  * accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License version
17  * 2 along with this work; if not, write to the Free Software Foundation,
18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19  *
20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21  * or visit www.oracle.com if you need additional information or have any
22  * questions.
23  */
24 
25 /*
26  * This file is available under and governed by the GNU General Public
27  * License version 2 only, as published by the Free Software Foundation.
28  * However, the following notice accompanied the original version of this
29  * file:
30  *
31  * Written by Doug Lea with assistance from members of JCP JSR-166
32  * Expert Group and released to the public domain, as explained at
33  * http://creativecommons.org/publicdomain/zero/1.0/
34  */
35 
36 package java.util.concurrent;
37 
38 import java.lang.invoke.MethodHandles;
39 import java.lang.invoke.VarHandle;
40 import java.util.AbstractQueue;
41 import java.util.Arrays;
42 import java.util.Collection;
43 import java.util.Comparator;
44 import java.util.Iterator;
45 import java.util.NoSuchElementException;
46 import java.util.Objects;
47 import java.util.PriorityQueue;
48 import java.util.Queue;
49 import java.util.SortedSet;
50 import java.util.Spliterator;
51 import java.util.concurrent.locks.Condition;
52 import java.util.concurrent.locks.ReentrantLock;
53 import java.util.function.Consumer;
54 import java.util.function.Predicate;
55 import jdk.internal.access.SharedSecrets;
56 import jdk.internal.util.ArraysSupport;
57 
58 /**
59  * An unbounded {@linkplain BlockingQueue blocking queue} that uses
60  * the same ordering rules as class {@link PriorityQueue} and supplies
61  * blocking retrieval operations.  While this queue is logically
62  * unbounded, attempted additions may fail due to resource exhaustion
63  * (causing {@code OutOfMemoryError}). This class does not permit
64  * {@code null} elements.  A priority queue relying on {@linkplain
65  * Comparable natural ordering} also does not permit insertion of
66  * non-comparable objects (doing so results in
67  * {@code ClassCastException}).
68  *
69  * <p>This class and its iterator implement all of the <em>optional</em>
70  * methods of the {@link Collection} and {@link Iterator} interfaces.
71  * The Iterator provided in method {@link #iterator()} and the
72  * Spliterator provided in method {@link #spliterator()} are <em>not</em>
73  * guaranteed to traverse the elements of the PriorityBlockingQueue in
74  * any particular order. If you need ordered traversal, consider using
75  * {@code Arrays.sort(pq.toArray())}.  Also, method {@code drainTo} can
76  * be used to <em>remove</em> some or all elements in priority order and
77  * place them in another collection.
78  *
79  * <p>Operations on this class make no guarantees about the ordering
80  * of elements with equal priority. If you need to enforce an
81  * ordering, you can define custom classes or comparators that use a
82  * secondary key to break ties in primary priority values.  For
83  * example, here is a class that applies first-in-first-out
84  * tie-breaking to comparable elements. To use it, you would insert a
85  * {@code new FIFOEntry(anEntry)} instead of a plain entry object.
86  *
87  * <pre> {@code
88  * class FIFOEntry<E extends Comparable<? super E>>
89  *     implements Comparable<FIFOEntry<E>> {
90  *   static final AtomicLong seq = new AtomicLong();
91  *   final long seqNum;
92  *   final E entry;
93  *   public FIFOEntry(E entry) {
94  *     seqNum = seq.getAndIncrement();
95  *     this.entry = entry;
96  *   }
97  *   public E getEntry() { return entry; }
98  *   public int compareTo(FIFOEntry<E> other) {
99  *     int res = entry.compareTo(other.entry);
100  *     if (res == 0 && other.entry != this.entry)
101  *       res = (seqNum < other.seqNum ? -1 : 1);
102  *     return res;
103  *   }
104  * }}</pre>
105  *
106  * <p>This class is a member of the
107  * <a href="{@docRoot}/java.base/java/util/package-summary.html#CollectionsFramework">
108  * Java Collections Framework</a>.
109  *
110  * @since 1.5
111  * @author Doug Lea
112  * @param <E> the type of elements held in this queue
113  */
114 @SuppressWarnings("unchecked")
115 public class PriorityBlockingQueue<E> extends AbstractQueue<E>
116     implements BlockingQueue<E>, java.io.Serializable {
117     private static final long serialVersionUID = 5595510919245408276L;
118 
119     /*
120      * The implementation uses an array-based binary heap, with public
121      * operations protected with a single lock. However, allocation
122      * during resizing uses a simple spinlock (used only while not
123      * holding main lock) in order to allow takes to operate
124      * concurrently with allocation.  This avoids repeated
125      * postponement of waiting consumers and consequent element
126      * build-up. The need to back away from lock during allocation
127      * makes it impossible to simply wrap delegated
128      * java.util.PriorityQueue operations within a lock, as was done
129      * in a previous version of this class. To maintain
130      * interoperability, a plain PriorityQueue is still used during
131      * serialization, which maintains compatibility at the expense of
132      * transiently doubling overhead.
133      */
134 
135     /**
136      * Default array capacity.
137      */
138     private static final int DEFAULT_INITIAL_CAPACITY = 11;
139 
140     /**
141      * Priority queue represented as a balanced binary heap: the two
142      * children of queue[n] are queue[2*n+1] and queue[2*(n+1)].  The
143      * priority queue is ordered by comparator, or by the elements'
144      * natural ordering, if comparator is null: For each node n in the
145      * heap and each descendant d of n, n <= d.  The element with the
146      * lowest value is in queue[0], assuming the queue is nonempty.
147      */
148     private transient Object[] queue;
149 
150     /**
151      * The number of elements in the priority queue.
152      */
153     private transient int size;
154 
155     /**
156      * The comparator, or null if priority queue uses elements'
157      * natural ordering.
158      */
159     private transient Comparator<? super E> comparator;
160 
161     /**
162      * Lock used for all public operations.
163      */
164     private final ReentrantLock lock = new ReentrantLock();
165 
166     /**
167      * Condition for blocking when empty.
168      */
169     @SuppressWarnings("serial") // Classes implementing Condition may be serializable.
170     private final Condition notEmpty = lock.newCondition();
171 
172     /**
173      * Spinlock for allocation, acquired via CAS.
174      */
175     private transient volatile int allocationSpinLock;
176 
177     /**
178      * A plain PriorityQueue used only for serialization,
179      * to maintain compatibility with previous versions
180      * of this class. Non-null only during serialization/deserialization.
181      */
182     private PriorityQueue<E> q;
183 
184     /**
185      * Creates a {@code PriorityBlockingQueue} with the default
186      * initial capacity (11) that orders its elements according to
187      * their {@linkplain Comparable natural ordering}.
188      */
PriorityBlockingQueue()189     public PriorityBlockingQueue() {
190         this(DEFAULT_INITIAL_CAPACITY, null);
191     }
192 
193     /**
194      * Creates a {@code PriorityBlockingQueue} with the specified
195      * initial capacity that orders its elements according to their
196      * {@linkplain Comparable natural ordering}.
197      *
198      * @param initialCapacity the initial capacity for this priority queue
199      * @throws IllegalArgumentException if {@code initialCapacity} is less
200      *         than 1
201      */
PriorityBlockingQueue(int initialCapacity)202     public PriorityBlockingQueue(int initialCapacity) {
203         this(initialCapacity, null);
204     }
205 
206     /**
207      * Creates a {@code PriorityBlockingQueue} with the specified initial
208      * capacity that orders its elements according to the specified
209      * comparator.
210      *
211      * @param initialCapacity the initial capacity for this priority queue
212      * @param  comparator the comparator that will be used to order this
213      *         priority queue.  If {@code null}, the {@linkplain Comparable
214      *         natural ordering} of the elements will be used.
215      * @throws IllegalArgumentException if {@code initialCapacity} is less
216      *         than 1
217      */
PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator)218     public PriorityBlockingQueue(int initialCapacity,
219                                  Comparator<? super E> comparator) {
220         if (initialCapacity < 1)
221             throw new IllegalArgumentException();
222         this.comparator = comparator;
223         this.queue = new Object[Math.max(1, initialCapacity)];
224     }
225 
226     /**
227      * Creates a {@code PriorityBlockingQueue} containing the elements
228      * in the specified collection.  If the specified collection is a
229      * {@link SortedSet} or a {@link PriorityBlockingQueue}, this
230      * priority queue will be ordered according to the same ordering.
231      * Otherwise, this priority queue will be ordered according to the
232      * {@linkplain Comparable natural ordering} of its elements.
233      *
234      * @param  c the collection whose elements are to be placed
235      *         into this priority queue
236      * @throws ClassCastException if elements of the specified collection
237      *         cannot be compared to one another according to the priority
238      *         queue's ordering
239      * @throws NullPointerException if the specified collection or any
240      *         of its elements are null
241      */
PriorityBlockingQueue(Collection<? extends E> c)242     public PriorityBlockingQueue(Collection<? extends E> c) {
243         boolean heapify = true; // true if not known to be in heap order
244         boolean screen = true;  // true if must screen for nulls
245         if (c instanceof SortedSet<?>) {
246             SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
247             this.comparator = (Comparator<? super E>) ss.comparator();
248             heapify = false;
249         }
250         else if (c instanceof PriorityBlockingQueue<?>) {
251             PriorityBlockingQueue<? extends E> pq =
252                 (PriorityBlockingQueue<? extends E>) c;
253             this.comparator = (Comparator<? super E>) pq.comparator();
254             screen = false;
255             if (pq.getClass() == PriorityBlockingQueue.class) // exact match
256                 heapify = false;
257         }
258         Object[] es = c.toArray();
259         int n = es.length;
260         // Android-changed: Defend against c.toArray (incorrectly) not returning Object[]
261         //                  (see b/204397945)
262         // if (c.getClass() != java.util.ArrayList.class)
263         if (es.getClass() != Object[].class)
264             es = Arrays.copyOf(es, n, Object[].class);
265         if (screen && (n == 1 || this.comparator != null)) {
266             for (Object e : es)
267                 if (e == null)
268                     throw new NullPointerException();
269         }
270         this.queue = ensureNonEmpty(es);
271         this.size = n;
272         if (heapify)
273             heapify();
274     }
275 
276     /** Ensures that queue[0] exists, helping peek() and poll(). */
ensureNonEmpty(Object[] es)277     private static Object[] ensureNonEmpty(Object[] es) {
278         return (es.length > 0) ? es : new Object[1];
279     }
280 
281     /**
282      * Tries to grow array to accommodate at least one more element
283      * (but normally expand by about 50%), giving up (allowing retry)
284      * on contention (which we expect to be rare). Call only while
285      * holding lock.
286      *
287      * @param array the heap array
288      * @param oldCap the length of the array
289      */
tryGrow(Object[] array, int oldCap)290     private void tryGrow(Object[] array, int oldCap) {
291         lock.unlock(); // must release and then re-acquire main lock
292         Object[] newArray = null;
293         if (allocationSpinLock == 0 &&
294             ALLOCATIONSPINLOCK.compareAndSet(this, 0, 1)) {
295             try {
296                 int growth = (oldCap < 64)
297                     ? (oldCap + 2) // grow faster if small
298                     : (oldCap >> 1);
299                 int newCap = ArraysSupport.newLength(oldCap, 1, growth);
300                 if (queue == array)
301                     newArray = new Object[newCap];
302             } finally {
303                 allocationSpinLock = 0;
304             }
305         }
306         if (newArray == null) // back off if another thread is allocating
307             Thread.yield();
308         lock.lock();
309         if (newArray != null && queue == array) {
310             queue = newArray;
311             System.arraycopy(array, 0, newArray, 0, oldCap);
312         }
313     }
314 
315     /**
316      * Mechanics for poll().  Call only while holding lock.
317      */
dequeue()318     private E dequeue() {
319         // assert lock.isHeldByCurrentThread();
320         final Object[] es;
321         final E result;
322 
323         if ((result = (E) ((es = queue)[0])) != null) {
324             final int n;
325             final E x = (E) es[(n = --size)];
326             es[n] = null;
327             if (n > 0) {
328                 final Comparator<? super E> cmp;
329                 if ((cmp = comparator) == null)
330                     siftDownComparable(0, x, es, n);
331                 else
332                     siftDownUsingComparator(0, x, es, n, cmp);
333             }
334         }
335         return result;
336     }
337 
338     /**
339      * Inserts item x at position k, maintaining heap invariant by
340      * promoting x up the tree until it is greater than or equal to
341      * its parent, or is the root.
342      *
343      * To simplify and speed up coercions and comparisons, the
344      * Comparable and Comparator versions are separated into different
345      * methods that are otherwise identical. (Similarly for siftDown.)
346      *
347      * @param k the position to fill
348      * @param x the item to insert
349      * @param es the heap array
350      */
siftUpComparable(int k, T x, Object[] es)351     private static <T> void siftUpComparable(int k, T x, Object[] es) {
352         Comparable<? super T> key = (Comparable<? super T>) x;
353         while (k > 0) {
354             int parent = (k - 1) >>> 1;
355             Object e = es[parent];
356             if (key.compareTo((T) e) >= 0)
357                 break;
358             es[k] = e;
359             k = parent;
360         }
361         es[k] = key;
362     }
363 
siftUpUsingComparator( int k, T x, Object[] es, Comparator<? super T> cmp)364     private static <T> void siftUpUsingComparator(
365         int k, T x, Object[] es, Comparator<? super T> cmp) {
366         while (k > 0) {
367             int parent = (k - 1) >>> 1;
368             Object e = es[parent];
369             if (cmp.compare(x, (T) e) >= 0)
370                 break;
371             es[k] = e;
372             k = parent;
373         }
374         es[k] = x;
375     }
376 
377     /**
378      * Inserts item x at position k, maintaining heap invariant by
379      * demoting x down the tree repeatedly until it is less than or
380      * equal to its children or is a leaf.
381      *
382      * @param k the position to fill
383      * @param x the item to insert
384      * @param es the heap array
385      * @param n heap size
386      */
siftDownComparable(int k, T x, Object[] es, int n)387     private static <T> void siftDownComparable(int k, T x, Object[] es, int n) {
388         // assert n > 0;
389         Comparable<? super T> key = (Comparable<? super T>)x;
390         int half = n >>> 1;           // loop while a non-leaf
391         while (k < half) {
392             int child = (k << 1) + 1; // assume left child is least
393             Object c = es[child];
394             int right = child + 1;
395             if (right < n &&
396                 ((Comparable<? super T>) c).compareTo((T) es[right]) > 0)
397                 c = es[child = right];
398             if (key.compareTo((T) c) <= 0)
399                 break;
400             es[k] = c;
401             k = child;
402         }
403         es[k] = key;
404     }
405 
siftDownUsingComparator( int k, T x, Object[] es, int n, Comparator<? super T> cmp)406     private static <T> void siftDownUsingComparator(
407         int k, T x, Object[] es, int n, Comparator<? super T> cmp) {
408         // assert n > 0;
409         int half = n >>> 1;
410         while (k < half) {
411             int child = (k << 1) + 1;
412             Object c = es[child];
413             int right = child + 1;
414             if (right < n && cmp.compare((T) c, (T) es[right]) > 0)
415                 c = es[child = right];
416             if (cmp.compare(x, (T) c) <= 0)
417                 break;
418             es[k] = c;
419             k = child;
420         }
421         es[k] = x;
422     }
423 
424     /**
425      * Establishes the heap invariant (described above) in the entire tree,
426      * assuming nothing about the order of the elements prior to the call.
427      * This classic algorithm due to Floyd (1964) is known to be O(size).
428      */
heapify()429     private void heapify() {
430         final Object[] es = queue;
431         int n = size, i = (n >>> 1) - 1;
432         final Comparator<? super E> cmp;
433         if ((cmp = comparator) == null)
434             for (; i >= 0; i--)
435                 siftDownComparable(i, (E) es[i], es, n);
436         else
437             for (; i >= 0; i--)
438                 siftDownUsingComparator(i, (E) es[i], es, n, cmp);
439     }
440 
441     /**
442      * Inserts the specified element into this priority queue.
443      *
444      * @param e the element to add
445      * @return {@code true} (as specified by {@link Collection#add})
446      * @throws ClassCastException if the specified element cannot be compared
447      *         with elements currently in the priority queue according to the
448      *         priority queue's ordering
449      * @throws NullPointerException if the specified element is null
450      */
add(E e)451     public boolean add(E e) {
452         return offer(e);
453     }
454 
455     /**
456      * Inserts the specified element into this priority queue.
457      * As the queue is unbounded, this method will never return {@code false}.
458      *
459      * @param e the element to add
460      * @return {@code true} (as specified by {@link Queue#offer})
461      * @throws ClassCastException if the specified element cannot be compared
462      *         with elements currently in the priority queue according to the
463      *         priority queue's ordering
464      * @throws NullPointerException if the specified element is null
465      */
offer(E e)466     public boolean offer(E e) {
467         if (e == null)
468             throw new NullPointerException();
469         final ReentrantLock lock = this.lock;
470         lock.lock();
471         int n, cap;
472         Object[] es;
473         while ((n = size) >= (cap = (es = queue).length))
474             tryGrow(es, cap);
475         try {
476             final Comparator<? super E> cmp;
477             if ((cmp = comparator) == null)
478                 siftUpComparable(n, e, es);
479             else
480                 siftUpUsingComparator(n, e, es, cmp);
481             size = n + 1;
482             notEmpty.signal();
483         } finally {
484             lock.unlock();
485         }
486         return true;
487     }
488 
489     /**
490      * Inserts the specified element into this priority queue.
491      * As the queue is unbounded, this method will never block.
492      *
493      * @param e the element to add
494      * @throws ClassCastException if the specified element cannot be compared
495      *         with elements currently in the priority queue according to the
496      *         priority queue's ordering
497      * @throws NullPointerException if the specified element is null
498      */
put(E e)499     public void put(E e) {
500         offer(e); // never need to block
501     }
502 
503     /**
504      * Inserts the specified element into this priority queue.
505      * As the queue is unbounded, this method will never block or
506      * return {@code false}.
507      *
508      * @param e the element to add
509      * @param timeout This parameter is ignored as the method never blocks
510      * @param unit This parameter is ignored as the method never blocks
511      * @return {@code true} (as specified by
512      *  {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer})
513      * @throws ClassCastException if the specified element cannot be compared
514      *         with elements currently in the priority queue according to the
515      *         priority queue's ordering
516      * @throws NullPointerException if the specified element is null
517      */
offer(E e, long timeout, TimeUnit unit)518     public boolean offer(E e, long timeout, TimeUnit unit) {
519         return offer(e); // never need to block
520     }
521 
poll()522     public E poll() {
523         final ReentrantLock lock = this.lock;
524         lock.lock();
525         try {
526             return dequeue();
527         } finally {
528             lock.unlock();
529         }
530     }
531 
take()532     public E take() throws InterruptedException {
533         final ReentrantLock lock = this.lock;
534         lock.lockInterruptibly();
535         E result;
536         try {
537             while ( (result = dequeue()) == null)
538                 notEmpty.await();
539         } finally {
540             lock.unlock();
541         }
542         return result;
543     }
544 
poll(long timeout, TimeUnit unit)545     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
546         long nanos = unit.toNanos(timeout);
547         final ReentrantLock lock = this.lock;
548         lock.lockInterruptibly();
549         E result;
550         try {
551             while ( (result = dequeue()) == null && nanos > 0)
552                 nanos = notEmpty.awaitNanos(nanos);
553         } finally {
554             lock.unlock();
555         }
556         return result;
557     }
558 
peek()559     public E peek() {
560         final ReentrantLock lock = this.lock;
561         lock.lock();
562         try {
563             return (E) queue[0];
564         } finally {
565             lock.unlock();
566         }
567     }
568 
569     /**
570      * Returns the comparator used to order the elements in this queue,
571      * or {@code null} if this queue uses the {@linkplain Comparable
572      * natural ordering} of its elements.
573      *
574      * @return the comparator used to order the elements in this queue,
575      *         or {@code null} if this queue uses the natural
576      *         ordering of its elements
577      */
comparator()578     public Comparator<? super E> comparator() {
579         return comparator;
580     }
581 
size()582     public int size() {
583         final ReentrantLock lock = this.lock;
584         lock.lock();
585         try {
586             return size;
587         } finally {
588             lock.unlock();
589         }
590     }
591 
592     /**
593      * Always returns {@code Integer.MAX_VALUE} because
594      * a {@code PriorityBlockingQueue} is not capacity constrained.
595      * @return {@code Integer.MAX_VALUE} always
596      */
remainingCapacity()597     public int remainingCapacity() {
598         return Integer.MAX_VALUE;
599     }
600 
indexOf(Object o)601     private int indexOf(Object o) {
602         if (o != null) {
603             final Object[] es = queue;
604             for (int i = 0, n = size; i < n; i++)
605                 if (o.equals(es[i]))
606                     return i;
607         }
608         return -1;
609     }
610 
611     /**
612      * Removes the ith element from queue.
613      */
removeAt(int i)614     private void removeAt(int i) {
615         final Object[] es = queue;
616         final int n = size - 1;
617         if (n == i) // removed last element
618             es[i] = null;
619         else {
620             E moved = (E) es[n];
621             es[n] = null;
622             final Comparator<? super E> cmp;
623             if ((cmp = comparator) == null)
624                 siftDownComparable(i, moved, es, n);
625             else
626                 siftDownUsingComparator(i, moved, es, n, cmp);
627             if (es[i] == moved) {
628                 if (cmp == null)
629                     siftUpComparable(i, moved, es);
630                 else
631                     siftUpUsingComparator(i, moved, es, cmp);
632             }
633         }
634         size = n;
635     }
636 
637     /**
638      * Removes a single instance of the specified element from this queue,
639      * if it is present.  More formally, removes an element {@code e} such
640      * that {@code o.equals(e)}, if this queue contains one or more such
641      * elements.  Returns {@code true} if and only if this queue contained
642      * the specified element (or equivalently, if this queue changed as a
643      * result of the call).
644      *
645      * @param o element to be removed from this queue, if present
646      * @return {@code true} if this queue changed as a result of the call
647      */
remove(Object o)648     public boolean remove(Object o) {
649         final ReentrantLock lock = this.lock;
650         lock.lock();
651         try {
652             int i = indexOf(o);
653             if (i == -1)
654                 return false;
655             removeAt(i);
656             return true;
657         } finally {
658             lock.unlock();
659         }
660     }
661 
662     /**
663      * Identity-based version for use in Itr.remove.
664      *
665      * @param o element to be removed from this queue, if present
666      */
removeEq(Object o)667     void removeEq(Object o) {
668         final ReentrantLock lock = this.lock;
669         lock.lock();
670         try {
671             final Object[] es = queue;
672             for (int i = 0, n = size; i < n; i++) {
673                 if (o == es[i]) {
674                     removeAt(i);
675                     break;
676                 }
677             }
678         } finally {
679             lock.unlock();
680         }
681     }
682 
683     /**
684      * Returns {@code true} if this queue contains the specified element.
685      * More formally, returns {@code true} if and only if this queue contains
686      * at least one element {@code e} such that {@code o.equals(e)}.
687      *
688      * @param o object to be checked for containment in this queue
689      * @return {@code true} if this queue contains the specified element
690      */
contains(Object o)691     public boolean contains(Object o) {
692         final ReentrantLock lock = this.lock;
693         lock.lock();
694         try {
695             return indexOf(o) != -1;
696         } finally {
697             lock.unlock();
698         }
699     }
700 
toString()701     public String toString() {
702         return Helpers.collectionToString(this);
703     }
704 
705     /**
706      * @throws UnsupportedOperationException {@inheritDoc}
707      * @throws ClassCastException            {@inheritDoc}
708      * @throws NullPointerException          {@inheritDoc}
709      * @throws IllegalArgumentException      {@inheritDoc}
710      */
drainTo(Collection<? super E> c)711     public int drainTo(Collection<? super E> c) {
712         return drainTo(c, Integer.MAX_VALUE);
713     }
714 
715     /**
716      * @throws UnsupportedOperationException {@inheritDoc}
717      * @throws ClassCastException            {@inheritDoc}
718      * @throws NullPointerException          {@inheritDoc}
719      * @throws IllegalArgumentException      {@inheritDoc}
720      */
drainTo(Collection<? super E> c, int maxElements)721     public int drainTo(Collection<? super E> c, int maxElements) {
722         Objects.requireNonNull(c);
723         if (c == this)
724             throw new IllegalArgumentException();
725         if (maxElements <= 0)
726             return 0;
727         final ReentrantLock lock = this.lock;
728         lock.lock();
729         try {
730             int n = Math.min(size, maxElements);
731             for (int i = 0; i < n; i++) {
732                 c.add((E) queue[0]); // In this order, in case add() throws.
733                 dequeue();
734             }
735             return n;
736         } finally {
737             lock.unlock();
738         }
739     }
740 
741     /**
742      * Atomically removes all of the elements from this queue.
743      * The queue will be empty after this call returns.
744      */
clear()745     public void clear() {
746         final ReentrantLock lock = this.lock;
747         lock.lock();
748         try {
749             final Object[] es = queue;
750             for (int i = 0, n = size; i < n; i++)
751                 es[i] = null;
752             size = 0;
753         } finally {
754             lock.unlock();
755         }
756     }
757 
758     /**
759      * Returns an array containing all of the elements in this queue.
760      * The returned array elements are in no particular order.
761      *
762      * <p>The returned array will be "safe" in that no references to it are
763      * maintained by this queue.  (In other words, this method must allocate
764      * a new array).  The caller is thus free to modify the returned array.
765      *
766      * <p>This method acts as bridge between array-based and collection-based
767      * APIs.
768      *
769      * @return an array containing all of the elements in this queue
770      */
toArray()771     public Object[] toArray() {
772         final ReentrantLock lock = this.lock;
773         lock.lock();
774         try {
775             return Arrays.copyOf(queue, size);
776         } finally {
777             lock.unlock();
778         }
779     }
780 
781     /**
782      * Returns an array containing all of the elements in this queue; the
783      * runtime type of the returned array is that of the specified array.
784      * The returned array elements are in no particular order.
785      * If the queue fits in the specified array, it is returned therein.
786      * Otherwise, a new array is allocated with the runtime type of the
787      * specified array and the size of this queue.
788      *
789      * <p>If this queue fits in the specified array with room to spare
790      * (i.e., the array has more elements than this queue), the element in
791      * the array immediately following the end of the queue is set to
792      * {@code null}.
793      *
794      * <p>Like the {@link #toArray()} method, this method acts as bridge between
795      * array-based and collection-based APIs.  Further, this method allows
796      * precise control over the runtime type of the output array, and may,
797      * under certain circumstances, be used to save allocation costs.
798      *
799      * <p>Suppose {@code x} is a queue known to contain only strings.
800      * The following code can be used to dump the queue into a newly
801      * allocated array of {@code String}:
802      *
803      * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
804      *
805      * Note that {@code toArray(new Object[0])} is identical in function to
806      * {@code toArray()}.
807      *
808      * @param a the array into which the elements of the queue are to
809      *          be stored, if it is big enough; otherwise, a new array of the
810      *          same runtime type is allocated for this purpose
811      * @return an array containing all of the elements in this queue
812      * @throws ArrayStoreException if the runtime type of the specified array
813      *         is not a supertype of the runtime type of every element in
814      *         this queue
815      * @throws NullPointerException if the specified array is null
816      */
toArray(T[] a)817     public <T> T[] toArray(T[] a) {
818         final ReentrantLock lock = this.lock;
819         lock.lock();
820         try {
821             int n = size;
822             if (a.length < n)
823                 // Make a new array of a's runtime type, but my contents:
824                 return (T[]) Arrays.copyOf(queue, size, a.getClass());
825             System.arraycopy(queue, 0, a, 0, n);
826             if (a.length > n)
827                 a[n] = null;
828             return a;
829         } finally {
830             lock.unlock();
831         }
832     }
833 
834     /**
835      * Returns an iterator over the elements in this queue. The
836      * iterator does not return the elements in any particular order.
837      *
838      * <p>The returned iterator is
839      * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
840      *
841      * @return an iterator over the elements in this queue
842      */
iterator()843     public Iterator<E> iterator() {
844         return new Itr(toArray());
845     }
846 
847     /**
848      * Snapshot iterator that works off copy of underlying q array.
849      */
850     final class Itr implements Iterator<E> {
851         final Object[] array; // Array of all elements
852         int cursor;           // index of next element to return
853         int lastRet = -1;     // index of last element, or -1 if no such
854 
Itr(Object[] array)855         Itr(Object[] array) {
856             this.array = array;
857         }
858 
hasNext()859         public boolean hasNext() {
860             return cursor < array.length;
861         }
862 
next()863         public E next() {
864             if (cursor >= array.length)
865                 throw new NoSuchElementException();
866             return (E)array[lastRet = cursor++];
867         }
868 
remove()869         public void remove() {
870             if (lastRet < 0)
871                 throw new IllegalStateException();
872             removeEq(array[lastRet]);
873             lastRet = -1;
874         }
875 
forEachRemaining(Consumer<? super E> action)876         public void forEachRemaining(Consumer<? super E> action) {
877             Objects.requireNonNull(action);
878             final Object[] es = array;
879             int i;
880             if ((i = cursor) < es.length) {
881                 lastRet = -1;
882                 cursor = es.length;
883                 for (; i < es.length; i++)
884                     action.accept((E) es[i]);
885                 lastRet = es.length - 1;
886             }
887         }
888     }
889 
890     /**
891      * Saves this queue to a stream (that is, serializes it).
892      *
893      * For compatibility with previous version of this class, elements
894      * are first copied to a java.util.PriorityQueue, which is then
895      * serialized.
896      *
897      * @param s the stream
898      * @throws java.io.IOException if an I/O error occurs
899      */
writeObject(java.io.ObjectOutputStream s)900     private void writeObject(java.io.ObjectOutputStream s)
901         throws java.io.IOException {
902         lock.lock();
903         try {
904             // avoid zero capacity argument
905             q = new PriorityQueue<E>(Math.max(size, 1), comparator);
906             q.addAll(this);
907             s.defaultWriteObject();
908         } finally {
909             q = null;
910             lock.unlock();
911         }
912     }
913 
914     /**
915      * Reconstitutes this queue from a stream (that is, deserializes it).
916      * @param s the stream
917      * @throws ClassNotFoundException if the class of a serialized object
918      *         could not be found
919      * @throws java.io.IOException if an I/O error occurs
920      */
readObject(java.io.ObjectInputStream s)921     private void readObject(java.io.ObjectInputStream s)
922         throws java.io.IOException, ClassNotFoundException {
923         try {
924             s.defaultReadObject();
925             int sz = q.size();
926             SharedSecrets.getJavaObjectInputStreamAccess().checkArray(s, Object[].class, sz);
927             this.queue = new Object[Math.max(1, sz)];
928             comparator = q.comparator();
929             addAll(q);
930         } finally {
931             q = null;
932         }
933     }
934 
935     /**
936      * Immutable snapshot spliterator that binds to elements "late".
937      */
938     final class PBQSpliterator implements Spliterator<E> {
939         Object[] array;        // null until late-bound-initialized
940         int index;
941         int fence;
942 
PBQSpliterator()943         PBQSpliterator() {}
944 
PBQSpliterator(Object[] array, int index, int fence)945         PBQSpliterator(Object[] array, int index, int fence) {
946             this.array = array;
947             this.index = index;
948             this.fence = fence;
949         }
950 
getFence()951         private int getFence() {
952             if (array == null)
953                 fence = (array = toArray()).length;
954             return fence;
955         }
956 
trySplit()957         public PBQSpliterator trySplit() {
958             int hi = getFence(), lo = index, mid = (lo + hi) >>> 1;
959             return (lo >= mid) ? null :
960                 new PBQSpliterator(array, lo, index = mid);
961         }
962 
forEachRemaining(Consumer<? super E> action)963         public void forEachRemaining(Consumer<? super E> action) {
964             Objects.requireNonNull(action);
965             final int hi = getFence(), lo = index;
966             final Object[] es = array;
967             index = hi;                 // ensure exhaustion
968             for (int i = lo; i < hi; i++)
969                 action.accept((E) es[i]);
970         }
971 
tryAdvance(Consumer<? super E> action)972         public boolean tryAdvance(Consumer<? super E> action) {
973             Objects.requireNonNull(action);
974             if (getFence() > index && index >= 0) {
975                 action.accept((E) array[index++]);
976                 return true;
977             }
978             return false;
979         }
980 
estimateSize()981         public long estimateSize() { return getFence() - index; }
982 
characteristics()983         public int characteristics() {
984             return (Spliterator.NONNULL |
985                     Spliterator.SIZED |
986                     Spliterator.SUBSIZED);
987         }
988     }
989 
990     /**
991      * Returns a {@link Spliterator} over the elements in this queue.
992      * The spliterator does not traverse elements in any particular order
993      * (the {@link Spliterator#ORDERED ORDERED} characteristic is not reported).
994      *
995      * <p>The returned spliterator is
996      * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
997      *
998      * <p>The {@code Spliterator} reports {@link Spliterator#SIZED} and
999      * {@link Spliterator#NONNULL}.
1000      *
1001      * @implNote
1002      * The {@code Spliterator} additionally reports {@link Spliterator#SUBSIZED}.
1003      *
1004      * @return a {@code Spliterator} over the elements in this queue
1005      * @since 1.8
1006      */
spliterator()1007     public Spliterator<E> spliterator() {
1008         return new PBQSpliterator();
1009     }
1010 
1011     /**
1012      * @throws NullPointerException {@inheritDoc}
1013      */
removeIf(Predicate<? super E> filter)1014     public boolean removeIf(Predicate<? super E> filter) {
1015         Objects.requireNonNull(filter);
1016         return bulkRemove(filter);
1017     }
1018 
1019     /**
1020      * @throws NullPointerException {@inheritDoc}
1021      */
removeAll(Collection<?> c)1022     public boolean removeAll(Collection<?> c) {
1023         Objects.requireNonNull(c);
1024         return bulkRemove(e -> c.contains(e));
1025     }
1026 
1027     /**
1028      * @throws NullPointerException {@inheritDoc}
1029      */
retainAll(Collection<?> c)1030     public boolean retainAll(Collection<?> c) {
1031         Objects.requireNonNull(c);
1032         return bulkRemove(e -> !c.contains(e));
1033     }
1034 
1035     // A tiny bit set implementation
1036 
nBits(int n)1037     private static long[] nBits(int n) {
1038         return new long[((n - 1) >> 6) + 1];
1039     }
setBit(long[] bits, int i)1040     private static void setBit(long[] bits, int i) {
1041         bits[i >> 6] |= 1L << i;
1042     }
isClear(long[] bits, int i)1043     private static boolean isClear(long[] bits, int i) {
1044         return (bits[i >> 6] & (1L << i)) == 0;
1045     }
1046 
1047     /** Implementation of bulk remove methods. */
bulkRemove(Predicate<? super E> filter)1048     private boolean bulkRemove(Predicate<? super E> filter) {
1049         final ReentrantLock lock = this.lock;
1050         lock.lock();
1051         try {
1052             final Object[] es = queue;
1053             final int end = size;
1054             int i;
1055             // Optimize for initial run of survivors
1056             for (i = 0; i < end && !filter.test((E) es[i]); i++)
1057                 ;
1058             if (i >= end)
1059                 return false;
1060             // Tolerate predicates that reentrantly access the
1061             // collection for read, so traverse once to find elements
1062             // to delete, a second pass to physically expunge.
1063             final int beg = i;
1064             final long[] deathRow = nBits(end - beg);
1065             deathRow[0] = 1L;   // set bit 0
1066             for (i = beg + 1; i < end; i++)
1067                 if (filter.test((E) es[i]))
1068                     setBit(deathRow, i - beg);
1069             int w = beg;
1070             for (i = beg; i < end; i++)
1071                 if (isClear(deathRow, i - beg))
1072                     es[w++] = es[i];
1073             for (i = size = w; i < end; i++)
1074                 es[i] = null;
1075             heapify();
1076             return true;
1077         } finally {
1078             lock.unlock();
1079         }
1080     }
1081 
1082     /**
1083      * @throws NullPointerException {@inheritDoc}
1084      */
forEach(Consumer<? super E> action)1085     public void forEach(Consumer<? super E> action) {
1086         Objects.requireNonNull(action);
1087         final ReentrantLock lock = this.lock;
1088         lock.lock();
1089         try {
1090             final Object[] es = queue;
1091             for (int i = 0, n = size; i < n; i++)
1092                 action.accept((E) es[i]);
1093         } finally {
1094             lock.unlock();
1095         }
1096     }
1097 
1098     // VarHandle mechanics
1099     private static final VarHandle ALLOCATIONSPINLOCK;
1100     static {
1101         try {
1102             MethodHandles.Lookup l = MethodHandles.lookup();
1103             ALLOCATIONSPINLOCK = l.findVarHandle(PriorityBlockingQueue.class,
1104                                                  "allocationSpinLock",
1105                                                  int.class);
1106         } catch (ReflectiveOperationException e) {
1107             throw new ExceptionInInitializerError(e);
1108         }
1109     }
1110 }
1111