1 /*
2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3  *
4  * This code is free software; you can redistribute it and/or modify it
5  * under the terms of the GNU General Public License version 2 only, as
6  * published by the Free Software Foundation.  Oracle designates this
7  * particular file as subject to the "Classpath" exception as provided
8  * by Oracle in the LICENSE file that accompanied this code.
9  *
10  * This code is distributed in the hope that it will be useful, but WITHOUT
11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
13  * version 2 for more details (a copy is included in the LICENSE file that
14  * accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License version
17  * 2 along with this work; if not, write to the Free Software Foundation,
18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19  *
20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21  * or visit www.oracle.com if you need additional information or have any
22  * questions.
23  */

24
25 /*
26  * This file is available under and governed by the GNU General Public
27  * License version 2 only, as published by the Free Software Foundation.
28  * However, the following notice accompanied the original version of this
29  * file:
30  *
31  * Written by Doug Lea with assistance from members of JCP JSR-166
32  * Expert Group and released to the public domain, as explained at
33  * http://creativecommons.org/publicdomain/zero/1.0/
34  */

35
36 package java.util.concurrent;
37
38 import java.util.AbstractQueue;
39 import java.util.Collection;
40 import java.util.Iterator;
41 import java.util.NoSuchElementException;
42 import java.util.Objects;
43 import java.util.Spliterator;
44 import java.util.Spliterators;
45 import java.util.concurrent.atomic.AtomicInteger;
46 import java.util.concurrent.locks.Condition;
47 import java.util.concurrent.locks.ReentrantLock;
48 import java.util.function.Consumer;
49 import java.util.function.Predicate;
50
51 /**
52  * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
53  * linked nodes.
54  * This queue orders elements FIFO (first-in-first-out).
55  * The <em>head</em> of the queue is that element that has been on the
56  * queue the longest time.
57  * The <em>tail</em> of the queue is that element that has been on the
58  * queue the shortest time. New elements
59  * are inserted at the tail of the queue, and the queue retrieval
60  * operations obtain elements at the head of the queue.
61  * Linked queues typically have higher throughput than array-based queues but
62  * less predictable performance in most concurrent applications.
63  *
64  * <p>The optional capacity bound constructor argument serves as a
65  * way to prevent excessive queue expansion. The capacity, if unspecified,
66  * is equal to {@link Integer#MAX_VALUE}.  Linked nodes are
67  * dynamically created upon each insertion unless this would bring the
68  * queue above capacity.
69  *
70  * <p>This class and its iterator implement all of the <em>optional</em>
71  * methods of the {@link Collection} and {@link Iterator} interfaces.
72  *
73  * <p>This class is a member of the
74  * <a href="{@docRoot}/java.base/java/util/package-summary.html#CollectionsFramework">
75  * Java Collections Framework</a>.
76  *
77  * @since 1.5
78  * @author Doug Lea
79  * @param <E> the type of elements held in this queue
80  */

