1   /*
2    * %W% %E%
3    *
4    * Copyright (c) 2006, Oracle and/or its affiliates. All rights reserved.
5    * ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
6    */
7   
8   package java.util.concurrent;
9   import java.util.concurrent.atomic.*;
10  import java.util.concurrent.locks.*;
11  import java.util.*;
12  
13  /**
14   * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
15   * linked nodes.
16   * This queue orders elements FIFO (first-in-first-out).
17   * The <em>head</em> of the queue is that element that has been on the
18   * queue the longest time.
19   * The <em>tail</em> of the queue is that element that has been on the
20   * queue the shortest time. New elements
21   * are inserted at the tail of the queue, and the queue retrieval
22   * operations obtain elements at the head of the queue.
23   * Linked queues typically have higher throughput than array-based queues but
24   * less predictable performance in most concurrent applications.
25   *
26   * <p> The optional capacity bound constructor argument serves as a
27   * way to prevent excessive queue expansion. The capacity, if unspecified,
28   * is equal to {@link Integer#MAX_VALUE}.  Linked nodes are
29   * dynamically created upon each insertion unless this would bring the
30   * queue above capacity.
31   *
32   * <p>This class and its iterator implement all of the
33   * <em>optional</em> methods of the {@link Collection} and {@link
34   * Iterator} interfaces.
35   *
36   * <p>This class is a member of the
37   * <a href="{@docRoot}/../technotes/guides/collections/index.html">
38   * Java Collections Framework</a>.
39   *
40   * @since 1.5
41   * @author Doug Lea
42   * @param <E> the type of elements held in this collection
43   *
44   */
45  public class LinkedBlockingQueue<E> extends AbstractQueue<E>
46          implements BlockingQueue<E>, java.io.Serializable {
47      private static final long serialVersionUID = -6903933977591709194L;
48  
49      /*
50       * A variant of the "two lock queue" algorithm.  The putLock gates
51       * entry to put (and offer), and has an associated condition for
52       * waiting puts.  Similarly for the takeLock.  The "count" field
53       * that they both rely on is maintained as an atomic to avoid
54       * needing to get both locks in most cases. Also, to minimize need
55       * for puts to get takeLock and vice-versa, cascading notifies are
56       * used. When a put notices that it has enabled at least one take,
57       * it signals taker. That taker in turn signals others if more
58       * items have been entered since the signal. And symmetrically for
59       * takes signalling puts. Operations such as remove(Object) and
60       * iterators acquire both locks.
61       *
62       * Visibility between writers and readers is provided as follows:
63       *
64       * Whenever an element is enqueued, the putLock is acquired and
65       * count updated.  A subsequent reader guarantees visibility to the
66       * enqueued Node by either acquiring the putLock (via fullyLock)
67       * or by acquiring the takeLock, and then reading n = count.get();
68       * this gives visibility to the first n items.
69       *
70       * To implement weakly consistent iterators, it appears we need to
71       * keep all Nodes GC-reachable from a predecessor dequeued Node.
72       * That would cause two problems:
73       * - allow a rogue Iterator to cause unbounded memory retention
74       * - cause cross-generational linking of old Nodes to new Nodes if
75       *   a Node was tenured while live, which generational GCs have a
76       *   hard time dealing with, causing repeated major collections.
77       * However, only non-deleted Nodes need to be reachable from
78       * dequeued Nodes, and reachability does not necessarily have to
79       * be of the kind understood by the GC.  We use the trick of
80       * linking a Node that has just been dequeued to itself.  Such a
81       * self-link implicitly means to advance to head.next.
82       */
83  
84      /**
85       * Linked list node class
86       */
87      static class Node<E> {
88          E item;
89          /**
90           * One of:
91           * - the real successor Node
92           * - this Node, meaning the successor is head.next
93           * - null, meaning there is no successor (this is the last node)
94           */
95  
96          Node<E> next;
97          Node(E x) { item = x; }
98      }
99  
100     /** The capacity bound, or Integer.MAX_VALUE if none */
101     private final int capacity;
102 
103     /** Current number of elements */
104     private final AtomicInteger count = new AtomicInteger(0);
105 
106     /** Head of linked list */
107     private transient Node<E> head;
108 
109     /** Tail of linked list */
110     private transient Node<E> last;
111 
112     /** Lock held by take, poll, etc */
113     private final ReentrantLock takeLock = new ReentrantLock();
114 
115     /** Wait queue for waiting takes */
116     private final Condition notEmpty = takeLock.newCondition();
117 
118     /** Lock held by put, offer, etc */
119     private final ReentrantLock putLock = new ReentrantLock();
120 
121     /** Wait queue for waiting puts */
122     private final Condition notFull = putLock.newCondition();
123 
124     /**
125      * Signals a waiting take. Called only from put/offer (which do not
126      * otherwise ordinarily lock takeLock.)
127      */
128     private void signalNotEmpty() {
129         final ReentrantLock takeLock = this.takeLock;
130         takeLock.lock();
131         try {
132             notEmpty.signal();
133         } finally {
134             takeLock.unlock();
135         }
136     }
137 
138     /**
139      * Signals a waiting put. Called only from take/poll.
140      */
141     private void signalNotFull() {
142         final ReentrantLock putLock = this.putLock;
143         putLock.lock();
144         try {
145             notFull.signal();
146         } finally {
147             putLock.unlock();
148         }
149     }
150 
151     /**
152      * Creates a node and links it at end of queue.
153      * @param x the item
154      */
155     private void enqueue(E x) {
156         // assert putLock.isHeldByCurrentThread();
157         last = last.next = new Node<E>(x);
158     }
159 
160     /**
161      * Removes a node from head of queue.
162      * @return the node
163      */
164     private E dequeue() {
165         // assert takeLock.isHeldByCurrentThread();
166         Node<E> h = head;
167         Node<E> first = h.next;
168         h.next = h; // help GC
169         head = first;
170         E x = first.item;
171         first.item = null;
172         return x;
173     }
174 
175     /**
176      * Lock to prevent both puts and takes.
177      */
178     void fullyLock() {
179         putLock.lock();
180         takeLock.lock();
181     }
182 
183     /**
184      * Unlock to allow both puts and takes.
185      */
186     void fullyUnlock() {
187         takeLock.unlock();
188         putLock.unlock();
189     }
190 
191     /**
192      * Tells whether both locks are held by current thread.
193      */
194     boolean isFullyLocked() {
195         return (putLock.isHeldByCurrentThread() &&
196                 takeLock.isHeldByCurrentThread());
197     }
198 
199 
200     /**
201      * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
202      * {@link Integer#MAX_VALUE}.
203      */
204     public LinkedBlockingQueue() {
205         this(Integer.MAX_VALUE);
206     }
207 
208     /**
209      * Creates a <tt>LinkedBlockingQueue</tt> with the given (fixed) capacity.
210      *
211      * @param capacity the capacity of this queue
212      * @throws IllegalArgumentException if <tt>capacity</tt> is not greater
213      *         than zero
214      */
215     public LinkedBlockingQueue(int capacity) {
216         if (capacity <= 0) throw new IllegalArgumentException();
217         this.capacity = capacity;
218         last = head = new Node<E>(null);
219     }
220 
221     /**
222      * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
223      * {@link Integer#MAX_VALUE}, initially containing the elements of the
224      * given collection,
225      * added in traversal order of the collection's iterator.
226      *
227      * @param c the collection of elements to initially contain
228      * @throws NullPointerException if the specified collection or any
229      *         of its elements are null
230      */
231     public LinkedBlockingQueue(Collection<? extends E> c) {
232         this(Integer.MAX_VALUE);
233         final ReentrantLock putLock = this.putLock;
234         putLock.lock(); // Never contended, but necessary for visibility
235         try {
236             int n = 0;
237             for (E e : c) {
238                 if (e == null)
239                     throw new NullPointerException();
240                 if (n == capacity)
241                     throw new IllegalStateException("Queue full");
242                 enqueue(e);
243                 ++n;
244             }
245             count.set(n);
246         } finally {
247             putLock.unlock();
248         }
249     }
250 
251 
252     // this doc comment is overridden to remove the reference to collections
253     // greater in size than Integer.MAX_VALUE
254     /**
255      * Returns the number of elements in this queue.
256      *
257      * @return the number of elements in this queue
258      */
259     public int size() {
260         return count.get();
261     }
262 
263     // this doc comment is a modified copy of the inherited doc comment,
264     // without the reference to unlimited queues.
265     /**
266      * Returns the number of additional elements that this queue can ideally
267      * (in the absence of memory or resource constraints) accept without
268      * blocking. This is always equal to the initial capacity of this queue
269      * less the current <tt>size</tt> of this queue.
270      *
271      * <p>Note that you <em>cannot</em> always tell if an attempt to insert
272      * an element will succeed by inspecting <tt>remainingCapacity</tt>
273      * because it may be the case that another thread is about to
274      * insert or remove an element.
275      */
276     public int remainingCapacity() {
277         return capacity - count.get();
278     }
279 
280     /**
281      * Inserts the specified element at the tail of this queue, waiting if
282      * necessary for space to become available.
283      *
284      * @throws InterruptedException {@inheritDoc}
285      * @throws NullPointerException {@inheritDoc}
286      */
287     public void put(E e) throws InterruptedException {
288         if (e == null) throw new NullPointerException();
289         // Note: convention in all put/take/etc is to preset local var
290         // holding count negative to indicate failure unless set.
291         int c = -1;
292         final ReentrantLock putLock = this.putLock;
293         final AtomicInteger count = this.count;
294         putLock.lockInterruptibly();
295         try {
296             /*
297              * Note that count is used in wait guard even though it is
298              * not protected by lock. This works because count can
299              * only decrease at this point (all other puts are shut
300              * out by lock), and we (or some other waiting put) are
301              * signalled if it ever changes from
302              * capacity. Similarly for all other uses of count in
303              * other wait guards.
304              */
305             while (count.get() == capacity) { 
306                     notFull.await();
307             }
308             enqueue(e);
309             c = count.getAndIncrement();
310             if (c + 1 < capacity)
311                 notFull.signal();
312         } finally {
313             putLock.unlock();
314         }
315         if (c == 0)
316             signalNotEmpty();
317     }
318 
319     /**
320      * Inserts the specified element at the tail of this queue, waiting if
321      * necessary up to the specified wait time for space to become available.
322      *
323      * @return <tt>true</tt> if successful, or <tt>false</tt> if
324      *         the specified waiting time elapses before space is available.
325      * @throws InterruptedException {@inheritDoc}
326      * @throws NullPointerException {@inheritDoc}
327      */
328     public boolean offer(E e, long timeout, TimeUnit unit)
329         throws InterruptedException {
330 
331         if (e == null) throw new NullPointerException();
332         long nanos = unit.toNanos(timeout);
333         int c = -1;
334         final ReentrantLock putLock = this.putLock;
335         final AtomicInteger count = this.count;
336         putLock.lockInterruptibly();
337         try {
338             while (count.get() == capacity) {
339 
340                 if (nanos <= 0)
341                     return false;
342                 nanos = notFull.awaitNanos(nanos);
343             }
344             enqueue(e);
345             c = count.getAndIncrement();
346            if (c + 1 < capacity)
347                notFull.signal();
348         } finally {
349             putLock.unlock();
350         }
351         if (c == 0)
352             signalNotEmpty();
353         return true;
354     }
355 
356     /**
357      * Inserts the specified element at the tail of this queue if it is
358      * possible to do so immediately without exceeding the queue's capacity,
359      * returning <tt>true</tt> upon success and <tt>false</tt> if this queue
360      * is full.
361      * When using a capacity-restricted queue, this method is generally
362      * preferable to method {@link BlockingQueue#add add}, which can fail to
363      * insert an element only by throwing an exception.
364      *
365      * @throws NullPointerException if the specified element is null
366      */
367     public boolean offer(E e) {
368         if (e == null) throw new NullPointerException();
369         final AtomicInteger count = this.count;
370         if (count.get() == capacity)
371             return false;
372         int c = -1;
373         final ReentrantLock putLock = this.putLock;
374         putLock.lock();
375         try {
376             if (count.get() < capacity) {
377                 enqueue(e);
378                 c = count.getAndIncrement();
379                 if (c + 1 < capacity)
380                     notFull.signal();
381             }
382         } finally {
383             putLock.unlock();
384         }
385         if (c == 0)
386             signalNotEmpty();
387         return c >= 0;
388     }
389 
390 
391     public E take() throws InterruptedException {
392         E x;
393         int c = -1;
394         final AtomicInteger count = this.count;
395         final ReentrantLock takeLock = this.takeLock;
396         takeLock.lockInterruptibly();
397         try {
398                 while (count.get() == 0) {
399                     notEmpty.await();
400                 }
401             x = dequeue();
402             c = count.getAndDecrement();
403             if (c > 1)
404                 notEmpty.signal();
405         } finally {
406             takeLock.unlock();
407         }
408         if (c == capacity)
409             signalNotFull();
410         return x;
411     }
412 
413     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
414         E x = null;
415         int c = -1;
416         long nanos = unit.toNanos(timeout);
417         final AtomicInteger count = this.count;
418         final ReentrantLock takeLock = this.takeLock;
419         takeLock.lockInterruptibly();
420         try {
421                 while (count.get() == 0) { 
422                   if (nanos <= 0)
423                     return null;
424                   nanos = notEmpty.awaitNanos(nanos);
425                 }
426                 x = dequeue();
427                 c = count.getAndDecrement();
428                 if (c > 1)
429                     notEmpty.signal();
430         } finally {
431             takeLock.unlock();
432         }
433         if (c == capacity)
434             signalNotFull();
435         return x;
436     }
437 
438     public E poll() {
439         final AtomicInteger count = this.count;
440         if (count.get() == 0)
441             return null;
442         E x = null;
443         int c = -1;
444         final ReentrantLock takeLock = this.takeLock;
445         takeLock.lock();
446         try {
447             if (count.get() > 0) {
448                 x = dequeue();
449                 c = count.getAndDecrement();
450                 if (c > 1)
451                     notEmpty.signal();
452             }
453         } finally {
454             takeLock.unlock();
455         }
456         if (c == capacity)
457             signalNotFull();
458         return x;
459     }
460 
461 
462     public E peek() {
463         if (count.get() == 0)
464             return null;
465         final ReentrantLock takeLock = this.takeLock;
466         takeLock.lock();
467         try {
468             Node<E> first = head.next;
469             if (first == null)
470                 return null;
471             else
472                 return first.item;
473         } finally {
474             takeLock.unlock();
475         }
476     }
477 
478     /*
479      * Unlinks interior Node p with predecessor trail.
480      */
481     void unlink(Node<E> p, Node<E> trail) {
482         // assert isFullyLocked();
483         // p.next is not changed, to allow iterators that are
484         // traversing p to maintain their weak-consistency guarantee.
485         p.item = null;
486         trail.next = p.next;
487         if (last == p)
488             last = trail;
489         if (count.getAndDecrement() == capacity)
490             notFull.signal();
491     }
492 
493     /**
494      * Removes a single instance of the specified element from this queue,
495      * if it is present.  More formally, removes an element <tt>e</tt> such
496      * that <tt>o.equals(e)</tt>, if this queue contains one or more such
497      * elements.
498      * Returns <tt>true</tt> if this queue contained the specified element
499      * (or equivalently, if this queue changed as a result of the call).
500      *
501      * @param o element to be removed from this queue, if present
502      * @return <tt>true</tt> if this queue changed as a result of the call
503      */
504     public boolean remove(Object o) {
505         if (o == null) return false;
506         fullyLock();
507         try {
508             for (Node<E> trail = head, p = trail.next;
509                  p != null;
510                  trail = p, p = p.next) {
511                 if (o.equals(p.item)) {
512                     unlink(p, trail);
513                     return true;
514                 }
515             }
516             return false;
517         } finally {
518             fullyUnlock();
519         }
520     }
521 
522     /**
523      * Returns an array containing all of the elements in this queue, in
524      * proper sequence.
525      *
526      * <p>The returned array will be "safe" in that no references to it are
527      * maintained by this queue.  (In other words, this method must allocate
528      * a new array).  The caller is thus free to modify the returned array.
529      *
530      * <p>This method acts as bridge between array-based and collection-based
531      * APIs.
532      *
533      * @return an array containing all of the elements in this queue
534      */
535     public Object[] toArray() {
536         fullyLock();
537         try {
538             int size = count.get();
539             Object[] a = new Object[size];
540             int k = 0;
541             for (Node<E> p = head.next; p != null; p = p.next)
542                 a[k++] = p.item;
543             return a;
544         } finally {
545             fullyUnlock();
546         }
547     }
548 
549     /**
550      * Returns an array containing all of the elements in this queue, in
551      * proper sequence; the runtime type of the returned array is that of
552      * the specified array.  If the queue fits in the specified array, it
553      * is returned therein.  Otherwise, a new array is allocated with the
554      * runtime type of the specified array and the size of this queue.
555      *
556      * <p>If this queue fits in the specified array with room to spare
557      * (i.e., the array has more elements than this queue), the element in
558      * the array immediately following the end of the queue is set to
559      * <tt>null</tt>.
560      *
561      * <p>Like the {@link #toArray()} method, this method acts as bridge between
562      * array-based and collection-based APIs.  Further, this method allows
563      * precise control over the runtime type of the output array, and may,
564      * under certain circumstances, be used to save allocation costs.
565      *
566      * <p>Suppose <tt>x</tt> is a queue known to contain only strings.
567      * The following code can be used to dump the queue into a newly
568      * allocated array of <tt>String</tt>:
569      *
570      * <pre>
571      *     String[] y = x.toArray(new String[0]);</pre>
572      *
573      * Note that <tt>toArray(new Object[0])</tt> is identical in function to
574      * <tt>toArray()</tt>.
575      *
576      * @param a the array into which the elements of the queue are to
577      *          be stored, if it is big enough; otherwise, a new array of the
578      *          same runtime type is allocated for this purpose
579      * @return an array containing all of the elements in this queue
580      * @throws ArrayStoreException if the runtime type of the specified array
581      *         is not a supertype of the runtime type of every element in
582      *         this queue
583      * @throws NullPointerException if the specified array is null
584      */
585     // @SuppressWarnings("unchecked")
586     public <T> T[] toArray(T[] a) {
587         fullyLock();
588         try {
589             int size = count.get();
590             if (a.length < size)
591                 a = (T[])java.lang.reflect.Array.newInstance
592                     (a.getClass().getComponentType(), size);
593 
594             int k = 0;
595             for (Node<E> p = head.next; p != null; p = p.next)
596                 a[k++] = (T)p.item;
597             if (a.length > k)
598                 a[k] = null;
599             return a;
600         } finally {
601             fullyUnlock();
602         }
603     }
604 
605     public String toString() {
606         fullyLock();
607         try {
608             return super.toString();
609         } finally {
610             fullyUnlock();
611         }
612     }
613 
614     /**
615      * Atomically removes all of the elements from this queue.
616      * The queue will be empty after this call returns.
617      */
618     public void clear() {
619         fullyLock();
620         try {
621             for (Node<E> p, h = head; (p = h.next) != null; h = p) {
622                 h.next = h;
623                 p.item = null;
624             }
625             head = last;
626             // assert head.item == null && head.next == null;
627             if (count.getAndSet(0) == capacity)
628                 notFull.signal();
629         } finally {
630             fullyUnlock();
631         }
632     }
633 
634     /**
635      * @throws UnsupportedOperationException {@inheritDoc}
636      * @throws ClassCastException            {@inheritDoc}
637      * @throws NullPointerException          {@inheritDoc}
638      * @throws IllegalArgumentException      {@inheritDoc}
639      */
640     public int drainTo(Collection<? super E> c) {
641         return drainTo(c, Integer.MAX_VALUE);
642 
643     }
644 
645     /**
646      * @throws UnsupportedOperationException {@inheritDoc}
647      * @throws ClassCastException            {@inheritDoc}
648      * @throws NullPointerException          {@inheritDoc}
649      * @throws IllegalArgumentException      {@inheritDoc}
650      */
651     public int drainTo(Collection<? super E> c, int maxElements) {
652         if (c == null)
653             throw new NullPointerException();
654         if (c == this)
655             throw new IllegalArgumentException();
656         boolean signalNotFull = false;
657         final ReentrantLock takeLock = this.takeLock;
658         takeLock.lock();
659         try {
660             int n = Math.min(maxElements, count.get());
661             Node<E> h = head;
662             int i = 0;
663             try {
664                 while (i < n) {
665                     Node<E> p = h.next;
666                     c.add(p.item);
667                     p.item = null;
668                     h.next = h;
669                     h = p;
670                     ++i;
671                 }
672                 return n;
673             } finally {
674                 // Restore invariants even if c.add() threw
675                 if (i > 0) {
676                     // assert h.item == null;
677                     head = h;
678                     signalNotFull = (count.getAndAdd(-i) == capacity);
679                 }
680             }
681         } finally {
682             takeLock.unlock();
683             if (signalNotFull)
684                  signalNotFull();
685         }
686     }
687 
688     /**
689      * Returns an iterator over the elements in this queue in proper sequence.
690      * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
691      * will never throw {@link ConcurrentModificationException},
692      * and guarantees to traverse elements as they existed upon
693      * construction of the iterator, and may (but is not guaranteed to)
694      * reflect any modifications subsequent to construction.
695      *
696      * @return an iterator over the elements in this queue in proper sequence
697      */
698     public Iterator<E> iterator() {
699       return new Itr();
700     }
701 
702     private class Itr implements Iterator<E> {
703         /*
704          * Basic weakly-consistent iterator.  At all times hold the next
705          * item to hand out so that if hasNext() reports true, we will
706          * still have it to return even if lost race with a take etc.
707          */
708         private Node<E> current;
709         private Node<E> lastRet;
710         private E currentElement;
711 
712         Itr() {
713             fullyLock();
714             try {
715                 current = head.next;
716                 if (current != null)
717                     currentElement = current.item;
718             } finally {
719                fullyUnlock();
720             }
721         }
722 
723         public boolean hasNext() {
724             return current != null;
725         }
726 
727         /**
728          * Returns the next live successor of p, or null if no such.
729          *  
730          * Unlike other traversal methods, iterators need to handle both:
731          * - dequeued nodes (p.next == p)
732          * -  (possibly multiple) interior removed nodes (p.item == null)
733          */
734         private Node<E> nextNode(Node<E> p) {
735             for (; ;) {   
736                 Node s = p.next;
737                 if (s == p)
738                    return head.next;
739                 if (s == null || s.item != null)
740                    return s;
741                 p = s;
742             }
743         }
744 
745         public E next() {
746             fullyLock();
747             try {
748                 if (current == null)
749                     throw new NoSuchElementException();
750                 E x = currentElement;
751                 lastRet = current;
752                 current = nextNode(current);
753                 currentElement = (current == null) ? null : current.item;
754                 return x;
755             } finally {
756                fullyUnlock();
757             }
758         }
759 
760         public void remove() {
761             if (lastRet == null)
762                 throw new IllegalStateException();
763             fullyLock();
764             try {
765                 Node<E> node = lastRet;
766                 lastRet = null;
767                 for (Node<E> trail = head, p = trail.next;
768                      p != null;
769                      trail = p, p = p.next) {
770                      if (p == node) {
771                          unlink(p, trail);
772                          break;
773                       }
774                }
775             } finally {
776                 fullyUnlock();
777             }
778         }
779     }
780 
781     /**
782      * Save the state to a stream (that is, serialize it).
783      *
784      * @serialData The capacity is emitted (int), followed by all of
785      * its elements (each an <tt>Object</tt>) in the proper order,
786      * followed by a null
787      * @param s the stream
788      */
789     private void writeObject(java.io.ObjectOutputStream s)
790         throws java.io.IOException {
791 
792         fullyLock();
793         try {
794             // Write out any hidden stuff, plus capacity
795             s.defaultWriteObject();
796 
797             // Write out all elements in the proper order.
798             for (Node<E> p = head.next; p != null; p = p.next)
799                 s.writeObject(p.item);
800 
801             // Use trailing null as sentinel
802             s.writeObject(null);
803         } finally {
804             fullyUnlock();
805         }
806     }
807 
808     /**
809      * Reconstitute this queue instance from a stream (that is,
810      * deserialize it).
811      * @param s the stream
812      */
813     private void readObject(java.io.ObjectInputStream s)
814         throws java.io.IOException, ClassNotFoundException {
815         // Read in capacity, and any hidden stuff
816         s.defaultReadObject();
817 
818         count.set(0);
819         last = head = new Node<E>(null);
820 
821         // Read in all elements and place in queue
822         for (;;) {
823             // @SuppressWarnings("unchecked")
824             E item = (E)s.readObject();
825             if (item == null)
826                 break;
827             add(item);
828         }
829     }
830 }
831