1   /*
2    * %W% %E%
3    *
4    * Copyright (c) 2006, Oracle and/or its affiliates. All rights reserved.
5    * ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
6    */
7   
8   package java.util.concurrent;
9   import java.util.AbstractQueue;
10  import java.util.ArrayList;
11  import java.util.Collection;
12  import java.util.Iterator;
13  import java.util.NoSuchElementException;
14  import java.util.Queue; 
15  
16  /**
17   * An unbounded thread-safe {@linkplain Queue queue} based on linked nodes.
18   * This queue orders elements FIFO (first-in-first-out).
19   * The <em>head</em> of the queue is that element that has been on the
20   * queue the longest time.
21   * The <em>tail</em> of the queue is that element that has been on the
22   * queue the shortest time. New elements
23   * are inserted at the tail of the queue, and the queue retrieval
24   * operations obtain elements at the head of the queue.
25   * A {@code ConcurrentLinkedQueue} is an appropriate choice when
26   * many threads will share access to a common collection.
27   * This queue does not permit {@code null} elements.
28   *
29   * <p>This implementation employs an efficient &quot;wait-free&quot;
30   * algorithm based on one described in <a
31   * href="http://www.cs.rochester.edu/u/michael/PODC96.html"> Simple,
32   * Fast, and Practical Non-Blocking and Blocking Concurrent Queue
33   * Algorithms</a> by Maged M. Michael and Michael L. Scott.
34   *
35   * <p>Beware that, unlike in most collections, the {@code size} method
36   * is <em>NOT</em> a constant-time operation. Because of the
37   * asynchronous nature of these queues, determining the current number
38   * of elements requires a traversal of the elements.
39   *
40   * <p>This class and its iterator implement all of the
41   * <em>optional</em> methods of the {@link Collection} and {@link
42   * Iterator} interfaces.
43   *
44   * <p>Memory consistency effects: As with other concurrent
45   * collections, actions in a thread prior to placing an object into a
46   * {@code ConcurrentLinkedQueue}
47   * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
48   * actions subsequent to the access or removal of that element from
49   * the {@code ConcurrentLinkedQueue} in another thread.
50   *
51   * <p>This class is a member of the
52   * <a href="{@docRoot}/../technotes/guides/collections/index.html">
53   * Java Collections Framework</a>.
54   *
55   * @since 1.5
56   * @author Doug Lea
57   * @param <E> the type of elements held in this collection
58   *
59   */
60  public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
61          implements Queue<E>, java.io.Serializable {
62      private static final long serialVersionUID = 196745693267521676L;
63  
64      /*
65       * This is a modification of the Michael & Scott algorithm,
66       * adapted for a garbage-collected environment, with support for
67       * interior node deletion (to support remove(Object)). For
68       * explanation, read the paper.
69       *
70       * Note that like most non-blocking algorithms in this package,
71       * this implementation relies on the fact that in garbage 
72       * collected systems, there is no possibility of ABA problems due
73       * to recycled nodes, so there is no need to use "counted
74       * pointers" or related techniques seen in versions used in
75       * non-GC'ed settings.
76       *
77       * The fundamental invariants are:
78       * - There is exactly one (last) Node with a null next reference,
79       * which is CASed when enqueueing. This last Node can be
80       * reached in O(1) time from tail, but tail is merely an
81       * optimization - it can always be reached in O(N) time from
82       * head as well.
83       * - The elements contained in the queue are the non-null items in
84       * Nodes that are reachable from head. CASing the item
85       * reference of a Node to null atomically removes it from the
86       * queue. Reachability of all elements from head must remain
87       * true even in the case of concurrent modifications that cause
88       * head to advance. A dequeued Node may remain in use
89       * indefinitely due to creation of an Iterator or simply a
90       * poll() that has lost its time slice.
91       *
92       * The above might appear to imply that all Nodes are GC-reachable
93       * from a predecessor dequeued Node. That would cause two problems:
94       * - allow a rogue Iterator to cause unbounded memory retention
95       * - cause cross-generational linking of old Nodes to new Nodes if
96       * a Node was tenured while live, which generational GCs have a
97       * hard time dealing with, causing repeated major collections.
98       * However, only non-deleted Nodes need to be reachable from
99       * dequeued Nodes, and reachability does not necessarily have to
100      * be of the kind understood by the GC. We use the trick of
101      * linking a Node that has just been dequeued to itself. Such a
102      * self-link implicitly means to advance to head.
103      *
104      * Both head and tail are permitted to lag. In fact, failing to
105      * update them every time one could is a significant optimization
106      * (fewer CASes). This is controlled by local "hops" variables
107      * that only trigger helping-CASes after experiencing multiple
108      * lags.
109      *
110      * Since head and tail are updated concurrently and independently,
111      * it is possible for tail to lag behind head (why not)?
112      *
113      * CASing a Node's item reference to null atomically removes the
114      * element from the queue. Iterators skip over Nodes with null
115      * items. Prior implementations of this class had a race between
116      * poll() and remove(Object) where the same element would appear
117      * to be successfully removed by two concurrent operations. The
118      * method remove(Object) also lazily unlinks deleted Nodes, but
119      * this is merely an optimization.
120      *
121      * When constructing a Node (before enqueuing it) we avoid paying
122      * for a volatile write to item by using lazySet instead of a
123      * normal write. This allows the cost of enqueue to be
124      * "one-and-a-half" CASes.
125      *
126      * Both head and tail may or may not point to a Node with a
127      * non-null item. If the queue is empty, all items must of course
128      * be null. Upon creation, both head and tail refer to a dummy
129      * Node with null item. Both head and tail are only updated using
130      * CAS, so they never regress, although again this is merely an
131      * optimization. 
132      */
133 
134     private static class Node<E> {
135         private volatile E item;
136         private volatile Node<E> next;
137 
138         Node(E item) {
139             // Piggyback on imminent casNext()
140             lazySetItem(item);
141          } 
142 
143         E getItem() {
144             return item;
145         }
146 
147         boolean casItem(E cmp, E val) {
148             return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
149         }
150 
151         void setItem(E val) {
152              item = val;
153         }
154 
155         void lazySetItem(E val) {
156             UNSAFE.putOrderedObject(this, itemOffset, val);
157         }
158 
159         void lazySetNext(Node<E> val) {
160             UNSAFE.putOrderedObject(this, nextOffset, val);
161         } 
162 
163         Node<E> getNext() {
164             return next;
165         }
166 
167         boolean casNext(Node<E> cmp, Node<E> val) {
168             return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
169         }
170 
171         // Unsafe mechanics
172 
173         private static final sun.misc.Unsafe UNSAFE =
174         sun.misc.Unsafe.getUnsafe();
175         private static final long nextOffset =
176         objectFieldOffset(UNSAFE, "next", Node.class);
177         private static final long itemOffset =
178         objectFieldOffset(UNSAFE, "item", Node.class);
179 
180     }
181     /**
182     * A node from which the first live (non-deleted) node (if any)
183     * can be reached in O(1) time.
184     * Invariants:
185     * - all live nodes are reachable from head via succ()
186     * - head != null
187     * - (tmp = head).next != tmp || tmp != head
188     * Non-invariants:
189     * - head.item may or may not be null.
190     * - it is permitted for tail to lag behind head, that is, for tail
191     * to not be reachable from head!
192     */
193     private transient volatile Node<E> head = new Node<E>(null);
194 
195     /**
196     * A node from which the last node on list (that is, the unique
197     * node with node.next == null) can be reached in O(1) time.
198     * Invariants:
199     * - the last node is always reachable from tail via succ()
200     * - tail != null
201     * Non-invariants:
202     * - tail.item may or may not be null.
203     * - it is permitted for tail to lag behind head, that is, for tail
204     * to not be reachable from head!
205     * - tail.next may or may not be self-pointing to tail.
206     */ 
207     private transient volatile Node<E> tail = head;
208 
209 
210     /**
211      * Creates a {@code ConcurrentLinkedQueue} that is initially empty.
212      */
213     public ConcurrentLinkedQueue() {}
214 
215     /**
216      * Creates a {@code ConcurrentLinkedQueue}
217      * initially containing the elements of the given collection,
218      * added in traversal order of the collection's iterator.
219      * @param c the collection of elements to initially contain
220      * @throws NullPointerException if the specified collection or any
221      *         of its elements are null
222      */
223     public ConcurrentLinkedQueue(Collection<? extends E> c) {
224         for (Iterator<? extends E> it = c.iterator(); it.hasNext();)
225             add(it.next());
226     }
227 
228     // Have to override just to update the javadoc
229 
230     /**
231      * Inserts the specified element at the tail of this queue.
232      *
233      * @return {@code true}  (as specified by {@link Collection#add})
234      * @throws NullPointerException if the specified element is null
235      */
236     public boolean add(E e) {
237         return offer(e);
238     }
239 
240     /**
241      * We don't bother to update head or tail pointers if fewer than
242      * HOPS links from "true" location. We assume that volatile
243      * writes are significantly more expensive than volatile reads.
244      */
245      private static final int HOPS = 1;
246 
247     /**
248      * Try to CAS head to p. If successful, repoint old head to itself
249      * as sentinel for succ(), below.
250      */
251      final void updateHead(Node<E> h, Node<E> p) {
252          if (h != p && casHead(h, p))
253              h.lazySetNext(h);
254      }
255     
256     /**
257      * Returns the successor of p, or the head node if p.next has been
258      * linked to self, which will only be true if traversing with a
259      * stale pointer that is now off the list.
260      */
261      final Node<E> succ(Node<E> p) {
262          Node<E> next = p.getNext();
263          return (p == next) ? head : next;
264      }
265  
266     /**
267      * Inserts the specified element at the tail of this queue.
268      *
269      * @return {@code true} (as specified by {@link Queue#offer})
270      * @throws NullPointerException if the specified element is null
271      */
272     public boolean offer(E e) {
273         if (e == null) throw new NullPointerException();
274         Node<E> n = new Node<E>(e);
275         retry:
276         for (;;) {
277             Node<E> t = tail;
278             Node<E> p = t;
279             for (int hops = 0; ; hops++) {
280                 Node<E> next = succ(p);
281                 if (next != null) {
282                     if (hops > HOPS && t != tail)
283                         continue retry;
284                     p = next;
285                 } else if (p.casNext(null, n)) {
286                     if (hops >= HOPS)
287                         casTail(t, n); // Failure is OK.
288                     return true;   
289                 } else {
290                     p = succ(p);
291                 }
292             }
293         }
294     }
295 
296     public E poll() {
297         Node<E> h = head;
298         Node<E> p = h;
299         for (int hops = 0; ; hops++) {
300             E item = p.getItem();
301 
302             if (item != null && p.casItem(item, null)) {
303                 if (hops >= HOPS) {
304                     Node<E> q = p.getNext();
305                     updateHead(h, (q != null) ? q : p);
306                 }
307                 return item;
308             }
309             Node<E> next = succ(p);
310             if (next == null) {
311                 updateHead(h, p);
312                 break;
313             }
314             p = next;
315         }
316         return null;
317     }
318 
319     public E peek() {
320         Node<E> h = head;
321         Node<E> p = h;
322         E item; 
323         for (;;) {
324             item = p.getItem();
325             if (item != null)
326                 break;
327             Node<E> next = succ(p);
328             if (next == null) {
329                 break;
330             }
331             p = next;
332         }
333         updateHead(h, p);
334         return item;
335     } 
336 
337     /**
338      * Returns the first live (non-deleted) node on list, or null if none.
339      * This is yet another variant of poll/peek; here returning the
340      * first node, not element. We could make peek() a wrapper around
341      * first(), but that would cost an extra volatile read of item,
342      * and the need to add a retry loop to deal with the possibility
343      * of losing a race to a concurrent poll().
344      */
345      Node<E> first() {
346          Node<E> h = head;
347          Node<E> p = h;
348          Node<E> result;
349          for (;;) { 
350              E item = p.getItem();
351              if (item != null) {
352                  result = p;
353                  break;
354              }
355              Node<E> next = succ(p);
356              if (next == null) {
357                  result = null;
358                  break;
359              }
360              p = next;
361          }
362          updateHead(h, p);
363          return result;
364      }
365 
366    /**
367     * Returns {@code true} if this queue contains no elements.
368     *
369     * @return {@code true} if this queue contains no elements
370     */ 
371     public boolean isEmpty() {
372         return first() == null;
373     }
374 
375     /**
376      * Returns the number of elements in this queue.  If this queue
377      * contains more than {@code Integer.MAX_VALUE} elements, returns
378      * {@code Integer.MAX_VALUE}.
379      *
380      * <p>Beware that, unlike in most collections, this method is
381      * <em>NOT</em> a constant-time operation. Because of the
382      * asynchronous nature of these queues, determining the current
383      * number of elements requires an O(n) traversal.
384      *
385      * @return the number of elements in this queue
386      */
387     public int size() {
388         int count = 0;
389         for (Node<E> p = first(); p != null; p = succ(p)) {
390             if (p.getItem() != null) {
391                 // Collections.size() spec says to max out
392                 if (++count == Integer.MAX_VALUE)
393                     break;
394             }
395         }
396         return count;
397     }
398 
399     /**
400      * Returns {@code true} if this queue contains the specified element.
401      * More formally, returns {@code true} if and only if this queue contains
402      * at least one element {@code e} such that {@code o.equals(e)}.
403      *
404      * @param o object to be checked for containment in this queue
405      * @return {@code true} if this queue contains the specified element
406      */
407     public boolean contains(Object o) {
408         if (o == null) return false;
409         for (Node<E> p = first(); p != null; p = succ(p)) {
410             E item = p.getItem();
411             if (item != null &&
412                 o.equals(item))
413                 return true;
414         }
415         return false;
416     }
417 
418     /**
419      * Removes a single instance of the specified element from this queue,
420      * if it is present.  More formally, removes an element {@code e} such
421      * that {@code o.equals(e)}, if this queue contains one or more such
422      * elements.
423      * Returns {@code true} if this queue contained the specified element
424      * (or equivalently, if this queue changed as a result of the call).
425      *
426      * @param o element to be removed from this queue, if present
427      * @return {@code true} if this queue changed as a result of the call
428      */
429     public boolean remove(Object o) {
430         if (o == null) return false;
431         Node<E> pred = null;
432         for (Node<E> p = first(); p != null; p = succ(p)) {
433             E item = p.getItem();
434             if (item != null &&
435                 o.equals(item) &&
436                 p.casItem(item, null)){
437                 Node<E> next = succ(p);
438                 if(pred != null && next != null)
439                    pred.casNext(p, next);
440                 return true;
441             }
442             pred = p;
443         }
444         return false;
445     }
446 
447     /**
448      * Returns an array containing all of the elements in this queue, in
449      * proper sequence.
450      *
451      * <p>The returned array will be "safe" in that no references to it are
452      * maintained by this queue.  (In other words, this method must allocate
453      * a new array).  The caller is thus free to modify the returned array.
454      *
455      * <p>This method acts as bridge between array-based and collection-based
456      * APIs.
457      *
458      * @return an array containing all of the elements in this queue
459      */
460     public Object[] toArray() {
461         // Use ArrayList to deal with resizing.
462         ArrayList<E> al = new ArrayList<E>();
463         for (Node<E> p = first(); p != null; p = succ(p)) {
464             E item = p.getItem();
465             if (item != null)
466                 al.add(item);
467         }
468         return al.toArray();
469     }
470 
471     /**
472      * Returns an array containing all of the elements in this queue, in
473      * proper sequence; the runtime type of the returned array is that of
474      * the specified array.  If the queue fits in the specified array, it
475      * is returned therein.  Otherwise, a new array is allocated with the
476      * runtime type of the specified array and the size of this queue.
477      *
478      * <p>If this queue fits in the specified array with room to spare
479      * (i.e., the array has more elements than this queue), the element in
480      * the array immediately following the end of the queue is set to
481      * {@code null}.
482      *
483      * <p>Like the {@link #toArray()} method, this method acts as bridge between
484      * array-based and collection-based APIs.  Further, this method allows
485      * precise control over the runtime type of the output array, and may,
486      * under certain circumstances, be used to save allocation costs.
487      *
488      * <p>Suppose {@code x} is a queue known to contain only strings.
489      * The following code can be used to dump the queue into a newly
490      * allocated array of {@code String}:
491      *
492      * <pre>
493      *     String[] y = x.toArray(new String[0]);</pre>
494      *
495      * Note that {@code toArray(new Object[0])} is identical in function to
496      * {@code toArray()}.
497      *
498      * @param a the array into which the elements of the queue are to
499      *          be stored, if it is big enough; otherwise, a new array of the
500      *          same runtime type is allocated for this purpose
501      * @return an array containing all of the elements in this queue
502      * @throws ArrayStoreException if the runtime type of the specified array
503      *         is not a supertype of the runtime type of every element in
504      *         this queue
505      * @throws NullPointerException if the specified array is null
506      */
507     @SuppressWarnings("unchecked")
508     public <T> T[] toArray(T[] a) {
509         // try to use sent-in array
510         int k = 0;
511         Node<E> p;
512         for (p = first(); p != null && k < a.length; p = succ(p)) {
513             E item = p.getItem();
514             if (item != null)
515                 a[k++] = (T)item;
516         }
517         if (p == null) {
518             if (k < a.length)
519                 a[k] = null;
520             return a;
521         }
522 
523         // If won't fit, use ArrayList version
524         ArrayList<E> al = new ArrayList<E>();
525         for (Node<E> q = first(); q != null; q = succ(q)) {
526             E item = q.getItem();
527             if (item != null)
528                 al.add(item);
529         }
530         return al.toArray(a);
531     }
532 
533     /**
534      * Returns an iterator over the elements in this queue in proper sequence.
535      * The returned iterator is a "weakly consistent" iterator that
536      * will never throw {@link  java.util.ConcurrentModificationException
537      * ConcurrentModificationException},
538      * and guarantees to traverse elements as they existed upon
539      * construction of the iterator, and may (but is not guaranteed to)
540      * reflect any modifications subsequent to construction.
541      *
542      * @return an iterator over the elements in this queue in proper sequence
543      */
544     public Iterator<E> iterator() {
545         return new Itr();
546     }
547 
548     private class Itr implements Iterator<E> {
549         /**
550          * Next node to return item for.
551          */
552         private Node<E> nextNode;
553 
554         /**
555          * nextItem holds on to item fields because once we claim
556          * that an element exists in hasNext(), we must return it in
557          * the following next() call even if it was in the process of
558          * being removed when hasNext() was called.
559          */
560         private E nextItem;
561 
562         /**
563          * Node of the last returned item, to support remove.
564          */
565         private Node<E> lastRet;
566 
567         Itr() {
568             advance();
569         }
570 
571         /**
572          * Moves to next valid node and returns item to return for
573          * next(), or null if no such.
574          */
575         private E advance() {
576             lastRet = nextNode;
577             E x = nextItem;
578 
579             Node<E> pred, p;
580             if (nextNode == null) {
581                 p = first();
582                 pred = null;
583             } else {
584                 pred = nextNode;
585                 p = succ(nextNode);
586             } 
587 
588             for (;;) {
589                 if (p == null) {
590                     nextNode = null;
591                     nextItem = null;
592                     return x;
593                 }
594                 E item = p.getItem();
595                 if (item != null) {
596                     nextNode = p;
597                     nextItem = item;
598                     return x;
599                 } else {
600                     // skip over nulls
601                     Node<E> next = succ(p);
602                     if (pred != null && next != null)
603                         pred.casNext(p, next);
604                     p = next; 
605                 }
606             }
607         }
608 
609         public boolean hasNext() {
610             return nextNode != null;
611         }
612 
613         public E next() {
614             if (nextNode == null) throw new NoSuchElementException();
615             return advance();
616         }
617 
618         public void remove() {
619             Node<E> l = lastRet;
620             if (l == null) throw new IllegalStateException();
621             // rely on a future traversal to relink.
622             l.setItem(null);
623             lastRet = null;
624         }
625     }
626 
627     /**
628      * Save the state to a stream (that is, serialize it).
629      *
630      * @serialData All of the elements (each an {@code E}) in
631      * the proper order, followed by a null
632      * @param s the stream
633      */
634     private void writeObject(java.io.ObjectOutputStream s)
635         throws java.io.IOException {
636 
637         // Write out any hidden stuff
638         s.defaultWriteObject();
639 
640         // Write out all elements in the proper order.
641         for (Node<E> p = first(); p != null; p = succ(p)) {
642             Object item = p.getItem();
643             if (item != null)
644                 s.writeObject(item);
645         }
646 
647         // Use trailing null as sentinel
648         s.writeObject(null);
649     }
650 
651     /**
652      * Reconstitute the Queue instance from a stream (that is,
653      * deserialize it).
654      * @param s the stream
655      */
656     private void readObject(java.io.ObjectInputStream s)
657         throws java.io.IOException, ClassNotFoundException {
658         // Read in capacity, and any hidden stuff
659         s.defaultReadObject();
660         head = new Node<E>(null);
661         tail = head;
662         // Read in all elements and place in queue
663         for (;;) {
664             @SuppressWarnings("unchecked")
665             E item = (E)s.readObject();
666             if (item == null)
667                 break;
668             else
669                 offer(item);
670         }
671     }
672 
673     // Unsafe mechanics
674 
675     private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
676     private static final long headOffset =
677         objectFieldOffset(UNSAFE, "head", ConcurrentLinkedQueue.class);
678     private static final long tailOffset =
679         objectFieldOffset(UNSAFE, "tail", ConcurrentLinkedQueue.class);
680 
681     private boolean casTail(Node<E> cmp, Node<E> val) {
682        return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
683     }
684 
685     private boolean casHead(Node<E> cmp, Node<E> val) {
686         return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val);
687     }
688 
689     private void lazySetHead(Node<E> val) {
690         UNSAFE.putOrderedObject(this, headOffset, val);
691     }
692 
693     static long objectFieldOffset(sun.misc.Unsafe UNSAFE,
694                                   String field, Class<?> klazz) {
695        try {
696            return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
697        } catch (NoSuchFieldException e) {
698             // Convert Exception to corresponding Error
699             NoSuchFieldError error = new NoSuchFieldError(field);
700             error.initCause(e);
701             throw error;
702        }
703     } 
704 }
705