81 public class LinkedBlockingQueue<E> extends AbstractQueue<E>
82         implements BlockingQueue<E>, java.io.Serializable {
83     private static final long serialVersionUID = -6903933977591709194L;
84
85     /*
86      * A variant of the "two lock queue" algorithm.  The putLock gates
87      * entry to put (and offer), and has an associated condition for
88      * waiting puts.  Similarly for the takeLock.  The "count" field
89      * that they both rely on is maintained as an atomic to avoid
90      * needing to get both locks in most cases. Also, to minimize need
91      * for puts to get takeLock and vice-versa, cascading notifies are
92      * used. When a put notices that it has enabled at least one take,
93      * it signals taker. That taker in turn signals others if more
94      * items have been entered since the signal. And symmetrically for
95      * takes signalling puts. Operations such as remove(Object) and
96      * iterators acquire both locks.
97      *
98      * Visibility between writers and readers is provided as follows:
99      *
100      * Whenever an element is enqueued, the putLock is acquired and
101      * count updated.  A subsequent reader guarantees visibility to the
102      * enqueued Node by either acquiring the putLock (via fullyLock)
103      * or by acquiring the takeLock, and then reading n = count.get();
104      * this gives visibility to the first n items.
105      *
106      * To implement weakly consistent iterators, it appears we need to
107      * keep all Nodes GC-reachable from a predecessor dequeued Node.
108      * That would cause two problems:
109      * - allow a rogue Iterator to cause unbounded memory retention
110      * - cause cross-generational linking of old Nodes to new Nodes if
111      *   a Node was tenured while live, which generational GCs have a
112      *   hard time dealing with, causing repeated major collections.
113      * However, only non-deleted Nodes need to be reachable from
114      * dequeued Nodes, and reachability does not necessarily have to
115      * be of the kind understood by the GC.  We use the trick of
116      * linking a Node that has just been dequeued to itself.  Such a
117      * self-link implicitly means to advance to head.next.
118      */

119
120     /**
121      * Linked list node class.
122      */

123     static class Node<E> {
124         E item;
125
126         /**
127          * One of:
128          * - the real successor Node
129          * - this Node, meaning the successor is head.next
130          * - null, meaning there is no successor (this is the last node)
131          */

132         Node<E> next;
133
134         Node(E x) { item = x; }
135     }
136
137     /** The capacity bound, or Integer.MAX_VALUE if none */
138     private final int capacity;
139
140     /** Current number of elements */
141     private final AtomicInteger count = new AtomicInteger();
142
143     /**
144      * Head of linked list.
145      * Invariant: head.item == null
146      */

147     transient Node<E> head;
148
149     /**
150      * Tail of linked list.
151      * Invariant: last.next == null
152      */

153     private transient Node<E> last;
154
155     /** Lock held by take, poll, etc */
156     private final ReentrantLock takeLock = new ReentrantLock();
157
158     /** Wait queue for waiting takes */
159     private final Condition notEmpty = takeLock.newCondition();
160
161     /** Lock held by put, offer, etc */
162     private final ReentrantLock putLock = new ReentrantLock();
163
164     /** Wait queue for waiting puts */
165     private final Condition notFull = putLock.newCondition();
166
167     /**
168      * Signals a waiting take. Called only from put/offer (which do not
169      * otherwise ordinarily lock takeLock.)
170      */

171     private void signalNotEmpty() {
172         final ReentrantLock takeLock = this.takeLock;
173         takeLock.lock();
174         try {
175             notEmpty.signal();
176         } finally {
177             takeLock.unlock();
178         }
179     }
180
181     /**
182      * Signals a waiting put. Called only from take/poll.
183      */

184     private void signalNotFull() {
185         final ReentrantLock putLock = this.putLock;
186         putLock.lock();
187         try {
188             notFull.signal();
189         } finally {
190             putLock.unlock();
191         }
192     }
193
194     /**
195      * Links node at end of queue.
196      *
197      * @param node the node
198      */

199     private void enqueue(Node<E> node) {
200         // assert putLock.isHeldByCurrentThread();
201         // assert last.next == null;
202         last = last.next = node;
203     }
204
205     /**
206      * Removes a node from head of queue.
207      *
208      * @return the node
209      */

210     private E dequeue() {
211         // assert takeLock.isHeldByCurrentThread();
212         // assert head.item == null;
213         Node<E> h = head;
214         Node<E> first = h.next;
215         h.next = h; // help GC
216         head = first;
217         E x = first.item;
218         first.item = null;
219         return x;
220     }
221
222     /**
223      * Locks to prevent both puts and takes.
224      */

225     void fullyLock() {
226         putLock.lock();
227         takeLock.lock();
228     }
229
230     /**
231      * Unlocks to allow both puts and takes.
232      */

233     void fullyUnlock() {
234         takeLock.unlock();
235         putLock.unlock();
236     }
237
238     /**
239      * Creates a {@code LinkedBlockingQueue} with a capacity of
240      * {@link Integer#MAX_VALUE}.
241      */

242     public LinkedBlockingQueue() {
243         this(Integer.MAX_VALUE);
244     }
245
246     /**
247      * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
248      *
249      * @param capacity the capacity of this queue
250      * @throws IllegalArgumentException if {@code capacity} is not greater
251      *         than zero
252      */

253     public LinkedBlockingQueue(int capacity) {
254         if (capacity <= 0) throw new IllegalArgumentException();
255         this.capacity = capacity;
256         last = head = new Node<E>(null);
257     }
258
259     /**
260      * Creates a {@code LinkedBlockingQueue} with a capacity of
261      * {@link Integer#MAX_VALUE}, initially containing the elements of the
262      * given collection,
263      * added in traversal order of the collection's iterator.
264      *
265      * @param c the collection of elements to initially contain
266      * @throws NullPointerException if the specified collection or any
267      *         of its elements are null
268      */

269     public LinkedBlockingQueue(Collection<? extends E> c) {
270         this(Integer.MAX_VALUE);
271         final ReentrantLock putLock = this.putLock;
272         putLock.lock(); // Never contended, but necessary for visibility
273         try {
274             int n = 0;
275             for (E e : c) {
276                 if (e == null)
277                     throw new NullPointerException();
278                 if (n == capacity)
279                     throw new IllegalStateException("Queue full");
280                 enqueue(new Node<E>(e));
281                 ++n;
282             }
283             count.set(n);
284         } finally {
285             putLock.unlock();
286         }
287     }
288
289     // this doc comment is overridden to remove the reference to collections
290     // greater in size than Integer.MAX_VALUE
291     /**
292      * Returns the number of elements in this queue.
293      *
294      * @return the number of elements in this queue
295      */

296     public int size() {
297         return count.get();
298     }
299
300     // this doc comment is a modified copy of the inherited doc comment,
301     // without the reference to unlimited queues.
302     /**
303      * Returns the number of additional elements that this queue can ideally
304      * (in the absence of memory or resource constraints) accept without
305      * blocking. This is always equal to the initial capacity of this queue
306      * less the current {@code size} of this queue.
307      *
308      * <p>Note that you <em>cannot</em> always tell if an attempt to insert
309      * an element will succeed by inspecting {@code remainingCapacity}
310      * because it may be the case that another thread is about to
311      * insert or remove an element.
312      */

313     public int remainingCapacity() {
314         return capacity - count.get();
315     }
316
317     /**
318      * Inserts the specified element at the tail of this queue, waiting if
319      * necessary for space to become available.
320      *
321      * @throws InterruptedException {@inheritDoc}
322      * @throws NullPointerException {@inheritDoc}
323      */

324     public void put(E e) throws InterruptedException {
325         if (e == nullthrow new NullPointerException();
326         final int c;
327         final Node<E> node = new Node<E>(e);
328         final ReentrantLock putLock = this.putLock;
329         final AtomicInteger count = this.count;
330         putLock.lockInterruptibly();
331         try {
332             /*
333              * Note that count is used in wait guard even though it is
334              * not protected by lock. This works because count can
335              * only decrease at this point (all other puts are shut
336              * out by lock), and we (or some other waiting put) are
337              * signalled if it ever changes from capacity. Similarly
338              * for all other uses of count in other wait guards.
339              */

340             while (count.get() == capacity) {
341                 notFull.await();
342             }
343             enqueue(node);
344             c = count.getAndIncrement();
345             if (c + 1 < capacity)
346                 notFull.signal();
347         } finally {
348             putLock.unlock();
349         }
350         if (c == 0)
351             signalNotEmpty();
352     }
353
354     /**
355      * Inserts the specified element at the tail of this queue, waiting if
356      * necessary up to the specified wait time for space to become available.
357      *
358      * @return {@code trueif successful, or {@code falseif
359      *         the specified waiting time elapses before space is available
360      * @throws InterruptedException {@inheritDoc}
361      * @throws NullPointerException {@inheritDoc}
362      */

363     public boolean offer(E e, long timeout, TimeUnit unit)
364         throws InterruptedException {
365
366         if (e == nullthrow new NullPointerException();
367         long nanos = unit.toNanos(timeout);
368         final int c;
369         final ReentrantLock putLock = this.putLock;
370         final AtomicInteger count = this.count;
371         putLock.lockInterruptibly();
372         try {
373             while (count.get() == capacity) {
374                 if (nanos <= 0L)
375                     return false;
376                 nanos = notFull.awaitNanos(nanos);
377             }
378             enqueue(new Node<E>(e));
379             c = count.getAndIncrement();
380             if (c + 1 < capacity)
381                 notFull.signal();
382         } finally {
383             putLock.unlock();
384         }
385         if (c == 0)
386             signalNotEmpty();
387         return true;
388     }
389
390     /**
391      * Inserts the specified element at the tail of this queue if it is
392      * possible to do so immediately without exceeding the queue's capacity,
393      * returning {@code true} upon success and {@code falseif this queue
394      * is full.
395      * When using a capacity-restricted queue, this method is generally
396      * preferable to method {@link BlockingQueue#add add}, which can fail to
397      * insert an element only by throwing an exception.
398      *
399      * @throws NullPointerException if the specified element is null
400      */

401     public boolean offer(E e) {
402         if (e == nullthrow new NullPointerException();
403         final AtomicInteger count = this.count;
404         if (count.get() == capacity)
405             return false;
406         final int c;
407         final Node<E> node = new Node<E>(e);
408         final ReentrantLock putLock = this.putLock;
409         putLock.lock();
410         try {
411             if (count.get() == capacity)
412                 return false;
413             enqueue(node);
414             c = count.getAndIncrement();
415             if (c + 1 < capacity)
416                 notFull.signal();
417         } finally {
418             putLock.unlock();
419         }
420         if (c == 0)
421             signalNotEmpty();
422         return true;
423     }
424
425     public E take() throws InterruptedException {
426         final E x;
427         final int c;
428         final AtomicInteger count = this.count;
429         final ReentrantLock takeLock = this.takeLock;
430         takeLock.lockInterruptibly();
431         try {
432             while (count.get() == 0) {
433                 notEmpty.await();
434             }
435             x = dequeue();
436             c = count.getAndDecrement();
437             if (c > 1)
438                 notEmpty.signal();
439         } finally {
440             takeLock.unlock();
441         }
442         if (c == capacity)
443             signalNotFull();
444         return x;
445     }
446
447     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
448         final E x;
449         final int c;
450         long nanos = unit.toNanos(timeout);
451         final AtomicInteger count = this.count;
452         final ReentrantLock takeLock = this.takeLock;
453         takeLock.lockInterruptibly();
454         try {
455             while (count.get() == 0) {
456                 if (nanos <= 0L)
457                     return null;
458                 nanos = notEmpty.awaitNanos(nanos);
459             }
460             x = dequeue();
461             c = count.getAndDecrement();
462             if (c > 1)
463                 notEmpty.signal();
464         } finally {
465             takeLock.unlock();
466         }
467         if (c == capacity)
468             signalNotFull();
469         return x;
470     }
471
472     public E poll() {
473         final AtomicInteger count = this.count;
474         if (count.get() == 0)
475             return null;
476         final E x;
477         final int c;
478         final ReentrantLock takeLock = this.takeLock;
479         takeLock.lock();
480         try {
481             if (count.get() == 0)
482                 return null;
483             x = dequeue();
484             c = count.getAndDecrement();
485             if (c > 1)
486                 notEmpty.signal();
487         } finally {
488             takeLock.unlock();
489         }
490         if (c == capacity)
491             signalNotFull();
492         return x;
493     }
494
495     public E peek() {
496         final AtomicInteger count = this.count;
497         if (count.get() == 0)
498             return null;
499         final ReentrantLock takeLock = this.takeLock;
500         takeLock.lock();
501         try {
502             return (count.get() > 0) ? head.next.item : null;
503         } finally {
504             takeLock.unlock();
505         }
506     }
507
508     /**
509      * Unlinks interior Node p with predecessor pred.
510      */

511     void unlink(Node<E> p, Node<E> pred) {
512         // assert putLock.isHeldByCurrentThread();
513         // assert takeLock.isHeldByCurrentThread();
514         // p.next is not changed, to allow iterators that are
515         // traversing p to maintain their weak-consistency guarantee.
516         p.item = null;
517         pred.next = p.next;
518         if (last == p)
519             last = pred;
520         if (count.getAndDecrement() == capacity)
521             notFull.signal();
522     }
523
524     /**
525      * Removes a single instance of the specified element from this queue,
526      * if it is present.  More formally, removes an element {@code e} such
527      * that {@code o.equals(e)}, if this queue contains one or more such
528      * elements.
529      * Returns {@code trueif this queue contained the specified element
530      * (or equivalently, if this queue changed as a result of the call).
531      *
532      * @param o element to be removed from this queue, if present
533      * @return {@code trueif this queue changed as a result of the call
534      */

535     public boolean remove(Object o) {
536         if (o == nullreturn false;
537         fullyLock();
538         try {
539             for (Node<E> pred = head, p = pred.next;
540                  p != null;
541                  pred = p, p = p.next) {
542                 if (o.equals(p.item)) {
543                     unlink(p, pred);
544                     return true;
545                 }
546             }
547             return false;
548         } finally {
549             fullyUnlock();
550         }
551     }
552
553     /**
554      * Returns {@code trueif this queue contains the specified element.
555      * More formally, returns {@code trueif and only if this queue contains
556      * at least one element {@code e} such that {@code o.equals(e)}.
557      *
558      * @param o object to be checked for containment in this queue
559      * @return {@code trueif this queue contains the specified element
560      */

561     public boolean contains(Object o) {
562         if (o == nullreturn false;
563         fullyLock();
564         try {
565             for (Node<E> p = head.next; p != null; p = p.next)
566                 if (o.equals(p.item))
567                     return true;
568             return false;
569         } finally {
570             fullyUnlock();
571         }
572     }
573
574     /**
575      * Returns an array containing all of the elements in this queue, in
576      * proper sequence.
577      *
578      * <p>The returned array will be "safe" in that no references to it are
579      * maintained by this queue.  (In other words, this method must allocate
580      * a new array).  The caller is thus free to modify the returned array.
581      *
582      * <p>This method acts as bridge between array-based and collection-based
583      * APIs.
584      *
585      * @return an array containing all of the elements in this queue
586      */

587     public Object[] toArray() {
588         fullyLock();
589         try {
590             int size = count.get();
591             Object[] a = new Object[size];
592             int k = 0;
593             for (Node<E> p = head.next; p != null; p = p.next)
594                 a[k++] = p.item;
595             return a;
596         } finally {
597             fullyUnlock();
598         }
599     }
600
601     /**
602      * Returns an array containing all of the elements in this queue, in
603      * proper sequence; the runtime type of the returned array is that of
604      * the specified array.  If the queue fits in the specified array, it
605      * is returned therein.  Otherwise, a new array is allocated with the
606      * runtime type of the specified array and the size of this queue.
607      *
608      * <p>If this queue fits in the specified array with room to spare
609      * (i.e., the array has more elements than this queue), the element in
610      * the array immediately following the end of the queue is set to
611      * {@code null}.
612      *
613      * <p>Like the {@link #toArray()} method, this method acts as bridge between
614      * array-based and collection-based APIs.  Further, this method allows
615      * precise control over the runtime type of the output array, and may,
616      * under certain circumstances, be used to save allocation costs.
617      *
618      * <p>Suppose {@code x} is a queue known to contain only strings.
619      * The following code can be used to dump the queue into a newly
620      * allocated array of {@code String}:
621      *
622      * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
623      *
624      * Note that {@code toArray(new Object[0])} is identical in function to
625      * {@code toArray()}.
626      *
627      * @param a the array into which the elements of the queue are to
628      *          be stored, if it is big enough; otherwise, a new array of the
629      *          same runtime type is allocated for this purpose
630      * @return an array containing all of the elements in this queue
631      * @throws ArrayStoreException if the runtime type of the specified array
632      *         is not a supertype of the runtime type of every element in
633      *         this queue
634      * @throws NullPointerException if the specified array is null
635      */

636     @SuppressWarnings("unchecked")
637     public <T> T[] toArray(T[] a) {
638         fullyLock();
639         try {
640             int size = count.get();
641             if (a.length < size)
642                 a = (T[])java.lang.reflect.Array.newInstance
643                     (a.getClass().getComponentType(), size);
644
645             int k = 0;
646             for (Node<E> p = head.next; p != null; p = p.next)
647                 a[k++] = (T)p.item;
648             if (a.length > k)
649                 a[k] = null;
650             return a;
651         } finally {
652             fullyUnlock();
653         }
654     }
655
656     public String toString() {
657         return Helpers.collectionToString(this);
658     }
659
660     /**
661      * Atomically removes all of the elements from this queue.
662      * The queue will be empty after this call returns.
663      */

664     public void clear() {
665         fullyLock();
666         try {
667             for (Node<E> p, h = head; (p = h.next) != null; h = p) {
668                 h.next = h;
669                 p.item = null;
670             }
671             head = last;
672             // assert head.item == null && head.next == null;
673             if (count.getAndSet(0) == capacity)
674                 notFull.signal();
675         } finally {
676             fullyUnlock();
677         }
678     }
679
680     /**
681      * @throws UnsupportedOperationException {@inheritDoc}
682      * @throws ClassCastException            {@inheritDoc}
683      * @throws NullPointerException          {@inheritDoc}
684      * @throws IllegalArgumentException      {@inheritDoc}
685      */

686     public int drainTo(Collection<? super E> c) {
687         return drainTo(c, Integer.MAX_VALUE);
688     }
689
690     /**
691      * @throws UnsupportedOperationException {@inheritDoc}
692      * @throws ClassCastException            {@inheritDoc}
693      * @throws NullPointerException          {@inheritDoc}
694      * @throws IllegalArgumentException      {@inheritDoc}
695      */

696     public int drainTo(Collection<? super E> c, int maxElements) {
697         Objects.requireNonNull(c);
698         if (c == this)
699             throw new IllegalArgumentException();
700         if (maxElements <= 0)
701             return 0;
702         boolean signalNotFull = false;
703         final ReentrantLock takeLock = this.takeLock;
704         takeLock.lock();
705         try {
706             int n = Math.min(maxElements, count.get());
707             // count.get provides visibility to first n Nodes
708             Node<E> h = head;
709             int i = 0;
710             try {
711                 while (i < n) {
712                     Node<E> p = h.next;
713                     c.add(p.item);
714                     p.item = null;
715                     h.next = h;
716                     h = p;
717                     ++i;
718                 }
719                 return n;
720             } finally {
721                 // Restore invariants even if c.add() threw
722                 if (i > 0) {
723                     // assert h.item == null;
724                     head = h;
725                     signalNotFull = (count.getAndAdd(-i) == capacity);
726                 }
727             }
728         } finally {
729             takeLock.unlock();
730             if (signalNotFull)
731                 signalNotFull();
732         }
733     }
734
735     /**
736      * Used for any element traversal that is not entirely under lock.
737      * Such traversals must handle both:
738      * - dequeued nodes (p.next == p)
739      * - (possibly multiple) interior removed nodes (p.item == null)
740      */

741     Node<E> succ(Node<E> p) {
742         if (p == (p = p.next))
743             p = head.next;
744         return p;
745     }
746
747     /**
748      * Returns an iterator over the elements in this queue in proper sequence.
749      * The elements will be returned in order from first (head) to last (tail).
750      *
751      * <p>The returned iterator is
752      * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
753      *
754      * @return an iterator over the elements in this queue in proper sequence
755      */

756     public Iterator<E> iterator() {
757         return new Itr();
758     }
759
760     /**
761      * Weakly-consistent iterator.
762      *
763      * Lazily updated ancestor field provides expected O(1) remove(),
764      * but still O(n) in the worst case, whenever the saved ancestor
765      * is concurrently deleted.
766      */

767     private class Itr implements Iterator<E> {
768         private Node<E> next;           // Node holding nextItem
769         private E nextItem;             // next item to hand out
770         private Node<E> lastRet;
771         private Node<E> ancestor;       // Helps unlink lastRet on remove()
772
773         Itr() {
774             fullyLock();
775             try {
776                 if ((next = head.next) != null)
777                     nextItem = next.item;
778             } finally {
779                 fullyUnlock();
780             }
781         }
782
783         public boolean hasNext() {
784             return next != null;
785         }
786
787         public E next() {
788             Node<E> p;
789             if ((p = next) == null)
790                 throw new NoSuchElementException();
791             lastRet = p;
792             E x = nextItem;
793             fullyLock();
794             try {
795                 E e = null;
796                 for (p = p.next; p != null && (e = p.item) == null; )
797                     p = succ(p);
798                 next = p;
799                 nextItem = e;
800             } finally {
801                 fullyUnlock();
802             }
803             return x;
804         }
805
806         public void forEachRemaining(Consumer<? super E> action) {
807             // A variant of forEachFrom
808             Objects.requireNonNull(action);
809             Node<E> p;
810             if ((p = next) == nullreturn;
811             lastRet = p;
812             next = null;
813             final int batchSize = 64;
814             Object[] es = null;
815             int n, len = 1;
816             do {
817                 fullyLock();
818                 try {
819                     if (es == null) {
820                         p = p.next;
821                         for (Node<E> q = p; q != null; q = succ(q))
822                             if (q.item != null && ++len == batchSize)
823                                 break;
824                         es = new Object[len];
825                         es[0] = nextItem;
826                         nextItem = null;
827                         n = 1;
828                     } else
829                         n = 0;
830                     for (; p != null && n < len; p = succ(p))
831                         if ((es[n] = p.item) != null) {
832                             lastRet = p;
833                             n++;
834                         }
835                 } finally {
836                     fullyUnlock();
837                 }
838                 for (int i = 0; i < n; i++) {
839                     @SuppressWarnings("unchecked") E e = (E) es[i];
840                     action.accept(e);
841                 }
842             } while (n > 0 && p != null);
843         }
844
845         public void remove() {
846             Node<E> p = lastRet;
847             if (p == null)
848                 throw new IllegalStateException();
849             lastRet = null;
850             fullyLock();
851             try {
852                 if (p.item != null) {
853                     if (ancestor == null)
854                         ancestor = head;
855                     ancestor = findPred(p, ancestor);
856                     unlink(p, ancestor);
857                 }
858             } finally {
859                 fullyUnlock();
860             }
861         }
862     }
863
864     /**
865      * A customized variant of Spliterators.IteratorSpliterator.
866      * Keep this class in sync with (very similar) LBDSpliterator.
867      */

868     private final class LBQSpliterator implements Spliterator<E> {
869         static final int MAX_BATCH = 1 << 25;  // max batch array size;
870         Node<E> current;    // current node; null until initialized
871         int batch;          // batch size for splits
872         boolean exhausted;  // true when no more nodes
873         long est = size();  // size estimate
874
875         LBQSpliterator() {}
876
877         public long estimateSize() { return est; }
878
879         public Spliterator<E> trySplit() {
880             Node<E> h;
881             if (!exhausted &&
882                 ((h = current) != null || (h = head.next) != null)
883                 && h.next != null) {
884                 int n = batch = Math.min(batch + 1, MAX_BATCH);
885                 Object[] a = new Object[n];
886                 int i = 0;
887                 Node<E> p = current;
888                 fullyLock();
889                 try {
890                     if (p != null || (p = head.next) != null)
891                         for (; p != null && i < n; p = succ(p))
892                             if ((a[i] = p.item) != null)
893                                 i++;
894                 } finally {
895                     fullyUnlock();
896                 }
897                 if ((current = p) == null) {
898                     est = 0L;
899                     exhausted = true;
900                 }
901                 else if ((est -= i) < 0L)
902                     est = 0L;
903                 if (i > 0)
904                     return Spliterators.spliterator
905                         (a, 0, i, (Spliterator.ORDERED |
906                                    Spliterator.NONNULL |
907                                    Spliterator.CONCURRENT));
908             }
909             return null;
910         }
911
912         public boolean tryAdvance(Consumer<? super E> action) {
913             Objects.requireNonNull(action);
914             if (!exhausted) {
915                 E e = null;
916                 fullyLock();
917                 try {
918                     Node<E> p;
919                     if ((p = current) != null || (p = head.next) != null)
920                         do {
921                             e = p.item;
922                             p = succ(p);
923                         } while (e == null && p != null);
924                     if ((current = p) == null)
925                         exhausted = true;
926                 } finally {
927                     fullyUnlock();
928                 }
929                 if (e != null) {
930                     action.accept(e);
931                     return true;
932                 }
933             }
934             return false;
935         }
936
937         public void forEachRemaining(Consumer<? super E> action) {
938             Objects.requireNonNull(action);
939             if (!exhausted) {
940                 exhausted = true;
941                 Node<E> p = current;
942                 current = null;
943                 forEachFrom(action, p);
944             }
945         }
946
947         public int characteristics() {
948             return (Spliterator.ORDERED |
949                     Spliterator.NONNULL |
950                     Spliterator.CONCURRENT);
951         }
952     }
953
954     /**
955      * Returns a {@link Spliterator} over the elements in this queue.
956      *
957      * <p>The returned spliterator is
958      * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
959      *
960      * <p>The {@code Spliterator} reports {@link Spliterator#CONCURRENT},
961      * {@link Spliterator#ORDERED}, and {@link Spliterator#NONNULL}.
962      *
963      * @implNote
964      * The {@code Spliterator} implements {@code trySplit} to permit limited
965      * parallelism.
966      *
967      * @return a {@code Spliterator} over the elements in this queue
968      * @since 1.8
969      */

970     public Spliterator<E> spliterator() {
971         return new LBQSpliterator();
972     }
973
974     /**
975      * @throws NullPointerException {@inheritDoc}
976      */

977     public void forEach(Consumer<? super E> action) {
978         Objects.requireNonNull(action);
979         forEachFrom(action, null);
980     }
981
982     /**
983      * Runs action on each element found during a traversal starting at p.
984      * If p is null, traversal starts at head.
985      */

986     void forEachFrom(Consumer<? super E> action, Node<E> p) {
987         // Extract batches of elements while holding the lock; then
988         // run the action on the elements while not
989         final int batchSize = 64;       // max number of elements per batch
990         Object[] es = null;             // container for batch of elements
991         int n, len = 0;
992         do {
993             fullyLock();
994             try {
995                 if (es == null) {
996                     if (p == null) p = head.next;
997                     for (Node<E> q = p; q != null; q = succ(q))
998                         if (q.item != null && ++len == batchSize)
999                             break;
1000                     es = new Object[len];
1001                 }
1002                 for (n = 0; p != null && n < len; p = succ(p))
1003                     if ((es[n] = p.item) != null)
1004                         n++;
1005             } finally {
1006                 fullyUnlock();
1007             }
1008             for (int i = 0; i < n; i++) {
1009                 @SuppressWarnings("unchecked") E e = (E) es[i];
1010                 action.accept(e);
1011             }
1012         } while (n > 0 && p != null);
1013     }
1014
1015     /**
1016      * @throws NullPointerException {@inheritDoc}
1017      */

1018     public boolean removeIf(Predicate<? super E> filter) {
1019         Objects.requireNonNull(filter);
1020         return bulkRemove(filter);
1021     }
1022
1023     /**
1024      * @throws NullPointerException {@inheritDoc}
1025      */

1026     public boolean removeAll(Collection<?> c) {
1027         Objects.requireNonNull(c);
1028         return bulkRemove(e -> c.contains(e));
1029     }
1030
1031     /**
1032      * @throws NullPointerException {@inheritDoc}
1033      */

1034     public boolean retainAll(Collection<?> c) {
1035         Objects.requireNonNull(c);
1036         return bulkRemove(e -> !c.contains(e));
1037     }
1038
1039     /**
1040      * Returns the predecessor of live node p, given a node that was
1041      * once a live ancestor of p (or head); allows unlinking of p.
1042      */

1043     Node<E> findPred(Node<E> p, Node<E> ancestor) {
1044         // assert p.item != null;
1045         if (ancestor.item == null)
1046             ancestor = head;
1047         // Fails with NPE if precondition not satisfied
1048         for (Node<E> q; (q = ancestor.next) != p; )
1049             ancestor = q;
1050         return ancestor;
1051     }
1052
1053     /** Implementation of bulk remove methods. */
1054     @SuppressWarnings("unchecked")
1055     private boolean bulkRemove(Predicate<? super E> filter) {
1056         boolean removed = false;
1057         Node<E> p = null, ancestor = head;
1058         Node<E>[] nodes = null;
1059         int n, len = 0;
1060         do {
1061             // 1. Extract batch of up to 64 elements while holding the lock.
1062             fullyLock();
1063             try {
1064                 if (nodes == null) {  // first batch; initialize
1065                     p = head.next;
1066                     for (Node<E> q = p; q != null; q = succ(q))
1067                         if (q.item != null && ++len == 64)
1068                             break;
1069                     nodes = (Node<E>[]) new Node<?>[len];
1070                 }
1071                 for (n = 0; p != null && n < len; p = succ(p))
1072                     nodes[n++] = p;
1073             } finally {
1074                 fullyUnlock();
1075             }
1076
1077             // 2. Run the filter on the elements while lock is free.
1078             long deathRow = 0L;       // "bitset" of size 64
1079             for (int i = 0; i < n; i++) {
1080                 final E e;
1081                 if ((e = nodes[i].item) != null && filter.test(e))
1082                     deathRow |= 1L << i;
1083             }
1084
1085             // 3. Remove any filtered elements while holding the lock.
1086             if (deathRow != 0) {
1087                 fullyLock();
1088                 try {
1089                     for (int i = 0; i < n; i++) {
1090                         final Node<E> q;
1091                         if ((deathRow & (1L << i)) != 0L
1092                             && (q = nodes[i]).item != null) {
1093                             ancestor = findPred(q, ancestor);
1094                             unlink(q, ancestor);
1095                             removed = true;
1096                         }
1097                         nodes[i] = null// help GC
1098                     }
1099                 } finally {
1100                     fullyUnlock();
1101                 }
1102             }
1103         } while (n > 0 && p != null);
1104         return removed;
1105     }
1106
1107     /**
1108      * Saves this queue to a stream (that is, serializes it).
1109      *
1110      * @param s the stream
1111      * @throws java.io.IOException if an I/O error occurs
1112      * @serialData The capacity is emitted (int), followed by all of
1113      * its elements (each an {@code Object}) in the proper order,
1114      * followed by a null
1115      */

1116     private void writeObject(java.io.ObjectOutputStream s)
1117         throws java.io.IOException {
1118
1119         fullyLock();
1120         try {
1121             // Write out any hidden stuff, plus capacity
1122             s.defaultWriteObject();
1123
1124             // Write out all elements in the proper order.
1125             for (Node<E> p = head.next; p != null; p = p.next)
1126                 s.writeObject(p.item);
1127
1128             // Use trailing null as sentinel
1129             s.writeObject(null);
1130         } finally {
1131             fullyUnlock();
1132         }
1133     }
1134
1135     /**
1136      * Reconstitutes this queue from a stream (that is, deserializes it).
1137      * @param s the stream
1138      * @throws ClassNotFoundException if the class of a serialized object
1139      *         could not be found
1140      * @throws java.io.IOException if an I/O error occurs
1141      */

1142     private void readObject(java.io.ObjectInputStream s)
1143         throws java.io.IOException, ClassNotFoundException {
1144         // Read in capacity, and any hidden stuff
1145         s.defaultReadObject();
1146
1147         count.set(0);
1148         last = head = new Node<E>(null);
1149
1150         // Read in all elements and place in queue
1151         for (;;) {
1152             @SuppressWarnings("unchecked")
1153             E item = (E)s.readObject();
1154             if (item == null)
1155                 break;
1156             add(item);
1157         }
1158     }
1159 }
1160