1   /*
2    * %W% %E%
3    *
4    * Copyright (c) 2006, Oracle and/or its affiliates. All rights reserved.
5    * ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
6    */
7   
8   package java.util.concurrent;
9   import java.util.*;
10  
11  /**
12   * Provides default implementations of {@link ExecutorService}
13   * execution methods. This class implements the <tt>submit</tt>,
14   * <tt>invokeAny</tt> and <tt>invokeAll</tt> methods using a
15   * {@link RunnableFuture} returned by <tt>newTaskFor</tt>, which defaults
16   * to the {@link FutureTask} class provided in this package.  For example,
17   * the implementation of <tt>submit(Runnable)</tt> creates an
18   * associated <tt>RunnableFuture</tt> that is executed and
19   * returned. Subclasses may override the <tt>newTaskFor</tt> methods
20   * to return <tt>RunnableFuture</tt> implementations other than
21   * <tt>FutureTask</tt>.
22   *
23   * <p> <b>Extension example</b>. Here is a sketch of a class
24   * that customizes {@link ThreadPoolExecutor} to use
25   * a <tt>CustomTask</tt> class instead of the default <tt>FutureTask</tt>:
26   * <pre>
27   * public class CustomThreadPoolExecutor extends ThreadPoolExecutor {
28   *
29   *   static class CustomTask&lt;V&gt; implements RunnableFuture&lt;V&gt; {...}
30   *
31   *   protected &lt;V&gt; RunnableFuture&lt;V&gt; newTaskFor(Callable&lt;V&gt; c) {
32   *       return new CustomTask&lt;V&gt;(c);
33   *   }
34   *   protected &lt;V&gt; RunnableFuture&lt;V&gt; newTaskFor(Runnable r, V v) {
35   *       return new CustomTask&lt;V&gt;(r, v);
36   *   }
37   *   // ... add constructors, etc.
38   * }
39   * </pre>
40   * @since 1.5
41   * @author Doug Lea
42   */
43  public abstract class AbstractExecutorService implements ExecutorService {
44  
45      /**
46       * Returns a <tt>RunnableFuture</tt> for the given runnable and default
47       * value.
48       *
49       * @param runnable the runnable task being wrapped
50       * @param value the default value for the returned future
51       * @return a <tt>RunnableFuture</tt> which when run will run the
52       * underlying runnable and which, as a <tt>Future</tt>, will yield
53       * the given value as its result and provide for cancellation of
54       * the underlying task.
55       * @since 1.6
56       */
57      protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
58          return new FutureTask<T>(runnable, value);
59      }
60  
61      /**
62       * Returns a <tt>RunnableFuture</tt> for the given callable task.
63       *
64       * @param callable the callable task being wrapped
65       * @return a <tt>RunnableFuture</tt> which when run will call the
66       * underlying callable and which, as a <tt>Future</tt>, will yield
67       * the callable's result as its result and provide for
68       * cancellation of the underlying task.
69       * @since 1.6
70       */
71      protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
72          return new FutureTask<T>(callable);
73      }
74  
75      public Future<?> submit(Runnable task) {
76          if (task == null) throw new NullPointerException();
77          RunnableFuture<Object> ftask = newTaskFor(task, null);
78          execute(ftask);
79          return ftask;
80      }
81  
82      public <T> Future<T> submit(Runnable task, T result) {
83          if (task == null) throw new NullPointerException();
84          RunnableFuture<T> ftask = newTaskFor(task, result);
85          execute(ftask);
86          return ftask;
87      }
88  
89      public <T> Future<T> submit(Callable<T> task) {
90          if (task == null) throw new NullPointerException();
91          RunnableFuture<T> ftask = newTaskFor(task);
92          execute(ftask);
93          return ftask;
94      }
95  
96      /**
97       * the main mechanics of invokeAny.
98       */
99      private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
100                             boolean timed, long nanos)
101         throws InterruptedException, ExecutionException, TimeoutException {
102         if (tasks == null)
103             throw new NullPointerException();
104         int ntasks = tasks.size();
105         if (ntasks == 0)
106             throw new IllegalArgumentException();
107         List<Future<T>> futures= new ArrayList<Future<T>>(ntasks);
108         ExecutorCompletionService<T> ecs =
109             new ExecutorCompletionService<T>(this);
110 
111         // For efficiency, especially in executors with limited
112         // parallelism, check to see if previously submitted tasks are
113         // done before submitting more of them. This interleaving
114         // plus the exception mechanics account for messiness of main
115         // loop.
116 
117         try {
118             // Record exceptions so that if we fail to obtain any
119             // result, we can throw the last exception we got.
120             ExecutionException ee = null;
121             long lastTime = (timed)? System.nanoTime() : 0;
122             Iterator<? extends Callable<T>> it = tasks.iterator();
123 
124             // Start one task for sure; the rest incrementally
125             futures.add(ecs.submit(it.next()));
126             --ntasks;
127             int active = 1;
128 
129             for (;;) {
130                 Future<T> f = ecs.poll();
131                 if (f == null) {
132                     if (ntasks > 0) {
133                         --ntasks;
134                         futures.add(ecs.submit(it.next()));
135                         ++active;
136                     }
137                     else if (active == 0)
138                         break;
139                     else if (timed) {
140                         f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
141                         if (f == null)
142                             throw new TimeoutException();
143                         long now = System.nanoTime();
144                         nanos -= now - lastTime;
145                         lastTime = now;
146                     }
147                     else
148                         f = ecs.take();
149                 }
150                 if (f != null) {
151                     --active;
152                     try {
153                         return f.get();
154                     } catch (InterruptedException ie) {
155                         throw ie;
156                     } catch (ExecutionException eex) {
157                         ee = eex;
158                     } catch (RuntimeException rex) {
159                         ee = new ExecutionException(rex);
160                     }
161                 }
162             }
163 
164             if (ee == null)
165                 ee = new ExecutionException();
166             throw ee;
167 
168         } finally {
169             for (Future<T> f : futures)
170                 f.cancel(true);
171         }
172     }
173 
174     public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
175         throws InterruptedException, ExecutionException {
176         try {
177             return doInvokeAny(tasks, false, 0);
178         } catch (TimeoutException cannotHappen) {
179             assert false;
180             return null;
181         }
182     }
183 
184     public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
185                            long timeout, TimeUnit unit)
186         throws InterruptedException, ExecutionException, TimeoutException {
187         return doInvokeAny(tasks, true, unit.toNanos(timeout));
188     }
189 
190     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
191         throws InterruptedException {
192         if (tasks == null)
193             throw new NullPointerException();
194         List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
195         boolean done = false;
196         try {
197             for (Callable<T> t : tasks) {
198                 RunnableFuture<T> f = newTaskFor(t);
199                 futures.add(f);
200                 execute(f);
201             }
202             for (Future<T> f : futures) {
203                 if (!f.isDone()) {
204                     try {
205                         f.get();
206                     } catch (CancellationException ignore) {
207                     } catch (ExecutionException ignore) {
208                     }
209                 }
210             }
211             done = true;
212             return futures;
213         } finally {
214             if (!done)
215                 for (Future<T> f : futures)
216                     f.cancel(true);
217         }
218     }
219 
220     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
221                                          long timeout, TimeUnit unit)
222         throws InterruptedException {
223         if (tasks == null || unit == null)
224             throw new NullPointerException();
225         long nanos = unit.toNanos(timeout);
226         List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
227         boolean done = false;
228         try {
229             for (Callable<T> t : tasks)
230                 futures.add(newTaskFor(t));
231 
232             long lastTime = System.nanoTime();
233 
234             // Interleave time checks and calls to execute in case
235             // executor doesn't have any/much parallelism.
236             Iterator<Future<T>> it = futures.iterator();
237             while (it.hasNext()) {
238                 execute((Runnable)(it.next()));
239                 long now = System.nanoTime();
240                 nanos -= now - lastTime;
241                 lastTime = now;
242                 if (nanos <= 0)
243                     return futures;
244             }
245 
246             for (Future<T> f : futures) {
247                 if (!f.isDone()) {
248                     if (nanos <= 0)
249                         return futures;
250                     try {
251                         f.get(nanos, TimeUnit.NANOSECONDS);
252                     } catch (CancellationException ignore) {
253                     } catch (ExecutionException ignore) {
254                     } catch (TimeoutException toe) {
255                         return futures;
256                     }
257                     long now = System.nanoTime();
258                     nanos -= now - lastTime;
259                     lastTime = now;
260                 }
261             }
262             done = true;
263             return futures;
264         } finally {
265             if (!done)
266                 for (Future<T> f : futures)
267                     f.cancel(true);
268         }
269     }
270 
271 }
272