| LinkedBlockingQueue.java |
1 /*
2 * %W% %E%
3 *
4 * Copyright (c) 2006, Oracle and/or its affiliates. All rights reserved.
5 * ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
6 */
7
8 package java.util.concurrent;
9 import java.util.concurrent.atomic.*;
10 import java.util.concurrent.locks.*;
11 import java.util.*;
12
13 /**
14 * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
15 * linked nodes.
16 * This queue orders elements FIFO (first-in-first-out).
17 * The <em>head</em> of the queue is that element that has been on the
18 * queue the longest time.
19 * The <em>tail</em> of the queue is that element that has been on the
20 * queue the shortest time. New elements
21 * are inserted at the tail of the queue, and the queue retrieval
22 * operations obtain elements at the head of the queue.
23 * Linked queues typically have higher throughput than array-based queues but
24 * less predictable performance in most concurrent applications.
25 *
26 * <p> The optional capacity bound constructor argument serves as a
27 * way to prevent excessive queue expansion. The capacity, if unspecified,
28 * is equal to {@link Integer#MAX_VALUE}. Linked nodes are
29 * dynamically created upon each insertion unless this would bring the
30 * queue above capacity.
31 *
32 * <p>This class and its iterator implement all of the
33 * <em>optional</em> methods of the {@link Collection} and {@link
34 * Iterator} interfaces.
35 *
36 * <p>This class is a member of the
37 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
38 * Java Collections Framework</a>.
39 *
40 * @since 1.5
41 * @author Doug Lea
42 * @param <E> the type of elements held in this collection
43 *
44 */
45 public class LinkedBlockingQueue<E> extends AbstractQueue<E>
46 implements BlockingQueue<E>, java.io.Serializable {
47 private static final long serialVersionUID = -6903933977591709194L;
48
49 /*
50 * A variant of the "two lock queue" algorithm. The putLock gates
51 * entry to put (and offer), and has an associated condition for
52 * waiting puts. Similarly for the takeLock. The "count" field
53 * that they both rely on is maintained as an atomic to avoid
54 * needing to get both locks in most cases. Also, to minimize need
55 * for puts to get takeLock and vice-versa, cascading notifies are
56 * used. When a put notices that it has enabled at least one take,
57 * it signals taker. That taker in turn signals others if more
58 * items have been entered since the signal. And symmetrically for
59 * takes signalling puts. Operations such as remove(Object) and
60 * iterators acquire both locks.
61 *
62 * Visibility between writers and readers is provided as follows:
63 *
64 * Whenever an element is enqueued, the putLock is acquired and
65 * count updated. A subsequent reader guarantees visibility to the
66 * enqueued Node by either acquiring the putLock (via fullyLock)
67 * or by acquiring the takeLock, and then reading n = count.get();
68 * this gives visibility to the first n items.
69 *
70 * To implement weakly consistent iterators, it appears we need to
71 * keep all Nodes GC-reachable from a predecessor dequeued Node.
72 * That would cause two problems:
73 * - allow a rogue Iterator to cause unbounded memory retention
74 * - cause cross-generational linking of old Nodes to new Nodes if
75 * a Node was tenured while live, which generational GCs have a
76 * hard time dealing with, causing repeated major collections.
77 * However, only non-deleted Nodes need to be reachable from
78 * dequeued Nodes, and reachability does not necessarily have to
79 * be of the kind understood by the GC. We use the trick of
80 * linking a Node that has just been dequeued to itself. Such a
81 * self-link implicitly means to advance to head.next.
82 */
83
84 /**
85 * Linked list node class
86 */
87 static class Node<E> {
88 E item;
89 /**
90 * One of:
91 * - the real successor Node
92 * - this Node, meaning the successor is head.next
93 * - null, meaning there is no successor (this is the last node)
94 */
95
96 Node<E> next;
97 Node(E x) { item = x; }
98 }
99
100 /** The capacity bound, or Integer.MAX_VALUE if none */
101 private final int capacity;
102
103 /** Current number of elements */
104 private final AtomicInteger count = new AtomicInteger(0);
105
106 /** Head of linked list */
107 private transient Node<E> head;
108
109 /** Tail of linked list */
110 private transient Node<E> last;
111
112 /** Lock held by take, poll, etc */
113 private final ReentrantLock takeLock = new ReentrantLock();
114
115 /** Wait queue for waiting takes */
116 private final Condition notEmpty = takeLock.newCondition();
117
118 /** Lock held by put, offer, etc */
119 private final ReentrantLock putLock = new ReentrantLock();
120
121 /** Wait queue for waiting puts */
122 private final Condition notFull = putLock.newCondition();
123
124 /**
125 * Signals a waiting take. Called only from put/offer (which do not
126 * otherwise ordinarily lock takeLock.)
127 */
128 private void signalNotEmpty() {
129 final ReentrantLock takeLock = this.takeLock;
130 takeLock.lock();
131 try {
132 notEmpty.signal();
133 } finally {
134 takeLock.unlock();
135 }
136 }
137
138 /**
139 * Signals a waiting put. Called only from take/poll.
140 */
141 private void signalNotFull() {
142 final ReentrantLock putLock = this.putLock;
143 putLock.lock();
144 try {
145 notFull.signal();
146 } finally {
147 putLock.unlock();
148 }
149 }
150
151 /**
152 * Creates a node and links it at end of queue.
153 * @param x the item
154 */
155 private void enqueue(E x) {
156 // assert putLock.isHeldByCurrentThread();
157 last = last.next = new Node<E>(x);
158 }
159
160 /**
161 * Removes a node from head of queue.
162 * @return the node
163 */
164 private E dequeue() {
165 // assert takeLock.isHeldByCurrentThread();
166 Node<E> h = head;
167 Node<E> first = h.next;
168 h.next = h; // help GC
169 head = first;
170 E x = first.item;
171 first.item = null;
172 return x;
173 }
174
175 /**
176 * Lock to prevent both puts and takes.
177 */
178 void fullyLock() {
179 putLock.lock();
180 takeLock.lock();
181 }
182
183 /**
184 * Unlock to allow both puts and takes.
185 */
186 void fullyUnlock() {
187 takeLock.unlock();
188 putLock.unlock();
189 }
190
191 /**
192 * Tells whether both locks are held by current thread.
193 */
194 boolean isFullyLocked() {
195 return (putLock.isHeldByCurrentThread() &&
196 takeLock.isHeldByCurrentThread());
197 }
198
199
200 /**
201 * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
202 * {@link Integer#MAX_VALUE}.
203 */
204 public LinkedBlockingQueue() {
205 this(Integer.MAX_VALUE);
206 }
207
208 /**
209 * Creates a <tt>LinkedBlockingQueue</tt> with the given (fixed) capacity.
210 *
211 * @param capacity the capacity of this queue
212 * @throws IllegalArgumentException if <tt>capacity</tt> is not greater
213 * than zero
214 */
215 public LinkedBlockingQueue(int capacity) {
216 if (capacity <= 0) throw new IllegalArgumentException();
217 this.capacity = capacity;
218 last = head = new Node<E>(null);
219 }
220
221 /**
222 * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
223 * {@link Integer#MAX_VALUE}, initially containing the elements of the
224 * given collection,
225 * added in traversal order of the collection's iterator.
226 *
227 * @param c the collection of elements to initially contain
228 * @throws NullPointerException if the specified collection or any
229 * of its elements are null
230 */
231 public LinkedBlockingQueue(Collection<? extends E> c) {
232 this(Integer.MAX_VALUE);
233 final ReentrantLock putLock = this.putLock;
234 putLock.lock(); // Never contended, but necessary for visibility
235 try {
236 int n = 0;
237 for (E e : c) {
238 if (e == null)
239 throw new NullPointerException();
240 if (n == capacity)
241 throw new IllegalStateException("Queue full");
242 enqueue(e);
243 ++n;
244 }
245 count.set(n);
246 } finally {
247 putLock.unlock();
248 }
249 }
250
251
252 // this doc comment is overridden to remove the reference to collections
253 // greater in size than Integer.MAX_VALUE
254 /**
255 * Returns the number of elements in this queue.
256 *
257 * @return the number of elements in this queue
258 */
259 public int size() {
260 return count.get();
261 }
262
263 // this doc comment is a modified copy of the inherited doc comment,
264 // without the reference to unlimited queues.
265 /**
266 * Returns the number of additional elements that this queue can ideally
267 * (in the absence of memory or resource constraints) accept without
268 * blocking. This is always equal to the initial capacity of this queue
269 * less the current <tt>size</tt> of this queue.
270 *
271 * <p>Note that you <em>cannot</em> always tell if an attempt to insert
272 * an element will succeed by inspecting <tt>remainingCapacity</tt>
273 * because it may be the case that another thread is about to
274 * insert or remove an element.
275 */
276 public int remainingCapacity() {
277 return capacity - count.get();
278 }
279
280 /**
281 * Inserts the specified element at the tail of this queue, waiting if
282 * necessary for space to become available.
283 *
284 * @throws InterruptedException {@inheritDoc}
285 * @throws NullPointerException {@inheritDoc}
286 */
287 public void put(E e) throws InterruptedException {
288 if (e == null) throw new NullPointerException();
289 // Note: convention in all put/take/etc is to preset local var
290 // holding count negative to indicate failure unless set.
291 int c = -1;
292 final ReentrantLock putLock = this.putLock;
293 final AtomicInteger count = this.count;
294 putLock.lockInterruptibly();
295 try {
296 /*
297 * Note that count is used in wait guard even though it is
298 * not protected by lock. This works because count can
299 * only decrease at this point (all other puts are shut
300 * out by lock), and we (or some other waiting put) are
301 * signalled if it ever changes from
302 * capacity. Similarly for all other uses of count in
303 * other wait guards.
304 */
305 while (count.get() == capacity) {
306 notFull.await();
307 }
308 enqueue(e);
309 c = count.getAndIncrement();
310 if (c + 1 < capacity)
311 notFull.signal();
312 } finally {
313 putLock.unlock();
314 }
315 if (c == 0)
316 signalNotEmpty();
317 }
318
319 /**
320 * Inserts the specified element at the tail of this queue, waiting if
321 * necessary up to the specified wait time for space to become available.
322 *
323 * @return <tt>true</tt> if successful, or <tt>false</tt> if
324 * the specified waiting time elapses before space is available.
325 * @throws InterruptedException {@inheritDoc}
326 * @throws NullPointerException {@inheritDoc}
327 */
328 public boolean offer(E e, long timeout, TimeUnit unit)
329 throws InterruptedException {
330
331 if (e == null) throw new NullPointerException();
332 long nanos = unit.toNanos(timeout);
333 int c = -1;
334 final ReentrantLock putLock = this.putLock;
335 final AtomicInteger count = this.count;
336 putLock.lockInterruptibly();
337 try {
338 while (count.get() == capacity) {
339
340 if (nanos <= 0)
341 return false;
342 nanos = notFull.awaitNanos(nanos);
343 }
344 enqueue(e);
345 c = count.getAndIncrement();
346 if (c + 1 < capacity)
347 notFull.signal();
348 } finally {
349 putLock.unlock();
350 }
351 if (c == 0)
352 signalNotEmpty();
353 return true;
354 }
355
356 /**
357 * Inserts the specified element at the tail of this queue if it is
358 * possible to do so immediately without exceeding the queue's capacity,
359 * returning <tt>true</tt> upon success and <tt>false</tt> if this queue
360 * is full.
361 * When using a capacity-restricted queue, this method is generally
362 * preferable to method {@link BlockingQueue#add add}, which can fail to
363 * insert an element only by throwing an exception.
364 *
365 * @throws NullPointerException if the specified element is null
366 */
367 public boolean offer(E e) {
368 if (e == null) throw new NullPointerException();
369 final AtomicInteger count = this.count;
370 if (count.get() == capacity)
371 return false;
372 int c = -1;
373 final ReentrantLock putLock = this.putLock;
374 putLock.lock();
375 try {
376 if (count.get() < capacity) {
377 enqueue(e);
378 c = count.getAndIncrement();
379 if (c + 1 < capacity)
380 notFull.signal();
381 }
382 } finally {
383 putLock.unlock();
384 }
385 if (c == 0)
386 signalNotEmpty();
387 return c >= 0;
388 }
389
390
391 public E take() throws InterruptedException {
392 E x;
393 int c = -1;
394 final AtomicInteger count = this.count;
395 final ReentrantLock takeLock = this.takeLock;
396 takeLock.lockInterruptibly();
397 try {
398 while (count.get() == 0) {
399 notEmpty.await();
400 }
401 x = dequeue();
402 c = count.getAndDecrement();
403 if (c > 1)
404 notEmpty.signal();
405 } finally {
406 takeLock.unlock();
407 }
408 if (c == capacity)
409 signalNotFull();
410 return x;
411 }
412
413 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
414 E x = null;
415 int c = -1;
416 long nanos = unit.toNanos(timeout);
417 final AtomicInteger count = this.count;
418 final ReentrantLock takeLock = this.takeLock;
419 takeLock.lockInterruptibly();
420 try {
421 while (count.get() == 0) {
422 if (nanos <= 0)
423 return null;
424 nanos = notEmpty.awaitNanos(nanos);
425 }
426 x = dequeue();
427 c = count.getAndDecrement();
428 if (c > 1)
429 notEmpty.signal();
430 } finally {
431 takeLock.unlock();
432 }
433 if (c == capacity)
434 signalNotFull();
435 return x;
436 }
437
438 public E poll() {
439 final AtomicInteger count = this.count;
440 if (count.get() == 0)
441 return null;
442 E x = null;
443 int c = -1;
444 final ReentrantLock takeLock = this.takeLock;
445 takeLock.lock();
446 try {
447 if (count.get() > 0) {
448 x = dequeue();
449 c = count.getAndDecrement();
450 if (c > 1)
451 notEmpty.signal();
452 }
453 } finally {
454 takeLock.unlock();
455 }
456 if (c == capacity)
457 signalNotFull();
458 return x;
459 }
460
461
462 public E peek() {
463 if (count.get() == 0)
464 return null;
465 final ReentrantLock takeLock = this.takeLock;
466 takeLock.lock();
467 try {
468 Node<E> first = head.next;
469 if (first == null)
470 return null;
471 else
472 return first.item;
473 } finally {
474 takeLock.unlock();
475 }
476 }
477
478 /*
479 * Unlinks interior Node p with predecessor trail.
480 */
481 void unlink(Node<E> p, Node<E> trail) {
482 // assert isFullyLocked();
483 // p.next is not changed, to allow iterators that are
484 // traversing p to maintain their weak-consistency guarantee.
485 p.item = null;
486 trail.next = p.next;
487 if (last == p)
488 last = trail;
489 if (count.getAndDecrement() == capacity)
490 notFull.signal();
491 }
492
493 /**
494 * Removes a single instance of the specified element from this queue,
495 * if it is present. More formally, removes an element <tt>e</tt> such
496 * that <tt>o.equals(e)</tt>, if this queue contains one or more such
497 * elements.
498 * Returns <tt>true</tt> if this queue contained the specified element
499 * (or equivalently, if this queue changed as a result of the call).
500 *
501 * @param o element to be removed from this queue, if present
502 * @return <tt>true</tt> if this queue changed as a result of the call
503 */
504 public boolean remove(Object o) {
505 if (o == null) return false;
506 fullyLock();
507 try {
508 for (Node<E> trail = head, p = trail.next;
509 p != null;
510 trail = p, p = p.next) {
511 if (o.equals(p.item)) {
512 unlink(p, trail);
513 return true;
514 }
515 }
516 return false;
517 } finally {
518 fullyUnlock();
519 }
520 }
521
522 /**
523 * Returns an array containing all of the elements in this queue, in
524 * proper sequence.
525 *
526 * <p>The returned array will be "safe" in that no references to it are
527 * maintained by this queue. (In other words, this method must allocate
528 * a new array). The caller is thus free to modify the returned array.
529 *
530 * <p>This method acts as bridge between array-based and collection-based
531 * APIs.
532 *
533 * @return an array containing all of the elements in this queue
534 */
535 public Object[] toArray() {
536 fullyLock();
537 try {
538 int size = count.get();
539 Object[] a = new Object[size];
540 int k = 0;
541 for (Node<E> p = head.next; p != null; p = p.next)
542 a[k++] = p.item;
543 return a;
544 } finally {
545 fullyUnlock();
546 }
547 }
548
549 /**
550 * Returns an array containing all of the elements in this queue, in
551 * proper sequence; the runtime type of the returned array is that of
552 * the specified array. If the queue fits in the specified array, it
553 * is returned therein. Otherwise, a new array is allocated with the
554 * runtime type of the specified array and the size of this queue.
555 *
556 * <p>If this queue fits in the specified array with room to spare
557 * (i.e., the array has more elements than this queue), the element in
558 * the array immediately following the end of the queue is set to
559 * <tt>null</tt>.
560 *
561 * <p>Like the {@link #toArray()} method, this method acts as bridge between
562 * array-based and collection-based APIs. Further, this method allows
563 * precise control over the runtime type of the output array, and may,
564 * under certain circumstances, be used to save allocation costs.
565 *
566 * <p>Suppose <tt>x</tt> is a queue known to contain only strings.
567 * The following code can be used to dump the queue into a newly
568 * allocated array of <tt>String</tt>:
569 *
570 * <pre>
571 * String[] y = x.toArray(new String[0]);</pre>
572 *
573 * Note that <tt>toArray(new Object[0])</tt> is identical in function to
574 * <tt>toArray()</tt>.
575 *
576 * @param a the array into which the elements of the queue are to
577 * be stored, if it is big enough; otherwise, a new array of the
578 * same runtime type is allocated for this purpose
579 * @return an array containing all of the elements in this queue
580 * @throws ArrayStoreException if the runtime type of the specified array
581 * is not a supertype of the runtime type of every element in
582 * this queue
583 * @throws NullPointerException if the specified array is null
584 */
585 // @SuppressWarnings("unchecked")
586 public <T> T[] toArray(T[] a) {
587 fullyLock();
588 try {
589 int size = count.get();
590 if (a.length < size)
591 a = (T[])java.lang.reflect.Array.newInstance
592 (a.getClass().getComponentType(), size);
593
594 int k = 0;
595 for (Node<E> p = head.next; p != null; p = p.next)
596 a[k++] = (T)p.item;
597 if (a.length > k)
598 a[k] = null;
599 return a;
600 } finally {
601 fullyUnlock();
602 }
603 }
604
605 public String toString() {
606 fullyLock();
607 try {
608 return super.toString();
609 } finally {
610 fullyUnlock();
611 }
612 }
613
614 /**
615 * Atomically removes all of the elements from this queue.
616 * The queue will be empty after this call returns.
617 */
618 public void clear() {
619 fullyLock();
620 try {
621 for (Node<E> p, h = head; (p = h.next) != null; h = p) {
622 h.next = h;
623 p.item = null;
624 }
625 head = last;
626 // assert head.item == null && head.next == null;
627 if (count.getAndSet(0) == capacity)
628 notFull.signal();
629 } finally {
630 fullyUnlock();
631 }
632 }
633
634 /**
635 * @throws UnsupportedOperationException {@inheritDoc}
636 * @throws ClassCastException {@inheritDoc}
637 * @throws NullPointerException {@inheritDoc}
638 * @throws IllegalArgumentException {@inheritDoc}
639 */
640 public int drainTo(Collection<? super E> c) {
641 return drainTo(c, Integer.MAX_VALUE);
642
643 }
644
645 /**
646 * @throws UnsupportedOperationException {@inheritDoc}
647 * @throws ClassCastException {@inheritDoc}
648 * @throws NullPointerException {@inheritDoc}
649 * @throws IllegalArgumentException {@inheritDoc}
650 */
651 public int drainTo(Collection<? super E> c, int maxElements) {
652 if (c == null)
653 throw new NullPointerException();
654 if (c == this)
655 throw new IllegalArgumentException();
656 boolean signalNotFull = false;
657 final ReentrantLock takeLock = this.takeLock;
658 takeLock.lock();
659 try {
660 int n = Math.min(maxElements, count.get());
661 Node<E> h = head;
662 int i = 0;
663 try {
664 while (i < n) {
665 Node<E> p = h.next;
666 c.add(p.item);
667 p.item = null;
668 h.next = h;
669 h = p;
670 ++i;
671 }
672 return n;
673 } finally {
674 // Restore invariants even if c.add() threw
675 if (i > 0) {
676 // assert h.item == null;
677 head = h;
678 signalNotFull = (count.getAndAdd(-i) == capacity);
679 }
680 }
681 } finally {
682 takeLock.unlock();
683 if (signalNotFull)
684 signalNotFull();
685 }
686 }
687
688 /**
689 * Returns an iterator over the elements in this queue in proper sequence.
690 * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
691 * will never throw {@link ConcurrentModificationException},
692 * and guarantees to traverse elements as they existed upon
693 * construction of the iterator, and may (but is not guaranteed to)
694 * reflect any modifications subsequent to construction.
695 *
696 * @return an iterator over the elements in this queue in proper sequence
697 */
698 public Iterator<E> iterator() {
699 return new Itr();
700 }
701
702 private class Itr implements Iterator<E> {
703 /*
704 * Basic weakly-consistent iterator. At all times hold the next
705 * item to hand out so that if hasNext() reports true, we will
706 * still have it to return even if lost race with a take etc.
707 */
708 private Node<E> current;
709 private Node<E> lastRet;
710 private E currentElement;
711
712 Itr() {
713 fullyLock();
714 try {
715 current = head.next;
716 if (current != null)
717 currentElement = current.item;
718 } finally {
719 fullyUnlock();
720 }
721 }
722
723 public boolean hasNext() {
724 return current != null;
725 }
726
727 /**
728 * Returns the next live successor of p, or null if no such.
729 *
730 * Unlike other traversal methods, iterators need to handle both:
731 * - dequeued nodes (p.next == p)
732 * - (possibly multiple) interior removed nodes (p.item == null)
733 */
734 private Node<E> nextNode(Node<E> p) {
735 for (; ;) {
736 Node s = p.next;
737 if (s == p)
738 return head.next;
739 if (s == null || s.item != null)
740 return s;
741 p = s;
742 }
743 }
744
745 public E next() {
746 fullyLock();
747 try {
748 if (current == null)
749 throw new NoSuchElementException();
750 E x = currentElement;
751 lastRet = current;
752 current = nextNode(current);
753 currentElement = (current == null) ? null : current.item;
754 return x;
755 } finally {
756 fullyUnlock();
757 }
758 }
759
760 public void remove() {
761 if (lastRet == null)
762 throw new IllegalStateException();
763 fullyLock();
764 try {
765 Node<E> node = lastRet;
766 lastRet = null;
767 for (Node<E> trail = head, p = trail.next;
768 p != null;
769 trail = p, p = p.next) {
770 if (p == node) {
771 unlink(p, trail);
772 break;
773 }
774 }
775 } finally {
776 fullyUnlock();
777 }
778 }
779 }
780
781 /**
782 * Save the state to a stream (that is, serialize it).
783 *
784 * @serialData The capacity is emitted (int), followed by all of
785 * its elements (each an <tt>Object</tt>) in the proper order,
786 * followed by a null
787 * @param s the stream
788 */
789 private void writeObject(java.io.ObjectOutputStream s)
790 throws java.io.IOException {
791
792 fullyLock();
793 try {
794 // Write out any hidden stuff, plus capacity
795 s.defaultWriteObject();
796
797 // Write out all elements in the proper order.
798 for (Node<E> p = head.next; p != null; p = p.next)
799 s.writeObject(p.item);
800
801 // Use trailing null as sentinel
802 s.writeObject(null);
803 } finally {
804 fullyUnlock();
805 }
806 }
807
808 /**
809 * Reconstitute this queue instance from a stream (that is,
810 * deserialize it).
811 * @param s the stream
812 */
813 private void readObject(java.io.ObjectInputStream s)
814 throws java.io.IOException, ClassNotFoundException {
815 // Read in capacity, and any hidden stuff
816 s.defaultReadObject();
817
818 count.set(0);
819 last = head = new Node<E>(null);
820
821 // Read in all elements and place in queue
822 for (;;) {
823 // @SuppressWarnings("unchecked")
824 E item = (E)s.readObject();
825 if (item == null)
826 break;
827 add(item);
828 }
829 }
830 }
831