| ThreadPoolExecutor.java |
1 /*
2 * %W% %E%
3 *
4 * Copyright (c) 2006, Oracle and/or its affiliates. All rights reserved.
5 * ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
6 */
7
8 package java.util.concurrent;
9 import java.util.concurrent.locks.*;
10 import java.util.*;
11
12 /**
13 * An {@link ExecutorService} that executes each submitted task using
14 * one of possibly several pooled threads, normally configured
15 * using {@link Executors} factory methods.
16 *
17 * <p>Thread pools address two different problems: they usually
18 * provide improved performance when executing large numbers of
19 * asynchronous tasks, due to reduced per-task invocation overhead,
20 * and they provide a means of bounding and managing the resources,
21 * including threads, consumed when executing a collection of tasks.
22 * Each <tt>ThreadPoolExecutor</tt> also maintains some basic
23 * statistics, such as the number of completed tasks.
24 *
25 * <p>To be useful across a wide range of contexts, this class
26 * provides many adjustable parameters and extensibility
27 * hooks. However, programmers are urged to use the more convenient
28 * {@link Executors} factory methods {@link
29 * Executors#newCachedThreadPool} (unbounded thread pool, with
30 * automatic thread reclamation), {@link Executors#newFixedThreadPool}
31 * (fixed size thread pool) and {@link
32 * Executors#newSingleThreadExecutor} (single background thread), that
33 * preconfigure settings for the most common usage
34 * scenarios. Otherwise, use the following guide when manually
35 * configuring and tuning this class:
36 *
37 * <dl>
38 *
39 * <dt>Core and maximum pool sizes</dt>
40 *
41 * <dd>A <tt>ThreadPoolExecutor</tt> will automatically adjust the
42 * pool size
43 * (see {@link ThreadPoolExecutor#getPoolSize})
44 * according to the bounds set by corePoolSize
45 * (see {@link ThreadPoolExecutor#getCorePoolSize})
46 * and
47 * maximumPoolSize
48 * (see {@link ThreadPoolExecutor#getMaximumPoolSize}).
49 * When a new task is submitted in method {@link
50 * ThreadPoolExecutor#execute}, and fewer than corePoolSize threads
51 * are running, a new thread is created to handle the request, even if
52 * other worker threads are idle. If there are more than
53 * corePoolSize but less than maximumPoolSize threads running, a new
54 * thread will be created only if the queue is full. By setting
55 * corePoolSize and maximumPoolSize the same, you create a fixed-size
56 * thread pool. By setting maximumPoolSize to an essentially unbounded
57 * value such as <tt>Integer.MAX_VALUE</tt>, you allow the pool to
58 * accommodate an arbitrary number of concurrent tasks. Most typically,
59 * core and maximum pool sizes are set only upon construction, but they
60 * may also be changed dynamically using {@link
61 * ThreadPoolExecutor#setCorePoolSize} and {@link
62 * ThreadPoolExecutor#setMaximumPoolSize}. </dd>
63 *
64 * <dt>On-demand construction</dt>
65 *
66 * <dd> By default, even core threads are initially created and
67 * started only when new tasks arrive, but this can be overridden
68 * dynamically using method {@link
69 * ThreadPoolExecutor#prestartCoreThread} or
70 * {@link ThreadPoolExecutor#prestartAllCoreThreads}.
71 * You probably want to prestart threads if you construct the
72 * pool with a non-empty queue. </dd>
73 *
74 * <dt>Creating new threads</dt>
75 *
76 * <dd>New threads are created using a {@link
77 * java.util.concurrent.ThreadFactory}. If not otherwise specified, a
78 * {@link Executors#defaultThreadFactory} is used, that creates threads to all
79 * be in the same {@link ThreadGroup} and with the same
80 * <tt>NORM_PRIORITY</tt> priority and non-daemon status. By supplying
81 * a different ThreadFactory, you can alter the thread's name, thread
82 * group, priority, daemon status, etc. If a <tt>ThreadFactory</tt> fails to create
83 * a thread when asked by returning null from <tt>newThread</tt>,
84 * the executor will continue, but might
85 * not be able to execute any tasks. </dd>
86 *
87 * <dt>Keep-alive times</dt>
88 *
89 * <dd>If the pool currently has more than corePoolSize threads,
90 * excess threads will be terminated if they have been idle for more
91 * than the keepAliveTime (see {@link
92 * ThreadPoolExecutor#getKeepAliveTime}). This provides a means of
93 * reducing resource consumption when the pool is not being actively
94 * used. If the pool becomes more active later, new threads will be
95 * constructed. This parameter can also be changed dynamically using
96 * method {@link ThreadPoolExecutor#setKeepAliveTime}. Using a value
97 * of <tt>Long.MAX_VALUE</tt> {@link TimeUnit#NANOSECONDS} effectively
98 * disables idle threads from ever terminating prior to shut down. By
99 * default, the keep-alive policy applies only when there are more
100 * than corePoolSizeThreads. But method {@link
101 * ThreadPoolExecutor#allowCoreThreadTimeOut(boolean)} can be used to apply
102 * this time-out policy to core threads as well, so long as
103 * the keepAliveTime value is non-zero. </dd>
104 *
105 * <dt>Queuing</dt>
106 *
107 * <dd>Any {@link BlockingQueue} may be used to transfer and hold
108 * submitted tasks. The use of this queue interacts with pool sizing:
109 *
110 * <ul>
111 *
112 * <li> If fewer than corePoolSize threads are running, the Executor
113 * always prefers adding a new thread
114 * rather than queuing.</li>
115 *
116 * <li> If corePoolSize or more threads are running, the Executor
117 * always prefers queuing a request rather than adding a new
118 * thread.</li>
119 *
120 * <li> If a request cannot be queued, a new thread is created unless
121 * this would exceed maximumPoolSize, in which case, the task will be
122 * rejected.</li>
123 *
124 * </ul>
125 *
126 * There are three general strategies for queuing:
127 * <ol>
128 *
129 * <li> <em> Direct handoffs.</em> A good default choice for a work
130 * queue is a {@link SynchronousQueue} that hands off tasks to threads
131 * without otherwise holding them. Here, an attempt to queue a task
132 * will fail if no threads are immediately available to run it, so a
133 * new thread will be constructed. This policy avoids lockups when
134 * handling sets of requests that might have internal dependencies.
135 * Direct handoffs generally require unbounded maximumPoolSizes to
136 * avoid rejection of new submitted tasks. This in turn admits the
137 * possibility of unbounded thread growth when commands continue to
138 * arrive on average faster than they can be processed. </li>
139 *
140 * <li><em> Unbounded queues.</em> Using an unbounded queue (for
141 * example a {@link LinkedBlockingQueue} without a predefined
142 * capacity) will cause new tasks to wait in the queue when all
143 * corePoolSize threads are busy. Thus, no more than corePoolSize
144 * threads will ever be created. (And the value of the maximumPoolSize
145 * therefore doesn't have any effect.) This may be appropriate when
146 * each task is completely independent of others, so tasks cannot
147 * affect each others execution; for example, in a web page server.
148 * While this style of queuing can be useful in smoothing out
149 * transient bursts of requests, it admits the possibility of
150 * unbounded work queue growth when commands continue to arrive on
151 * average faster than they can be processed. </li>
152 *
153 * <li><em>Bounded queues.</em> A bounded queue (for example, an
154 * {@link ArrayBlockingQueue}) helps prevent resource exhaustion when
155 * used with finite maximumPoolSizes, but can be more difficult to
156 * tune and control. Queue sizes and maximum pool sizes may be traded
157 * off for each other: Using large queues and small pools minimizes
158 * CPU usage, OS resources, and context-switching overhead, but can
159 * lead to artificially low throughput. If tasks frequently block (for
160 * example if they are I/O bound), a system may be able to schedule
161 * time for more threads than you otherwise allow. Use of small queues
162 * generally requires larger pool sizes, which keeps CPUs busier but
163 * may encounter unacceptable scheduling overhead, which also
164 * decreases throughput. </li>
165 *
166 * </ol>
167 *
168 * </dd>
169 *
170 * <dt>Rejected tasks</dt>
171 *
172 * <dd> New tasks submitted in method {@link
173 * ThreadPoolExecutor#execute} will be <em>rejected</em> when the
174 * Executor has been shut down, and also when the Executor uses finite
175 * bounds for both maximum threads and work queue capacity, and is
176 * saturated. In either case, the <tt>execute</tt> method invokes the
177 * {@link RejectedExecutionHandler#rejectedExecution} method of its
178 * {@link RejectedExecutionHandler}. Four predefined handler policies
179 * are provided:
180 *
181 * <ol>
182 *
183 * <li> In the
184 * default {@link ThreadPoolExecutor.AbortPolicy}, the handler throws a
185 * runtime {@link RejectedExecutionException} upon rejection. </li>
186 *
187 * <li> In {@link
188 * ThreadPoolExecutor.CallerRunsPolicy}, the thread that invokes
189 * <tt>execute</tt> itself runs the task. This provides a simple
190 * feedback control mechanism that will slow down the rate that new
191 * tasks are submitted. </li>
192 *
193 * <li> In {@link ThreadPoolExecutor.DiscardPolicy},
194 * a task that cannot be executed is simply dropped. </li>
195 *
196 * <li>In {@link
197 * ThreadPoolExecutor.DiscardOldestPolicy}, if the executor is not
198 * shut down, the task at the head of the work queue is dropped, and
199 * then execution is retried (which can fail again, causing this to be
200 * repeated.) </li>
201 *
202 * </ol>
203 *
204 * It is possible to define and use other kinds of {@link
205 * RejectedExecutionHandler} classes. Doing so requires some care
206 * especially when policies are designed to work only under particular
207 * capacity or queuing policies. </dd>
208 *
209 * <dt>Hook methods</dt>
210 *
211 * <dd>This class provides <tt>protected</tt> overridable {@link
212 * ThreadPoolExecutor#beforeExecute} and {@link
213 * ThreadPoolExecutor#afterExecute} methods that are called before and
214 * after execution of each task. These can be used to manipulate the
215 * execution environment; for example, reinitializing ThreadLocals,
216 * gathering statistics, or adding log entries. Additionally, method
217 * {@link ThreadPoolExecutor#terminated} can be overridden to perform
218 * any special processing that needs to be done once the Executor has
219 * fully terminated.
220 *
221 * <p>If hook or callback methods throw
222 * exceptions, internal worker threads may in turn fail and
223 * abruptly terminate.</dd>
224 *
225 * <dt>Queue maintenance</dt>
226 *
227 * <dd> Method {@link ThreadPoolExecutor#getQueue} allows access to
228 * the work queue for purposes of monitoring and debugging. Use of
229 * this method for any other purpose is strongly discouraged. Two
230 * supplied methods, {@link ThreadPoolExecutor#remove} and {@link
231 * ThreadPoolExecutor#purge} are available to assist in storage
232 * reclamation when large numbers of queued tasks become
233 * cancelled.</dd>
234 *
235 * <dt>Finalization</dt>
236 *
237 * <dd> A pool that is no longer referenced in a program <em>AND</em>
238 * has no remaining threads will be <tt>shutdown</tt>
239 * automatically. If you would like to ensure that unreferenced pools
240 * are reclaimed even if users forget to call {@link
241 * ThreadPoolExecutor#shutdown}, then you must arrange that unused
242 * threads eventually die, by setting appropriate keep-alive times,
243 * using a lower bound of zero core threads and/or setting {@link
244 * ThreadPoolExecutor#allowCoreThreadTimeOut(boolean)}. </dd> </dl>
245 *
246 * <p> <b>Extension example</b>. Most extensions of this class
247 * override one or more of the protected hook methods. For example,
248 * here is a subclass that adds a simple pause/resume feature:
249 *
250 * <pre>
251 * class PausableThreadPoolExecutor extends ThreadPoolExecutor {
252 * private boolean isPaused;
253 * private ReentrantLock pauseLock = new ReentrantLock();
254 * private Condition unpaused = pauseLock.newCondition();
255 *
256 * public PausableThreadPoolExecutor(...) { super(...); }
257 *
258 * protected void beforeExecute(Thread t, Runnable r) {
259 * super.beforeExecute(t, r);
260 * pauseLock.lock();
261 * try {
262 * while (isPaused) unpaused.await();
263 * } catch (InterruptedException ie) {
264 * t.interrupt();
265 * } finally {
266 * pauseLock.unlock();
267 * }
268 * }
269 *
270 * public void pause() {
271 * pauseLock.lock();
272 * try {
273 * isPaused = true;
274 * } finally {
275 * pauseLock.unlock();
276 * }
277 * }
278 *
279 * public void resume() {
280 * pauseLock.lock();
281 * try {
282 * isPaused = false;
283 * unpaused.signalAll();
284 * } finally {
285 * pauseLock.unlock();
286 * }
287 * }
288 * }
289 * </pre>
290 * @since 1.5
291 * @author Doug Lea
292 */
293 public class ThreadPoolExecutor extends AbstractExecutorService {
294
295 /**
296 * Permission for checking shutdown
297 */
298 private static final RuntimePermission shutdownPerm =
299 new RuntimePermission("modifyThread");
300
301 /*
302 * A ThreadPoolExecutor manages a largish set of control fields.
303 * State changes in fields that affect execution control
304 * guarantees only occur within mainLock regions. These include
305 * fields runState, poolSize, corePoolSize, and maximumPoolSize
306 * However, these fields are also declared volatile, so can be
307 * read outside of locked regions. (Also, the workers Set is
308 * accessed only under lock).
309 *
310 * The other fields representing user control parameters do not
311 * affect execution invariants, so are declared volatile and
312 * allowed to change (via user methods) asynchronously with
313 * execution. These fields: allowCoreThreadTimeOut, keepAliveTime,
314 * the rejected execution handler, and threadFactory, are not
315 * updated within locks.
316 *
317 * The extensive use of volatiles here enables the most
318 * performance-critical actions, such as enqueuing and dequeuing
319 * tasks in the workQueue, to normally proceed without holding the
320 * mainLock when they see that the state allows actions, although,
321 * as described below, sometimes at the expense of re-checks
322 * following these actions.
323 */
324
325 /**
326 * runState provides the main lifecyle control, taking on values:
327 *
328 * RUNNING: Accept new tasks and process queued tasks
329 * SHUTDOWN: Don't accept new tasks, but process queued tasks
330 * STOP: Don't accept new tasks, don't process queued tasks,
331 * and interrupt in-progress tasks
332 * TERMINATED: Same as STOP, plus all threads have terminated
333 *
334 * The numerical order among these values matters, to allow
335 * ordered comparisons. The runState monotonically increases over
336 * time, but need not hit each state. The transitions are:
337 *
338 * RUNNING -> SHUTDOWN
339 * On invocation of shutdown(), perhaps implicitly in finalize()
340 * (RUNNING or SHUTDOWN) -> STOP
341 * On invocation of shutdownNow()
342 * SHUTDOWN -> TERMINATED
343 * When both queue and pool are empty
344 * STOP -> TERMINATED
345 * When pool is empty
346 */
347 volatile int runState;
348 static final int RUNNING = 0;
349 static final int SHUTDOWN = 1;
350 static final int STOP = 2;
351 static final int TERMINATED = 3;
352
353 /**
354 * The queue used for holding tasks and handing off to worker
355 * threads. Note that when using this queue, we do not require
356 * that workQueue.poll() returning null necessarily means that
357 * workQueue.isEmpty(), so must sometimes check both. This
358 * accommodates special-purpose queues such as DelayQueues for
359 * which poll() is allowed to return null even if it may later
360 * return non-null when delays expire.
361 */
362 private final BlockingQueue<Runnable> workQueue;
363
364 /**
365 * Lock held on updates to poolSize, corePoolSize,
366 * maximumPoolSize, runState, and workers set.
367 */
368 private final ReentrantLock mainLock = new ReentrantLock();
369
370 /**
371 * Wait condition to support awaitTermination
372 */
373 private final Condition termination = mainLock.newCondition();
374
375 /**
376 * Set containing all worker threads in pool. Accessed only when
377 * holding mainLock.
378 */
379 private final HashSet<Worker> workers = new HashSet<Worker>();
380
381 /**
382 * Timeout in nanoseconds for idle threads waiting for work.
383 * Threads use this timeout when there are more than corePoolSize
384 * present or if allowCoreThreadTimeOut. Otherwise they wait
385 * forever for new work.
386 */
387 private volatile long keepAliveTime;
388
389 /**
390 * If false (default) core threads stay alive even when idle. If
391 * true, core threads use keepAliveTime to time out waiting for
392 * work.
393 */
394 private volatile boolean allowCoreThreadTimeOut;
395
396 /**
397 * Core pool size, updated only while holding mainLock, but
398 * volatile to allow concurrent readability even during updates.
399 */
400 private volatile int corePoolSize;
401
402 /**
403 * Maximum pool size, updated only while holding mainLock but
404 * volatile to allow concurrent readability even during updates.
405 */
406 private volatile int maximumPoolSize;
407
408 /**
409 * Current pool size, updated only while holding mainLock but
410 * volatile to allow concurrent readability even during updates.
411 */
412 private volatile int poolSize;
413
414 /**
415 * Handler called when saturated or shutdown in execute.
416 */
417 private volatile RejectedExecutionHandler handler;
418
419 /**
420 * Factory for new threads. All threads are created using this
421 * factory (via method addThread). All callers must be prepared
422 * for addThread to fail by returning null, which may reflect a
423 * system or user's policy limiting the number of threads. Even
424 * though it is not treated as an error, failure to create threads
425 * may result in new tasks being rejected or existing ones
426 * remaining stuck in the queue. On the other hand, no special
427 * precautions exist to handle OutOfMemoryErrors that might be
428 * thrown while trying to create threads, since there is generally
429 * no recourse from within this class.
430 */
431 private volatile ThreadFactory threadFactory;
432
433 /**
434 * Tracks largest attained pool size.
435 */
436 private int largestPoolSize;
437
438 /**
439 * Counter for completed tasks. Updated only on termination of
440 * worker threads.
441 */
442 private long completedTaskCount;
443
444 /**
445 * The default rejected execution handler
446 */
447 private static final RejectedExecutionHandler defaultHandler =
448 new AbortPolicy();
449
450 // Constructors
451
452 /**
453 * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
454 * parameters and default thread factory and rejected execution handler.
455 * It may be more convenient to use one of the {@link Executors} factory
456 * methods instead of this general purpose constructor.
457 *
458 * @param corePoolSize the number of threads to keep in the
459 * pool, even if they are idle.
460 * @param maximumPoolSize the maximum number of threads to allow in the
461 * pool.
462 * @param keepAliveTime when the number of threads is greater than
463 * the core, this is the maximum time that excess idle threads
464 * will wait for new tasks before terminating.
465 * @param unit the time unit for the keepAliveTime
466 * argument.
467 * @param workQueue the queue to use for holding tasks before they
468 * are executed. This queue will hold only the <tt>Runnable</tt>
469 * tasks submitted by the <tt>execute</tt> method.
470 * @throws IllegalArgumentException if corePoolSize or
471 * keepAliveTime less than zero, or if maximumPoolSize less than or
472 * equal to zero, or if corePoolSize greater than maximumPoolSize.
473 * @throws NullPointerException if <tt>workQueue</tt> is null
474 */
475 public ThreadPoolExecutor(int corePoolSize,
476 int maximumPoolSize,
477 long keepAliveTime,
478 TimeUnit unit,
479 BlockingQueue<Runnable> workQueue) {
480 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
481 Executors.defaultThreadFactory(), defaultHandler);
482 }
483
484 /**
485 * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
486 * parameters and default rejected execution handler.
487 *
488 * @param corePoolSize the number of threads to keep in the
489 * pool, even if they are idle.
490 * @param maximumPoolSize the maximum number of threads to allow in the
491 * pool.
492 * @param keepAliveTime when the number of threads is greater than
493 * the core, this is the maximum time that excess idle threads
494 * will wait for new tasks before terminating.
495 * @param unit the time unit for the keepAliveTime
496 * argument.
497 * @param workQueue the queue to use for holding tasks before they
498 * are executed. This queue will hold only the <tt>Runnable</tt>
499 * tasks submitted by the <tt>execute</tt> method.
500 * @param threadFactory the factory to use when the executor
501 * creates a new thread.
502 * @throws IllegalArgumentException if corePoolSize or
503 * keepAliveTime less than zero, or if maximumPoolSize less than or
504 * equal to zero, or if corePoolSize greater than maximumPoolSize.
505 * @throws NullPointerException if <tt>workQueue</tt>
506 * or <tt>threadFactory</tt> are null.
507 */
508 public ThreadPoolExecutor(int corePoolSize,
509 int maximumPoolSize,
510 long keepAliveTime,
511 TimeUnit unit,
512 BlockingQueue<Runnable> workQueue,
513 ThreadFactory threadFactory) {
514 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
515 threadFactory, defaultHandler);
516 }
517
518 /**
519 * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
520 * parameters and default thread factory.
521 *
522 * @param corePoolSize the number of threads to keep in the
523 * pool, even if they are idle.
524 * @param maximumPoolSize the maximum number of threads to allow in the
525 * pool.
526 * @param keepAliveTime when the number of threads is greater than
527 * the core, this is the maximum time that excess idle threads
528 * will wait for new tasks before terminating.
529 * @param unit the time unit for the keepAliveTime
530 * argument.
531 * @param workQueue the queue to use for holding tasks before they
532 * are executed. This queue will hold only the <tt>Runnable</tt>
533 * tasks submitted by the <tt>execute</tt> method.
534 * @param handler the handler to use when execution is blocked
535 * because the thread bounds and queue capacities are reached.
536 * @throws IllegalArgumentException if corePoolSize or
537 * keepAliveTime less than zero, or if maximumPoolSize less than or
538 * equal to zero, or if corePoolSize greater than maximumPoolSize.
539 * @throws NullPointerException if <tt>workQueue</tt>
540 * or <tt>handler</tt> are null.
541 */
542 public ThreadPoolExecutor(int corePoolSize,
543 int maximumPoolSize,
544 long keepAliveTime,
545 TimeUnit unit,
546 BlockingQueue<Runnable> workQueue,
547 RejectedExecutionHandler handler) {
548 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
549 Executors.defaultThreadFactory(), handler);
550 }
551
552 /**
553 * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
554 * parameters.
555 *
556 * @param corePoolSize the number of threads to keep in the
557 * pool, even if they are idle.
558 * @param maximumPoolSize the maximum number of threads to allow in the
559 * pool.
560 * @param keepAliveTime when the number of threads is greater than
561 * the core, this is the maximum time that excess idle threads
562 * will wait for new tasks before terminating.
563 * @param unit the time unit for the keepAliveTime
564 * argument.
565 * @param workQueue the queue to use for holding tasks before they
566 * are executed. This queue will hold only the <tt>Runnable</tt>
567 * tasks submitted by the <tt>execute</tt> method.
568 * @param threadFactory the factory to use when the executor
569 * creates a new thread.
570 * @param handler the handler to use when execution is blocked
571 * because the thread bounds and queue capacities are reached.
572 * @throws IllegalArgumentException if corePoolSize or
573 * keepAliveTime less than zero, or if maximumPoolSize less than or
574 * equal to zero, or if corePoolSize greater than maximumPoolSize.
575 * @throws NullPointerException if <tt>workQueue</tt>
576 * or <tt>threadFactory</tt> or <tt>handler</tt> are null.
577 */
578 public ThreadPoolExecutor(int corePoolSize,
579 int maximumPoolSize,
580 long keepAliveTime,
581 TimeUnit unit,
582 BlockingQueue<Runnable> workQueue,
583 ThreadFactory threadFactory,
584 RejectedExecutionHandler handler) {
585 if (corePoolSize < 0 ||
586 maximumPoolSize <= 0 ||
587 maximumPoolSize < corePoolSize ||
588 keepAliveTime < 0)
589 throw new IllegalArgumentException();
590 if (workQueue == null || threadFactory == null || handler == null)
591 throw new NullPointerException();
592 this.corePoolSize = corePoolSize;
593 this.maximumPoolSize = maximumPoolSize;
594 this.workQueue = workQueue;
595 this.keepAliveTime = unit.toNanos(keepAliveTime);
596 this.threadFactory = threadFactory;
597 this.handler = handler;
598 }
599
600 /*
601 * Support for execute().
602 *
603 * Method execute() and its helper methods handle the various
604 * cases encountered when new tasks are submitted. The main
605 * execute() method proceeds in 3 steps:
606 *
607 * 1. If it appears that fewer than corePoolSize threads are
608 * running, try to start a new thread with the given command as
609 * its first task. The check here errs on the side of caution.
610 * The call to addIfUnderCorePoolSize rechecks runState and pool
611 * size under lock (they change only under lock) so prevents false
612 * alarms that would add threads when it shouldn't, but may also
613 * fail to add them when they should. This is compensated within
614 * the following steps.
615 *
616 * 2. If a task can be successfully queued, then we are done, but
617 * still need to compensate for missing the fact that we should
618 * have added a thread (because existing ones died) or that
619 * shutdown occurred since entry into this method. So we recheck
620 * state and if necessary (in ensureQueuedTaskHandled) roll back
621 * the enqueuing if shut down, or start a new thread if there are
622 * none.
623 *
624 * 3. If we cannot queue task, then we try to add a new
625 * thread. There's no guesswork here (addIfUnderMaximumPoolSize)
626 * since it is performed under lock. If it fails, we know we are
627 * shut down or saturated.
628 *
629 * The reason for taking this overall approach is to normally
630 * avoid holding mainLock during this method, which would be a
631 * serious scalability bottleneck. After warmup, almost all calls
632 * take step 2 in a way that entails no locking.
633 */
634
635 /**
636 * Executes the given task sometime in the future. The task
637 * may execute in a new thread or in an existing pooled thread.
638 *
639 * If the task cannot be submitted for execution, either because this
640 * executor has been shutdown or because its capacity has been reached,
641 * the task is handled by the current <tt>RejectedExecutionHandler</tt>.
642 *
643 * @param command the task to execute
644 * @throws RejectedExecutionException at discretion of
645 * <tt>RejectedExecutionHandler</tt>, if task cannot be accepted
646 * for execution
647 * @throws NullPointerException if command is null
648 */
649 public void execute(Runnable command) {
650 if (command == null)
651 throw new NullPointerException();
652 if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
653 if (runState == RUNNING && workQueue.offer(command)) {
654 if (runState != RUNNING || poolSize == 0)
655 ensureQueuedTaskHandled(command);
656 }
657 else if (!addIfUnderMaximumPoolSize(command))
658 reject(command); // is shutdown or saturated
659 }
660 }
661
662 /**
663 * Creates and returns a new thread running firstTask as its first
664 * task. Call only while holding mainLock.
665 *
666 * @param firstTask the task the new thread should run first (or
667 * null if none)
668 * @return the new thread, or null if threadFactory fails to create thread
669 */
670 private Thread addThread(Runnable firstTask) {
671 Worker w = new Worker(firstTask);
672 Thread t = threadFactory.newThread(w);
673 if (t != null) {
674 w.thread = t;
675 workers.add(w);
676 int nt = ++poolSize;
677 if (nt > largestPoolSize)
678 largestPoolSize = nt;
679 }
680 return t;
681 }
682
683 /**
684 * Creates and starts a new thread running firstTask as its first
685 * task, only if fewer than corePoolSize threads are running
686 * and the pool is not shut down.
687 * @param firstTask the task the new thread should run first (or
688 * null if none)
689 * @return true if successful
690 */
691 private boolean addIfUnderCorePoolSize(Runnable firstTask) {
692 Thread t = null;
693 final ReentrantLock mainLock = this.mainLock;
694 mainLock.lock();
695 try {
696 if (poolSize < corePoolSize && runState == RUNNING)
697 t = addThread(firstTask);
698 } finally {
699 mainLock.unlock();
700 }
701 if (t == null)
702 return false;
703 t.start();
704 return true;
705 }
706
707 /**
708 * Creates and starts a new thread running firstTask as its first
709 * task, only if fewer than maximumPoolSize threads are running
710 * and pool is not shut down.
711 * @param firstTask the task the new thread should run first (or
712 * null if none)
713 * @return true if successful
714 */
715 private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {
716 Thread t = null;
717 final ReentrantLock mainLock = this.mainLock;
718 mainLock.lock();
719 try {
720 if (poolSize < maximumPoolSize && runState == RUNNING)
721 t = addThread(firstTask);
722 } finally {
723 mainLock.unlock();
724 }
725 if (t == null)
726 return false;
727 t.start();
728 return true;
729 }
730
731 /**
732 * Rechecks state after queuing a task. Called from execute when
733 * pool state has been observed to change after queuing a task. If
734 * the task was queued concurrently with a call to shutdownNow,
735 * and is still present in the queue, this task must be removed
736 * and rejected to preserve shutdownNow guarantees. Otherwise,
737 * this method ensures (unless addThread fails) that there is at
738 * least one live thread to handle this task
739 * @param command the task
740 */
741 private void ensureQueuedTaskHandled(Runnable command) {
742 final ReentrantLock mainLock = this.mainLock;
743 mainLock.lock();
744 boolean reject = false;
745 Thread t = null;
746 try {
747 int state = runState;
748 if (state != RUNNING && workQueue.remove(command))
749 reject = true;
750 else if (state < STOP &&
751 poolSize < Math.max(corePoolSize, 1) &&
752 !workQueue.isEmpty())
753 t = addThread(null);
754 } finally {
755 mainLock.unlock();
756 }
757 if (reject)
758 reject(command);
759 else if (t != null)
760 t.start();
761 }
762
763 /**
764 * Invokes the rejected execution handler for the given command.
765 */
766 void reject(Runnable command) {
767 handler.rejectedExecution(command, this);
768 }
769
770
771 /**
772 * Worker threads.
773 *
774 * Worker threads can start out life either with an initial first
775 * task, or without one. Normally, they are started with a first
776 * task. This enables execute(), etc to bypass queuing when there
777 * are fewer than corePoolSize threads (in which case we always
778 * start one), or when the queue is full.(in which case we must
779 * bypass queue.) Initially idle threads are created either by
780 * users (prestartCoreThread and setCorePoolSize) or when methods
781 * ensureQueuedTaskHandled and tryTerminate notice that the queue
782 * is not empty but there are no active threads to handle them.
783 *
784 * After completing a task, workers try to get another one,
785 * via method getTask. If they cannot (i.e., getTask returns
786 * null), they exit, calling workerDone to update pool state.
787 *
788 * When starting to run a task, unless the pool is stopped, each
789 * worker thread ensures that it is not interrupted, and uses
790 * runLock to prevent the pool from interrupting it in the midst
791 * of execution. This shields user tasks from any interrupts that
792 * may otherwise be needed during shutdown (see method
793 * interruptIdleWorkers), unless the pool is stopping (via
794 * shutdownNow) in which case interrupts are let through to affect
795 * both tasks and workers. However, this shielding does not
796 * necessarily protect the workers from lagging interrupts from
797 * other user threads directed towards tasks that have already
798 * been completed. Thus, a worker thread may be interrupted
799 * needlessly (for example in getTask), in which case it rechecks
800 * pool state to see if it should exit.
801 */
802 private final class Worker implements Runnable {
803 /**
804 * The runLock is acquired and released surrounding each task
805 * execution. It mainly protects against interrupts that are
806 * intended to cancel the worker thread from instead
807 * interrupting the task being run.
808 */
809 private final ReentrantLock runLock = new ReentrantLock();
810
811 /**
812 * Initial task to run before entering run loop. Possibly null.
813 */
814 private Runnable firstTask;
815
816 /**
817 * Per thread completed task counter; accumulated
818 * into completedTaskCount upon termination.
819 */
820 volatile long completedTasks;
821
822 /**
823 * Thread this worker is running in. Acts as a final field,
824 * but cannot be set until thread is created.
825 */
826 Thread thread;
827
828 Worker(Runnable firstTask) {
829 this.firstTask = firstTask;
830 }
831
832 boolean isActive() {
833 return runLock.isLocked();
834 }
835
836 /**
837 * Interrupts thread if not running a task.
838 */
839 void interruptIfIdle() {
840 final ReentrantLock runLock = this.runLock;
841 if (runLock.tryLock()) {
842 try {
843 if (thread != Thread.currentThread())
844 thread.interrupt();
845 } finally {
846 runLock.unlock();
847 }
848 }
849 }
850
851 /**
852 * Interrupts thread even if running a task.
853 */
854 void interruptNow() {
855 thread.interrupt();
856 }
857
858 /**
859 * Runs a single task between before/after methods.
860 */
861 private void runTask(Runnable task) {
862 final ReentrantLock runLock = this.runLock;
863 runLock.lock();
864 try {
865 /*
866 * Ensure that unless pool is stopping, this thread
867 * does not have its interrupt set. This requires a
868 * double-check of state in case the interrupt was
869 * cleared concurrently with a shutdownNow -- if so,
870 * the interrupt is re-enabled.
871 */
872 if (runState < STOP &&
873 Thread.interrupted() &&
874 runState >= STOP)
875 thread.interrupt();
876 /*
877 * Track execution state to ensure that afterExecute
878 * is called only if task completed or threw
879 * exception. Otherwise, the caught runtime exception
880 * will have been thrown by afterExecute itself, in
881 * which case we don't want to call it again.
882 */
883 boolean ran = false;
884 beforeExecute(thread, task);
885 try {
886 task.run();
887 ran = true;
888 afterExecute(task, null);
889 ++completedTasks;
890 } catch (RuntimeException ex) {
891 if (!ran)
892 afterExecute(task, ex);
893 throw ex;
894 }
895 } finally {
896 runLock.unlock();
897 }
898 }
899
900 /**
901 * Main run loop
902 */
903 public void run() {
904 try {
905 Runnable task = firstTask;
906 firstTask = null;
907 while (task != null || (task = getTask()) != null) {
908 runTask(task);
909 task = null;
910 }
911 } finally {
912 workerDone(this);
913 }
914 }
915 }
916
917 /* Utilities for worker thread control */
918
919 /**
920 * Gets the next task for a worker thread to run. The general
921 * approach is similar to execute() in that worker threads trying
922 * to get a task to run do so on the basis of prevailing state
923 * accessed outside of locks. This may cause them to choose the
924 * "wrong" action, such as trying to exit because no tasks
925 * appear to be available, or entering a take when the pool is in
926 * the process of being shut down. These potential problems are
927 * countered by (1) rechecking pool state (in workerCanExit)
928 * before giving up, and (2) interrupting other workers upon
929 * shutdown, so they can recheck state. All other user-based state
930 * changes (to allowCoreThreadTimeOut etc) are OK even when
931 * performed asynchronously wrt getTask.
932 *
933 * @return the task
934 */
935 Runnable getTask() {
936 for (;;) {
937 try {
938 int state = runState;
939 if (state > SHUTDOWN)
940 return null;
941 Runnable r;
942 if (state == SHUTDOWN) // Help drain queue
943 r = workQueue.poll();
944 else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
945 r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
946 else
947 r = workQueue.take();
948 if (r != null)
949 return r;
950 if (workerCanExit()) {
951 if (runState >= SHUTDOWN) // Wake up others
952 interruptIdleWorkers();
953 return null;
954 }
955 // Else retry
956 } catch (InterruptedException ie) {
957 // On interruption, re-check runState
958 }
959 }
960 }
961
962 /**
963 * Check whether a worker thread that fails to get a task can
964 * exit. We allow a worker thread to die if the pool is stopping,
965 * or the queue is empty, or there is at least one thread to
966 * handle possibly non-empty queue, even if core timeouts are
967 * allowed.
968 */
969 private boolean workerCanExit() {
970 final ReentrantLock mainLock = this.mainLock;
971 mainLock.lock();
972 boolean canExit;
973 try {
974 canExit = runState >= STOP ||
975 workQueue.isEmpty() ||
976 (allowCoreThreadTimeOut &&
977 poolSize > Math.max(1, corePoolSize));
978 } finally {
979 mainLock.unlock();
980 }
981 return canExit;
982 }
983
984 /**
985 * Wakes up all threads that might be waiting for tasks so they
986 * can check for termination. Note: this method is also called by
987 * ScheduledThreadPoolExecutor.
988 */
989 void interruptIdleWorkers() {
990 final ReentrantLock mainLock = this.mainLock;
991 mainLock.lock();
992 try {
993 for (Worker w : workers)
994 w.interruptIfIdle();
995 } finally {
996 mainLock.unlock();
997 }
998 }
999
1000 /**
1001 * Performs bookkeeping for an exiting worker thread.
1002 * @param w the worker
1003 */
1004 void workerDone(Worker w) {
1005 final ReentrantLock mainLock = this.mainLock;
1006 mainLock.lock();
1007 try {
1008 completedTaskCount += w.completedTasks;
1009 workers.remove(w);
1010 if (--poolSize == 0)
1011 tryTerminate();
1012 } finally {
1013 mainLock.unlock();
1014 }
1015 }
1016
1017 /* Termination support. */
1018
1019 /**
1020 * Transitions to TERMINATED state if either (SHUTDOWN and pool
1021 * and queue empty) or (STOP and pool empty), otherwise unless
1022 * stopped, ensuring that there is at least one live thread to
1023 * handle queued tasks.
1024 *
1025 * This method is called from the three places in which
1026 * termination can occur: in workerDone on exit of the last thread
1027 * after pool has been shut down, or directly within calls to
1028 * shutdown or shutdownNow, if there are no live threads.
1029 */
1030 private void tryTerminate() {
1031 if (poolSize == 0) {
1032 int state = runState;
1033 if (state < STOP && !workQueue.isEmpty()) {
1034 state = RUNNING; // disable termination check below
1035 Thread t = addThread(null);
1036 if (t != null)
1037 t.start();
1038 }
1039 if (state == STOP || state == SHUTDOWN) {
1040 runState = TERMINATED;
1041 termination.signalAll();
1042 terminated();
1043 }
1044 }
1045 }
1046
1047 /**
1048 * Initiates an orderly shutdown in which previously submitted
1049 * tasks are executed, but no new tasks will be
1050 * accepted. Invocation has no additional effect if already shut
1051 * down.
1052 * @throws SecurityException if a security manager exists and
1053 * shutting down this ExecutorService may manipulate threads that
1054 * the caller is not permitted to modify because it does not hold
1055 * {@link java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
1056 * or the security manager's <tt>checkAccess</tt> method denies access.
1057 */
1058 public void shutdown() {
1059 /*
1060 * Conceptually, shutdown is just a matter of changing the
1061 * runState to SHUTDOWN, and then interrupting any worker
1062 * threads that might be blocked in getTask() to wake them up
1063 * so they can exit. Then, if there happen not to be any
1064 * threads or tasks, we can directly terminate pool via
1065 * tryTerminate. Else, the last worker to leave the building
1066 * turns off the lights (in workerDone).
1067 *
1068 * But this is made more delicate because we must cooperate
1069 * with the security manager (if present), which may implement
1070 * policies that make more sense for operations on Threads
1071 * than they do for ThreadPools. This requires 3 steps:
1072 *
1073 * 1. Making sure caller has permission to shut down threads
1074 * in general (see shutdownPerm).
1075 *
1076 * 2. If (1) passes, making sure the caller is allowed to
1077 * modify each of our threads. This might not be true even if
1078 * first check passed, if the SecurityManager treats some
1079 * threads specially. If this check passes, then we can try
1080 * to set runState.
1081 *
1082 * 3. If both (1) and (2) pass, dealing with inconsistent
1083 * security managers that allow checkAccess but then throw a
1084 * SecurityException when interrupt() is invoked. In this
1085 * third case, because we have already set runState, we can
1086 * only try to back out from the shutdown as cleanly as
1087 * possible. Some workers may have been killed but we remain
1088 * in non-shutdown state (which may entail tryTerminate from
1089 * workerDone starting a new worker to maintain liveness.)
1090 */
1091
1092 SecurityManager security = System.getSecurityManager();
1093 if (security != null)
1094 security.checkPermission(shutdownPerm);
1095
1096 final ReentrantLock mainLock = this.mainLock;
1097 mainLock.lock();
1098 try {
1099 if (security != null) { // Check if caller can modify our threads
1100 for (Worker w : workers)
1101 security.checkAccess(w.thread);
1102 }
1103
1104 int state = runState;
1105 if (state < SHUTDOWN)
1106 runState = SHUTDOWN;
1107
1108 try {
1109 for (Worker w : workers) {
1110 w.interruptIfIdle();
1111 }
1112 } catch (SecurityException se) { // Try to back out
1113 runState = state;
1114 // tryTerminate() here would be a no-op
1115 throw se;
1116 }
1117
1118 tryTerminate(); // Terminate now if pool and queue empty
1119 } finally {
1120 mainLock.unlock();
1121 }
1122 }
1123
1124 /**
1125 * Attempts to stop all actively executing tasks, halts the
1126 * processing of waiting tasks, and returns a list of the tasks
1127 * that were awaiting execution. These tasks are drained (removed)
1128 * from the task queue upon return from this method.
1129 *
1130 * <p>There are no guarantees beyond best-effort attempts to stop
1131 * processing actively executing tasks. This implementation
1132 * cancels tasks via {@link Thread#interrupt}, so any task that
1133 * fails to respond to interrupts may never terminate.
1134 *
1135 * @return list of tasks that never commenced execution
1136 * @throws SecurityException if a security manager exists and
1137 * shutting down this ExecutorService may manipulate threads that
1138 * the caller is not permitted to modify because it does not hold
1139 * {@link java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
1140 * or the security manager's <tt>checkAccess</tt> method denies access.
1141 */
1142 public List<Runnable> shutdownNow() {
1143 /*
1144 * shutdownNow differs from shutdown only in that
1145 * 1. runState is set to STOP,
1146 * 2. all worker threads are interrupted, not just the idle ones, and
1147 * 3. the queue is drained and returned.
1148 */
1149 SecurityManager security = System.getSecurityManager();
1150 if (security != null)
1151 security.checkPermission(shutdownPerm);
1152
1153 final ReentrantLock mainLock = this.mainLock;
1154 mainLock.lock();
1155 try {
1156 if (security != null) { // Check if caller can modify our threads
1157 for (Worker w : workers)
1158 security.checkAccess(w.thread);
1159 }
1160
1161 int state = runState;
1162 if (state < STOP)
1163 runState = STOP;
1164
1165 try {
1166 for (Worker w : workers) {
1167 w.interruptNow();
1168 }
1169 } catch (SecurityException se) { // Try to back out
1170 runState = state;
1171 // tryTerminate() here would be a no-op
1172 throw se;
1173 }
1174
1175 List<Runnable> tasks = drainQueue();
1176 tryTerminate(); // Terminate now if pool and queue empty
1177 return tasks;
1178 } finally {
1179 mainLock.unlock();
1180 }
1181 }
1182
1183 /**
1184 * Drains the task queue into a new list. Used by shutdownNow.
1185 * Call only while holding main lock.
1186 */
1187 private List<Runnable> drainQueue() {
1188 List<Runnable> taskList = new ArrayList<Runnable>();
1189 workQueue.drainTo(taskList);
1190 /*
1191 * If the queue is a DelayQueue or any other kind of queue
1192 * for which poll or drainTo may fail to remove some elements,
1193 * we need to manually traverse and remove remaining tasks.
1194 * To guarantee atomicity wrt other threads using this queue,
1195 * we need to create a new iterator for each element removed.
1196 */
1197 while (!workQueue.isEmpty()) {
1198 Iterator<Runnable> it = workQueue.iterator();
1199 try {
1200 if (it.hasNext()) {
1201 Runnable r = it.next();
1202 if (workQueue.remove(r))
1203 taskList.add(r);
1204 }
1205 } catch (ConcurrentModificationException ignore) {
1206 }
1207 }
1208 return taskList;
1209 }
1210
1211 public boolean isShutdown() {
1212 return runState != RUNNING;
1213 }
1214
1215 /**
1216 * Returns true if shutdownNow has been invoked but this executor
1217 * has not completely terminated.
1218 */
1219 boolean isStopped() {
1220 return runState == STOP;
1221 }
1222
1223 /**
1224 * Returns true if this executor is in the process of terminating
1225 * after <tt>shutdown</tt> or <tt>shutdownNow</tt> but has not
1226 * completely terminated. This method may be useful for
1227 * debugging. A return of <tt>true</tt> reported a sufficient
1228 * period after shutdown may indicate that submitted tasks have
1229 * ignored or suppressed interruption, causing this executor not
1230 * to properly terminate.
1231 * @return true if terminating but not yet terminated
1232 */
1233 public boolean isTerminating() {
1234 int state = runState;
1235 return state == SHUTDOWN || state == STOP;
1236 }
1237
1238 public boolean isTerminated() {
1239 return runState == TERMINATED;
1240 }
1241
1242 public boolean awaitTermination(long timeout, TimeUnit unit)
1243 throws InterruptedException {
1244 long nanos = unit.toNanos(timeout);
1245 final ReentrantLock mainLock = this.mainLock;
1246 mainLock.lock();
1247 try {
1248 for (;;) {
1249 if (runState == TERMINATED)
1250 return true;
1251 if (nanos <= 0)
1252 return false;
1253 nanos = termination.awaitNanos(nanos);
1254 }
1255 } finally {
1256 mainLock.unlock();
1257 }
1258 }
1259
1260 /**
1261 * Invokes <tt>shutdown</tt> when this executor is no longer
1262 * referenced.
1263 */
1264 protected void finalize() {
1265 shutdown();
1266 }
1267
1268 /* Getting and setting tunable parameters */
1269
1270 /**
1271 * Sets the thread factory used to create new threads.
1272 *
1273 * @param threadFactory the new thread factory
1274 * @throws NullPointerException if threadFactory is null
1275 * @see #getThreadFactory
1276 */
1277 public void setThreadFactory(ThreadFactory threadFactory) {
1278 if (threadFactory == null)
1279 throw new NullPointerException();
1280 this.threadFactory = threadFactory;
1281 }
1282
1283 /**
1284 * Returns the thread factory used to create new threads.
1285 *
1286 * @return the current thread factory
1287 * @see #setThreadFactory
1288 */
1289 public ThreadFactory getThreadFactory() {
1290 return threadFactory;
1291 }
1292
1293 /**
1294 * Sets a new handler for unexecutable tasks.
1295 *
1296 * @param handler the new handler
1297 * @throws NullPointerException if handler is null
1298 * @see #getRejectedExecutionHandler
1299 */
1300 public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
1301 if (handler == null)
1302 throw new NullPointerException();
1303 this.handler = handler;
1304 }
1305
1306 /**
1307 * Returns the current handler for unexecutable tasks.
1308 *
1309 * @return the current handler
1310 * @see #setRejectedExecutionHandler
1311 */
1312 public RejectedExecutionHandler getRejectedExecutionHandler() {
1313 return handler;
1314 }
1315
1316 /**
1317 * Sets the core number of threads. This overrides any value set
1318 * in the constructor. If the new value is smaller than the
1319 * current value, excess existing threads will be terminated when
1320 * they next become idle. If larger, new threads will, if needed,
1321 * be started to execute any queued tasks.
1322 *
1323 * @param corePoolSize the new core size
1324 * @throws IllegalArgumentException if <tt>corePoolSize</tt>
1325 * less than zero
1326 * @see #getCorePoolSize
1327 */
1328 public void setCorePoolSize(int corePoolSize) {
1329 if (corePoolSize < 0)
1330 throw new IllegalArgumentException();
1331 final ReentrantLock mainLock = this.mainLock;
1332 mainLock.lock();
1333 try {
1334 int extra = this.corePoolSize - corePoolSize;
1335 this.corePoolSize = corePoolSize;
1336 if (extra < 0) {
1337 int n = workQueue.size(); // don't add more threads than tasks
1338 while (extra++ < 0 && n-- > 0 && poolSize < corePoolSize) {
1339 Thread t = addThread(null);
1340 if (t != null)
1341 t.start();
1342 else
1343 break;
1344 }
1345 }
1346 else if (extra > 0 && poolSize > corePoolSize) {
1347 try {
1348 Iterator<Worker> it = workers.iterator();
1349 while (it.hasNext() &&
1350 extra-- > 0 &&
1351 poolSize > corePoolSize &&
1352 workQueue.remainingCapacity() == 0)
1353 it.next().interruptIfIdle();
1354 } catch (SecurityException ignore) {
1355 // Not an error; it is OK if the threads stay live
1356 }
1357 }
1358 } finally {
1359 mainLock.unlock();
1360 }
1361 }
1362
1363 /**
1364 * Returns the core number of threads.
1365 *
1366 * @return the core number of threads
1367 * @see #setCorePoolSize
1368 */
1369 public int getCorePoolSize() {
1370 return corePoolSize;
1371 }
1372
1373 /**
1374 * Starts a core thread, causing it to idly wait for work. This
1375 * overrides the default policy of starting core threads only when
1376 * new tasks are executed. This method will return <tt>false</tt>
1377 * if all core threads have already been started.
1378 * @return true if a thread was started
1379 */
1380 public boolean prestartCoreThread() {
1381 return addIfUnderCorePoolSize(null);
1382 }
1383
1384 /**
1385 * Starts all core threads, causing them to idly wait for work. This
1386 * overrides the default policy of starting core threads only when
1387 * new tasks are executed.
1388 * @return the number of threads started
1389 */
1390 public int prestartAllCoreThreads() {
1391 int n = 0;
1392 while (addIfUnderCorePoolSize(null))
1393 ++n;
1394 return n;
1395 }
1396
1397 /**
1398 * Returns true if this pool allows core threads to time out and
1399 * terminate if no tasks arrive within the keepAlive time, being
1400 * replaced if needed when new tasks arrive. When true, the same
1401 * keep-alive policy applying to non-core threads applies also to
1402 * core threads. When false (the default), core threads are never
1403 * terminated due to lack of incoming tasks.
1404 * @return <tt>true</tt> if core threads are allowed to time out,
1405 * else <tt>false</tt>
1406 *
1407 * @since 1.6
1408 */
1409 public boolean allowsCoreThreadTimeOut() {
1410 return allowCoreThreadTimeOut;
1411 }
1412
1413 /**
1414 * Sets the policy governing whether core threads may time out and
1415 * terminate if no tasks arrive within the keep-alive time, being
1416 * replaced if needed when new tasks arrive. When false, core
1417 * threads are never terminated due to lack of incoming
1418 * tasks. When true, the same keep-alive policy applying to
1419 * non-core threads applies also to core threads. To avoid
1420 * continual thread replacement, the keep-alive time must be
1421 * greater than zero when setting <tt>true</tt>. This method
1422 * should in general be called before the pool is actively used.
1423 * @param value <tt>true</tt> if should time out, else <tt>false</tt>
1424 * @throws IllegalArgumentException if value is <tt>true</tt>
1425 * and the current keep-alive time is not greater than zero.
1426 *
1427 * @since 1.6
1428 */
1429 public void allowCoreThreadTimeOut(boolean value) {
1430 if (value && keepAliveTime <= 0)
1431 throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
1432
1433 allowCoreThreadTimeOut = value;
1434 }
1435
1436 /**
1437 * Sets the maximum allowed number of threads. This overrides any
1438 * value set in the constructor. If the new value is smaller than
1439 * the current value, excess existing threads will be
1440 * terminated when they next become idle.
1441 *
1442 * @param maximumPoolSize the new maximum
1443 * @throws IllegalArgumentException if the new maximum is
1444 * less than or equal to zero, or
1445 * less than the {@linkplain #getCorePoolSize core pool size}
1446 * @see #getMaximumPoolSize
1447 */
1448 public void setMaximumPoolSize(int maximumPoolSize) {
1449 if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
1450 throw new IllegalArgumentException();
1451 final ReentrantLock mainLock = this.mainLock;
1452 mainLock.lock();
1453 try {
1454 int extra = this.maximumPoolSize - maximumPoolSize;
1455 this.maximumPoolSize = maximumPoolSize;
1456 if (extra > 0 && poolSize > maximumPoolSize) {
1457 try {
1458 Iterator<Worker> it = workers.iterator();
1459 while (it.hasNext() &&
1460 extra > 0 &&
1461 poolSize > maximumPoolSize) {
1462 it.next().interruptIfIdle();
1463 --extra;
1464 }
1465 } catch (SecurityException ignore) {
1466 // Not an error; it is OK if the threads stay live
1467 }
1468 }
1469 } finally {
1470 mainLock.unlock();
1471 }
1472 }
1473
1474 /**
1475 * Returns the maximum allowed number of threads.
1476 *
1477 * @return the maximum allowed number of threads
1478 * @see #setMaximumPoolSize
1479 */
1480 public int getMaximumPoolSize() {
1481 return maximumPoolSize;
1482 }
1483
1484 /**
1485 * Sets the time limit for which threads may remain idle before
1486 * being terminated. If there are more than the core number of
1487 * threads currently in the pool, after waiting this amount of
1488 * time without processing a task, excess threads will be
1489 * terminated. This overrides any value set in the constructor.
1490 * @param time the time to wait. A time value of zero will cause
1491 * excess threads to terminate immediately after executing tasks.
1492 * @param unit the time unit of the time argument
1493 * @throws IllegalArgumentException if time less than zero or
1494 * if time is zero and allowsCoreThreadTimeOut
1495 * @see #getKeepAliveTime
1496 */
1497 public void setKeepAliveTime(long time, TimeUnit unit) {
1498 if (time < 0)
1499 throw new IllegalArgumentException();
1500 if (time == 0 && allowsCoreThreadTimeOut())
1501 throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
1502 this.keepAliveTime = unit.toNanos(time);
1503 }
1504
1505 /**
1506 * Returns the thread keep-alive time, which is the amount of time
1507 * that threads in excess of the core pool size may remain
1508 * idle before being terminated.
1509 *
1510 * @param unit the desired time unit of the result
1511 * @return the time limit
1512 * @see #setKeepAliveTime
1513 */
1514 public long getKeepAliveTime(TimeUnit unit) {
1515 return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);
1516 }
1517
1518 /* User-level queue utilities */
1519
1520 /**
1521 * Returns the task queue used by this executor. Access to the
1522 * task queue is intended primarily for debugging and monitoring.
1523 * This queue may be in active use. Retrieving the task queue
1524 * does not prevent queued tasks from executing.
1525 *
1526 * @return the task queue
1527 */
1528 public BlockingQueue<Runnable> getQueue() {
1529 return workQueue;
1530 }
1531
1532 /**
1533 * Removes this task from the executor's internal queue if it is
1534 * present, thus causing it not to be run if it has not already
1535 * started.
1536 *
1537 * <p> This method may be useful as one part of a cancellation
1538 * scheme. It may fail to remove tasks that have been converted
1539 * into other forms before being placed on the internal queue. For
1540 * example, a task entered using <tt>submit</tt> might be
1541 * converted into a form that maintains <tt>Future</tt> status.
1542 * However, in such cases, method {@link ThreadPoolExecutor#purge}
1543 * may be used to remove those Futures that have been cancelled.
1544 *
1545 * @param task the task to remove
1546 * @return true if the task was removed
1547 */
1548 public boolean remove(Runnable task) {
1549 return getQueue().remove(task);
1550 }
1551
1552 /**
1553 * Tries to remove from the work queue all {@link Future}
1554 * tasks that have been cancelled. This method can be useful as a
1555 * storage reclamation operation, that has no other impact on
1556 * functionality. Cancelled tasks are never executed, but may
1557 * accumulate in work queues until worker threads can actively
1558 * remove them. Invoking this method instead tries to remove them now.
1559 * However, this method may fail to remove tasks in
1560 * the presence of interference by other threads.
1561 */
1562 public void purge() {
1563 // Fail if we encounter interference during traversal
1564 try {
1565 Iterator<Runnable> it = getQueue().iterator();
1566 while (it.hasNext()) {
1567 Runnable r = it.next();
1568 if (r instanceof Future<?>) {
1569 Future<?> c = (Future<?>)r;
1570 if (c.isCancelled())
1571 it.remove();
1572 }
1573 }
1574 }
1575 catch (ConcurrentModificationException ex) {
1576 return;
1577 }
1578 }
1579
1580 /* Statistics */
1581
1582 /**
1583 * Returns the current number of threads in the pool.
1584 *
1585 * @return the number of threads
1586 */
1587 public int getPoolSize() {
1588 return poolSize;
1589 }
1590
1591 /**
1592 * Returns the approximate number of threads that are actively
1593 * executing tasks.
1594 *
1595 * @return the number of threads
1596 */
1597 public int getActiveCount() {
1598 final ReentrantLock mainLock = this.mainLock;
1599 mainLock.lock();
1600 try {
1601 int n = 0;
1602 for (Worker w : workers) {
1603 if (w.isActive())
1604 ++n;
1605 }
1606 return n;
1607 } finally {
1608 mainLock.unlock();
1609 }
1610 }
1611
1612 /**
1613 * Returns the largest number of threads that have ever
1614 * simultaneously been in the pool.
1615 *
1616 * @return the number of threads
1617 */
1618 public int getLargestPoolSize() {
1619 final ReentrantLock mainLock = this.mainLock;
1620 mainLock.lock();
1621 try {
1622 return largestPoolSize;
1623 } finally {
1624 mainLock.unlock();
1625 }
1626 }
1627
1628 /**
1629 * Returns the approximate total number of tasks that have ever been
1630 * scheduled for execution. Because the states of tasks and
1631 * threads may change dynamically during computation, the returned
1632 * value is only an approximation.
1633 *
1634 * @return the number of tasks
1635 */
1636 public long getTaskCount() {
1637 final ReentrantLock mainLock = this.mainLock;
1638 mainLock.lock();
1639 try {
1640 long n = completedTaskCount;
1641 for (Worker w : workers) {
1642 n += w.completedTasks;
1643 if (w.isActive())
1644 ++n;
1645 }
1646 return n + workQueue.size();
1647 } finally {
1648 mainLock.unlock();
1649 }
1650 }
1651
1652 /**
1653 * Returns the approximate total number of tasks that have
1654 * completed execution. Because the states of tasks and threads
1655 * may change dynamically during computation, the returned value
1656 * is only an approximation, but one that does not ever decrease
1657 * across successive calls.
1658 *
1659 * @return the number of tasks
1660 */
1661 public long getCompletedTaskCount() {
1662 final ReentrantLock mainLock = this.mainLock;
1663 mainLock.lock();
1664 try {
1665 long n = completedTaskCount;
1666 for (Worker w : workers)
1667 n += w.completedTasks;
1668 return n;
1669 } finally {
1670 mainLock.unlock();
1671 }
1672 }
1673
1674 /* Extension hooks */
1675
1676 /**
1677 * Method invoked prior to executing the given Runnable in the
1678 * given thread. This method is invoked by thread <tt>t</tt> that
1679 * will execute task <tt>r</tt>, and may be used to re-initialize
1680 * ThreadLocals, or to perform logging.
1681 *
1682 * <p>This implementation does nothing, but may be customized in
1683 * subclasses. Note: To properly nest multiple overridings, subclasses
1684 * should generally invoke <tt>super.beforeExecute</tt> at the end of
1685 * this method.
1686 *
1687 * @param t the thread that will run task r.
1688 * @param r the task that will be executed.
1689 */
1690 protected void beforeExecute(Thread t, Runnable r) { }
1691
1692 /**
1693 * Method invoked upon completion of execution of the given Runnable.
1694 * This method is invoked by the thread that executed the task. If
1695 * non-null, the Throwable is the uncaught <tt>RuntimeException</tt>
1696 * or <tt>Error</tt> that caused execution to terminate abruptly.
1697 *
1698 * <p><b>Note:</b> When actions are enclosed in tasks (such as
1699 * {@link FutureTask}) either explicitly or via methods such as
1700 * <tt>submit</tt>, these task objects catch and maintain
1701 * computational exceptions, and so they do not cause abrupt
1702 * termination, and the internal exceptions are <em>not</em>
1703 * passed to this method.
1704 *
1705 * <p>This implementation does nothing, but may be customized in
1706 * subclasses. Note: To properly nest multiple overridings, subclasses
1707 * should generally invoke <tt>super.afterExecute</tt> at the
1708 * beginning of this method.
1709 *
1710 * @param r the runnable that has completed.
1711 * @param t the exception that caused termination, or null if
1712 * execution completed normally.
1713 */
1714 protected void afterExecute(Runnable r, Throwable t) { }
1715
1716 /**
1717 * Method invoked when the Executor has terminated. Default
1718 * implementation does nothing. Note: To properly nest multiple
1719 * overridings, subclasses should generally invoke
1720 * <tt>super.terminated</tt> within this method.
1721 */
1722 protected void terminated() { }
1723
1724 /* Predefined RejectedExecutionHandlers */
1725
1726 /**
1727 * A handler for rejected tasks that runs the rejected task
1728 * directly in the calling thread of the <tt>execute</tt> method,
1729 * unless the executor has been shut down, in which case the task
1730 * is discarded.
1731 */
1732 public static class CallerRunsPolicy implements RejectedExecutionHandler {
1733 /**
1734 * Creates a <tt>CallerRunsPolicy</tt>.
1735 */
1736 public CallerRunsPolicy() { }
1737
1738 /**
1739 * Executes task r in the caller's thread, unless the executor
1740 * has been shut down, in which case the task is discarded.
1741 * @param r the runnable task requested to be executed
1742 * @param e the executor attempting to execute this task
1743 */
1744 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1745 if (!e.isShutdown()) {
1746 r.run();
1747 }
1748 }
1749 }
1750
1751 /**
1752 * A handler for rejected tasks that throws a
1753 * <tt>RejectedExecutionException</tt>.
1754 */
1755 public static class AbortPolicy implements RejectedExecutionHandler {
1756 /**
1757 * Creates an <tt>AbortPolicy</tt>.
1758 */
1759 public AbortPolicy() { }
1760
1761 /**
1762 * Always throws RejectedExecutionException.
1763 * @param r the runnable task requested to be executed
1764 * @param e the executor attempting to execute this task
1765 * @throws RejectedExecutionException always.
1766 */
1767 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1768 throw new RejectedExecutionException();
1769 }
1770 }
1771
1772 /**
1773 * A handler for rejected tasks that silently discards the
1774 * rejected task.
1775 */
1776 public static class DiscardPolicy implements RejectedExecutionHandler {
1777 /**
1778 * Creates a <tt>DiscardPolicy</tt>.
1779 */
1780 public DiscardPolicy() { }
1781
1782 /**
1783 * Does nothing, which has the effect of discarding task r.
1784 * @param r the runnable task requested to be executed
1785 * @param e the executor attempting to execute this task
1786 */
1787 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1788 }
1789 }
1790
1791 /**
1792 * A handler for rejected tasks that discards the oldest unhandled
1793 * request and then retries <tt>execute</tt>, unless the executor
1794 * is shut down, in which case the task is discarded.
1795 */
1796 public static class DiscardOldestPolicy implements RejectedExecutionHandler {
1797 /**
1798 * Creates a <tt>DiscardOldestPolicy</tt> for the given executor.
1799 */
1800 public DiscardOldestPolicy() { }
1801
1802 /**
1803 * Obtains and ignores the next task that the executor
1804 * would otherwise execute, if one is immediately available,
1805 * and then retries execution of task r, unless the executor
1806 * is shut down, in which case task r is instead discarded.
1807 * @param r the runnable task requested to be executed
1808 * @param e the executor attempting to execute this task
1809 */
1810 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1811 if (!e.isShutdown()) {
1812 e.getQueue().poll();
1813 e.execute(r);
1814 }
1815 }
1816 }
1817}
1818