1   /*
2    * %W% %E%
3    *
4    * Copyright (c) 2006, Oracle and/or its affiliates. All rights reserved.
5    * ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
6    */
7   
8   package java.util.concurrent;
9   
10  /**
11   * A {@link CompletionService} that uses a supplied {@link Executor}
12   * to execute tasks.  This class arranges that submitted tasks are,
13   * upon completion, placed on a queue accessible using <tt>take</tt>.
14   * The class is lightweight enough to be suitable for transient use
15   * when processing groups of tasks.
16   *
17   * <p>
18   *
19   * <b>Usage Examples.</b>
20   *
21   * Suppose you have a set of solvers for a certain problem, each
22   * returning a value of some type <tt>Result</tt>, and would like to
23   * run them concurrently, processing the results of each of them that
24   * return a non-null value, in some method <tt>use(Result r)</tt>. You
25   * could write this as:
26   *
27   * <pre>
28   *   void solve(Executor e,
29   *              Collection&lt;Callable&lt;Result&gt;&gt; solvers)
30   *     throws InterruptedException, ExecutionException {
31   *       CompletionService&lt;Result&gt; ecs
32   *           = new ExecutorCompletionService&lt;Result&gt;(e);
33   *       for (Callable&lt;Result&gt; s : solvers)
34   *           ecs.submit(s);
35   *       int n = solvers.size();
36   *       for (int i = 0; i &lt; n; ++i) {
37   *           Result r = ecs.take().get();
38   *           if (r != null)
39   *               use(r);
40   *       }
41   *   }
42   * </pre>
43   *
44   * Suppose instead that you would like to use the first non-null result
45   * of the set of tasks, ignoring any that encounter exceptions,
46   * and cancelling all other tasks when the first one is ready:
47   *
48   * <pre>
49   *   void solve(Executor e,
50   *              Collection&lt;Callable&lt;Result&gt;&gt; solvers)
51   *     throws InterruptedException {
52   *       CompletionService&lt;Result&gt; ecs
53   *           = new ExecutorCompletionService&lt;Result&gt;(e);
54   *       int n = solvers.size();
55   *       List&lt;Future&lt;Result&gt;&gt; futures
56   *           = new ArrayList&lt;Future&lt;Result&gt;&gt;(n);
57   *       Result result = null;
58   *       try {
59   *           for (Callable&lt;Result&gt; s : solvers)
60   *               futures.add(ecs.submit(s));
61   *           for (int i = 0; i &lt; n; ++i) {
62   *               try {
63   *                   Result r = ecs.take().get();
64   *                   if (r != null) {
65   *                       result = r;
66   *                       break;
67   *                   }
68   *               } catch (ExecutionException ignore) {}
69   *           }
70   *       }
71   *       finally {
72   *           for (Future&lt;Result&gt; f : futures)
73   *               f.cancel(true);
74   *       }
75   *
76   *       if (result != null)
77   *           use(result);
78   *   }
79   * </pre>
80   */
81  public class ExecutorCompletionService<V> implements CompletionService<V> {
82      private final Executor executor;
83      private final AbstractExecutorService aes;
84      private final BlockingQueue<Future<V>> completionQueue;
85  
86      /**
87       * FutureTask extension to enqueue upon completion
88       */
89      private class QueueingFuture extends FutureTask<Void> {
90          QueueingFuture(RunnableFuture<V> task) {
91              super(task, null);
92              this.task = task;
93          }
94          protected void done() { completionQueue.add(task); }
95          private final Future<V> task;
96      }
97  
98      private RunnableFuture<V> newTaskFor(Callable<V> task) {
99          if (aes == null)
100             return new FutureTask<V>(task);
101         else
102             return aes.newTaskFor(task);
103     }
104 
105     private RunnableFuture<V> newTaskFor(Runnable task, V result) {
106         if (aes == null)
107             return new FutureTask<V>(task, result);
108         else
109             return aes.newTaskFor(task, result);
110     }
111 
112     /**
113      * Creates an ExecutorCompletionService using the supplied
114      * executor for base task execution and a
115      * {@link LinkedBlockingQueue} as a completion queue.
116      *
117      * @param executor the executor to use
118      * @throws NullPointerException if executor is <tt>null</tt>
119      */
120     public ExecutorCompletionService(Executor executor) {
121         if (executor == null)
122             throw new NullPointerException();
123         this.executor = executor;
124         this.aes = (executor instanceof AbstractExecutorService) ?
125             (AbstractExecutorService) executor : null;
126         this.completionQueue = new LinkedBlockingQueue<Future<V>>();
127     }
128 
129     /**
130      * Creates an ExecutorCompletionService using the supplied
131      * executor for base task execution and the supplied queue as its
132      * completion queue.
133      *
134      * @param executor the executor to use
135      * @param completionQueue the queue to use as the completion queue
136      * normally one dedicated for use by this service
137      * @throws NullPointerException if executor or completionQueue are <tt>null</tt>
138      */
139     public ExecutorCompletionService(Executor executor,
140                                      BlockingQueue<Future<V>> completionQueue) {
141         if (executor == null || completionQueue == null)
142             throw new NullPointerException();
143         this.executor = executor;
144         this.aes = (executor instanceof AbstractExecutorService) ?
145             (AbstractExecutorService) executor : null;
146         this.completionQueue = completionQueue;
147     }
148 
149     public Future<V> submit(Callable<V> task) {
150         if (task == null) throw new NullPointerException();
151         RunnableFuture<V> f = newTaskFor(task);
152         executor.execute(new QueueingFuture(f));
153         return f;
154     }
155 
156     public Future<V> submit(Runnable task, V result) {
157         if (task == null) throw new NullPointerException();
158         RunnableFuture<V> f = newTaskFor(task, result);
159         executor.execute(new QueueingFuture(f));
160         return f;
161     }
162 
163     public Future<V> take() throws InterruptedException {
164         return completionQueue.take();
165     }
166 
167     public Future<V> poll() {
168         return completionQueue.poll();
169     }
170 
171     public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException {
172         return completionQueue.poll(timeout, unit);
173     }
174 
175 }
176