1 /*
2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3  *
4  * This code is free software; you can redistribute it and/or modify it
5  * under the terms of the GNU General Public License version 2 only, as
6  * published by the Free Software Foundation.  Oracle designates this
7  * particular file as subject to the "Classpath" exception as provided
8  * by Oracle in the LICENSE file that accompanied this code.
9  *
10  * This code is distributed in the hope that it will be useful, but WITHOUT
11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
13  * version 2 for more details (a copy is included in the LICENSE file that
14  * accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License version
17  * 2 along with this work; if not, write to the Free Software Foundation,
18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19  *
20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21  * or visit www.oracle.com if you need additional information or have any
22  * questions.
23  */

24
25 /*
26  * This file is available under and governed by the GNU General Public
27  * License version 2 only, as published by the Free Software Foundation.
28  * However, the following notice accompanied the original version of this
29  * file:
30  *
31  * Written by Doug Lea with assistance from members of JCP JSR-166
32  * Expert Group and released to the public domain, as explained at
33  * http://creativecommons.org/publicdomain/zero/1.0/
34  */

35
36 package java.util.concurrent;
37
38 import java.lang.invoke.MethodHandles;
39 import java.lang.invoke.VarHandle;
40 import java.util.AbstractQueue;
41 import java.util.Arrays;
42 import java.util.Collection;
43 import java.util.Comparator;
44 import java.util.Iterator;
45 import java.util.NoSuchElementException;
46 import java.util.Objects;
47 import java.util.PriorityQueue;
48 import java.util.Queue;
49 import java.util.SortedSet;
50 import java.util.Spliterator;
51 import java.util.concurrent.locks.Condition;
52 import java.util.concurrent.locks.ReentrantLock;
53 import java.util.function.Consumer;
54 import java.util.function.Predicate;
55 import jdk.internal.misc.SharedSecrets;
56
57 /**
58  * An unbounded {@linkplain BlockingQueue blocking queue} that uses
59  * the same ordering rules as class {@link PriorityQueue} and supplies
60  * blocking retrieval operations.  While this queue is logically
61  * unbounded, attempted additions may fail due to resource exhaustion
62  * (causing {@code OutOfMemoryError}). This class does not permit
63  * {@code null} elements.  A priority queue relying on {@linkplain
64  * Comparable natural ordering} also does not permit insertion of
65  * non-comparable objects (doing so results in
66  * {@code ClassCastException}).
67  *
68  * <p>This class and its iterator implement all of the <em>optional</em>
69  * methods of the {@link Collection} and {@link Iterator} interfaces.
70  * The Iterator provided in method {@link #iterator()} and the
71  * Spliterator provided in method {@link #spliterator()} are <em>not</em>
72  * guaranteed to traverse the elements of the PriorityBlockingQueue in
73  * any particular order. If you need ordered traversal, consider using
74  * {@code Arrays.sort(pq.toArray())}.  Also, method {@code drainTo} can
75  * be used to <em>remove</em> some or all elements in priority order and
76  * place them in another collection.
77  *
78  * <p>Operations on this class make no guarantees about the ordering
79  * of elements with equal priority. If you need to enforce an
80  * ordering, you can define custom classes or comparators that use a
81  * secondary key to break ties in primary priority values.  For
82  * example, here is a class that applies first-in-first-out
83  * tie-breaking to comparable elements. To use it, you would insert a
84  * {@code new FIFOEntry(anEntry)} instead of a plain entry object.
85  *
86  * <pre> {@code
87  * class FIFOEntry<E extends Comparable<? super E>>
88  *     implements Comparable<FIFOEntry<E>> {
89  *   static final AtomicLong seq = new AtomicLong(0);
90  *   final long seqNum;
91  *   final E entry;
92  *   public FIFOEntry(E entry) {
93  *     seqNum = seq.getAndIncrement();
94  *     this.entry = entry;
95  *   }
96  *   public E getEntry() { return entry; }
97  *   public int compareTo(FIFOEntry<E> other) {
98  *     int res = entry.compareTo(other.entry);
99  *     if (res == 0 && other.entry != this.entry)
100  *       res = (seqNum < other.seqNum ? -1 : 1);
101  *     return res;
102  *   }
103  * }}</pre>
104  *
105  * <p>This class is a member of the
106  * <a href="{@docRoot}/java.base/java/util/package-summary.html#CollectionsFramework">
107  * Java Collections Framework</a>.
108  *
109  * @since 1.5
110  * @author Doug Lea
111  * @param <E> the type of elements held in this queue
112  */

