| ScheduledThreadPoolExecutor.java |
1 /*
2 * %W% %E%
3 *
4 * Copyright (c) 2006, 2010, Oracle and/or its affiliates. All rights reserved.
5 * ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
6 */
7
8 package java.util.concurrent;
9 import java.util.concurrent.atomic.*;
10 import java.util.*;
11
12 /**
13 * A {@link ThreadPoolExecutor} that can additionally schedule
14 * commands to run after a given delay, or to execute
15 * periodically. This class is preferable to {@link java.util.Timer}
16 * when multiple worker threads are needed, or when the additional
17 * flexibility or capabilities of {@link ThreadPoolExecutor} (which
18 * this class extends) are required.
19 *
20 * <p> Delayed tasks execute no sooner than they are enabled, but
21 * without any real-time guarantees about when, after they are
22 * enabled, they will commence. Tasks scheduled for exactly the same
23 * execution time are enabled in first-in-first-out (FIFO) order of
24 * submission.
25 *
26 * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
27 * of the inherited tuning methods are not useful for it. In
28 * particular, because it acts as a fixed-sized pool using
29 * <tt>corePoolSize</tt> threads and an unbounded queue, adjustments
30 * to <tt>maximumPoolSize</tt> have no useful effect.
31 *
32 * <p><b>Extension notes:</b> This class overrides {@link
33 * AbstractExecutorService} <tt>submit</tt> methods to generate
34 * internal objects to control per-task delays and scheduling. To
35 * preserve functionality, any further overrides of these methods in
36 * subclasses must invoke superclass versions, which effectively
37 * disables additional task customization. However, this class
38 * provides alternative protected extension method
39 * <tt>decorateTask</tt> (one version each for <tt>Runnable</tt> and
40 * <tt>Callable</tt>) that can be used to customize the concrete task
41 * types used to execute commands entered via <tt>execute</tt>,
42 * <tt>submit</tt>, <tt>schedule</tt>, <tt>scheduleAtFixedRate</tt>,
43 * and <tt>scheduleWithFixedDelay</tt>. By default, a
44 * <tt>ScheduledThreadPoolExecutor</tt> uses a task type extending
45 * {@link FutureTask}. However, this may be modified or replaced using
46 * subclasses of the form:
47 *
48 * <pre>
49 * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
50 *
51 * static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }
52 *
53 * protected <V> RunnableScheduledFuture<V> decorateTask(
54 * Runnable r, RunnableScheduledFuture<V> task) {
55 * return new CustomTask<V>(r, task);
56 * }
57 *
58 * protected <V> RunnableScheduledFuture<V> decorateTask(
59 * Callable<V> c, RunnableScheduledFuture<V> task) {
60 * return new CustomTask<V>(c, task);
61 * }
62 * // ... add constructors, etc.
63 * }
64 * </pre>
65 * @since 1.5
66 * @author Doug Lea
67 */
68 public class ScheduledThreadPoolExecutor
69 extends ThreadPoolExecutor
70 implements ScheduledExecutorService {
71
72 /**
73 * False if should cancel/suppress periodic tasks on shutdown.
74 */
75 private volatile boolean continueExistingPeriodicTasksAfterShutdown;
76
77 /**
78 * False if should cancel non-periodic tasks on shutdown.
79 */
80 private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
81
82 /**
83 * Sequence number to break scheduling ties, and in turn to
84 * guarantee FIFO order among tied entries.
85 */
86 private static final AtomicLong sequencer = new AtomicLong(0);
87
88 /** Base of nanosecond timings, to avoid wrapping */
89 private static final long NANO_ORIGIN = System.nanoTime();
90
91 /**
92 * Returns nanosecond time offset by origin
93 */
94 final long now() {
95 return System.nanoTime() - NANO_ORIGIN;
96 }
97
98 private class ScheduledFutureTask<V>
99 extends FutureTask<V> implements RunnableScheduledFuture<V> {
100
101 /** Sequence number to break ties FIFO */
102 private final long sequenceNumber;
103 /** The time the task is enabled to execute in nanoTime units */
104 private long time;
105 /**
106 * Period in nanoseconds for repeating tasks. A positive
107 * value indicates fixed-rate execution. A negative value
108 * indicates fixed-delay execution. A value of 0 indicates a
109 * non-repeating task.
110 */
111 private final long period;
112
113 /**
114 * Creates a one-shot action with given nanoTime-based trigger time.
115 */
116 ScheduledFutureTask(Runnable r, V result, long ns) {
117 super(r, result);
118 this.time = ns;
119 this.period = 0;
120 this.sequenceNumber = sequencer.getAndIncrement();
121 }
122
123 /**
124 * Creates a periodic action with given nano time and period.
125 */
126 ScheduledFutureTask(Runnable r, V result, long ns, long period) {
127 super(r, result);
128 this.time = ns;
129 this.period = period;
130 this.sequenceNumber = sequencer.getAndIncrement();
131 }
132
133 /**
134 * Creates a one-shot action with given nanoTime-based trigger.
135 */
136 ScheduledFutureTask(Callable<V> callable, long ns) {
137 super(callable);
138 this.time = ns;
139 this.period = 0;
140 this.sequenceNumber = sequencer.getAndIncrement();
141 }
142
143 public long getDelay(TimeUnit unit) {
144 return unit.convert(time - now(), TimeUnit.NANOSECONDS);
145 }
146
147 public int compareTo(Delayed other) {
148 if (other == this) // compare zero ONLY if same object
149 return 0;
150 if (other instanceof ScheduledFutureTask) {
151 ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
152 long diff = time - x.time;
153 if (diff < 0)
154 return -1;
155 else if (diff > 0)
156 return 1;
157 else if (sequenceNumber < x.sequenceNumber)
158 return -1;
159 else
160 return 1;
161 }
162 long d = (getDelay(TimeUnit.NANOSECONDS) -
163 other.getDelay(TimeUnit.NANOSECONDS));
164 return (d == 0)? 0 : ((d < 0)? -1 : 1);
165 }
166
167 /**
168 * Returns true if this is a periodic (not a one-shot) action.
169 *
170 * @return true if periodic
171 */
172 public boolean isPeriodic() {
173 return period != 0;
174 }
175
176 /**
177 * Runs a periodic task.
178 */
179 private void runPeriodic() {
180 boolean ok = ScheduledFutureTask.super.runAndReset();
181 boolean down = isShutdown();
182 // Reschedule if not cancelled and not shutdown or policy allows
183 if (ok && (!down ||
184 (getContinueExistingPeriodicTasksAfterShutdownPolicy() &&
185 !isStopped()))) {
186 long p = period;
187 if (p > 0)
188 time += p;
189 else
190 time = triggerTime(-p);
191 ScheduledThreadPoolExecutor.super.getQueue().add(this);
192 }
193 // This might have been the final executed delayed
194 // task. Wake up threads to check.
195 else if (down)
196 interruptIdleWorkers();
197 }
198
199 /**
200 * Overrides FutureTask version so as to reset/requeue if periodic.
201 */
202 public void run() {
203 if (isPeriodic())
204 runPeriodic();
205 else
206 ScheduledFutureTask.super.run();
207 }
208 }
209
210 /**
211 * Specialized variant of ThreadPoolExecutor.execute for delayed tasks.
212 */
213 private void delayedExecute(Runnable command) {
214 if (isShutdown()) {
215 reject(command);
216 return;
217 }
218 // Prestart a thread if necessary. We cannot prestart it
219 // running the task because the task (probably) shouldn't be
220 // run yet, so thread will just idle until delay elapses.
221 if (getPoolSize() < getCorePoolSize())
222 prestartCoreThread();
223
224 super.getQueue().add(command);
225 }
226
227 /**
228 * Cancels and clears the queue of all tasks that should not be run
229 * due to shutdown policy.
230 */
231 private void cancelUnwantedTasks() {
232 boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy();
233 boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy();
234 if (!keepDelayed && !keepPeriodic)
235 super.getQueue().clear();
236 else if (keepDelayed || keepPeriodic) {
237 Object[] entries = super.getQueue().toArray();
238 for (int i = 0; i < entries.length; ++i) {
239 Object e = entries[i];
240 if (e instanceof RunnableScheduledFuture) {
241 RunnableScheduledFuture<?> t = (RunnableScheduledFuture<?>)e;
242 if (t.isPeriodic()? !keepPeriodic : !keepDelayed)
243 t.cancel(false);
244 }
245 }
246 entries = null;
247 purge();
248 }
249 }
250
251 public boolean remove(Runnable task) {
252 if (!(task instanceof RunnableScheduledFuture))
253 return false;
254 return getQueue().remove(task);
255 }
256
257 /**
258 * Modifies or replaces the task used to execute a runnable.
259 * This method can be used to override the concrete
260 * class used for managing internal tasks.
261 * The default implementation simply returns the given task.
262 *
263 * @param runnable the submitted Runnable
264 * @param task the task created to execute the runnable
265 * @return a task that can execute the runnable
266 * @since 1.6
267 */
268 protected <V> RunnableScheduledFuture<V> decorateTask(
269 Runnable runnable, RunnableScheduledFuture<V> task) {
270 return task;
271 }
272
273 /**
274 * Modifies or replaces the task used to execute a callable.
275 * This method can be used to override the concrete
276 * class used for managing internal tasks.
277 * The default implementation simply returns the given task.
278 *
279 * @param callable the submitted Callable
280 * @param task the task created to execute the callable
281 * @return a task that can execute the callable
282 * @since 1.6
283 */
284 protected <V> RunnableScheduledFuture<V> decorateTask(
285 Callable<V> callable, RunnableScheduledFuture<V> task) {
286 return task;
287 }
288
289 /**
290 * Creates a new ScheduledThreadPoolExecutor with the given core
291 * pool size.
292 *
293 * @param corePoolSize the number of threads to keep in the pool,
294 * even if they are idle
295 * @throws IllegalArgumentException if <tt>corePoolSize < 0</tt>
296 */
297 public ScheduledThreadPoolExecutor(int corePoolSize) {
298 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
299 new DelayedWorkQueue());
300 }
301
302 /**
303 * Creates a new ScheduledThreadPoolExecutor with the given
304 * initial parameters.
305 *
306 * @param corePoolSize the number of threads to keep in the pool,
307 * even if they are idle
308 * @param threadFactory the factory to use when the executor
309 * creates a new thread
310 * @throws IllegalArgumentException if <tt>corePoolSize < 0</tt>
311 * @throws NullPointerException if threadFactory is null
312 */
313 public ScheduledThreadPoolExecutor(int corePoolSize,
314 ThreadFactory threadFactory) {
315 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
316 new DelayedWorkQueue(), threadFactory);
317 }
318
319 /**
320 * Creates a new ScheduledThreadPoolExecutor with the given
321 * initial parameters.
322 *
323 * @param corePoolSize the number of threads to keep in the pool,
324 * even if they are idle
325 * @param handler the handler to use when execution is blocked
326 * because the thread bounds and queue capacities are reached
327 * @throws IllegalArgumentException if <tt>corePoolSize < 0</tt>
328 * @throws NullPointerException if handler is null
329 */
330 public ScheduledThreadPoolExecutor(int corePoolSize,
331 RejectedExecutionHandler handler) {
332 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
333 new DelayedWorkQueue(), handler);
334 }
335
336 /**
337 * Returns the trigger time of a delayed action.
338 */
339 private long triggerTime(long delay, TimeUnit unit) {
340 return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
341 }
342
343 /**
344 * Returns the trigger time of a delayed action.
345 */
346 long triggerTime(long delay) {
347 return now() +
348 ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
349 }
350
351 /**
352 * Constrains the values of all delays in the queue to be within
353 * Long.MAX_VALUE of each other, to avoid overflow in compareTo.
354 * This may occur if a task is eligible to be dequeued, but has
355 * not yet been, while some other task is added with a delay of
356 * Long.MAX_VALUE.
357 */
358 private long overflowFree(long delay) {
359 Delayed head = (Delayed) super.getQueue().peek();
360 if (head != null) {
361 long headDelay = head.getDelay(TimeUnit.NANOSECONDS);
362 if (headDelay < 0 && (delay - headDelay < 0))
363 delay = Long.MAX_VALUE + headDelay;
364 }
365 return delay;
366 }
367
368 /**
369 * Creates a new ScheduledThreadPoolExecutor with the given
370 * initial parameters.
371 *
372 * @param corePoolSize the number of threads to keep in the pool,
373 * even if they are idle
374 * @param threadFactory the factory to use when the executor
375 * creates a new thread
376 * @param handler the handler to use when execution is blocked
377 * because the thread bounds and queue capacities are reached.
378 * @throws IllegalArgumentException if <tt>corePoolSize < 0</tt>
379 * @throws NullPointerException if threadFactory or handler is null
380 */
381 public ScheduledThreadPoolExecutor(int corePoolSize,
382 ThreadFactory threadFactory,
383 RejectedExecutionHandler handler) {
384 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
385 new DelayedWorkQueue(), threadFactory, handler);
386 }
387
388 public ScheduledFuture<?> schedule(Runnable command,
389 long delay,
390 TimeUnit unit) {
391 if (command == null || unit == null)
392 throw new NullPointerException();
393 RunnableScheduledFuture<?> t = decorateTask(command,
394 new ScheduledFutureTask<Void>(command, null,
395 triggerTime(delay, unit)));
396
397 delayedExecute(t);
398 return t;
399 }
400
401 public <V> ScheduledFuture<V> schedule(Callable<V> callable,
402 long delay,
403 TimeUnit unit) {
404 if (callable == null || unit == null)
405 throw new NullPointerException();
406 RunnableScheduledFuture<V> t = decorateTask(callable,
407 new ScheduledFutureTask<V>(callable,
408 triggerTime(delay, unit)));
409 delayedExecute(t);
410 return t;
411 }
412
413 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
414 long initialDelay,
415 long period,
416 TimeUnit unit) {
417 if (command == null || unit == null)
418 throw new NullPointerException();
419 if (period <= 0)
420 throw new IllegalArgumentException();
421 RunnableScheduledFuture<?> t = decorateTask(command,
422 new ScheduledFutureTask<Object>(command,
423 null,
424 triggerTime(initialDelay, unit),
425 unit.toNanos(period)));
426 delayedExecute(t);
427 return t;
428 }
429
430 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
431 long initialDelay,
432 long delay,
433 TimeUnit unit) {
434 if (command == null || unit == null)
435 throw new NullPointerException();
436 if (delay <= 0)
437 throw new IllegalArgumentException();
438 RunnableScheduledFuture<?> t = decorateTask(command,
439 new ScheduledFutureTask<Boolean>(command,
440 null,
441 triggerTime(initialDelay, unit),
442 unit.toNanos(-delay)));
443 delayedExecute(t);
444 return t;
445 }
446
447
448 /**
449 * Executes command with zero required delay. This has effect
450 * equivalent to <tt>schedule(command, 0, anyUnit)</tt>. Note
451 * that inspections of the queue and of the list returned by
452 * <tt>shutdownNow</tt> will access the zero-delayed
453 * {@link ScheduledFuture}, not the <tt>command</tt> itself.
454 *
455 * @param command the task to execute
456 * @throws RejectedExecutionException at discretion of
457 * <tt>RejectedExecutionHandler</tt>, if task cannot be accepted
458 * for execution because the executor has been shut down.
459 * @throws NullPointerException if command is null
460 */
461 public void execute(Runnable command) {
462 if (command == null)
463 throw new NullPointerException();
464 schedule(command, 0, TimeUnit.NANOSECONDS);
465 }
466
467 // Override AbstractExecutorService methods
468
469 public Future<?> submit(Runnable task) {
470 return schedule(task, 0, TimeUnit.NANOSECONDS);
471 }
472
473 public <T> Future<T> submit(Runnable task, T result) {
474 return schedule(Executors.callable(task, result),
475 0, TimeUnit.NANOSECONDS);
476 }
477
478 public <T> Future<T> submit(Callable<T> task) {
479 return schedule(task, 0, TimeUnit.NANOSECONDS);
480 }
481
482 /**
483 * Sets the policy on whether to continue executing existing periodic
484 * tasks even when this executor has been <tt>shutdown</tt>. In
485 * this case, these tasks will only terminate upon
486 * <tt>shutdownNow</tt>, or after setting the policy to
487 * <tt>false</tt> when already shutdown. This value is by default
488 * false.
489 *
490 * @param value if true, continue after shutdown, else don't.
491 * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
492 */
493 public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
494 continueExistingPeriodicTasksAfterShutdown = value;
495 if (!value && isShutdown())
496 cancelUnwantedTasks();
497 }
498
499 /**
500 * Gets the policy on whether to continue executing existing
501 * periodic tasks even when this executor has been
502 * <tt>shutdown</tt>. In this case, these tasks will only
503 * terminate upon <tt>shutdownNow</tt> or after setting the policy
504 * to <tt>false</tt> when already shutdown. This value is by
505 * default false.
506 *
507 * @return true if will continue after shutdown
508 * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
509 */
510 public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
511 return continueExistingPeriodicTasksAfterShutdown;
512 }
513
514 /**
515 * Sets the policy on whether to execute existing delayed
516 * tasks even when this executor has been <tt>shutdown</tt>. In
517 * this case, these tasks will only terminate upon
518 * <tt>shutdownNow</tt>, or after setting the policy to
519 * <tt>false</tt> when already shutdown. This value is by default
520 * true.
521 *
522 * @param value if true, execute after shutdown, else don't.
523 * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
524 */
525 public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
526 executeExistingDelayedTasksAfterShutdown = value;
527 if (!value && isShutdown())
528 cancelUnwantedTasks();
529 }
530
531 /**
532 * Gets the policy on whether to execute existing delayed
533 * tasks even when this executor has been <tt>shutdown</tt>. In
534 * this case, these tasks will only terminate upon
535 * <tt>shutdownNow</tt>, or after setting the policy to
536 * <tt>false</tt> when already shutdown. This value is by default
537 * true.
538 *
539 * @return true if will execute after shutdown
540 * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
541 */
542 public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
543 return executeExistingDelayedTasksAfterShutdown;
544 }
545
546
547 /**
548 * Initiates an orderly shutdown in which previously submitted
549 * tasks are executed, but no new tasks will be accepted. If the
550 * <tt>ExecuteExistingDelayedTasksAfterShutdownPolicy</tt> has
551 * been set <tt>false</tt>, existing delayed tasks whose delays
552 * have not yet elapsed are cancelled. And unless the
553 * <tt>ContinueExistingPeriodicTasksAfterShutdownPolicy</tt> has
554 * been set <tt>true</tt>, future executions of existing periodic
555 * tasks will be cancelled.
556 */
557 public void shutdown() {
558 cancelUnwantedTasks();
559 super.shutdown();
560 }
561
562 /**
563 * Attempts to stop all actively executing tasks, halts the
564 * processing of waiting tasks, and returns a list of the tasks
565 * that were awaiting execution.
566 *
567 * <p>There are no guarantees beyond best-effort attempts to stop
568 * processing actively executing tasks. This implementation
569 * cancels tasks via {@link Thread#interrupt}, so any task that
570 * fails to respond to interrupts may never terminate.
571 *
572 * @return list of tasks that never commenced execution. Each
573 * element of this list is a {@link ScheduledFuture},
574 * including those tasks submitted using <tt>execute</tt>, which
575 * are for scheduling purposes used as the basis of a zero-delay
576 * <tt>ScheduledFuture</tt>.
577 * @throws SecurityException {@inheritDoc}
578 */
579 public List<Runnable> shutdownNow() {
580 return super.shutdownNow();
581 }
582
583 /**
584 * Returns the task queue used by this executor. Each element of
585 * this queue is a {@link ScheduledFuture}, including those
586 * tasks submitted using <tt>execute</tt> which are for scheduling
587 * purposes used as the basis of a zero-delay
588 * <tt>ScheduledFuture</tt>. Iteration over this queue is
589 * <em>not</em> guaranteed to traverse tasks in the order in
590 * which they will execute.
591 *
592 * @return the task queue
593 */
594 public BlockingQueue<Runnable> getQueue() {
595 return super.getQueue();
596 }
597
598 /**
599 * An annoying wrapper class to convince javac to use a
600 * DelayQueue<RunnableScheduledFuture> as a BlockingQueue<Runnable>
601 */
602 private static class DelayedWorkQueue
603 extends AbstractCollection<Runnable>
604 implements BlockingQueue<Runnable> {
605
606 private final DelayQueue<RunnableScheduledFuture> dq = new DelayQueue<RunnableScheduledFuture>();
607 public Runnable poll() { return dq.poll(); }
608 public Runnable peek() { return dq.peek(); }
609 public Runnable take() throws InterruptedException { return dq.take(); }
610 public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
611 return dq.poll(timeout, unit);
612 }
613
614 public boolean add(Runnable x) {
615 return dq.add((RunnableScheduledFuture)x);
616 }
617 public boolean offer(Runnable x) {
618 return dq.offer((RunnableScheduledFuture)x);
619 }
620 public void put(Runnable x) {
621 dq.put((RunnableScheduledFuture)x);
622 }
623 public boolean offer(Runnable x, long timeout, TimeUnit unit) {
624 return dq.offer((RunnableScheduledFuture)x, timeout, unit);
625 }
626
627 public Runnable remove() { return dq.remove(); }
628 public Runnable element() { return dq.element(); }
629 public void clear() { dq.clear(); }
630 public int drainTo(Collection<? super Runnable> c) { return dq.drainTo(c); }
631 public int drainTo(Collection<? super Runnable> c, int maxElements) {
632 return dq.drainTo(c, maxElements);
633 }
634
635 public int remainingCapacity() { return dq.remainingCapacity(); }
636 public boolean remove(Object x) { return dq.remove(x); }
637 public boolean contains(Object x) { return dq.contains(x); }
638 public int size() { return dq.size(); }
639 public boolean isEmpty() { return dq.isEmpty(); }
640 public Object[] toArray() { return dq.toArray(); }
641 public <T> T[] toArray(T[] array) { return dq.toArray(array); }
642 public Iterator<Runnable> iterator() {
643 return new Iterator<Runnable>() {
644 private Iterator<RunnableScheduledFuture> it = dq.iterator();
645 public boolean hasNext() { return it.hasNext(); }
646 public Runnable next() { return it.next(); }
647 public void remove() { it.remove(); }
648 };
649 }
650 }
651 }
652