1 /*
2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3 *
4 * This code is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License version 2 only, as
6 * published by the Free Software Foundation. Oracle designates this
7 * particular file as subject to the "Classpath" exception as provided
8 * by Oracle in the LICENSE file that accompanied this code.
9 *
10 * This code is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
13 * version 2 for more details (a copy is included in the LICENSE file that
14 * accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License version
17 * 2 along with this work; if not, write to the Free Software Foundation,
18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19 *
20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21 * or visit www.oracle.com if you need additional information or have any
22 * questions.
23 */
24
25 /*
26 * This file is available under and governed by the GNU General Public
27 * License version 2 only, as published by the Free Software Foundation.
28 * However, the following notice accompanied the original version of this
29 * file:
30 *
31 * Written by Doug Lea with assistance from members of JCP JSR-166
32 * Expert Group and released to the public domain, as explained at
33 * http://creativecommons.org/publicdomain/zero/1.0/
34 */
35
36 package java.util.concurrent;
37
38 import java.util.AbstractQueue;
39 import java.util.Collection;
40 import java.util.Iterator;
41 import java.util.NoSuchElementException;
42 import java.util.Objects;
43 import java.util.Spliterator;
44 import java.util.Spliterators;
45 import java.util.concurrent.atomic.AtomicInteger;
46 import java.util.concurrent.locks.Condition;
47 import java.util.concurrent.locks.ReentrantLock;
48 import java.util.function.Consumer;
49 import java.util.function.Predicate;
50
51 /**
52 * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
53 * linked nodes.
54 * This queue orders elements FIFO (first-in-first-out).
55 * The <em>head</em> of the queue is that element that has been on the
56 * queue the longest time.
57 * The <em>tail</em> of the queue is that element that has been on the
58 * queue the shortest time. New elements
59 * are inserted at the tail of the queue, and the queue retrieval
60 * operations obtain elements at the head of the queue.
61 * Linked queues typically have higher throughput than array-based queues but
62 * less predictable performance in most concurrent applications.
63 *
64 * <p>The optional capacity bound constructor argument serves as a
65 * way to prevent excessive queue expansion. The capacity, if unspecified,
66 * is equal to {@link Integer#MAX_VALUE}. Linked nodes are
67 * dynamically created upon each insertion unless this would bring the
68 * queue above capacity.
69 *
70 * <p>This class and its iterator implement all of the <em>optional</em>
71 * methods of the {@link Collection} and {@link Iterator} interfaces.
72 *
73 * <p>This class is a member of the
74 * <a href="{@docRoot}/java.base/java/util/package-summary.html#CollectionsFramework">
75 * Java Collections Framework</a>.
76 *
77 * @since 1.5
78 * @author Doug Lea
79 * @param <E> the type of elements held in this queue
80 */
81 public class LinkedBlockingQueue<E> extends AbstractQueue<E>
82 implements BlockingQueue<E>, java.io.Serializable {
83 private static final long serialVersionUID = -6903933977591709194L;
84
85 /*
86 * A variant of the "two lock queue" algorithm. The putLock gates
87 * entry to put (and offer), and has an associated condition for
88 * waiting puts. Similarly for the takeLock. The "count" field
89 * that they both rely on is maintained as an atomic to avoid
90 * needing to get both locks in most cases. Also, to minimize need
91 * for puts to get takeLock and vice-versa, cascading notifies are
92 * used. When a put notices that it has enabled at least one take,
93 * it signals taker. That taker in turn signals others if more
94 * items have been entered since the signal. And symmetrically for
95 * takes signalling puts. Operations such as remove(Object) and
96 * iterators acquire both locks.
97 *
98 * Visibility between writers and readers is provided as follows:
99 *
100 * Whenever an element is enqueued, the putLock is acquired and
101 * count updated. A subsequent reader guarantees visibility to the
102 * enqueued Node by either acquiring the putLock (via fullyLock)
103 * or by acquiring the takeLock, and then reading n = count.get();
104 * this gives visibility to the first n items.
105 *
106 * To implement weakly consistent iterators, it appears we need to
107 * keep all Nodes GC-reachable from a predecessor dequeued Node.
108 * That would cause two problems:
109 * - allow a rogue Iterator to cause unbounded memory retention
110 * - cause cross-generational linking of old Nodes to new Nodes if
111 * a Node was tenured while live, which generational GCs have a
112 * hard time dealing with, causing repeated major collections.
113 * However, only non-deleted Nodes need to be reachable from
114 * dequeued Nodes, and reachability does not necessarily have to
115 * be of the kind understood by the GC. We use the trick of
116 * linking a Node that has just been dequeued to itself. Such a
117 * self-link implicitly means to advance to head.next.
118 */
119
120 /**
121 * Linked list node class.
122 */
123 static class Node<E> {
124 E item;
125
126 /**
127 * One of:
128 * - the real successor Node
129 * - this Node, meaning the successor is head.next
130 * - null, meaning there is no successor (this is the last node)
131 */
132 Node<E> next;
133
134 Node(E x) { item = x; }
135 }
136
137 /** The capacity bound, or Integer.MAX_VALUE if none */
138 private final int capacity;
139
140 /** Current number of elements */
141 private final AtomicInteger count = new AtomicInteger();
142
143 /**
144 * Head of linked list.
145 * Invariant: head.item == null
146 */
147 transient Node<E> head;
148
149 /**
150 * Tail of linked list.
151 * Invariant: last.next == null
152 */
153 private transient Node<E> last;
154
155 /** Lock held by take, poll, etc */
156 private final ReentrantLock takeLock = new ReentrantLock();
157
158 /** Wait queue for waiting takes */
159 private final Condition notEmpty = takeLock.newCondition();
160
161 /** Lock held by put, offer, etc */
162 private final ReentrantLock putLock = new ReentrantLock();
163
164 /** Wait queue for waiting puts */
165 private final Condition notFull = putLock.newCondition();
166
167 /**
168 * Signals a waiting take. Called only from put/offer (which do not
169 * otherwise ordinarily lock takeLock.)
170 */
171 private void signalNotEmpty() {
172 final ReentrantLock takeLock = this.takeLock;
173 takeLock.lock();
174 try {
175 notEmpty.signal();
176 } finally {
177 takeLock.unlock();
178 }
179 }
180
181 /**
182 * Signals a waiting put. Called only from take/poll.
183 */
184 private void signalNotFull() {
185 final ReentrantLock putLock = this.putLock;
186 putLock.lock();
187 try {
188 notFull.signal();
189 } finally {
190 putLock.unlock();
191 }
192 }
193
194 /**
195 * Links node at end of queue.
196 *
197 * @param node the node
198 */
199 private void enqueue(Node<E> node) {
200 // assert putLock.isHeldByCurrentThread();
201 // assert last.next == null;
202 last = last.next = node;
203 }
204
205 /**
206 * Removes a node from head of queue.
207 *
208 * @return the node
209 */
210 private E dequeue() {
211 // assert takeLock.isHeldByCurrentThread();
212 // assert head.item == null;
213 Node<E> h = head;
214 Node<E> first = h.next;
215 h.next = h; // help GC
216 head = first;
217 E x = first.item;
218 first.item = null;
219 return x;
220 }
221
222 /**
223 * Locks to prevent both puts and takes.
224 */
225 void fullyLock() {
226 putLock.lock();
227 takeLock.lock();
228 }
229
230 /**
231 * Unlocks to allow both puts and takes.
232 */
233 void fullyUnlock() {
234 takeLock.unlock();
235 putLock.unlock();
236 }
237
238 /**
239 * Creates a {@code LinkedBlockingQueue} with a capacity of
240 * {@link Integer#MAX_VALUE}.
241 */
242 public LinkedBlockingQueue() {
243 this(Integer.MAX_VALUE);
244 }
245
246 /**
247 * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
248 *
249 * @param capacity the capacity of this queue
250 * @throws IllegalArgumentException if {@code capacity} is not greater
251 * than zero
252 */
253 public LinkedBlockingQueue(int capacity) {
254 if (capacity <= 0) throw new IllegalArgumentException();
255 this.capacity = capacity;
256 last = head = new Node<E>(null);
257 }
258
259 /**
260 * Creates a {@code LinkedBlockingQueue} with a capacity of
261 * {@link Integer#MAX_VALUE}, initially containing the elements of the
262 * given collection,
263 * added in traversal order of the collection's iterator.
264 *
265 * @param c the collection of elements to initially contain
266 * @throws NullPointerException if the specified collection or any
267 * of its elements are null
268 */
269 public LinkedBlockingQueue(Collection<? extends E> c) {
270 this(Integer.MAX_VALUE);
271 final ReentrantLock putLock = this.putLock;
272 putLock.lock(); // Never contended, but necessary for visibility
273 try {
274 int n = 0;
275 for (E e : c) {
276 if (e == null)
277 throw new NullPointerException();
278 if (n == capacity)
279 throw new IllegalStateException("Queue full");
280 enqueue(new Node<E>(e));
281 ++n;
282 }
283 count.set(n);
284 } finally {
285 putLock.unlock();
286 }
287 }
288
289 // this doc comment is overridden to remove the reference to collections
290 // greater in size than Integer.MAX_VALUE
291 /**
292 * Returns the number of elements in this queue.
293 *
294 * @return the number of elements in this queue
295 */
296 public int size() {
297 return count.get();
298 }
299
300 // this doc comment is a modified copy of the inherited doc comment,
301 // without the reference to unlimited queues.
302 /**
303 * Returns the number of additional elements that this queue can ideally
304 * (in the absence of memory or resource constraints) accept without
305 * blocking. This is always equal to the initial capacity of this queue
306 * less the current {@code size} of this queue.
307 *
308 * <p>Note that you <em>cannot</em> always tell if an attempt to insert
309 * an element will succeed by inspecting {@code remainingCapacity}
310 * because it may be the case that another thread is about to
311 * insert or remove an element.
312 */
313 public int remainingCapacity() {
314 return capacity - count.get();
315 }
316
317 /**
318 * Inserts the specified element at the tail of this queue, waiting if
319 * necessary for space to become available.
320 *
321 * @throws InterruptedException {@inheritDoc}
322 * @throws NullPointerException {@inheritDoc}
323 */
324 public void put(E e) throws InterruptedException {
325 if (e == null) throw new NullPointerException();
326 final int c;
327 final Node<E> node = new Node<E>(e);
328 final ReentrantLock putLock = this.putLock;
329 final AtomicInteger count = this.count;
330 putLock.lockInterruptibly();
331 try {
332 /*
333 * Note that count is used in wait guard even though it is
334 * not protected by lock. This works because count can
335 * only decrease at this point (all other puts are shut
336 * out by lock), and we (or some other waiting put) are
337 * signalled if it ever changes from capacity. Similarly
338 * for all other uses of count in other wait guards.
339 */
340 while (count.get() == capacity) {
341 notFull.await();
342 }
343 enqueue(node);
344 c = count.getAndIncrement();
345 if (c + 1 < capacity)
346 notFull.signal();
347 } finally {
348 putLock.unlock();
349 }
350 if (c == 0)
351 signalNotEmpty();
352 }
353
354 /**
355 * Inserts the specified element at the tail of this queue, waiting if
356 * necessary up to the specified wait time for space to become available.
357 *
358 * @return {@code true} if successful, or {@code false} if
359 * the specified waiting time elapses before space is available
360 * @throws InterruptedException {@inheritDoc}
361 * @throws NullPointerException {@inheritDoc}
362 */
363 public boolean offer(E e, long timeout, TimeUnit unit)
364 throws InterruptedException {
365
366 if (e == null) throw new NullPointerException();
367 long nanos = unit.toNanos(timeout);
368 final int c;
369 final ReentrantLock putLock = this.putLock;
370 final AtomicInteger count = this.count;
371 putLock.lockInterruptibly();
372 try {
373 while (count.get() == capacity) {
374 if (nanos <= 0L)
375 return false;
376 nanos = notFull.awaitNanos(nanos);
377 }
378 enqueue(new Node<E>(e));
379 c = count.getAndIncrement();
380 if (c + 1 < capacity)
381 notFull.signal();
382 } finally {
383 putLock.unlock();
384 }
385 if (c == 0)
386 signalNotEmpty();
387 return true;
388 }
389
390 /**
391 * Inserts the specified element at the tail of this queue if it is
392 * possible to do so immediately without exceeding the queue's capacity,
393 * returning {@code true} upon success and {@code false} if this queue
394 * is full.
395 * When using a capacity-restricted queue, this method is generally
396 * preferable to method {@link BlockingQueue#add add}, which can fail to
397 * insert an element only by throwing an exception.
398 *
399 * @throws NullPointerException if the specified element is null
400 */
401 public boolean offer(E e) {
402 if (e == null) throw new NullPointerException();
403 final AtomicInteger count = this.count;
404 if (count.get() == capacity)
405 return false;
406 final int c;
407 final Node<E> node = new Node<E>(e);
408 final ReentrantLock putLock = this.putLock;
409 putLock.lock();
410 try {
411 if (count.get() == capacity)
412 return false;
413 enqueue(node);
414 c = count.getAndIncrement();
415 if (c + 1 < capacity)
416 notFull.signal();
417 } finally {
418 putLock.unlock();
419 }
420 if (c == 0)
421 signalNotEmpty();
422 return true;
423 }
424
425 public E take() throws InterruptedException {
426 final E x;
427 final int c;
428 final AtomicInteger count = this.count;
429 final ReentrantLock takeLock = this.takeLock;
430 takeLock.lockInterruptibly();
431 try {
432 while (count.get() == 0) {
433 notEmpty.await();
434 }
435 x = dequeue();
436 c = count.getAndDecrement();
437 if (c > 1)
438 notEmpty.signal();
439 } finally {
440 takeLock.unlock();
441 }
442 if (c == capacity)
443 signalNotFull();
444 return x;
445 }
446
447 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
448 final E x;
449 final int c;
450 long nanos = unit.toNanos(timeout);
451 final AtomicInteger count = this.count;
452 final ReentrantLock takeLock = this.takeLock;
453 takeLock.lockInterruptibly();
454 try {
455 while (count.get() == 0) {
456 if (nanos <= 0L)
457 return null;
458 nanos = notEmpty.awaitNanos(nanos);
459 }
460 x = dequeue();
461 c = count.getAndDecrement();
462 if (c > 1)
463 notEmpty.signal();
464 } finally {
465 takeLock.unlock();
466 }
467 if (c == capacity)
468 signalNotFull();
469 return x;
470 }
471
472 public E poll() {
473 final AtomicInteger count = this.count;
474 if (count.get() == 0)
475 return null;
476 final E x;
477 final int c;
478 final ReentrantLock takeLock = this.takeLock;
479 takeLock.lock();
480 try {
481 if (count.get() == 0)
482 return null;
483 x = dequeue();
484 c = count.getAndDecrement();
485 if (c > 1)
486 notEmpty.signal();
487 } finally {
488 takeLock.unlock();
489 }
490 if (c == capacity)
491 signalNotFull();
492 return x;
493 }
494
495 public E peek() {
496 final AtomicInteger count = this.count;
497 if (count.get() == 0)
498 return null;
499 final ReentrantLock takeLock = this.takeLock;
500 takeLock.lock();
501 try {
502 return (count.get() > 0) ? head.next.item : null;
503 } finally {
504 takeLock.unlock();
505 }
506 }
507
508 /**
509 * Unlinks interior Node p with predecessor pred.
510 */
511 void unlink(Node<E> p, Node<E> pred) {
512 // assert putLock.isHeldByCurrentThread();
513 // assert takeLock.isHeldByCurrentThread();
514 // p.next is not changed, to allow iterators that are
515 // traversing p to maintain their weak-consistency guarantee.
516 p.item = null;
517 pred.next = p.next;
518 if (last == p)
519 last = pred;
520 if (count.getAndDecrement() == capacity)
521 notFull.signal();
522 }
523
524 /**
525 * Removes a single instance of the specified element from this queue,
526 * if it is present. More formally, removes an element {@code e} such
527 * that {@code o.equals(e)}, if this queue contains one or more such
528 * elements.
529 * Returns {@code true} if this queue contained the specified element
530 * (or equivalently, if this queue changed as a result of the call).
531 *
532 * @param o element to be removed from this queue, if present
533 * @return {@code true} if this queue changed as a result of the call
534 */
535 public boolean remove(Object o) {
536 if (o == null) return false;
537 fullyLock();
538 try {
539 for (Node<E> pred = head, p = pred.next;
540 p != null;
541 pred = p, p = p.next) {
542 if (o.equals(p.item)) {
543 unlink(p, pred);
544 return true;
545 }
546 }
547 return false;
548 } finally {
549 fullyUnlock();
550 }
551 }
552
553 /**
554 * Returns {@code true} if this queue contains the specified element.
555 * More formally, returns {@code true} if and only if this queue contains
556 * at least one element {@code e} such that {@code o.equals(e)}.
557 *
558 * @param o object to be checked for containment in this queue
559 * @return {@code true} if this queue contains the specified element
560 */
561 public boolean contains(Object o) {
562 if (o == null) return false;
563 fullyLock();
564 try {
565 for (Node<E> p = head.next; p != null; p = p.next)
566 if (o.equals(p.item))
567 return true;
568 return false;
569 } finally {
570 fullyUnlock();
571 }
572 }
573
574 /**
575 * Returns an array containing all of the elements in this queue, in
576 * proper sequence.
577 *
578 * <p>The returned array will be "safe" in that no references to it are
579 * maintained by this queue. (In other words, this method must allocate
580 * a new array). The caller is thus free to modify the returned array.
581 *
582 * <p>This method acts as bridge between array-based and collection-based
583 * APIs.
584 *
585 * @return an array containing all of the elements in this queue
586 */
587 public Object[] toArray() {
588 fullyLock();
589 try {
590 int size = count.get();
591 Object[] a = new Object[size];
592 int k = 0;
593 for (Node<E> p = head.next; p != null; p = p.next)
594 a[k++] = p.item;
595 return a;
596 } finally {
597 fullyUnlock();
598 }
599 }
600
601 /**
602 * Returns an array containing all of the elements in this queue, in
603 * proper sequence; the runtime type of the returned array is that of
604 * the specified array. If the queue fits in the specified array, it
605 * is returned therein. Otherwise, a new array is allocated with the
606 * runtime type of the specified array and the size of this queue.
607 *
608 * <p>If this queue fits in the specified array with room to spare
609 * (i.e., the array has more elements than this queue), the element in
610 * the array immediately following the end of the queue is set to
611 * {@code null}.
612 *
613 * <p>Like the {@link #toArray()} method, this method acts as bridge between
614 * array-based and collection-based APIs. Further, this method allows
615 * precise control over the runtime type of the output array, and may,
616 * under certain circumstances, be used to save allocation costs.
617 *
618 * <p>Suppose {@code x} is a queue known to contain only strings.
619 * The following code can be used to dump the queue into a newly
620 * allocated array of {@code String}:
621 *
622 * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
623 *
624 * Note that {@code toArray(new Object[0])} is identical in function to
625 * {@code toArray()}.
626 *
627 * @param a the array into which the elements of the queue are to
628 * be stored, if it is big enough; otherwise, a new array of the
629 * same runtime type is allocated for this purpose
630 * @return an array containing all of the elements in this queue
631 * @throws ArrayStoreException if the runtime type of the specified array
632 * is not a supertype of the runtime type of every element in
633 * this queue
634 * @throws NullPointerException if the specified array is null
635 */
636 @SuppressWarnings("unchecked")
637 public <T> T[] toArray(T[] a) {
638 fullyLock();
639 try {
640 int size = count.get();
641 if (a.length < size)
642 a = (T[])java.lang.reflect.Array.newInstance
643 (a.getClass().getComponentType(), size);
644
645 int k = 0;
646 for (Node<E> p = head.next; p != null; p = p.next)
647 a[k++] = (T)p.item;
648 if (a.length > k)
649 a[k] = null;
650 return a;
651 } finally {
652 fullyUnlock();
653 }
654 }
655
656 public String toString() {
657 return Helpers.collectionToString(this);
658 }
659
660 /**
661 * Atomically removes all of the elements from this queue.
662 * The queue will be empty after this call returns.
663 */
664 public void clear() {
665 fullyLock();
666 try {
667 for (Node<E> p, h = head; (p = h.next) != null; h = p) {
668 h.next = h;
669 p.item = null;
670 }
671 head = last;
672 // assert head.item == null && head.next == null;
673 if (count.getAndSet(0) == capacity)
674 notFull.signal();
675 } finally {
676 fullyUnlock();
677 }
678 }
679
680 /**
681 * @throws UnsupportedOperationException {@inheritDoc}
682 * @throws ClassCastException {@inheritDoc}
683 * @throws NullPointerException {@inheritDoc}
684 * @throws IllegalArgumentException {@inheritDoc}
685 */
686 public int drainTo(Collection<? super E> c) {
687 return drainTo(c, Integer.MAX_VALUE);
688 }
689
690 /**
691 * @throws UnsupportedOperationException {@inheritDoc}
692 * @throws ClassCastException {@inheritDoc}
693 * @throws NullPointerException {@inheritDoc}
694 * @throws IllegalArgumentException {@inheritDoc}
695 */
696 public int drainTo(Collection<? super E> c, int maxElements) {
697 Objects.requireNonNull(c);
698 if (c == this)
699 throw new IllegalArgumentException();
700 if (maxElements <= 0)
701 return 0;
702 boolean signalNotFull = false;
703 final ReentrantLock takeLock = this.takeLock;
704 takeLock.lock();
705 try {
706 int n = Math.min(maxElements, count.get());
707 // count.get provides visibility to first n Nodes
708 Node<E> h = head;
709 int i = 0;
710 try {
711 while (i < n) {
712 Node<E> p = h.next;
713 c.add(p.item);
714 p.item = null;
715 h.next = h;
716 h = p;
717 ++i;
718 }
719 return n;
720 } finally {
721 // Restore invariants even if c.add() threw
722 if (i > 0) {
723 // assert h.item == null;
724 head = h;
725 signalNotFull = (count.getAndAdd(-i) == capacity);
726 }
727 }
728 } finally {
729 takeLock.unlock();
730 if (signalNotFull)
731 signalNotFull();
732 }
733 }
734
735 /**
736 * Used for any element traversal that is not entirely under lock.
737 * Such traversals must handle both:
738 * - dequeued nodes (p.next == p)
739 * - (possibly multiple) interior removed nodes (p.item == null)
740 */
741 Node<E> succ(Node<E> p) {
742 if (p == (p = p.next))
743 p = head.next;
744 return p;
745 }
746
747 /**
748 * Returns an iterator over the elements in this queue in proper sequence.
749 * The elements will be returned in order from first (head) to last (tail).
750 *
751 * <p>The returned iterator is
752 * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
753 *
754 * @return an iterator over the elements in this queue in proper sequence
755 */
756 public Iterator<E> iterator() {
757 return new Itr();
758 }
759
760 /**
761 * Weakly-consistent iterator.
762 *
763 * Lazily updated ancestor field provides expected O(1) remove(),
764 * but still O(n) in the worst case, whenever the saved ancestor
765 * is concurrently deleted.
766 */
767 private class Itr implements Iterator<E> {
768 private Node<E> next; // Node holding nextItem
769 private E nextItem; // next item to hand out
770 private Node<E> lastRet;
771 private Node<E> ancestor; // Helps unlink lastRet on remove()
772
773 Itr() {
774 fullyLock();
775 try {
776 if ((next = head.next) != null)
777 nextItem = next.item;
778 } finally {
779 fullyUnlock();
780 }
781 }
782
783 public boolean hasNext() {
784 return next != null;
785 }
786
787 public E next() {
788 Node<E> p;
789 if ((p = next) == null)
790 throw new NoSuchElementException();
791 lastRet = p;
792 E x = nextItem;
793 fullyLock();
794 try {
795 E e = null;
796 for (p = p.next; p != null && (e = p.item) == null; )
797 p = succ(p);
798 next = p;
799 nextItem = e;
800 } finally {
801 fullyUnlock();
802 }
803 return x;
804 }
805
806 public void forEachRemaining(Consumer<? super E> action) {
807 // A variant of forEachFrom
808 Objects.requireNonNull(action);
809 Node<E> p;
810 if ((p = next) == null) return;
811 lastRet = p;
812 next = null;
813 final int batchSize = 64;
814 Object[] es = null;
815 int n, len = 1;
816 do {
817 fullyLock();
818 try {
819 if (es == null) {
820 p = p.next;
821 for (Node<E> q = p; q != null; q = succ(q))
822 if (q.item != null && ++len == batchSize)
823 break;
824 es = new Object[len];
825 es[0] = nextItem;
826 nextItem = null;
827 n = 1;
828 } else
829 n = 0;
830 for (; p != null && n < len; p = succ(p))
831 if ((es[n] = p.item) != null) {
832 lastRet = p;
833 n++;
834 }
835 } finally {
836 fullyUnlock();
837 }
838 for (int i = 0; i < n; i++) {
839 @SuppressWarnings("unchecked") E e = (E) es[i];
840 action.accept(e);
841 }
842 } while (n > 0 && p != null);
843 }
844
845 public void remove() {
846 Node<E> p = lastRet;
847 if (p == null)
848 throw new IllegalStateException();
849 lastRet = null;
850 fullyLock();
851 try {
852 if (p.item != null) {
853 if (ancestor == null)
854 ancestor = head;
855 ancestor = findPred(p, ancestor);
856 unlink(p, ancestor);
857 }
858 } finally {
859 fullyUnlock();
860 }
861 }
862 }
863
864 /**
865 * A customized variant of Spliterators.IteratorSpliterator.
866 * Keep this class in sync with (very similar) LBDSpliterator.
867 */
868 private final class LBQSpliterator implements Spliterator<E> {
869 static final int MAX_BATCH = 1 << 25; // max batch array size;
870 Node<E> current; // current node; null until initialized
871 int batch; // batch size for splits
872 boolean exhausted; // true when no more nodes
873 long est = size(); // size estimate
874
875 LBQSpliterator() {}
876
877 public long estimateSize() { return est; }
878
879 public Spliterator<E> trySplit() {
880 Node<E> h;
881 if (!exhausted &&
882 ((h = current) != null || (h = head.next) != null)
883 && h.next != null) {
884 int n = batch = Math.min(batch + 1, MAX_BATCH);
885 Object[] a = new Object[n];
886 int i = 0;
887 Node<E> p = current;
888 fullyLock();
889 try {
890 if (p != null || (p = head.next) != null)
891 for (; p != null && i < n; p = succ(p))
892 if ((a[i] = p.item) != null)
893 i++;
894 } finally {
895 fullyUnlock();
896 }
897 if ((current = p) == null) {
898 est = 0L;
899 exhausted = true;
900 }
901 else if ((est -= i) < 0L)
902 est = 0L;
903 if (i > 0)
904 return Spliterators.spliterator
905 (a, 0, i, (Spliterator.ORDERED |
906 Spliterator.NONNULL |
907 Spliterator.CONCURRENT));
908 }
909 return null;
910 }
911
912 public boolean tryAdvance(Consumer<? super E> action) {
913 Objects.requireNonNull(action);
914 if (!exhausted) {
915 E e = null;
916 fullyLock();
917 try {
918 Node<E> p;
919 if ((p = current) != null || (p = head.next) != null)
920 do {
921 e = p.item;
922 p = succ(p);
923 } while (e == null && p != null);
924 if ((current = p) == null)
925 exhausted = true;
926 } finally {
927 fullyUnlock();
928 }
929 if (e != null) {
930 action.accept(e);
931 return true;
932 }
933 }
934 return false;
935 }
936
937 public void forEachRemaining(Consumer<? super E> action) {
938 Objects.requireNonNull(action);
939 if (!exhausted) {
940 exhausted = true;
941 Node<E> p = current;
942 current = null;
943 forEachFrom(action, p);
944 }
945 }
946
947 public int characteristics() {
948 return (Spliterator.ORDERED |
949 Spliterator.NONNULL |
950 Spliterator.CONCURRENT);
951 }
952 }
953
954 /**
955 * Returns a {@link Spliterator} over the elements in this queue.
956 *
957 * <p>The returned spliterator is
958 * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
959 *
960 * <p>The {@code Spliterator} reports {@link Spliterator#CONCURRENT},
961 * {@link Spliterator#ORDERED}, and {@link Spliterator#NONNULL}.
962 *
963 * @implNote
964 * The {@code Spliterator} implements {@code trySplit} to permit limited
965 * parallelism.
966 *
967 * @return a {@code Spliterator} over the elements in this queue
968 * @since 1.8
969 */
970 public Spliterator<E> spliterator() {
971 return new LBQSpliterator();
972 }
973
974 /**
975 * @throws NullPointerException {@inheritDoc}
976 */
977 public void forEach(Consumer<? super E> action) {
978 Objects.requireNonNull(action);
979 forEachFrom(action, null);
980 }
981
982 /**
983 * Runs action on each element found during a traversal starting at p.
984 * If p is null, traversal starts at head.
985 */
986 void forEachFrom(Consumer<? super E> action, Node<E> p) {
987 // Extract batches of elements while holding the lock; then
988 // run the action on the elements while not
989 final int batchSize = 64; // max number of elements per batch
990 Object[] es = null; // container for batch of elements
991 int n, len = 0;
992 do {
993 fullyLock();
994 try {
995 if (es == null) {
996 if (p == null) p = head.next;
997 for (Node<E> q = p; q != null; q = succ(q))
998 if (q.item != null && ++len == batchSize)
999 break;
1000 es = new Object[len];
1001 }
1002 for (n = 0; p != null && n < len; p = succ(p))
1003 if ((es[n] = p.item) != null)
1004 n++;
1005 } finally {
1006 fullyUnlock();
1007 }
1008 for (int i = 0; i < n; i++) {
1009 @SuppressWarnings("unchecked") E e = (E) es[i];
1010 action.accept(e);
1011 }
1012 } while (n > 0 && p != null);
1013 }
1014
1015 /**
1016 * @throws NullPointerException {@inheritDoc}
1017 */
1018 public boolean removeIf(Predicate<? super E> filter) {
1019 Objects.requireNonNull(filter);
1020 return bulkRemove(filter);
1021 }
1022
1023 /**
1024 * @throws NullPointerException {@inheritDoc}
1025 */
1026 public boolean removeAll(Collection<?> c) {
1027 Objects.requireNonNull(c);
1028 return bulkRemove(e -> c.contains(e));
1029 }
1030
1031 /**
1032 * @throws NullPointerException {@inheritDoc}
1033 */
1034 public boolean retainAll(Collection<?> c) {
1035 Objects.requireNonNull(c);
1036 return bulkRemove(e -> !c.contains(e));
1037 }
1038
1039 /**
1040 * Returns the predecessor of live node p, given a node that was
1041 * once a live ancestor of p (or head); allows unlinking of p.
1042 */
1043 Node<E> findPred(Node<E> p, Node<E> ancestor) {
1044 // assert p.item != null;
1045 if (ancestor.item == null)
1046 ancestor = head;
1047 // Fails with NPE if precondition not satisfied
1048 for (Node<E> q; (q = ancestor.next) != p; )
1049 ancestor = q;
1050 return ancestor;
1051 }
1052
1053 /** Implementation of bulk remove methods. */
1054 @SuppressWarnings("unchecked")
1055 private boolean bulkRemove(Predicate<? super E> filter) {
1056 boolean removed = false;
1057 Node<E> p = null, ancestor = head;
1058 Node<E>[] nodes = null;
1059 int n, len = 0;
1060 do {
1061 // 1. Extract batch of up to 64 elements while holding the lock.
1062 fullyLock();
1063 try {
1064 if (nodes == null) { // first batch; initialize
1065 p = head.next;
1066 for (Node<E> q = p; q != null; q = succ(q))
1067 if (q.item != null && ++len == 64)
1068 break;
1069 nodes = (Node<E>[]) new Node<?>[len];
1070 }
1071 for (n = 0; p != null && n < len; p = succ(p))
1072 nodes[n++] = p;
1073 } finally {
1074 fullyUnlock();
1075 }
1076
1077 // 2. Run the filter on the elements while lock is free.
1078 long deathRow = 0L; // "bitset" of size 64
1079 for (int i = 0; i < n; i++) {
1080 final E e;
1081 if ((e = nodes[i].item) != null && filter.test(e))
1082 deathRow |= 1L << i;
1083 }
1084
1085 // 3. Remove any filtered elements while holding the lock.
1086 if (deathRow != 0) {
1087 fullyLock();
1088 try {
1089 for (int i = 0; i < n; i++) {
1090 final Node<E> q;
1091 if ((deathRow & (1L << i)) != 0L
1092 && (q = nodes[i]).item != null) {
1093 ancestor = findPred(q, ancestor);
1094 unlink(q, ancestor);
1095 removed = true;
1096 }
1097 nodes[i] = null; // help GC
1098 }
1099 } finally {
1100 fullyUnlock();
1101 }
1102 }
1103 } while (n > 0 && p != null);
1104 return removed;
1105 }
1106
1107 /**
1108 * Saves this queue to a stream (that is, serializes it).
1109 *
1110 * @param s the stream
1111 * @throws java.io.IOException if an I/O error occurs
1112 * @serialData The capacity is emitted (int), followed by all of
1113 * its elements (each an {@code Object}) in the proper order,
1114 * followed by a null
1115 */
1116 private void writeObject(java.io.ObjectOutputStream s)
1117 throws java.io.IOException {
1118
1119 fullyLock();
1120 try {
1121 // Write out any hidden stuff, plus capacity
1122 s.defaultWriteObject();
1123
1124 // Write out all elements in the proper order.
1125 for (Node<E> p = head.next; p != null; p = p.next)
1126 s.writeObject(p.item);
1127
1128 // Use trailing null as sentinel
1129 s.writeObject(null);
1130 } finally {
1131 fullyUnlock();
1132 }
1133 }
1134
1135 /**
1136 * Reconstitutes this queue from a stream (that is, deserializes it).
1137 * @param s the stream
1138 * @throws ClassNotFoundException if the class of a serialized object
1139 * could not be found
1140 * @throws java.io.IOException if an I/O error occurs
1141 */
1142 private void readObject(java.io.ObjectInputStream s)
1143 throws java.io.IOException, ClassNotFoundException {
1144 // Read in capacity, and any hidden stuff
1145 s.defaultReadObject();
1146
1147 count.set(0);
1148 last = head = new Node<E>(null);
1149
1150 // Read in all elements and place in queue
1151 for (;;) {
1152 @SuppressWarnings("unchecked")
1153 E item = (E)s.readObject();
1154 if (item == null)
1155 break;
1156 add(item);
1157 }
1158 }
1159 }
1160