1   /*
2    * %W% %E%
3    *
4    * Copyright (c) 2006, 2010, Oracle and/or its affiliates. All rights reserved.
5    * ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
6    */
7   
8   package java.util.concurrent;
9   import java.util.concurrent.atomic.*;
10  import java.util.*;
11  
12  /**
13   * A {@link ThreadPoolExecutor} that can additionally schedule
14   * commands to run after a given delay, or to execute
15   * periodically. This class is preferable to {@link java.util.Timer}
16   * when multiple worker threads are needed, or when the additional
17   * flexibility or capabilities of {@link ThreadPoolExecutor} (which
18   * this class extends) are required.
19   *
20   * <p> Delayed tasks execute no sooner than they are enabled, but
21   * without any real-time guarantees about when, after they are
22   * enabled, they will commence. Tasks scheduled for exactly the same
23   * execution time are enabled in first-in-first-out (FIFO) order of
24   * submission.
25   *
26   * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
27   * of the inherited tuning methods are not useful for it. In
28   * particular, because it acts as a fixed-sized pool using
29   * <tt>corePoolSize</tt> threads and an unbounded queue, adjustments
30   * to <tt>maximumPoolSize</tt> have no useful effect.
31   *
32   * <p><b>Extension notes:</b> This class overrides {@link
33   * AbstractExecutorService} <tt>submit</tt> methods to generate
34   * internal objects to control per-task delays and scheduling. To
35   * preserve functionality, any further overrides of these methods in
36   * subclasses must invoke superclass versions, which effectively
37   * disables additional task customization. However, this class
38   * provides alternative protected extension method
39   * <tt>decorateTask</tt> (one version each for <tt>Runnable</tt> and
40   * <tt>Callable</tt>) that can be used to customize the concrete task
41   * types used to execute commands entered via <tt>execute</tt>,
42   * <tt>submit</tt>, <tt>schedule</tt>, <tt>scheduleAtFixedRate</tt>,
43   * and <tt>scheduleWithFixedDelay</tt>.  By default, a
44   * <tt>ScheduledThreadPoolExecutor</tt> uses a task type extending
45   * {@link FutureTask}. However, this may be modified or replaced using
46   * subclasses of the form:
47   *
48   * <pre>
49   * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
50   *
51   *   static class CustomTask&lt;V&gt; implements RunnableScheduledFuture&lt;V&gt; { ... }
52   *
53   *   protected &lt;V&gt; RunnableScheduledFuture&lt;V&gt; decorateTask(
54   *                Runnable r, RunnableScheduledFuture&lt;V&gt; task) {
55   *       return new CustomTask&lt;V&gt;(r, task);
56   *   }
57   *
58   *   protected &lt;V&gt; RunnableScheduledFuture&lt;V&gt; decorateTask(
59   *                Callable&lt;V&gt; c, RunnableScheduledFuture&lt;V&gt; task) {
60   *       return new CustomTask&lt;V&gt;(c, task);
61   *   }
62   *   // ... add constructors, etc.
63   * }
64   * </pre>
65   * @since 1.5
66   * @author Doug Lea
67   */
68  public class ScheduledThreadPoolExecutor
69          extends ThreadPoolExecutor
70          implements ScheduledExecutorService {
71  
72      /**
73       * False if should cancel/suppress periodic tasks on shutdown.
74       */
75      private volatile boolean continueExistingPeriodicTasksAfterShutdown;
76  
77      /**
78       * False if should cancel non-periodic tasks on shutdown.
79       */
80      private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
81  
82      /**
83       * Sequence number to break scheduling ties, and in turn to
84       * guarantee FIFO order among tied entries.
85       */
86      private static final AtomicLong sequencer = new AtomicLong(0);
87  
88      /** Base of nanosecond timings, to avoid wrapping */
89      private static final long NANO_ORIGIN = System.nanoTime();
90  
91      /**
92       * Returns nanosecond time offset by origin
93       */
94      final long now() {
95          return System.nanoTime() - NANO_ORIGIN;
96      }
97  
98      private class ScheduledFutureTask<V>
99              extends FutureTask<V> implements RunnableScheduledFuture<V> {
100 
101         /** Sequence number to break ties FIFO */
102         private final long sequenceNumber;
103         /** The time the task is enabled to execute in nanoTime units */
104         private long time;
105         /**
106          * Period in nanoseconds for repeating tasks.  A positive
107          * value indicates fixed-rate execution.  A negative value
108          * indicates fixed-delay execution.  A value of 0 indicates a
109          * non-repeating task.
110          */
111         private final long period;
112 
113         /**
114          * Creates a one-shot action with given nanoTime-based trigger time.
115          */
116         ScheduledFutureTask(Runnable r, V result, long ns) {
117             super(r, result);
118             this.time = ns;
119             this.period = 0;
120             this.sequenceNumber = sequencer.getAndIncrement();
121         }
122 
123         /**
124          * Creates a periodic action with given nano time and period.
125          */
126         ScheduledFutureTask(Runnable r, V result, long ns, long period) {
127             super(r, result);
128             this.time = ns;
129             this.period = period;
130             this.sequenceNumber = sequencer.getAndIncrement();
131         }
132 
133         /**
134          * Creates a one-shot action with given nanoTime-based trigger.
135          */
136         ScheduledFutureTask(Callable<V> callable, long ns) {
137             super(callable);
138             this.time = ns;
139             this.period = 0;
140             this.sequenceNumber = sequencer.getAndIncrement();
141         }
142 
143         public long getDelay(TimeUnit unit) {
144             return unit.convert(time - now(), TimeUnit.NANOSECONDS);
145         }
146 
147         public int compareTo(Delayed other) {
148             if (other == this) // compare zero ONLY if same object
149                 return 0;
150             if (other instanceof ScheduledFutureTask) {
151                 ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
152                 long diff = time - x.time;
153                 if (diff < 0)
154                     return -1;
155                 else if (diff > 0)
156                     return 1;
157                 else if (sequenceNumber < x.sequenceNumber)
158                     return -1;
159                 else
160                     return 1;
161             }
162             long d = (getDelay(TimeUnit.NANOSECONDS) -
163                       other.getDelay(TimeUnit.NANOSECONDS));
164             return (d == 0)? 0 : ((d < 0)? -1 : 1);
165         }
166 
167         /**
168          * Returns true if this is a periodic (not a one-shot) action.
169          *
170          * @return true if periodic
171          */
172         public boolean isPeriodic() {
173             return period != 0;
174         }
175 
176         /**
177          * Runs a periodic task.
178          */
179         private void runPeriodic() {
180             boolean ok = ScheduledFutureTask.super.runAndReset();
181             boolean down = isShutdown();
182             // Reschedule if not cancelled and not shutdown or policy allows
183             if (ok && (!down ||
184                        (getContinueExistingPeriodicTasksAfterShutdownPolicy() &&
185                         !isStopped()))) {
186                 long p = period;
187                 if (p > 0)
188                     time += p;
189                 else
190                     time = triggerTime(-p);
191                 ScheduledThreadPoolExecutor.super.getQueue().add(this);
192             }
193             // This might have been the final executed delayed
194             // task.  Wake up threads to check.
195             else if (down)
196                 interruptIdleWorkers();
197         }
198 
199         /**
200          * Overrides FutureTask version so as to reset/requeue if periodic.
201          */
202         public void run() {
203             if (isPeriodic())
204                 runPeriodic();
205             else
206                 ScheduledFutureTask.super.run();
207         }
208     }
209 
210     /**
211      * Specialized variant of ThreadPoolExecutor.execute for delayed tasks.
212      */
213     private void delayedExecute(Runnable command) {
214         if (isShutdown()) {
215             reject(command);
216             return;
217         }
218         // Prestart a thread if necessary. We cannot prestart it
219         // running the task because the task (probably) shouldn't be
220         // run yet, so thread will just idle until delay elapses.
221         if (getPoolSize() < getCorePoolSize())
222             prestartCoreThread();
223 
224         super.getQueue().add(command);
225     }
226 
227     /**
228      * Cancels and clears the queue of all tasks that should not be run
229      * due to shutdown policy.
230      */
231     private void cancelUnwantedTasks() {
232         boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy();
233         boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy();
234         if (!keepDelayed && !keepPeriodic)
235             super.getQueue().clear();
236         else if (keepDelayed || keepPeriodic) {
237             Object[] entries = super.getQueue().toArray();
238             for (int i = 0; i < entries.length; ++i) {
239                 Object e = entries[i];
240                 if (e instanceof RunnableScheduledFuture) {
241                     RunnableScheduledFuture<?> t = (RunnableScheduledFuture<?>)e;
242                     if (t.isPeriodic()? !keepPeriodic : !keepDelayed)
243                         t.cancel(false);
244                 }
245             }
246             entries = null;
247             purge();
248         }
249     }
250 
251     public boolean remove(Runnable task) {
252         if (!(task instanceof RunnableScheduledFuture))
253             return false;
254         return getQueue().remove(task);
255     }
256 
257     /**
258      * Modifies or replaces the task used to execute a runnable.
259      * This method can be used to override the concrete
260      * class used for managing internal tasks.
261      * The default implementation simply returns the given task.
262      *
263      * @param runnable the submitted Runnable
264      * @param task the task created to execute the runnable
265      * @return a task that can execute the runnable
266      * @since 1.6
267      */
268     protected <V> RunnableScheduledFuture<V> decorateTask(
269         Runnable runnable, RunnableScheduledFuture<V> task) {
270         return task;
271     }
272 
273     /**
274      * Modifies or replaces the task used to execute a callable.
275      * This method can be used to override the concrete
276      * class used for managing internal tasks.
277      * The default implementation simply returns the given task.
278      *
279      * @param callable the submitted Callable
280      * @param task the task created to execute the callable
281      * @return a task that can execute the callable
282      * @since 1.6
283      */
284     protected <V> RunnableScheduledFuture<V> decorateTask(
285         Callable<V> callable, RunnableScheduledFuture<V> task) {
286         return task;
287     }
288 
289     /**
290      * Creates a new ScheduledThreadPoolExecutor with the given core
291      * pool size.
292      *
293      * @param corePoolSize the number of threads to keep in the pool,
294      * even if they are idle
295      * @throws IllegalArgumentException if <tt>corePoolSize &lt; 0</tt>
296      */
297     public ScheduledThreadPoolExecutor(int corePoolSize) {
298         super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
299               new DelayedWorkQueue());
300     }
301 
302     /**
303      * Creates a new ScheduledThreadPoolExecutor with the given
304      * initial parameters.
305      *
306      * @param corePoolSize the number of threads to keep in the pool,
307      * even if they are idle
308      * @param threadFactory the factory to use when the executor
309      * creates a new thread
310      * @throws IllegalArgumentException if <tt>corePoolSize &lt; 0</tt>
311      * @throws NullPointerException if threadFactory is null
312      */
313     public ScheduledThreadPoolExecutor(int corePoolSize,
314                              ThreadFactory threadFactory) {
315         super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
316               new DelayedWorkQueue(), threadFactory);
317     }
318 
319     /**
320      * Creates a new ScheduledThreadPoolExecutor with the given
321      * initial parameters.
322      *
323      * @param corePoolSize the number of threads to keep in the pool,
324      * even if they are idle
325      * @param handler the handler to use when execution is blocked
326      * because the thread bounds and queue capacities are reached
327      * @throws IllegalArgumentException if <tt>corePoolSize &lt; 0</tt>
328      * @throws NullPointerException if handler is null
329      */
330     public ScheduledThreadPoolExecutor(int corePoolSize,
331                               RejectedExecutionHandler handler) {
332         super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
333               new DelayedWorkQueue(), handler);
334     }
335 
336     /**
337      * Returns the trigger time of a delayed action.
338      */
339     private long triggerTime(long delay, TimeUnit unit) {
340          return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
341     }
342 
343     /**
344      * Returns the trigger time of a delayed action.
345      */
346     long triggerTime(long delay) {
347          return now() +
348              ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
349     }
350 
351     /**
352      * Constrains the values of all delays in the queue to be within
353      * Long.MAX_VALUE of each other, to avoid overflow in compareTo.
354      * This may occur if a task is eligible to be dequeued, but has
355      * not yet been, while some other task is added with a delay of
356      * Long.MAX_VALUE.
357      */
358     private long overflowFree(long delay) {
359         Delayed head = (Delayed) super.getQueue().peek();
360         if (head != null) {
361             long headDelay = head.getDelay(TimeUnit.NANOSECONDS);
362             if (headDelay < 0 && (delay - headDelay < 0))
363                 delay = Long.MAX_VALUE + headDelay;
364         }
365         return delay;
366     }
367  
368     /**
369      * Creates a new ScheduledThreadPoolExecutor with the given
370      * initial parameters.
371      *
372      * @param corePoolSize the number of threads to keep in the pool,
373      * even if they are idle
374      * @param threadFactory the factory to use when the executor
375      * creates a new thread
376      * @param handler the handler to use when execution is blocked
377      * because the thread bounds and queue capacities are reached.
378      * @throws IllegalArgumentException if <tt>corePoolSize &lt; 0</tt>
379      * @throws NullPointerException if threadFactory or handler is null
380      */
381     public ScheduledThreadPoolExecutor(int corePoolSize,
382                               ThreadFactory threadFactory,
383                               RejectedExecutionHandler handler) {
384         super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
385               new DelayedWorkQueue(), threadFactory, handler);
386     }
387 
388     public ScheduledFuture<?> schedule(Runnable command,
389                                        long delay,
390                                        TimeUnit unit) {
391         if (command == null || unit == null)
392             throw new NullPointerException();
393         RunnableScheduledFuture<?> t = decorateTask(command,
394             new ScheduledFutureTask<Void>(command, null,
395                                           triggerTime(delay, unit)));
396 
397         delayedExecute(t);
398         return t;
399     }
400 
401     public <V> ScheduledFuture<V> schedule(Callable<V> callable,
402                                            long delay,
403                                            TimeUnit unit) {
404         if (callable == null || unit == null)
405             throw new NullPointerException();
406         RunnableScheduledFuture<V> t = decorateTask(callable,
407             new ScheduledFutureTask<V>(callable,
408                        triggerTime(delay, unit)));
409         delayedExecute(t);
410         return t;
411     }
412 
413     public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
414                                                   long initialDelay,
415                                                   long period,
416                                                   TimeUnit unit) {
417         if (command == null || unit == null)
418             throw new NullPointerException();
419         if (period <= 0)
420             throw new IllegalArgumentException();
421         RunnableScheduledFuture<?> t = decorateTask(command,
422             new ScheduledFutureTask<Object>(command,
423                                             null,
424                                             triggerTime(initialDelay, unit),
425                                             unit.toNanos(period)));
426         delayedExecute(t);
427         return t;
428     }
429 
430     public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
431                                                      long initialDelay,
432                                                      long delay,
433                                                      TimeUnit unit) {
434         if (command == null || unit == null)
435             throw new NullPointerException();
436         if (delay <= 0)
437             throw new IllegalArgumentException();
438         RunnableScheduledFuture<?> t = decorateTask(command,
439             new ScheduledFutureTask<Boolean>(command,
440                                              null,
441                                              triggerTime(initialDelay, unit),
442                                              unit.toNanos(-delay)));
443         delayedExecute(t);
444         return t;
445     }
446 
447 
448     /**
449      * Executes command with zero required delay. This has effect
450      * equivalent to <tt>schedule(command, 0, anyUnit)</tt>.  Note
451      * that inspections of the queue and of the list returned by
452      * <tt>shutdownNow</tt> will access the zero-delayed
453      * {@link ScheduledFuture}, not the <tt>command</tt> itself.
454      *
455      * @param command the task to execute
456      * @throws RejectedExecutionException at discretion of
457      * <tt>RejectedExecutionHandler</tt>, if task cannot be accepted
458      * for execution because the executor has been shut down.
459      * @throws NullPointerException if command is null
460      */
461     public void execute(Runnable command) {
462         if (command == null)
463             throw new NullPointerException();
464         schedule(command, 0, TimeUnit.NANOSECONDS);
465     }
466 
467     // Override AbstractExecutorService methods
468 
469     public Future<?> submit(Runnable task) {
470         return schedule(task, 0, TimeUnit.NANOSECONDS);
471     }
472 
473     public <T> Future<T> submit(Runnable task, T result) {
474         return schedule(Executors.callable(task, result),
475                         0, TimeUnit.NANOSECONDS);
476     }
477 
478     public <T> Future<T> submit(Callable<T> task) {
479         return schedule(task, 0, TimeUnit.NANOSECONDS);
480     }
481 
482     /**
483      * Sets the policy on whether to continue executing existing periodic
484      * tasks even when this executor has been <tt>shutdown</tt>. In
485      * this case, these tasks will only terminate upon
486      * <tt>shutdownNow</tt>, or after setting the policy to
487      * <tt>false</tt> when already shutdown. This value is by default
488      * false.
489      *
490      * @param value if true, continue after shutdown, else don't.
491      * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
492      */
493     public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
494         continueExistingPeriodicTasksAfterShutdown = value;
495         if (!value && isShutdown())
496             cancelUnwantedTasks();
497     }
498 
499     /**
500      * Gets the policy on whether to continue executing existing
501      * periodic tasks even when this executor has been
502      * <tt>shutdown</tt>. In this case, these tasks will only
503      * terminate upon <tt>shutdownNow</tt> or after setting the policy
504      * to <tt>false</tt> when already shutdown. This value is by
505      * default false.
506      *
507      * @return true if will continue after shutdown
508      * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
509      */
510     public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
511         return continueExistingPeriodicTasksAfterShutdown;
512     }
513 
514     /**
515      * Sets the policy on whether to execute existing delayed
516      * tasks even when this executor has been <tt>shutdown</tt>. In
517      * this case, these tasks will only terminate upon
518      * <tt>shutdownNow</tt>, or after setting the policy to
519      * <tt>false</tt> when already shutdown. This value is by default
520      * true.
521      *
522      * @param value if true, execute after shutdown, else don't.
523      * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
524      */
525     public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
526         executeExistingDelayedTasksAfterShutdown = value;
527         if (!value && isShutdown())
528             cancelUnwantedTasks();
529     }
530 
531     /**
532      * Gets the policy on whether to execute existing delayed
533      * tasks even when this executor has been <tt>shutdown</tt>. In
534      * this case, these tasks will only terminate upon
535      * <tt>shutdownNow</tt>, or after setting the policy to
536      * <tt>false</tt> when already shutdown. This value is by default
537      * true.
538      *
539      * @return true if will execute after shutdown
540      * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
541      */
542     public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
543         return executeExistingDelayedTasksAfterShutdown;
544     }
545 
546 
547     /**
548      * Initiates an orderly shutdown in which previously submitted
549      * tasks are executed, but no new tasks will be accepted. If the
550      * <tt>ExecuteExistingDelayedTasksAfterShutdownPolicy</tt> has
551      * been set <tt>false</tt>, existing delayed tasks whose delays
552      * have not yet elapsed are cancelled. And unless the
553      * <tt>ContinueExistingPeriodicTasksAfterShutdownPolicy</tt> has
554      * been set <tt>true</tt>, future executions of existing periodic
555      * tasks will be cancelled.
556      */
557     public void shutdown() {
558         cancelUnwantedTasks();
559         super.shutdown();
560     }
561 
562     /**
563      * Attempts to stop all actively executing tasks, halts the
564      * processing of waiting tasks, and returns a list of the tasks
565      * that were awaiting execution.
566      *
567      * <p>There are no guarantees beyond best-effort attempts to stop
568      * processing actively executing tasks.  This implementation
569      * cancels tasks via {@link Thread#interrupt}, so any task that
570      * fails to respond to interrupts may never terminate.
571      *
572      * @return list of tasks that never commenced execution.  Each
573      * element of this list is a {@link ScheduledFuture},
574      * including those tasks submitted using <tt>execute</tt>, which
575      * are for scheduling purposes used as the basis of a zero-delay
576      * <tt>ScheduledFuture</tt>.
577      * @throws SecurityException {@inheritDoc}
578      */
579     public List<Runnable> shutdownNow() {
580         return super.shutdownNow();
581     }
582 
583     /**
584      * Returns the task queue used by this executor.  Each element of
585      * this queue is a {@link ScheduledFuture}, including those
586      * tasks submitted using <tt>execute</tt> which are for scheduling
587      * purposes used as the basis of a zero-delay
588      * <tt>ScheduledFuture</tt>. Iteration over this queue is
589      * <em>not</em> guaranteed to traverse tasks in the order in
590      * which they will execute.
591      *
592      * @return the task queue
593      */
594     public BlockingQueue<Runnable> getQueue() {
595         return super.getQueue();
596     }
597 
598     /**
599      * An annoying wrapper class to convince javac to use a
600      * DelayQueue<RunnableScheduledFuture> as a BlockingQueue<Runnable>
601      */
602     private static class DelayedWorkQueue
603         extends AbstractCollection<Runnable>
604         implements BlockingQueue<Runnable> {
605 
606         private final DelayQueue<RunnableScheduledFuture> dq = new DelayQueue<RunnableScheduledFuture>();
607         public Runnable poll() { return dq.poll(); }
608         public Runnable peek() { return dq.peek(); }
609         public Runnable take() throws InterruptedException { return dq.take(); }
610         public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
611             return dq.poll(timeout, unit);
612         }
613 
614         public boolean add(Runnable x) {
615         return dq.add((RunnableScheduledFuture)x);
616     }
617         public boolean offer(Runnable x) {
618         return dq.offer((RunnableScheduledFuture)x);
619     }
620         public void put(Runnable x) {
621             dq.put((RunnableScheduledFuture)x);
622         }
623         public boolean offer(Runnable x, long timeout, TimeUnit unit) {
624             return dq.offer((RunnableScheduledFuture)x, timeout, unit);
625         }
626 
627         public Runnable remove() { return dq.remove(); }
628         public Runnable element() { return dq.element(); }
629         public void clear() { dq.clear(); }
630         public int drainTo(Collection<? super Runnable> c) { return dq.drainTo(c); }
631         public int drainTo(Collection<? super Runnable> c, int maxElements) {
632             return dq.drainTo(c, maxElements);
633         }
634 
635         public int remainingCapacity() { return dq.remainingCapacity(); }
636         public boolean remove(Object x) { return dq.remove(x); }
637         public boolean contains(Object x) { return dq.contains(x); }
638         public int size() { return dq.size(); }
639         public boolean isEmpty() { return dq.isEmpty(); }
640         public Object[] toArray() { return dq.toArray(); }
641         public <T> T[] toArray(T[] array) { return dq.toArray(array); }
642         public Iterator<Runnable> iterator() {
643             return new Iterator<Runnable>() {
644                 private Iterator<RunnableScheduledFuture> it = dq.iterator();
645                 public boolean hasNext() { return it.hasNext(); }
646                 public Runnable next() { return it.next(); }
647                 public void remove() { it.remove(); }
648             };
649         }
650     }
651 }
652