113 @SuppressWarnings("unchecked")
114 public class PriorityBlockingQueue<E> extends AbstractQueue<E>
115     implements BlockingQueue<E>, java.io.Serializable {
116     private static final long serialVersionUID = 5595510919245408276L;
117
118     /*
119      * The implementation uses an array-based binary heap, with public
120      * operations protected with a single lock. However, allocation
121      * during resizing uses a simple spinlock (used only while not
122      * holding main lock) in order to allow takes to operate
123      * concurrently with allocation.  This avoids repeated
124      * postponement of waiting consumers and consequent element
125      * build-up. The need to back away from lock during allocation
126      * makes it impossible to simply wrap delegated
127      * java.util.PriorityQueue operations within a lock, as was done
128      * in a previous version of this class. To maintain
129      * interoperability, a plain PriorityQueue is still used during
130      * serialization, which maintains compatibility at the expense of
131      * transiently doubling overhead.
132      */

133
134     /**
135      * Default array capacity.
136      */

137     private static final int DEFAULT_INITIAL_CAPACITY = 11;
138
139     /**
140      * The maximum size of array to allocate.
141      * Some VMs reserve some header words in an array.
142      * Attempts to allocate larger arrays may result in
143      * OutOfMemoryError: Requested array size exceeds VM limit
144      */

145     private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
146
147     /**
148      * Priority queue represented as a balanced binary heap: the two
149      * children of queue[n] are queue[2*n+1] and queue[2*(n+1)].  The
150      * priority queue is ordered by comparator, or by the elements'
151      * natural ordering, if comparator is null: For each node n in the
152      * heap and each descendant d of n, n <= d.  The element with the
153      * lowest value is in queue[0], assuming the queue is nonempty.
154      */

155     private transient Object[] queue;
156
157     /**
158      * The number of elements in the priority queue.
159      */

160     private transient int size;
161
162     /**
163      * The comparator, or null if priority queue uses elements'
164      * natural ordering.
165      */

166     private transient Comparator<? super E> comparator;
167
168     /**
169      * Lock used for all public operations.
170      */

171     private final ReentrantLock lock = new ReentrantLock();
172
173     /**
174      * Condition for blocking when empty.
175      */

176     private final Condition notEmpty = lock.newCondition();
177
178     /**
179      * Spinlock for allocation, acquired via CAS.
180      */

181     private transient volatile int allocationSpinLock;
182
183     /**
184      * A plain PriorityQueue used only for serialization,
185      * to maintain compatibility with previous versions
186      * of this class. Non-null only during serialization/deserialization.
187      */

188     private PriorityQueue<E> q;
189
190     /**
191      * Creates a {@code PriorityBlockingQueue} with the default
192      * initial capacity (11) that orders its elements according to
193      * their {@linkplain Comparable natural ordering}.
194      */

195     public PriorityBlockingQueue() {
196         this(DEFAULT_INITIAL_CAPACITY, null);
197     }
198
199     /**
200      * Creates a {@code PriorityBlockingQueue} with the specified
201      * initial capacity that orders its elements according to their
202      * {@linkplain Comparable natural ordering}.
203      *
204      * @param initialCapacity the initial capacity for this priority queue
205      * @throws IllegalArgumentException if {@code initialCapacity} is less
206      *         than 1
207      */

208     public PriorityBlockingQueue(int initialCapacity) {
209         this(initialCapacity, null);
210     }
211
212     /**
213      * Creates a {@code PriorityBlockingQueue} with the specified initial
214      * capacity that orders its elements according to the specified
215      * comparator.
216      *
217      * @param initialCapacity the initial capacity for this priority queue
218      * @param  comparator the comparator that will be used to order this
219      *         priority queue.  If {@code null}, the {@linkplain Comparable
220      *         natural ordering} of the elements will be used.
221      * @throws IllegalArgumentException if {@code initialCapacity} is less
222      *         than 1
223      */

224     public PriorityBlockingQueue(int initialCapacity,
225                                  Comparator<? super E> comparator) {
226         if (initialCapacity < 1)
227             throw new IllegalArgumentException();
228         this.comparator = comparator;
229         this.queue = new Object[Math.max(1, initialCapacity)];
230     }
231
232     /**
233      * Creates a {@code PriorityBlockingQueue} containing the elements
234      * in the specified collection.  If the specified collection is a
235      * {@link SortedSet} or a {@link PriorityQueue}, this
236      * priority queue will be ordered according to the same ordering.
237      * Otherwise, this priority queue will be ordered according to the
238      * {@linkplain Comparable natural ordering} of its elements.
239      *
240      * @param  c the collection whose elements are to be placed
241      *         into this priority queue
242      * @throws ClassCastException if elements of the specified collection
243      *         cannot be compared to one another according to the priority
244      *         queue's ordering
245      * @throws NullPointerException if the specified collection or any
246      *         of its elements are null
247      */

248     public PriorityBlockingQueue(Collection<? extends E> c) {
249         boolean heapify = true// true if not known to be in heap order
250         boolean screen = true;  // true if must screen for nulls
251         if (c instanceof SortedSet<?>) {
252             SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
253             this.comparator = (Comparator<? super E>) ss.comparator();
254             heapify = false;
255         }
256         else if (c instanceof PriorityBlockingQueue<?>) {
257             PriorityBlockingQueue<? extends E> pq =
258                 (PriorityBlockingQueue<? extends E>) c;
259             this.comparator = (Comparator<? super E>) pq.comparator();
260             screen = false;
261             if (pq.getClass() == PriorityBlockingQueue.class// exact match
262                 heapify = false;
263         }
264         Object[] es = c.toArray();
265         int n = es.length;
266         // If c.toArray incorrectly doesn't return Object[], copy it.
267         if (es.getClass() != Object[].class)
268             es = Arrays.copyOf(es, n, Object[].class);
269         if (screen && (n == 1 || this.comparator != null)) {
270             for (Object e : es)
271                 if (e == null)
272                     throw new NullPointerException();
273         }
274         this.queue = ensureNonEmpty(es);
275         this.size = n;
276         if (heapify)
277             heapify();
278     }
279
280     /** Ensures that queue[0] exists, helping peek() and poll(). */
281     private static Object[] ensureNonEmpty(Object[] es) {
282         return (es.length > 0) ? es : new Object[1];
283     }
284
285     /**
286      * Tries to grow array to accommodate at least one more element
287      * (but normally expand by about 50%), giving up (allowing retry)
288      * on contention (which we expect to be rare). Call only while
289      * holding lock.
290      *
291      * @param array the heap array
292      * @param oldCap the length of the array
293      */

294     private void tryGrow(Object[] array, int oldCap) {
295         lock.unlock(); // must release and then re-acquire main lock
296         Object[] newArray = null;
297         if (allocationSpinLock == 0 &&
298             ALLOCATIONSPINLOCK.compareAndSet(this, 0, 1)) {
299             try {
300                 int newCap = oldCap + ((oldCap < 64) ?
301                                        (oldCap + 2) : // grow faster if small
302                                        (oldCap >> 1));
303                 if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
304                     int minCap = oldCap + 1;
305                     if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
306                         throw new OutOfMemoryError();
307                     newCap = MAX_ARRAY_SIZE;
308                 }
309                 if (newCap > oldCap && queue == array)
310                     newArray = new Object[newCap];
311             } finally {
312                 allocationSpinLock = 0;
313             }
314         }
315         if (newArray == null// back off if another thread is allocating
316             Thread.yield();
317         lock.lock();
318         if (newArray != null && queue == array) {
319             queue = newArray;
320             System.arraycopy(array, 0, newArray, 0, oldCap);
321         }
322     }
323
324     /**
325      * Mechanics for poll().  Call only while holding lock.
326      */

327     private E dequeue() {
328         // assert lock.isHeldByCurrentThread();
329         final Object[] es;
330         final E result;
331
332         if ((result = (E) ((es = queue)[0])) != null) {
333             final int n;
334             final E x = (E) es[(n = --size)];
335             es[n] = null;
336             if (n > 0) {
337                 final Comparator<? super E> cmp;
338                 if ((cmp = comparator) == null)
339                     siftDownComparable(0, x, es, n);
340                 else
341                     siftDownUsingComparator(0, x, es, n, cmp);
342             }
343         }
344         return result;
345     }
346
347     /**
348      * Inserts item x at position k, maintaining heap invariant by
349      * promoting x up the tree until it is greater than or equal to
350      * its parent, or is the root.
351      *
352      * To simplify and speed up coercions and comparisons, the
353      * Comparable and Comparator versions are separated into different
354      * methods that are otherwise identical. (Similarly for siftDown.)
355      *
356      * @param k the position to fill
357      * @param x the item to insert
358      * @param es the heap array
359      */

360     private static <T> void siftUpComparable(int k, T x, Object[] es) {
361         Comparable<? super T> key = (Comparable<? super T>) x;
362         while (k > 0) {
363             int parent = (k - 1) >>> 1;
364             Object e = es[parent];
365             if (key.compareTo((T) e) >= 0)
366                 break;
367             es[k] = e;
368             k = parent;
369         }
370         es[k] = key;
371     }
372
373     private static <T> void siftUpUsingComparator(
374         int k, T x, Object[] es, Comparator<? super T> cmp) {
375         while (k > 0) {
376             int parent = (k - 1) >>> 1;
377             Object e = es[parent];
378             if (cmp.compare(x, (T) e) >= 0)
379                 break;
380             es[k] = e;
381             k = parent;
382         }
383         es[k] = x;
384     }
385
386     /**
387      * Inserts item x at position k, maintaining heap invariant by
388      * demoting x down the tree repeatedly until it is less than or
389      * equal to its children or is a leaf.
390      *
391      * @param k the position to fill
392      * @param x the item to insert
393      * @param es the heap array
394      * @param n heap size
395      */

396     private static <T> void siftDownComparable(int k, T x, Object[] es, int n) {
397         // assert n > 0;
398         Comparable<? super T> key = (Comparable<? super T>)x;
399         int half = n >>> 1;           // loop while a non-leaf
400         while (k < half) {
401             int child = (k << 1) + 1; // assume left child is least
402             Object c = es[child];
403             int right = child + 1;
404             if (right < n &&
405                 ((Comparable<? super T>) c).compareTo((T) es[right]) > 0)
406                 c = es[child = right];
407             if (key.compareTo((T) c) <= 0)
408                 break;
409             es[k] = c;
410             k = child;
411         }
412         es[k] = key;
413     }
414
415     private static <T> void siftDownUsingComparator(
416         int k, T x, Object[] es, int n, Comparator<? super T> cmp) {
417         // assert n > 0;
418         int half = n >>> 1;
419         while (k < half) {
420             int child = (k << 1) + 1;
421             Object c = es[child];
422             int right = child + 1;
423             if (right < n && cmp.compare((T) c, (T) es[right]) > 0)
424                 c = es[child = right];
425             if (cmp.compare(x, (T) c) <= 0)
426                 break;
427             es[k] = c;
428             k = child;
429         }
430         es[k] = x;
431     }
432
433     /**
434      * Establishes the heap invariant (described above) in the entire tree,
435      * assuming nothing about the order of the elements prior to the call.
436      * This classic algorithm due to Floyd (1964) is known to be O(size).
437      */

438     private void heapify() {
439         final Object[] es = queue;
440         int n = size, i = (n >>> 1) - 1;
441         final Comparator<? super E> cmp;
442         if ((cmp = comparator) == null)
443             for (; i >= 0; i--)
444                 siftDownComparable(i, (E) es[i], es, n);
445         else
446             for (; i >= 0; i--)
447                 siftDownUsingComparator(i, (E) es[i], es, n, cmp);
448     }
449
450     /**
451      * Inserts the specified element into this priority queue.
452      *
453      * @param e the element to add
454      * @return {@code true} (as specified by {@link Collection#add})
455      * @throws ClassCastException if the specified element cannot be compared
456      *         with elements currently in the priority queue according to the
457      *         priority queue's ordering
458      * @throws NullPointerException if the specified element is null
459      */

460     public boolean add(E e) {
461         return offer(e);
462     }
463
464     /**
465      * Inserts the specified element into this priority queue.
466      * As the queue is unbounded, this method will never return {@code false}.
467      *
468      * @param e the element to add
469      * @return {@code true} (as specified by {@link Queue#offer})
470      * @throws ClassCastException if the specified element cannot be compared
471      *         with elements currently in the priority queue according to the
472      *         priority queue's ordering
473      * @throws NullPointerException if the specified element is null
474      */

475     public boolean offer(E e) {
476         if (e == null)
477             throw new NullPointerException();
478         final ReentrantLock lock = this.lock;
479         lock.lock();
480         int n, cap;
481         Object[] es;
482         while ((n = size) >= (cap = (es = queue).length))
483             tryGrow(es, cap);
484         try {
485             final Comparator<? super E> cmp;
486             if ((cmp = comparator) == null)
487                 siftUpComparable(n, e, es);
488             else
489                 siftUpUsingComparator(n, e, es, cmp);
490             size = n + 1;
491             notEmpty.signal();
492         } finally {
493             lock.unlock();
494         }
495         return true;
496     }
497
498     /**
499      * Inserts the specified element into this priority queue.
500      * As the queue is unbounded, this method will never block.
501      *
502      * @param e the element to add
503      * @throws ClassCastException if the specified element cannot be compared
504      *         with elements currently in the priority queue according to the
505      *         priority queue's ordering
506      * @throws NullPointerException if the specified element is null
507      */

508     public void put(E e) {
509         offer(e); // never need to block
510     }
511
512     /**
513      * Inserts the specified element into this priority queue.
514      * As the queue is unbounded, this method will never block or
515      * return {@code false}.
516      *
517      * @param e the element to add
518      * @param timeout This parameter is ignored as the method never blocks
519      * @param unit This parameter is ignored as the method never blocks
520      * @return {@code true} (as specified by
521      *  {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer})
522      * @throws ClassCastException if the specified element cannot be compared
523      *         with elements currently in the priority queue according to the
524      *         priority queue's ordering
525      * @throws NullPointerException if the specified element is null
526      */

527     public boolean offer(E e, long timeout, TimeUnit unit) {
528         return offer(e); // never need to block
529     }
530
531     public E poll() {
532         final ReentrantLock lock = this.lock;
533         lock.lock();
534         try {
535             return dequeue();
536         } finally {
537             lock.unlock();
538         }
539     }
540
541     public E take() throws InterruptedException {
542         final ReentrantLock lock = this.lock;
543         lock.lockInterruptibly();
544         E result;
545         try {
546             while ( (result = dequeue()) == null)
547                 notEmpty.await();
548         } finally {
549             lock.unlock();
550         }
551         return result;
552     }
553
554     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
555         long nanos = unit.toNanos(timeout);
556         final ReentrantLock lock = this.lock;
557         lock.lockInterruptibly();
558         E result;
559         try {
560             while ( (result = dequeue()) == null && nanos > 0)
561                 nanos = notEmpty.awaitNanos(nanos);
562         } finally {
563             lock.unlock();
564         }
565         return result;
566     }
567
568     public E peek() {
569         final ReentrantLock lock = this.lock;
570         lock.lock();
571         try {
572             return (E) queue[0];
573         } finally {
574             lock.unlock();
575         }
576     }
577
578     /**
579      * Returns the comparator used to order the elements in this queue,
580      * or {@code nullif this queue uses the {@linkplain Comparable
581      * natural ordering} of its elements.
582      *
583      * @return the comparator used to order the elements in this queue,
584      *         or {@code nullif this queue uses the natural
585      *         ordering of its elements
586      */

587     public Comparator<? super E> comparator() {
588         return comparator;
589     }
590
591     public int size() {
592         final ReentrantLock lock = this.lock;
593         lock.lock();
594         try {
595             return size;
596         } finally {
597             lock.unlock();
598         }
599     }
600
601     /**
602      * Always returns {@code Integer.MAX_VALUE} because
603      * a {@code PriorityBlockingQueue} is not capacity constrained.
604      * @return {@code Integer.MAX_VALUE} always
605      */

606     public int remainingCapacity() {
607         return Integer.MAX_VALUE;
608     }
609
610     private int indexOf(Object o) {
611         if (o != null) {
612             final Object[] es = queue;
613             for (int i = 0, n = size; i < n; i++)
614                 if (o.equals(es[i]))
615                     return i;
616         }
617         return -1;
618     }
619
620     /**
621      * Removes the ith element from queue.
622      */

623     private void removeAt(int i) {
624         final Object[] es = queue;
625         final int n = size - 1;
626         if (n == i) // removed last element
627             es[i] = null;
628         else {
629             E moved = (E) es[n];
630             es[n] = null;
631             final Comparator<? super E> cmp;
632             if ((cmp = comparator) == null)
633                 siftDownComparable(i, moved, es, n);
634             else
635                 siftDownUsingComparator(i, moved, es, n, cmp);
636             if (es[i] == moved) {
637                 if (cmp == null)
638                     siftUpComparable(i, moved, es);
639                 else
640                     siftUpUsingComparator(i, moved, es, cmp);
641             }
642         }
643         size = n;
644     }
645
646     /**
647      * Removes a single instance of the specified element from this queue,
648      * if it is present.  More formally, removes an element {@code e} such
649      * that {@code o.equals(e)}, if this queue contains one or more such
650      * elements.  Returns {@code trueif and only if this queue contained
651      * the specified element (or equivalently, if this queue changed as a
652      * result of the call).
653      *
654      * @param o element to be removed from this queue, if present
655      * @return {@code trueif this queue changed as a result of the call
656      */

657     public boolean remove(Object o) {
658         final ReentrantLock lock = this.lock;
659         lock.lock();
660         try {
661             int i = indexOf(o);
662             if (i == -1)
663                 return false;
664             removeAt(i);
665             return true;
666         } finally {
667             lock.unlock();
668         }
669     }
670
671     /**
672      * Identity-based version for use in Itr.remove.
673      *
674      * @param o element to be removed from this queue, if present
675      */

676     void removeEq(Object o) {
677         final ReentrantLock lock = this.lock;
678         lock.lock();
679         try {
680             final Object[] es = queue;
681             for (int i = 0, n = size; i < n; i++) {
682                 if (o == es[i]) {
683                     removeAt(i);
684                     break;
685                 }
686             }
687         } finally {
688             lock.unlock();
689         }
690     }
691
692     /**
693      * Returns {@code trueif this queue contains the specified element.
694      * More formally, returns {@code trueif and only if this queue contains
695      * at least one element {@code e} such that {@code o.equals(e)}.
696      *
697      * @param o object to be checked for containment in this queue
698      * @return {@code trueif this queue contains the specified element
699      */

700     public boolean contains(Object o) {
701         final ReentrantLock lock = this.lock;
702         lock.lock();
703         try {
704             return indexOf(o) != -1;
705         } finally {
706             lock.unlock();
707         }
708     }
709
710     public String toString() {
711         return Helpers.collectionToString(this);
712     }
713
714     /**
715      * @throws UnsupportedOperationException {@inheritDoc}
716      * @throws ClassCastException            {@inheritDoc}
717      * @throws NullPointerException          {@inheritDoc}
718      * @throws IllegalArgumentException      {@inheritDoc}
719      */

720     public int drainTo(Collection<? super E> c) {
721         return drainTo(c, Integer.MAX_VALUE);
722     }
723
724     /**
725      * @throws UnsupportedOperationException {@inheritDoc}
726      * @throws ClassCastException            {@inheritDoc}
727      * @throws NullPointerException          {@inheritDoc}
728      * @throws IllegalArgumentException      {@inheritDoc}
729      */

730     public int drainTo(Collection<? super E> c, int maxElements) {
731         Objects.requireNonNull(c);
732         if (c == this)
733             throw new IllegalArgumentException();
734         if (maxElements <= 0)
735             return 0;
736         final ReentrantLock lock = this.lock;
737         lock.lock();
738         try {
739             int n = Math.min(size, maxElements);
740             for (int i = 0; i < n; i++) {
741                 c.add((E) queue[0]); // In this order, in case add() throws.
742                 dequeue();
743             }
744             return n;
745         } finally {
746             lock.unlock();
747         }
748     }
749
750     /**
751      * Atomically removes all of the elements from this queue.
752      * The queue will be empty after this call returns.
753      */

754     public void clear() {
755         final ReentrantLock lock = this.lock;
756         lock.lock();
757         try {
758             final Object[] es = queue;
759             for (int i = 0, n = size; i < n; i++)
760                 es[i] = null;
761             size = 0;
762         } finally {
763             lock.unlock();
764         }
765     }
766
767     /**
768      * Returns an array containing all of the elements in this queue.
769      * The returned array elements are in no particular order.
770      *
771      * <p>The returned array will be "safe" in that no references to it are
772      * maintained by this queue.  (In other words, this method must allocate
773      * a new array).  The caller is thus free to modify the returned array.
774      *
775      * <p>This method acts as bridge between array-based and collection-based
776      * APIs.
777      *
778      * @return an array containing all of the elements in this queue
779      */

780     public Object[] toArray() {
781         final ReentrantLock lock = this.lock;
782         lock.lock();
783         try {
784             return Arrays.copyOf(queue, size);
785         } finally {
786             lock.unlock();
787         }
788     }
789
790     /**
791      * Returns an array containing all of the elements in this queue; the
792      * runtime type of the returned array is that of the specified array.
793      * The returned array elements are in no particular order.
794      * If the queue fits in the specified array, it is returned therein.
795      * Otherwise, a new array is allocated with the runtime type of the
796      * specified array and the size of this queue.
797      *
798      * <p>If this queue fits in the specified array with room to spare
799      * (i.e., the array has more elements than this queue), the element in
800      * the array immediately following the end of the queue is set to
801      * {@code null}.
802      *
803      * <p>Like the {@link #toArray()} method, this method acts as bridge between
804      * array-based and collection-based APIs.  Further, this method allows
805      * precise control over the runtime type of the output array, and may,
806      * under certain circumstances, be used to save allocation costs.
807      *
808      * <p>Suppose {@code x} is a queue known to contain only strings.
809      * The following code can be used to dump the queue into a newly
810      * allocated array of {@code String}:
811      *
812      * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
813      *
814      * Note that {@code toArray(new Object[0])} is identical in function to
815      * {@code toArray()}.
816      *
817      * @param a the array into which the elements of the queue are to
818      *          be stored, if it is big enough; otherwise, a new array of the
819      *          same runtime type is allocated for this purpose
820      * @return an array containing all of the elements in this queue
821      * @throws ArrayStoreException if the runtime type of the specified array
822      *         is not a supertype of the runtime type of every element in
823      *         this queue
824      * @throws NullPointerException if the specified array is null
825      */

826     public <T> T[] toArray(T[] a) {
827         final ReentrantLock lock = this.lock;
828         lock.lock();
829         try {
830             int n = size;
831             if (a.length < n)
832                 // Make a new array of a's runtime type, but my contents:
833                 return (T[]) Arrays.copyOf(queue, size, a.getClass());
834             System.arraycopy(queue, 0, a, 0, n);
835             if (a.length > n)
836                 a[n] = null;
837             return a;
838         } finally {
839             lock.unlock();
840         }
841     }
842
843     /**
844      * Returns an iterator over the elements in this queue. The
845      * iterator does not return the elements in any particular order.
846      *
847      * <p>The returned iterator is
848      * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
849      *
850      * @return an iterator over the elements in this queue
851      */

852     public Iterator<E> iterator() {
853         return new Itr(toArray());
854     }
855
856     /**
857      * Snapshot iterator that works off copy of underlying q array.
858      */

859     final class Itr implements Iterator<E> {
860         final Object[] array; // Array of all elements
861         int cursor;           // index of next element to return
862         int lastRet = -1;     // index of last element, or -1 if no such
863
864         Itr(Object[] array) {
865             this.array = array;
866         }
867
868         public boolean hasNext() {
869             return cursor < array.length;
870         }
871
872         public E next() {
873             if (cursor >= array.length)
874                 throw new NoSuchElementException();
875             return (E)array[lastRet = cursor++];
876         }
877
878         public void remove() {
879             if (lastRet < 0)
880                 throw new IllegalStateException();
881             removeEq(array[lastRet]);
882             lastRet = -1;
883         }
884
885         public void forEachRemaining(Consumer<? super E> action) {
886             Objects.requireNonNull(action);
887             final Object[] es = array;
888             int i;
889             if ((i = cursor) < es.length) {
890                 lastRet = -1;
891                 cursor = es.length;
892                 for (; i < es.length; i++)
893                     action.accept((E) es[i]);
894                 lastRet = es.length - 1;
895             }
896         }
897     }
898
899     /**
900      * Saves this queue to a stream (that is, serializes it).
901      *
902      * For compatibility with previous version of this class, elements
903      * are first copied to a java.util.PriorityQueue, which is then
904      * serialized.
905      *
906      * @param s the stream
907      * @throws java.io.IOException if an I/O error occurs
908      */

909     private void writeObject(java.io.ObjectOutputStream s)
910         throws java.io.IOException {
911         lock.lock();
912         try {
913             // avoid zero capacity argument
914             q = new PriorityQueue<E>(Math.max(size, 1), comparator);
915             q.addAll(this);
916             s.defaultWriteObject();
917         } finally {
918             q = null;
919             lock.unlock();
920         }
921     }
922
923     /**
924      * Reconstitutes this queue from a stream (that is, deserializes it).
925      * @param s the stream
926      * @throws ClassNotFoundException if the class of a serialized object
927      *         could not be found
928      * @throws java.io.IOException if an I/O error occurs
929      */

930     private void readObject(java.io.ObjectInputStream s)
931         throws java.io.IOException, ClassNotFoundException {
932         try {
933             s.defaultReadObject();
934             int sz = q.size();
935             SharedSecrets.getJavaObjectInputStreamAccess().checkArray(s, Object[].class, sz);
936             this.queue = new Object[Math.max(1, sz)];
937             comparator = q.comparator();
938             addAll(q);
939         } finally {
940             q = null;
941         }
942     }
943
944     /**
945      * Immutable snapshot spliterator that binds to elements "late".
946      */

947     final class PBQSpliterator implements Spliterator<E> {
948         Object[] array;        // null until late-bound-initialized
949         int index;
950         int fence;
951
952         PBQSpliterator() {}
953
954         PBQSpliterator(Object[] array, int index, int fence) {
955             this.array = array;
956             this.index = index;
957             this.fence = fence;
958         }
959
960         private int getFence() {
961             if (array == null)
962                 fence = (array = toArray()).length;
963             return fence;
964         }
965
966         public PBQSpliterator trySplit() {
967             int hi = getFence(), lo = index, mid = (lo + hi) >>> 1;
968             return (lo >= mid) ? null :
969                 new PBQSpliterator(array, lo, index = mid);
970         }
971
972         public void forEachRemaining(Consumer<? super E> action) {
973             Objects.requireNonNull(action);
974             final int hi = getFence(), lo = index;
975             final Object[] es = array;
976             index = hi;                 // ensure exhaustion
977             for (int i = lo; i < hi; i++)
978                 action.accept((E) es[i]);
979         }
980
981         public boolean tryAdvance(Consumer<? super E> action) {
982             Objects.requireNonNull(action);
983             if (getFence() > index && index >= 0) {
984                 action.accept((E) array[index++]);
985                 return true;
986             }
987             return false;
988         }
989
990         public long estimateSize() { return getFence() - index; }
991
992         public int characteristics() {
993             return (Spliterator.NONNULL |
994                     Spliterator.SIZED |
995                     Spliterator.SUBSIZED);
996         }
997     }
998
999     /**
1000      * Returns a {@link Spliterator} over the elements in this queue.
1001      * The spliterator does not traverse elements in any particular order
1002      * (the {@link Spliterator#ORDERED ORDERED} characteristic is not reported).
1003      *
1004      * <p>The returned spliterator is
1005      * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
1006      *
1007      * <p>The {@code Spliterator} reports {@link Spliterator#SIZED} and
1008      * {@link Spliterator#NONNULL}.
1009      *
1010      * @implNote
1011      * The {@code Spliterator} additionally reports {@link Spliterator#SUBSIZED}.
1012      *
1013      * @return a {@code Spliterator} over the elements in this queue
1014      * @since 1.8
1015      */

1016     public Spliterator<E> spliterator() {
1017         return new PBQSpliterator();
1018     }
1019
1020     /**
1021      * @throws NullPointerException {@inheritDoc}
1022      */

1023     public boolean removeIf(Predicate<? super E> filter) {
1024         Objects.requireNonNull(filter);
1025         return bulkRemove(filter);
1026     }
1027
1028     /**
1029      * @throws NullPointerException {@inheritDoc}
1030      */

1031     public boolean removeAll(Collection<?> c) {
1032         Objects.requireNonNull(c);
1033         return bulkRemove(e -> c.contains(e));
1034     }
1035
1036     /**
1037      * @throws NullPointerException {@inheritDoc}
1038      */

1039     public boolean retainAll(Collection<?> c) {
1040         Objects.requireNonNull(c);
1041         return bulkRemove(e -> !c.contains(e));
1042     }
1043
1044     // A tiny bit set implementation
1045
1046     private static long[] nBits(int n) {
1047         return new long[((n - 1) >> 6) + 1];
1048     }
1049     private static void setBit(long[] bits, int i) {
1050         bits[i >> 6] |= 1L << i;
1051     }
1052     private static boolean isClear(long[] bits, int i) {
1053         return (bits[i >> 6] & (1L << i)) == 0;
1054     }
1055
1056     /** Implementation of bulk remove methods. */
1057     private boolean bulkRemove(Predicate<? super E> filter) {
1058         final ReentrantLock lock = this.lock;
1059         lock.lock();
1060         try {
1061             final Object[] es = queue;
1062             final int end = size;
1063             int i;
1064             // Optimize for initial run of survivors
1065             for (i = 0; i < end && !filter.test((E) es[i]); i++)
1066                 ;
1067             if (i >= end)
1068                 return false;
1069             // Tolerate predicates that reentrantly access the
1070             // collection for read, so traverse once to find elements
1071             // to delete, a second pass to physically expunge.
1072             final int beg = i;
1073             final long[] deathRow = nBits(end - beg);
1074             deathRow[0] = 1L;   // set bit 0
1075             for (i = beg + 1; i < end; i++)
1076                 if (filter.test((E) es[i]))
1077                     setBit(deathRow, i - beg);
1078             int w = beg;
1079             for (i = beg; i < end; i++)
1080                 if (isClear(deathRow, i - beg))
1081                     es[w++] = es[i];
1082             for (i = size = w; i < end; i++)
1083                 es[i] = null;
1084             heapify();
1085             return true;
1086         } finally {
1087             lock.unlock();
1088         }
1089     }
1090
1091     /**
1092      * @throws NullPointerException {@inheritDoc}
1093      */

1094     public void forEach(Consumer<? super E> action) {
1095         Objects.requireNonNull(action);
1096         final ReentrantLock lock = this.lock;
1097         lock.lock();
1098         try {
1099             final Object[] es = queue;
1100             for (int i = 0, n = size; i < n; i++)
1101                 action.accept((E) es[i]);
1102         } finally {
1103             lock.unlock();
1104         }
1105     }
1106
1107     // VarHandle mechanics
1108     private static final VarHandle ALLOCATIONSPINLOCK;
1109     static {
1110         try {
1111             MethodHandles.Lookup l = MethodHandles.lookup();
1112             ALLOCATIONSPINLOCK = l.findVarHandle(PriorityBlockingQueue.class,
1113                                                  "allocationSpinLock",
1114                                                  int.class);
1115         } catch (ReflectiveOperationException e) {
1116             throw new ExceptionInInitializerError(e);
1117         }
1118     }
1119 }
1120