0N/A/*
0N/A * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
0N/A *
0N/A * This code is free software; you can redistribute it and/or modify it
0N/A * under the terms of the GNU General Public License version 2 only, as
2362N/A * published by the Free Software Foundation. Oracle designates this
0N/A * particular file as subject to the "Classpath" exception as provided
2362N/A * by Oracle in the LICENSE file that accompanied this code.
0N/A *
0N/A * This code is distributed in the hope that it will be useful, but WITHOUT
0N/A * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
0N/A * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
0N/A * version 2 for more details (a copy is included in the LICENSE file that
0N/A * accompanied this code).
0N/A *
0N/A * You should have received a copy of the GNU General Public License version
0N/A * 2 along with this work; if not, write to the Free Software Foundation,
0N/A * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
0N/A *
2362N/A * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
2362N/A * or visit www.oracle.com if you need additional information or have any
2362N/A * questions.
0N/A */
0N/A
0N/A/*
0N/A * This file is available under and governed by the GNU General Public
0N/A * License version 2 only, as published by the Free Software Foundation.
0N/A * However, the following notice accompanied the original version of this
0N/A * file:
0N/A *
0N/A * Written by Doug Lea with assistance from members of JCP JSR-166
0N/A * Expert Group and released to the public domain, as explained at
3984N/A * http://creativecommons.org/publicdomain/zero/1.0/
0N/A */
0N/A
0N/Apackage java.util.concurrent;
5681N/Aimport java.util.concurrent.locks.AbstractQueuedSynchronizer;
5681N/Aimport java.util.concurrent.locks.Condition;
5681N/Aimport java.util.concurrent.locks.ReentrantLock;
5681N/Aimport java.util.concurrent.atomic.AtomicInteger;
0N/Aimport java.util.*;
0N/A
0N/A/**
0N/A * An {@link ExecutorService} that executes each submitted task using
0N/A * one of possibly several pooled threads, normally configured
0N/A * using {@link Executors} factory methods.
0N/A *
0N/A * <p>Thread pools address two different problems: they usually
0N/A * provide improved performance when executing large numbers of
0N/A * asynchronous tasks, due to reduced per-task invocation overhead,
0N/A * and they provide a means of bounding and managing the resources,
0N/A * including threads, consumed when executing a collection of tasks.
0N/A * Each {@code ThreadPoolExecutor} also maintains some basic
0N/A * statistics, such as the number of completed tasks.
0N/A *
0N/A * <p>To be useful across a wide range of contexts, this class
0N/A * provides many adjustable parameters and extensibility
0N/A * hooks. However, programmers are urged to use the more convenient
0N/A * {@link Executors} factory methods {@link
0N/A * Executors#newCachedThreadPool} (unbounded thread pool, with
0N/A * automatic thread reclamation), {@link Executors#newFixedThreadPool}
0N/A * (fixed size thread pool) and {@link
0N/A * Executors#newSingleThreadExecutor} (single background thread), that
0N/A * preconfigure settings for the most common usage
0N/A * scenarios. Otherwise, use the following guide when manually
0N/A * configuring and tuning this class:
0N/A *
0N/A * <dl>
0N/A *
0N/A * <dt>Core and maximum pool sizes</dt>
0N/A *
0N/A * <dd>A {@code ThreadPoolExecutor} will automatically adjust the
0N/A * pool size (see {@link #getPoolSize})
0N/A * according to the bounds set by
0N/A * corePoolSize (see {@link #getCorePoolSize}) and
0N/A * maximumPoolSize (see {@link #getMaximumPoolSize}).
0N/A *
0N/A * When a new task is submitted in method {@link #execute}, and fewer
0N/A * than corePoolSize threads are running, a new thread is created to
0N/A * handle the request, even if other worker threads are idle. If
0N/A * there are more than corePoolSize but less than maximumPoolSize
0N/A * threads running, a new thread will be created only if the queue is
0N/A * full. By setting corePoolSize and maximumPoolSize the same, you
0N/A * create a fixed-size thread pool. By setting maximumPoolSize to an
0N/A * essentially unbounded value such as {@code Integer.MAX_VALUE}, you
0N/A * allow the pool to accommodate an arbitrary number of concurrent
0N/A * tasks. Most typically, core and maximum pool sizes are set only
0N/A * upon construction, but they may also be changed dynamically using
0N/A * {@link #setCorePoolSize} and {@link #setMaximumPoolSize}. </dd>
0N/A *
0N/A * <dt>On-demand construction</dt>
0N/A *
0N/A * <dd> By default, even core threads are initially created and
0N/A * started only when new tasks arrive, but this can be overridden
0N/A * dynamically using method {@link #prestartCoreThread} or {@link
0N/A * #prestartAllCoreThreads}. You probably want to prestart threads if
0N/A * you construct the pool with a non-empty queue. </dd>
0N/A *
0N/A * <dt>Creating new threads</dt>
0N/A *
0N/A * <dd>New threads are created using a {@link ThreadFactory}. If not
0N/A * otherwise specified, a {@link Executors#defaultThreadFactory} is
0N/A * used, that creates threads to all be in the same {@link
0N/A * ThreadGroup} and with the same {@code NORM_PRIORITY} priority and
0N/A * non-daemon status. By supplying a different ThreadFactory, you can
0N/A * alter the thread's name, thread group, priority, daemon status,
0N/A * etc. If a {@code ThreadFactory} fails to create a thread when asked
0N/A * by returning null from {@code newThread}, the executor will
0N/A * continue, but might not be able to execute any tasks. Threads
0N/A * should possess the "modifyThread" {@code RuntimePermission}. If
0N/A * worker threads or other threads using the pool do not possess this
0N/A * permission, service may be degraded: configuration changes may not
0N/A * take effect in a timely manner, and a shutdown pool may remain in a
0N/A * state in which termination is possible but not completed.</dd>
0N/A *
0N/A * <dt>Keep-alive times</dt>
0N/A *
0N/A * <dd>If the pool currently has more than corePoolSize threads,
0N/A * excess threads will be terminated if they have been idle for more
0N/A * than the keepAliveTime (see {@link #getKeepAliveTime}). This
0N/A * provides a means of reducing resource consumption when the pool is
0N/A * not being actively used. If the pool becomes more active later, new
0N/A * threads will be constructed. This parameter can also be changed
0N/A * dynamically using method {@link #setKeepAliveTime}. Using a value
0N/A * of {@code Long.MAX_VALUE} {@link TimeUnit#NANOSECONDS} effectively
0N/A * disables idle threads from ever terminating prior to shut down. By
0N/A * default, the keep-alive policy applies only when there are more
0N/A * than corePoolSizeThreads. But method {@link
0N/A * #allowCoreThreadTimeOut(boolean)} can be used to apply this
0N/A * time-out policy to core threads as well, so long as the
0N/A * keepAliveTime value is non-zero. </dd>
0N/A *
0N/A * <dt>Queuing</dt>
0N/A *
0N/A * <dd>Any {@link BlockingQueue} may be used to transfer and hold
0N/A * submitted tasks. The use of this queue interacts with pool sizing:
0N/A *
0N/A * <ul>
0N/A *
0N/A * <li> If fewer than corePoolSize threads are running, the Executor
0N/A * always prefers adding a new thread
0N/A * rather than queuing.</li>
0N/A *
0N/A * <li> If corePoolSize or more threads are running, the Executor
0N/A * always prefers queuing a request rather than adding a new
0N/A * thread.</li>
0N/A *
0N/A * <li> If a request cannot be queued, a new thread is created unless
0N/A * this would exceed maximumPoolSize, in which case, the task will be
0N/A * rejected.</li>
0N/A *
0N/A * </ul>
0N/A *
0N/A * There are three general strategies for queuing:
0N/A * <ol>
0N/A *
0N/A * <li> <em> Direct handoffs.</em> A good default choice for a work
0N/A * queue is a {@link SynchronousQueue} that hands off tasks to threads
0N/A * without otherwise holding them. Here, an attempt to queue a task
0N/A * will fail if no threads are immediately available to run it, so a
0N/A * new thread will be constructed. This policy avoids lockups when
0N/A * handling sets of requests that might have internal dependencies.
0N/A * Direct handoffs generally require unbounded maximumPoolSizes to
0N/A * avoid rejection of new submitted tasks. This in turn admits the
0N/A * possibility of unbounded thread growth when commands continue to
0N/A * arrive on average faster than they can be processed. </li>
0N/A *
0N/A * <li><em> Unbounded queues.</em> Using an unbounded queue (for
0N/A * example a {@link LinkedBlockingQueue} without a predefined
0N/A * capacity) will cause new tasks to wait in the queue when all
0N/A * corePoolSize threads are busy. Thus, no more than corePoolSize
0N/A * threads will ever be created. (And the value of the maximumPoolSize
0N/A * therefore doesn't have any effect.) This may be appropriate when
0N/A * each task is completely independent of others, so tasks cannot
0N/A * affect each others execution; for example, in a web page server.
0N/A * While this style of queuing can be useful in smoothing out
0N/A * transient bursts of requests, it admits the possibility of
0N/A * unbounded work queue growth when commands continue to arrive on
0N/A * average faster than they can be processed. </li>
0N/A *
0N/A * <li><em>Bounded queues.</em> A bounded queue (for example, an
0N/A * {@link ArrayBlockingQueue}) helps prevent resource exhaustion when
0N/A * used with finite maximumPoolSizes, but can be more difficult to
0N/A * tune and control. Queue sizes and maximum pool sizes may be traded
0N/A * off for each other: Using large queues and small pools minimizes
0N/A * CPU usage, OS resources, and context-switching overhead, but can
0N/A * lead to artificially low throughput. If tasks frequently block (for
0N/A * example if they are I/O bound), a system may be able to schedule
0N/A * time for more threads than you otherwise allow. Use of small queues
0N/A * generally requires larger pool sizes, which keeps CPUs busier but
0N/A * may encounter unacceptable scheduling overhead, which also
0N/A * decreases throughput. </li>
0N/A *
0N/A * </ol>
0N/A *
0N/A * </dd>
0N/A *
0N/A * <dt>Rejected tasks</dt>
0N/A *
0N/A * <dd> New tasks submitted in method {@link #execute} will be
0N/A * <em>rejected</em> when the Executor has been shut down, and also
0N/A * when the Executor uses finite bounds for both maximum threads and
0N/A * work queue capacity, and is saturated. In either case, the {@code
0N/A * execute} method invokes the {@link
0N/A * RejectedExecutionHandler#rejectedExecution} method of its {@link
0N/A * RejectedExecutionHandler}. Four predefined handler policies are
0N/A * provided:
0N/A *
0N/A * <ol>
0N/A *
0N/A * <li> In the default {@link ThreadPoolExecutor.AbortPolicy}, the
0N/A * handler throws a runtime {@link RejectedExecutionException} upon
0N/A * rejection. </li>
0N/A *
0N/A * <li> In {@link ThreadPoolExecutor.CallerRunsPolicy}, the thread
0N/A * that invokes {@code execute} itself runs the task. This provides a
0N/A * simple feedback control mechanism that will slow down the rate that
0N/A * new tasks are submitted. </li>
0N/A *
0N/A * <li> In {@link ThreadPoolExecutor.DiscardPolicy}, a task that
0N/A * cannot be executed is simply dropped. </li>
0N/A *
0N/A * <li>In {@link ThreadPoolExecutor.DiscardOldestPolicy}, if the
0N/A * executor is not shut down, the task at the head of the work queue
0N/A * is dropped, and then execution is retried (which can fail again,
0N/A * causing this to be repeated.) </li>
0N/A *
0N/A * </ol>
0N/A *
0N/A * It is possible to define and use other kinds of {@link
0N/A * RejectedExecutionHandler} classes. Doing so requires some care
0N/A * especially when policies are designed to work only under particular
0N/A * capacity or queuing policies. </dd>
0N/A *
0N/A * <dt>Hook methods</dt>
0N/A *
0N/A * <dd>This class provides {@code protected} overridable {@link
0N/A * #beforeExecute} and {@link #afterExecute} methods that are called
0N/A * before and after execution of each task. These can be used to
0N/A * manipulate the execution environment; for example, reinitializing
0N/A * ThreadLocals, gathering statistics, or adding log
0N/A * entries. Additionally, method {@link #terminated} can be overridden
0N/A * to perform any special processing that needs to be done once the
0N/A * Executor has fully terminated.
0N/A *
0N/A * <p>If hook or callback methods throw exceptions, internal worker
0N/A * threads may in turn fail and abruptly terminate.</dd>
0N/A *
0N/A * <dt>Queue maintenance</dt>
0N/A *
0N/A * <dd> Method {@link #getQueue} allows access to the work queue for
0N/A * purposes of monitoring and debugging. Use of this method for any
0N/A * other purpose is strongly discouraged. Two supplied methods,
0N/A * {@link #remove} and {@link #purge} are available to assist in
0N/A * storage reclamation when large numbers of queued tasks become
0N/A * cancelled.</dd>
0N/A *
0N/A * <dt>Finalization</dt>
0N/A *
0N/A * <dd> A pool that is no longer referenced in a program <em>AND</em>
0N/A * has no remaining threads will be {@code shutdown} automatically. If
0N/A * you would like to ensure that unreferenced pools are reclaimed even
0N/A * if users forget to call {@link #shutdown}, then you must arrange
0N/A * that unused threads eventually die, by setting appropriate
0N/A * keep-alive times, using a lower bound of zero core threads and/or
0N/A * setting {@link #allowCoreThreadTimeOut(boolean)}. </dd>
0N/A *
0N/A * </dl>
0N/A *
0N/A * <p> <b>Extension example</b>. Most extensions of this class
0N/A * override one or more of the protected hook methods. For example,
0N/A * here is a subclass that adds a simple pause/resume feature:
0N/A *
0N/A * <pre> {@code
0N/A * class PausableThreadPoolExecutor extends ThreadPoolExecutor {
0N/A * private boolean isPaused;
0N/A * private ReentrantLock pauseLock = new ReentrantLock();
0N/A * private Condition unpaused = pauseLock.newCondition();
0N/A *
0N/A * public PausableThreadPoolExecutor(...) { super(...); }
0N/A *
0N/A * protected void beforeExecute(Thread t, Runnable r) {
0N/A * super.beforeExecute(t, r);
0N/A * pauseLock.lock();
0N/A * try {
0N/A * while (isPaused) unpaused.await();
0N/A * } catch (InterruptedException ie) {
0N/A * t.interrupt();
0N/A * } finally {
0N/A * pauseLock.unlock();
0N/A * }
0N/A * }
0N/A *
0N/A * public void pause() {
0N/A * pauseLock.lock();
0N/A * try {
0N/A * isPaused = true;
0N/A * } finally {
0N/A * pauseLock.unlock();
0N/A * }
0N/A * }
0N/A *
0N/A * public void resume() {
0N/A * pauseLock.lock();
0N/A * try {
0N/A * isPaused = false;
0N/A * unpaused.signalAll();
0N/A * } finally {
0N/A * pauseLock.unlock();
0N/A * }
0N/A * }
0N/A * }}</pre>
0N/A *
0N/A * @since 1.5
0N/A * @author Doug Lea
0N/A */
0N/Apublic class ThreadPoolExecutor extends AbstractExecutorService {
0N/A /**
0N/A * The main pool control state, ctl, is an atomic integer packing
0N/A * two conceptual fields
0N/A * workerCount, indicating the effective number of threads
0N/A * runState, indicating whether running, shutting down etc
0N/A *
0N/A * In order to pack them into one int, we limit workerCount to
0N/A * (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2
0N/A * billion) otherwise representable. If this is ever an issue in
0N/A * the future, the variable can be changed to be an AtomicLong,
0N/A * and the shift/mask constants below adjusted. But until the need
0N/A * arises, this code is a bit faster and simpler using an int.
0N/A *
0N/A * The workerCount is the number of workers that have been
0N/A * permitted to start and not permitted to stop. The value may be
0N/A * transiently different from the actual number of live threads,
0N/A * for example when a ThreadFactory fails to create a thread when
0N/A * asked, and when exiting threads are still performing
0N/A * bookkeeping before terminating. The user-visible pool size is
0N/A * reported as the current size of the workers set.
0N/A *
0N/A * The runState provides the main lifecyle control, taking on values:
0N/A *
0N/A * RUNNING: Accept new tasks and process queued tasks
0N/A * SHUTDOWN: Don't accept new tasks, but process queued tasks
0N/A * STOP: Don't accept new tasks, don't process queued tasks,
0N/A * and interrupt in-progress tasks
0N/A * TIDYING: All tasks have terminated, workerCount is zero,
0N/A * the thread transitioning to state TIDYING
0N/A * will run the terminated() hook method
0N/A * TERMINATED: terminated() has completed
0N/A *
0N/A * The numerical order among these values matters, to allow
0N/A * ordered comparisons. The runState monotonically increases over
0N/A * time, but need not hit each state. The transitions are:
0N/A *
0N/A * RUNNING -> SHUTDOWN
0N/A * On invocation of shutdown(), perhaps implicitly in finalize()
0N/A * (RUNNING or SHUTDOWN) -> STOP
0N/A * On invocation of shutdownNow()
0N/A * SHUTDOWN -> TIDYING
0N/A * When both queue and pool are empty
0N/A * STOP -> TIDYING
0N/A * When pool is empty
0N/A * TIDYING -> TERMINATED
0N/A * When the terminated() hook method has completed
0N/A *
0N/A * Threads waiting in awaitTermination() will return when the
0N/A * state reaches TERMINATED.
0N/A *
0N/A * Detecting the transition from SHUTDOWN to TIDYING is less
0N/A * straightforward than you'd like because the queue may become
0N/A * empty after non-empty and vice versa during SHUTDOWN state, but
0N/A * we can only terminate if, after seeing that it is empty, we see
0N/A * that workerCount is 0 (which sometimes entails a recheck -- see
0N/A * below).
0N/A */
0N/A private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
0N/A private static final int COUNT_BITS = Integer.SIZE - 3;
0N/A private static final int CAPACITY = (1 << COUNT_BITS) - 1;
0N/A
0N/A // runState is stored in the high-order bits
0N/A private static final int RUNNING = -1 << COUNT_BITS;
0N/A private static final int SHUTDOWN = 0 << COUNT_BITS;
0N/A private static final int STOP = 1 << COUNT_BITS;
0N/A private static final int TIDYING = 2 << COUNT_BITS;
0N/A private static final int TERMINATED = 3 << COUNT_BITS;
0N/A
0N/A // Packing and unpacking ctl
0N/A private static int runStateOf(int c) { return c & ~CAPACITY; }
0N/A private static int workerCountOf(int c) { return c & CAPACITY; }
0N/A private static int ctlOf(int rs, int wc) { return rs | wc; }
0N/A
0N/A /*
0N/A * Bit field accessors that don't require unpacking ctl.
0N/A * These depend on the bit layout and on workerCount being never negative.
0N/A */
0N/A
0N/A private static boolean runStateLessThan(int c, int s) {
0N/A return c < s;
0N/A }
0N/A
0N/A private static boolean runStateAtLeast(int c, int s) {
0N/A return c >= s;
0N/A }
0N/A
0N/A private static boolean isRunning(int c) {
0N/A return c < SHUTDOWN;
0N/A }
0N/A
0N/A /**
0N/A * Attempt to CAS-increment the workerCount field of ctl.
0N/A */
0N/A private boolean compareAndIncrementWorkerCount(int expect) {
0N/A return ctl.compareAndSet(expect, expect + 1);
0N/A }
0N/A
0N/A /**
0N/A * Attempt to CAS-decrement the workerCount field of ctl.
0N/A */
0N/A private boolean compareAndDecrementWorkerCount(int expect) {
0N/A return ctl.compareAndSet(expect, expect - 1);
0N/A }
0N/A
0N/A /**
0N/A * Decrements the workerCount field of ctl. This is called only on
0N/A * abrupt termination of a thread (see processWorkerExit). Other
0N/A * decrements are performed within getTask.
0N/A */
0N/A private void decrementWorkerCount() {
0N/A do {} while (! compareAndDecrementWorkerCount(ctl.get()));
0N/A }
0N/A
0N/A /**
0N/A * The queue used for holding tasks and handing off to worker
0N/A * threads. We do not require that workQueue.poll() returning
0N/A * null necessarily means that workQueue.isEmpty(), so rely
0N/A * solely on isEmpty to see if the queue is empty (which we must
0N/A * do for example when deciding whether to transition from
0N/A * SHUTDOWN to TIDYING). This accommodates special-purpose
0N/A * queues such as DelayQueues for which poll() is allowed to
0N/A * return null even if it may later return non-null when delays
0N/A * expire.
0N/A */
0N/A private final BlockingQueue<Runnable> workQueue;
0N/A
0N/A /**
0N/A * Lock held on access to workers set and related bookkeeping.
0N/A * While we could use a concurrent set of some sort, it turns out
0N/A * to be generally preferable to use a lock. Among the reasons is
0N/A * that this serializes interruptIdleWorkers, which avoids
0N/A * unnecessary interrupt storms, especially during shutdown.
0N/A * Otherwise exiting threads would concurrently interrupt those
0N/A * that have not yet interrupted. It also simplifies some of the
0N/A * associated statistics bookkeeping of largestPoolSize etc. We
0N/A * also hold mainLock on shutdown and shutdownNow, for the sake of
0N/A * ensuring workers set is stable while separately checking
0N/A * permission to interrupt and actually interrupting.
0N/A */
0N/A private final ReentrantLock mainLock = new ReentrantLock();
0N/A
0N/A /**
0N/A * Set containing all worker threads in pool. Accessed only when
0N/A * holding mainLock.
0N/A */
0N/A private final HashSet<Worker> workers = new HashSet<Worker>();
0N/A
0N/A /**
0N/A * Wait condition to support awaitTermination
0N/A */
0N/A private final Condition termination = mainLock.newCondition();
0N/A
0N/A /**
0N/A * Tracks largest attained pool size. Accessed only under
0N/A * mainLock.
0N/A */
0N/A private int largestPoolSize;
0N/A
0N/A /**
0N/A * Counter for completed tasks. Updated only on termination of
0N/A * worker threads. Accessed only under mainLock.
0N/A */
0N/A private long completedTaskCount;
0N/A
0N/A /*
0N/A * All user control parameters are declared as volatiles so that
0N/A * ongoing actions are based on freshest values, but without need
0N/A * for locking, since no internal invariants depend on them
0N/A * changing synchronously with respect to other actions.
0N/A */
0N/A
0N/A /**
0N/A * Factory for new threads. All threads are created using this
0N/A * factory (via method addWorker). All callers must be prepared
0N/A * for addWorker to fail, which may reflect a system or user's
0N/A * policy limiting the number of threads. Even though it is not
0N/A * treated as an error, failure to create threads may result in
0N/A * new tasks being rejected or existing ones remaining stuck in
5681N/A * the queue.
5681N/A *
5681N/A * We go further and preserve pool invariants even in the face of
5681N/A * errors such as OutOfMemoryError, that might be thrown while
5681N/A * trying to create threads. Such errors are rather common due to
5681N/A * the need to allocate a native stack in Thread#start, and users
5681N/A * will want to perform clean pool shutdown to clean up. There
5681N/A * will likely be enough memory available for the cleanup code to
5681N/A * complete without encountering yet another OutOfMemoryError.
0N/A */
0N/A private volatile ThreadFactory threadFactory;
0N/A
0N/A /**
0N/A * Handler called when saturated or shutdown in execute.
0N/A */
0N/A private volatile RejectedExecutionHandler handler;
0N/A
0N/A /**
0N/A * Timeout in nanoseconds for idle threads waiting for work.
0N/A * Threads use this timeout when there are more than corePoolSize
0N/A * present or if allowCoreThreadTimeOut. Otherwise they wait
0N/A * forever for new work.
0N/A */
0N/A private volatile long keepAliveTime;
0N/A
0N/A /**
0N/A * If false (default), core threads stay alive even when idle.
0N/A * If true, core threads use keepAliveTime to time out waiting
0N/A * for work.
0N/A */
0N/A private volatile boolean allowCoreThreadTimeOut;
0N/A
0N/A /**
0N/A * Core pool size is the minimum number of workers to keep alive
0N/A * (and not allow to time out etc) unless allowCoreThreadTimeOut
0N/A * is set, in which case the minimum is zero.
0N/A */
0N/A private volatile int corePoolSize;
0N/A
0N/A /**
0N/A * Maximum pool size. Note that the actual maximum is internally
0N/A * bounded by CAPACITY.
0N/A */
0N/A private volatile int maximumPoolSize;
0N/A
0N/A /**
0N/A * The default rejected execution handler
0N/A */
0N/A private static final RejectedExecutionHandler defaultHandler =
0N/A new AbortPolicy();
0N/A
0N/A /**
0N/A * Permission required for callers of shutdown and shutdownNow.
0N/A * We additionally require (see checkShutdownAccess) that callers
0N/A * have permission to actually interrupt threads in the worker set
0N/A * (as governed by Thread.interrupt, which relies on
0N/A * ThreadGroup.checkAccess, which in turn relies on
0N/A * SecurityManager.checkAccess). Shutdowns are attempted only if
0N/A * these checks pass.
0N/A *
0N/A * All actual invocations of Thread.interrupt (see
0N/A * interruptIdleWorkers and interruptWorkers) ignore
0N/A * SecurityExceptions, meaning that the attempted interrupts
0N/A * silently fail. In the case of shutdown, they should not fail
0N/A * unless the SecurityManager has inconsistent policies, sometimes
0N/A * allowing access to a thread and sometimes not. In such cases,
0N/A * failure to actually interrupt threads may disable or delay full
0N/A * termination. Other uses of interruptIdleWorkers are advisory,
0N/A * and failure to actually interrupt will merely delay response to
0N/A * configuration changes so is not handled exceptionally.
0N/A */
0N/A private static final RuntimePermission shutdownPerm =
0N/A new RuntimePermission("modifyThread");
0N/A
0N/A /**
0N/A * Class Worker mainly maintains interrupt control state for
0N/A * threads running tasks, along with other minor bookkeeping.
0N/A * This class opportunistically extends AbstractQueuedSynchronizer
0N/A * to simplify acquiring and releasing a lock surrounding each
0N/A * task execution. This protects against interrupts that are
0N/A * intended to wake up a worker thread waiting for a task from
0N/A * instead interrupting a task being run. We implement a simple
5681N/A * non-reentrant mutual exclusion lock rather than use
5681N/A * ReentrantLock because we do not want worker tasks to be able to
5681N/A * reacquire the lock when they invoke pool control methods like
5681N/A * setCorePoolSize. Additionally, to suppress interrupts until
5681N/A * the thread actually starts running tasks, we initialize lock
5681N/A * state to a negative value, and clear it upon start (in
5681N/A * runWorker).
0N/A */
0N/A private final class Worker
0N/A extends AbstractQueuedSynchronizer
0N/A implements Runnable
0N/A {
0N/A /**
0N/A * This class will never be serialized, but we provide a
0N/A * serialVersionUID to suppress a javac warning.
0N/A */
0N/A private static final long serialVersionUID = 6138294804551838833L;
0N/A
0N/A /** Thread this worker is running in. Null if factory fails. */
0N/A final Thread thread;
0N/A /** Initial task to run. Possibly null. */
0N/A Runnable firstTask;
0N/A /** Per-thread task counter */
0N/A volatile long completedTasks;
0N/A
0N/A /**
0N/A * Creates with given first task and thread from ThreadFactory.
0N/A * @param firstTask the first task (null if none)
0N/A */
0N/A Worker(Runnable firstTask) {
5681N/A setState(-1); // inhibit interrupts until runWorker
0N/A this.firstTask = firstTask;
0N/A this.thread = getThreadFactory().newThread(this);
0N/A }
0N/A
0N/A /** Delegates main run loop to outer runWorker */
0N/A public void run() {
0N/A runWorker(this);
0N/A }
0N/A
0N/A // Lock methods
0N/A //
0N/A // The value 0 represents the unlocked state.
0N/A // The value 1 represents the locked state.
0N/A
0N/A protected boolean isHeldExclusively() {
5681N/A return getState() != 0;
0N/A }
0N/A
0N/A protected boolean tryAcquire(int unused) {
0N/A if (compareAndSetState(0, 1)) {
0N/A setExclusiveOwnerThread(Thread.currentThread());
0N/A return true;
0N/A }
0N/A return false;
0N/A }
0N/A
0N/A protected boolean tryRelease(int unused) {
0N/A setExclusiveOwnerThread(null);
0N/A setState(0);
0N/A return true;
0N/A }
0N/A
0N/A public void lock() { acquire(1); }
0N/A public boolean tryLock() { return tryAcquire(1); }
0N/A public void unlock() { release(1); }
0N/A public boolean isLocked() { return isHeldExclusively(); }
5681N/A
5681N/A void interruptIfStarted() {
5681N/A Thread t;
5681N/A if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
5681N/A try {
5681N/A t.interrupt();
5681N/A } catch (SecurityException ignore) {
5681N/A }
5681N/A }
5681N/A }
0N/A }
0N/A
0N/A /*
0N/A * Methods for setting control state
0N/A */
0N/A
0N/A /**
0N/A * Transitions runState to given target, or leaves it alone if
0N/A * already at least the given target.
0N/A *
0N/A * @param targetState the desired state, either SHUTDOWN or STOP
0N/A * (but not TIDYING or TERMINATED -- use tryTerminate for that)
0N/A */
0N/A private void advanceRunState(int targetState) {
0N/A for (;;) {
0N/A int c = ctl.get();
0N/A if (runStateAtLeast(c, targetState) ||
0N/A ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
0N/A break;
0N/A }
0N/A }
0N/A
0N/A /**
0N/A * Transitions to TERMINATED state if either (SHUTDOWN and pool
0N/A * and queue empty) or (STOP and pool empty). If otherwise
0N/A * eligible to terminate but workerCount is nonzero, interrupts an
0N/A * idle worker to ensure that shutdown signals propagate. This
0N/A * method must be called following any action that might make
0N/A * termination possible -- reducing worker count or removing tasks
0N/A * from the queue during shutdown. The method is non-private to
0N/A * allow access from ScheduledThreadPoolExecutor.
0N/A */
0N/A final void tryTerminate() {
0N/A for (;;) {
0N/A int c = ctl.get();
0N/A if (isRunning(c) ||
0N/A runStateAtLeast(c, TIDYING) ||
0N/A (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
0N/A return;
0N/A if (workerCountOf(c) != 0) { // Eligible to terminate
0N/A interruptIdleWorkers(ONLY_ONE);
0N/A return;
0N/A }
0N/A
0N/A final ReentrantLock mainLock = this.mainLock;
0N/A mainLock.lock();
0N/A try {
0N/A if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
0N/A try {
0N/A terminated();
0N/A } finally {
0N/A ctl.set(ctlOf(TERMINATED, 0));
0N/A termination.signalAll();
0N/A }
0N/A return;
0N/A }
0N/A } finally {
0N/A mainLock.unlock();
0N/A }
0N/A // else retry on failed CAS
0N/A }
0N/A }
0N/A
0N/A /*
0N/A * Methods for controlling interrupts to worker threads.
0N/A */
0N/A
0N/A /**
0N/A * If there is a security manager, makes sure caller has
0N/A * permission to shut down threads in general (see shutdownPerm).
0N/A * If this passes, additionally makes sure the caller is allowed
0N/A * to interrupt each worker thread. This might not be true even if
0N/A * first check passed, if the SecurityManager treats some threads
0N/A * specially.
0N/A */
0N/A private void checkShutdownAccess() {
0N/A SecurityManager security = System.getSecurityManager();
0N/A if (security != null) {
0N/A security.checkPermission(shutdownPerm);
0N/A final ReentrantLock mainLock = this.mainLock;
0N/A mainLock.lock();
0N/A try {
0N/A for (Worker w : workers)
0N/A security.checkAccess(w.thread);
0N/A } finally {
0N/A mainLock.unlock();
0N/A }
0N/A }
0N/A }
0N/A
0N/A /**
0N/A * Interrupts all threads, even if active. Ignores SecurityExceptions
0N/A * (in which case some threads may remain uninterrupted).
0N/A */
0N/A private void interruptWorkers() {
0N/A final ReentrantLock mainLock = this.mainLock;
0N/A mainLock.lock();
0N/A try {
5681N/A for (Worker w : workers)
5681N/A w.interruptIfStarted();
0N/A } finally {
0N/A mainLock.unlock();
0N/A }
0N/A }
0N/A
0N/A /**
0N/A * Interrupts threads that might be waiting for tasks (as
0N/A * indicated by not being locked) so they can check for
0N/A * termination or configuration changes. Ignores
0N/A * SecurityExceptions (in which case some threads may remain
0N/A * uninterrupted).
0N/A *
0N/A * @param onlyOne If true, interrupt at most one worker. This is
0N/A * called only from tryTerminate when termination is otherwise
0N/A * enabled but there are still other workers. In this case, at
0N/A * most one waiting worker is interrupted to propagate shutdown
0N/A * signals in case all threads are currently waiting.
0N/A * Interrupting any arbitrary thread ensures that newly arriving
0N/A * workers since shutdown began will also eventually exit.
0N/A * To guarantee eventual termination, it suffices to always
0N/A * interrupt only one idle worker, but shutdown() interrupts all
0N/A * idle workers so that redundant workers exit promptly, not
0N/A * waiting for a straggler task to finish.
0N/A */
0N/A private void interruptIdleWorkers(boolean onlyOne) {
0N/A final ReentrantLock mainLock = this.mainLock;
0N/A mainLock.lock();
0N/A try {
0N/A for (Worker w : workers) {
0N/A Thread t = w.thread;
0N/A if (!t.isInterrupted() && w.tryLock()) {
0N/A try {
0N/A t.interrupt();
0N/A } catch (SecurityException ignore) {
0N/A } finally {
0N/A w.unlock();
0N/A }
0N/A }
0N/A if (onlyOne)
0N/A break;
0N/A }
0N/A } finally {
0N/A mainLock.unlock();
0N/A }
0N/A }
0N/A
0N/A /**
0N/A * Common form of interruptIdleWorkers, to avoid having to
0N/A * remember what the boolean argument means.
0N/A */
0N/A private void interruptIdleWorkers() {
0N/A interruptIdleWorkers(false);
0N/A }
0N/A
0N/A private static final boolean ONLY_ONE = true;
0N/A
0N/A /*
0N/A * Misc utilities, most of which are also exported to
0N/A * ScheduledThreadPoolExecutor
0N/A */
0N/A
0N/A /**
0N/A * Invokes the rejected execution handler for the given command.
0N/A * Package-protected for use by ScheduledThreadPoolExecutor.
0N/A */
0N/A final void reject(Runnable command) {
0N/A handler.rejectedExecution(command, this);
0N/A }
0N/A
0N/A /**
0N/A * Performs any further cleanup following run state transition on
0N/A * invocation of shutdown. A no-op here, but used by
0N/A * ScheduledThreadPoolExecutor to cancel delayed tasks.
0N/A */
0N/A void onShutdown() {
0N/A }
0N/A
0N/A /**
0N/A * State check needed by ScheduledThreadPoolExecutor to
0N/A * enable running tasks during shutdown.
0N/A *
0N/A * @param shutdownOK true if should return true if SHUTDOWN
0N/A */
0N/A final boolean isRunningOrShutdown(boolean shutdownOK) {
0N/A int rs = runStateOf(ctl.get());
0N/A return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
0N/A }
0N/A
0N/A /**
0N/A * Drains the task queue into a new list, normally using
0N/A * drainTo. But if the queue is a DelayQueue or any other kind of
0N/A * queue for which poll or drainTo may fail to remove some
0N/A * elements, it deletes them one by one.
0N/A */
0N/A private List<Runnable> drainQueue() {
0N/A BlockingQueue<Runnable> q = workQueue;
0N/A List<Runnable> taskList = new ArrayList<Runnable>();
0N/A q.drainTo(taskList);
0N/A if (!q.isEmpty()) {
0N/A for (Runnable r : q.toArray(new Runnable[0])) {
0N/A if (q.remove(r))
0N/A taskList.add(r);
0N/A }
0N/A }
0N/A return taskList;
0N/A }
0N/A
0N/A /*
0N/A * Methods for creating, running and cleaning up after workers
0N/A */
0N/A
0N/A /**
0N/A * Checks if a new worker can be added with respect to current
0N/A * pool state and the given bound (either core or maximum). If so,
0N/A * the worker count is adjusted accordingly, and, if possible, a
5681N/A * new worker is created and started, running firstTask as its
0N/A * first task. This method returns false if the pool is stopped or
0N/A * eligible to shut down. It also returns false if the thread
5681N/A * factory fails to create a thread when asked. If the thread
5681N/A * creation fails, either due to the thread factory returning
5681N/A * null, or due to an exception (typically OutOfMemoryError in
5681N/A * Thread#start), we roll back cleanly.
0N/A *
0N/A * @param firstTask the task the new thread should run first (or
0N/A * null if none). Workers are created with an initial first task
0N/A * (in method execute()) to bypass queuing when there are fewer
0N/A * than corePoolSize threads (in which case we always start one),
0N/A * or when the queue is full (in which case we must bypass queue).
0N/A * Initially idle threads are usually created via
0N/A * prestartCoreThread or to replace other dying workers.
0N/A *
0N/A * @param core if true use corePoolSize as bound, else
0N/A * maximumPoolSize. (A boolean indicator is used here rather than a
0N/A * value to ensure reads of fresh values after checking other pool
0N/A * state).
0N/A * @return true if successful
0N/A */
0N/A private boolean addWorker(Runnable firstTask, boolean core) {
0N/A retry:
0N/A for (;;) {
0N/A int c = ctl.get();
0N/A int rs = runStateOf(c);
0N/A
0N/A // Check if queue empty only if necessary.
0N/A if (rs >= SHUTDOWN &&
0N/A ! (rs == SHUTDOWN &&
0N/A firstTask == null &&
0N/A ! workQueue.isEmpty()))
0N/A return false;
0N/A
0N/A for (;;) {
0N/A int wc = workerCountOf(c);
0N/A if (wc >= CAPACITY ||
0N/A wc >= (core ? corePoolSize : maximumPoolSize))
0N/A return false;
0N/A if (compareAndIncrementWorkerCount(c))
0N/A break retry;
0N/A c = ctl.get(); // Re-read ctl
0N/A if (runStateOf(c) != rs)
0N/A continue retry;
0N/A // else CAS failed due to workerCount change; retry inner loop
0N/A }
0N/A }
0N/A
5681N/A boolean workerStarted = false;
5681N/A boolean workerAdded = false;
5681N/A Worker w = null;
5681N/A try {
5681N/A final ReentrantLock mainLock = this.mainLock;
5681N/A w = new Worker(firstTask);
5681N/A final Thread t = w.thread;
5681N/A if (t != null) {
5681N/A mainLock.lock();
5681N/A try {
5681N/A // Recheck while holding lock.
5681N/A // Back out on ThreadFactory failure or if
5681N/A // shut down before lock acquired.
5681N/A int c = ctl.get();
5681N/A int rs = runStateOf(c);
0N/A
5681N/A if (rs < SHUTDOWN ||
5681N/A (rs == SHUTDOWN && firstTask == null)) {
5681N/A if (t.isAlive()) // precheck that t is startable
5681N/A throw new IllegalThreadStateException();
5681N/A workers.add(w);
5681N/A int s = workers.size();
5681N/A if (s > largestPoolSize)
5681N/A largestPoolSize = s;
5681N/A workerAdded = true;
5681N/A }
5681N/A } finally {
5681N/A mainLock.unlock();
5681N/A }
5681N/A if (workerAdded) {
5681N/A t.start();
5681N/A workerStarted = true;
5681N/A }
5681N/A }
5681N/A } finally {
5681N/A if (! workerStarted)
5681N/A addWorkerFailed(w);
5681N/A }
5681N/A return workerStarted;
5681N/A }
5681N/A
5681N/A /**
5681N/A * Rolls back the worker thread creation.
5681N/A * - removes worker from workers, if present
5681N/A * - decrements worker count
5681N/A * - rechecks for termination, in case the existence of this
5681N/A * worker was holding up termination
5681N/A */
5681N/A private void addWorkerFailed(Worker w) {
0N/A final ReentrantLock mainLock = this.mainLock;
0N/A mainLock.lock();
0N/A try {
5681N/A if (w != null)
5681N/A workers.remove(w);
5681N/A decrementWorkerCount();
5681N/A tryTerminate();
0N/A } finally {
0N/A mainLock.unlock();
0N/A }
0N/A }
0N/A
0N/A /**
0N/A * Performs cleanup and bookkeeping for a dying worker. Called
0N/A * only from worker threads. Unless completedAbruptly is set,
0N/A * assumes that workerCount has already been adjusted to account
0N/A * for exit. This method removes thread from worker set, and
0N/A * possibly terminates the pool or replaces the worker if either
0N/A * it exited due to user task exception or if fewer than
0N/A * corePoolSize workers are running or queue is non-empty but
0N/A * there are no workers.
0N/A *
0N/A * @param w the worker
0N/A * @param completedAbruptly if the worker died due to user exception
0N/A */
0N/A private void processWorkerExit(Worker w, boolean completedAbruptly) {
0N/A if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
0N/A decrementWorkerCount();
0N/A
0N/A final ReentrantLock mainLock = this.mainLock;
0N/A mainLock.lock();
0N/A try {
0N/A completedTaskCount += w.completedTasks;
0N/A workers.remove(w);
0N/A } finally {
0N/A mainLock.unlock();
0N/A }
0N/A
0N/A tryTerminate();
0N/A
0N/A int c = ctl.get();
0N/A if (runStateLessThan(c, STOP)) {
0N/A if (!completedAbruptly) {
0N/A int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
0N/A if (min == 0 && ! workQueue.isEmpty())
0N/A min = 1;
0N/A if (workerCountOf(c) >= min)
0N/A return; // replacement not needed
0N/A }
0N/A addWorker(null, false);
0N/A }
0N/A }
0N/A
0N/A /**
0N/A * Performs blocking or timed wait for a task, depending on
0N/A * current configuration settings, or returns null if this worker
0N/A * must exit because of any of:
0N/A * 1. There are more than maximumPoolSize workers (due to
0N/A * a call to setMaximumPoolSize).
0N/A * 2. The pool is stopped.
0N/A * 3. The pool is shutdown and the queue is empty.
0N/A * 4. This worker timed out waiting for a task, and timed-out
0N/A * workers are subject to termination (that is,
0N/A * {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
0N/A * both before and after the timed wait.
0N/A *
0N/A * @return task, or null if the worker must exit, in which case
0N/A * workerCount is decremented
0N/A */
0N/A private Runnable getTask() {
0N/A boolean timedOut = false; // Did the last poll() time out?
0N/A
0N/A retry:
0N/A for (;;) {
0N/A int c = ctl.get();
0N/A int rs = runStateOf(c);
0N/A
0N/A // Check if queue empty only if necessary.
0N/A if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
0N/A decrementWorkerCount();
0N/A return null;
0N/A }
0N/A
0N/A boolean timed; // Are workers subject to culling?
0N/A
0N/A for (;;) {
0N/A int wc = workerCountOf(c);
0N/A timed = allowCoreThreadTimeOut || wc > corePoolSize;
0N/A
0N/A if (wc <= maximumPoolSize && ! (timedOut && timed))
0N/A break;
0N/A if (compareAndDecrementWorkerCount(c))
0N/A return null;
0N/A c = ctl.get(); // Re-read ctl
0N/A if (runStateOf(c) != rs)
0N/A continue retry;
0N/A // else CAS failed due to workerCount change; retry inner loop
0N/A }
0N/A
0N/A try {
0N/A Runnable r = timed ?
0N/A workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
0N/A workQueue.take();
0N/A if (r != null)
0N/A return r;
0N/A timedOut = true;
0N/A } catch (InterruptedException retry) {
0N/A timedOut = false;
0N/A }
0N/A }
0N/A }
0N/A
0N/A /**
0N/A * Main worker run loop. Repeatedly gets tasks from queue and
0N/A * executes them, while coping with a number of issues:
0N/A *
0N/A * 1. We may start out with an initial task, in which case we
0N/A * don't need to get the first one. Otherwise, as long as pool is
0N/A * running, we get tasks from getTask. If it returns null then the
0N/A * worker exits due to changed pool state or configuration
0N/A * parameters. Other exits result from exception throws in
0N/A * external code, in which case completedAbruptly holds, which
0N/A * usually leads processWorkerExit to replace this thread.
0N/A *
0N/A * 2. Before running any task, the lock is acquired to prevent
0N/A * other pool interrupts while the task is executing, and
0N/A * clearInterruptsForTaskRun called to ensure that unless pool is
0N/A * stopping, this thread does not have its interrupt set.
0N/A *
0N/A * 3. Each task run is preceded by a call to beforeExecute, which
0N/A * might throw an exception, in which case we cause thread to die
0N/A * (breaking loop with completedAbruptly true) without processing
0N/A * the task.
0N/A *
0N/A * 4. Assuming beforeExecute completes normally, we run the task,
0N/A * gathering any of its thrown exceptions to send to
0N/A * afterExecute. We separately handle RuntimeException, Error
0N/A * (both of which the specs guarantee that we trap) and arbitrary
0N/A * Throwables. Because we cannot rethrow Throwables within
0N/A * Runnable.run, we wrap them within Errors on the way out (to the
0N/A * thread's UncaughtExceptionHandler). Any thrown exception also
0N/A * conservatively causes thread to die.
0N/A *
0N/A * 5. After task.run completes, we call afterExecute, which may
0N/A * also throw an exception, which will also cause thread to
0N/A * die. According to JLS Sec 14.20, this exception is the one that
0N/A * will be in effect even if task.run throws.
0N/A *
0N/A * The net effect of the exception mechanics is that afterExecute
0N/A * and the thread's UncaughtExceptionHandler have as accurate
0N/A * information as we can provide about any problems encountered by
0N/A * user code.
0N/A *
0N/A * @param w the worker
0N/A */
0N/A final void runWorker(Worker w) {
5681N/A Thread wt = Thread.currentThread();
0N/A Runnable task = w.firstTask;
0N/A w.firstTask = null;
5681N/A w.unlock(); // allow interrupts
0N/A boolean completedAbruptly = true;
0N/A try {
0N/A while (task != null || (task = getTask()) != null) {
0N/A w.lock();
5681N/A // If pool is stopping, ensure thread is interrupted;
5681N/A // if not, ensure thread is not interrupted. This
5681N/A // requires a recheck in second case to deal with
5681N/A // shutdownNow race while clearing interrupt
5681N/A if ((runStateAtLeast(ctl.get(), STOP) ||
5681N/A (Thread.interrupted() &&
5681N/A runStateAtLeast(ctl.get(), STOP))) &&
5681N/A !wt.isInterrupted())
5681N/A wt.interrupt();
0N/A try {
5681N/A beforeExecute(wt, task);
0N/A Throwable thrown = null;
0N/A try {
0N/A task.run();
0N/A } catch (RuntimeException x) {
0N/A thrown = x; throw x;
0N/A } catch (Error x) {
0N/A thrown = x; throw x;
0N/A } catch (Throwable x) {
0N/A thrown = x; throw new Error(x);
0N/A } finally {
0N/A afterExecute(task, thrown);
0N/A }
0N/A } finally {
0N/A task = null;
0N/A w.completedTasks++;
0N/A w.unlock();
0N/A }
0N/A }
0N/A completedAbruptly = false;
0N/A } finally {
0N/A processWorkerExit(w, completedAbruptly);
0N/A }
0N/A }
0N/A
0N/A // Public constructors and methods
0N/A
0N/A /**
0N/A * Creates a new {@code ThreadPoolExecutor} with the given initial
0N/A * parameters and default thread factory and rejected execution handler.
0N/A * It may be more convenient to use one of the {@link Executors} factory
0N/A * methods instead of this general purpose constructor.
0N/A *
0N/A * @param corePoolSize the number of threads to keep in the pool, even
0N/A * if they are idle, unless {@code allowCoreThreadTimeOut} is set
0N/A * @param maximumPoolSize the maximum number of threads to allow in the
0N/A * pool
0N/A * @param keepAliveTime when the number of threads is greater than
0N/A * the core, this is the maximum time that excess idle threads
0N/A * will wait for new tasks before terminating.
0N/A * @param unit the time unit for the {@code keepAliveTime} argument
0N/A * @param workQueue the queue to use for holding tasks before they are
0N/A * executed. This queue will hold only the {@code Runnable}
0N/A * tasks submitted by the {@code execute} method.
0N/A * @throws IllegalArgumentException if one of the following holds:<br>
0N/A * {@code corePoolSize < 0}<br>
0N/A * {@code keepAliveTime < 0}<br>
0N/A * {@code maximumPoolSize <= 0}<br>
0N/A * {@code maximumPoolSize < corePoolSize}
0N/A * @throws NullPointerException if {@code workQueue} is null
0N/A */
0N/A public ThreadPoolExecutor(int corePoolSize,
0N/A int maximumPoolSize,
0N/A long keepAliveTime,
0N/A TimeUnit unit,
0N/A BlockingQueue<Runnable> workQueue) {
0N/A this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
0N/A Executors.defaultThreadFactory(), defaultHandler);
0N/A }
0N/A
0N/A /**
0N/A * Creates a new {@code ThreadPoolExecutor} with the given initial
0N/A * parameters and default rejected execution handler.
0N/A *
0N/A * @param corePoolSize the number of threads to keep in the pool, even
0N/A * if they are idle, unless {@code allowCoreThreadTimeOut} is set
0N/A * @param maximumPoolSize the maximum number of threads to allow in the
0N/A * pool
0N/A * @param keepAliveTime when the number of threads is greater than
0N/A * the core, this is the maximum time that excess idle threads
0N/A * will wait for new tasks before terminating.
0N/A * @param unit the time unit for the {@code keepAliveTime} argument
0N/A * @param workQueue the queue to use for holding tasks before they are
0N/A * executed. This queue will hold only the {@code Runnable}
0N/A * tasks submitted by the {@code execute} method.
0N/A * @param threadFactory the factory to use when the executor
0N/A * creates a new thread
0N/A * @throws IllegalArgumentException if one of the following holds:<br>
0N/A * {@code corePoolSize < 0}<br>
0N/A * {@code keepAliveTime < 0}<br>
0N/A * {@code maximumPoolSize <= 0}<br>
0N/A * {@code maximumPoolSize < corePoolSize}
0N/A * @throws NullPointerException if {@code workQueue}
0N/A * or {@code threadFactory} is null
0N/A */
0N/A public ThreadPoolExecutor(int corePoolSize,
0N/A int maximumPoolSize,
0N/A long keepAliveTime,
0N/A TimeUnit unit,
0N/A BlockingQueue<Runnable> workQueue,
0N/A ThreadFactory threadFactory) {
0N/A this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
0N/A threadFactory, defaultHandler);
0N/A }
0N/A
0N/A /**
0N/A * Creates a new {@code ThreadPoolExecutor} with the given initial
0N/A * parameters and default thread factory.
0N/A *
0N/A * @param corePoolSize the number of threads to keep in the pool, even
0N/A * if they are idle, unless {@code allowCoreThreadTimeOut} is set
0N/A * @param maximumPoolSize the maximum number of threads to allow in the
0N/A * pool
0N/A * @param keepAliveTime when the number of threads is greater than
0N/A * the core, this is the maximum time that excess idle threads
0N/A * will wait for new tasks before terminating.
0N/A * @param unit the time unit for the {@code keepAliveTime} argument
0N/A * @param workQueue the queue to use for holding tasks before they are
0N/A * executed. This queue will hold only the {@code Runnable}
0N/A * tasks submitted by the {@code execute} method.
0N/A * @param handler the handler to use when execution is blocked
0N/A * because the thread bounds and queue capacities are reached
0N/A * @throws IllegalArgumentException if one of the following holds:<br>
0N/A * {@code corePoolSize < 0}<br>
0N/A * {@code keepAliveTime < 0}<br>
0N/A * {@code maximumPoolSize <= 0}<br>
0N/A * {@code maximumPoolSize < corePoolSize}
0N/A * @throws NullPointerException if {@code workQueue}
0N/A * or {@code handler} is null
0N/A */
0N/A public ThreadPoolExecutor(int corePoolSize,
0N/A int maximumPoolSize,
0N/A long keepAliveTime,
0N/A TimeUnit unit,
0N/A BlockingQueue<Runnable> workQueue,
0N/A RejectedExecutionHandler handler) {
0N/A this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
0N/A Executors.defaultThreadFactory(), handler);
0N/A }
0N/A
0N/A /**
0N/A * Creates a new {@code ThreadPoolExecutor} with the given initial
0N/A * parameters.
0N/A *
0N/A * @param corePoolSize the number of threads to keep in the pool, even
0N/A * if they are idle, unless {@code allowCoreThreadTimeOut} is set
0N/A * @param maximumPoolSize the maximum number of threads to allow in the
0N/A * pool
0N/A * @param keepAliveTime when the number of threads is greater than
0N/A * the core, this is the maximum time that excess idle threads
0N/A * will wait for new tasks before terminating.
0N/A * @param unit the time unit for the {@code keepAliveTime} argument
0N/A * @param workQueue the queue to use for holding tasks before they are
0N/A * executed. This queue will hold only the {@code Runnable}
0N/A * tasks submitted by the {@code execute} method.
0N/A * @param threadFactory the factory to use when the executor
0N/A * creates a new thread
0N/A * @param handler the handler to use when execution is blocked
0N/A * because the thread bounds and queue capacities are reached
0N/A * @throws IllegalArgumentException if one of the following holds:<br>
0N/A * {@code corePoolSize < 0}<br>
0N/A * {@code keepAliveTime < 0}<br>
0N/A * {@code maximumPoolSize <= 0}<br>
0N/A * {@code maximumPoolSize < corePoolSize}
0N/A * @throws NullPointerException if {@code workQueue}
0N/A * or {@code threadFactory} or {@code handler} is null
0N/A */
0N/A public ThreadPoolExecutor(int corePoolSize,
0N/A int maximumPoolSize,
0N/A long keepAliveTime,
0N/A TimeUnit unit,
0N/A BlockingQueue<Runnable> workQueue,
0N/A ThreadFactory threadFactory,
0N/A RejectedExecutionHandler handler) {
0N/A if (corePoolSize < 0 ||
0N/A maximumPoolSize <= 0 ||
0N/A maximumPoolSize < corePoolSize ||
0N/A keepAliveTime < 0)
0N/A throw new IllegalArgumentException();
0N/A if (workQueue == null || threadFactory == null || handler == null)
0N/A throw new NullPointerException();
0N/A this.corePoolSize = corePoolSize;
0N/A this.maximumPoolSize = maximumPoolSize;
0N/A this.workQueue = workQueue;
0N/A this.keepAliveTime = unit.toNanos(keepAliveTime);
0N/A this.threadFactory = threadFactory;
0N/A this.handler = handler;
0N/A }
0N/A
0N/A /**
0N/A * Executes the given task sometime in the future. The task
0N/A * may execute in a new thread or in an existing pooled thread.
0N/A *
0N/A * If the task cannot be submitted for execution, either because this
0N/A * executor has been shutdown or because its capacity has been reached,
0N/A * the task is handled by the current {@code RejectedExecutionHandler}.
0N/A *
0N/A * @param command the task to execute
0N/A * @throws RejectedExecutionException at discretion of
0N/A * {@code RejectedExecutionHandler}, if the task
0N/A * cannot be accepted for execution
0N/A * @throws NullPointerException if {@code command} is null
0N/A */
0N/A public void execute(Runnable command) {
0N/A if (command == null)
0N/A throw new NullPointerException();
0N/A /*
0N/A * Proceed in 3 steps:
0N/A *
0N/A * 1. If fewer than corePoolSize threads are running, try to
0N/A * start a new thread with the given command as its first
0N/A * task. The call to addWorker atomically checks runState and
0N/A * workerCount, and so prevents false alarms that would add
0N/A * threads when it shouldn't, by returning false.
0N/A *
0N/A * 2. If a task can be successfully queued, then we still need
0N/A * to double-check whether we should have added a thread
0N/A * (because existing ones died since last checking) or that
0N/A * the pool shut down since entry into this method. So we
0N/A * recheck state and if necessary roll back the enqueuing if
0N/A * stopped, or start a new thread if there are none.
0N/A *
0N/A * 3. If we cannot queue task, then we try to add a new
0N/A * thread. If it fails, we know we are shut down or saturated
0N/A * and so reject the task.
0N/A */
0N/A int c = ctl.get();
0N/A if (workerCountOf(c) < corePoolSize) {
0N/A if (addWorker(command, true))
0N/A return;
0N/A c = ctl.get();
0N/A }
0N/A if (isRunning(c) && workQueue.offer(command)) {
0N/A int recheck = ctl.get();
0N/A if (! isRunning(recheck) && remove(command))
0N/A reject(command);
0N/A else if (workerCountOf(recheck) == 0)
0N/A addWorker(null, false);
0N/A }
0N/A else if (!addWorker(command, false))
0N/A reject(command);
0N/A }
0N/A
0N/A /**
0N/A * Initiates an orderly shutdown in which previously submitted
0N/A * tasks are executed, but no new tasks will be accepted.
0N/A * Invocation has no additional effect if already shut down.
0N/A *
38N/A * <p>This method does not wait for previously submitted tasks to
38N/A * complete execution. Use {@link #awaitTermination awaitTermination}
38N/A * to do that.
38N/A *
0N/A * @throws SecurityException {@inheritDoc}
0N/A */
0N/A public void shutdown() {
0N/A final ReentrantLock mainLock = this.mainLock;
0N/A mainLock.lock();
0N/A try {
0N/A checkShutdownAccess();
0N/A advanceRunState(SHUTDOWN);
0N/A interruptIdleWorkers();
0N/A onShutdown(); // hook for ScheduledThreadPoolExecutor
0N/A } finally {
0N/A mainLock.unlock();
0N/A }
0N/A tryTerminate();
0N/A }
0N/A
0N/A /**
0N/A * Attempts to stop all actively executing tasks, halts the
0N/A * processing of waiting tasks, and returns a list of the tasks
0N/A * that were awaiting execution. These tasks are drained (removed)
0N/A * from the task queue upon return from this method.
0N/A *
38N/A * <p>This method does not wait for actively executing tasks to
38N/A * terminate. Use {@link #awaitTermination awaitTermination} to
38N/A * do that.
38N/A *
0N/A * <p>There are no guarantees beyond best-effort attempts to stop
0N/A * processing actively executing tasks. This implementation
0N/A * cancels tasks via {@link Thread#interrupt}, so any task that
0N/A * fails to respond to interrupts may never terminate.
0N/A *
0N/A * @throws SecurityException {@inheritDoc}
0N/A */
0N/A public List<Runnable> shutdownNow() {
0N/A List<Runnable> tasks;
0N/A final ReentrantLock mainLock = this.mainLock;
0N/A mainLock.lock();
0N/A try {
0N/A checkShutdownAccess();
0N/A advanceRunState(STOP);
0N/A interruptWorkers();
0N/A tasks = drainQueue();
0N/A } finally {
0N/A mainLock.unlock();
0N/A }
0N/A tryTerminate();
0N/A return tasks;
0N/A }
0N/A
0N/A public boolean isShutdown() {
0N/A return ! isRunning(ctl.get());
0N/A }
0N/A
0N/A /**
0N/A * Returns true if this executor is in the process of terminating
0N/A * after {@link #shutdown} or {@link #shutdownNow} but has not
0N/A * completely terminated. This method may be useful for
0N/A * debugging. A return of {@code true} reported a sufficient
0N/A * period after shutdown may indicate that submitted tasks have
0N/A * ignored or suppressed interruption, causing this executor not
0N/A * to properly terminate.
0N/A *
0N/A * @return true if terminating but not yet terminated
0N/A */
0N/A public boolean isTerminating() {
0N/A int c = ctl.get();
0N/A return ! isRunning(c) && runStateLessThan(c, TERMINATED);
0N/A }
0N/A
0N/A public boolean isTerminated() {
0N/A return runStateAtLeast(ctl.get(), TERMINATED);
0N/A }
0N/A
0N/A public boolean awaitTermination(long timeout, TimeUnit unit)
0N/A throws InterruptedException {
0N/A long nanos = unit.toNanos(timeout);
0N/A final ReentrantLock mainLock = this.mainLock;
0N/A mainLock.lock();
0N/A try {
0N/A for (;;) {
0N/A if (runStateAtLeast(ctl.get(), TERMINATED))
0N/A return true;
0N/A if (nanos <= 0)
0N/A return false;
0N/A nanos = termination.awaitNanos(nanos);
0N/A }
0N/A } finally {
0N/A mainLock.unlock();
0N/A }
0N/A }
0N/A
0N/A /**
0N/A * Invokes {@code shutdown} when this executor is no longer
0N/A * referenced and it has no threads.
0N/A */
0N/A protected void finalize() {
0N/A shutdown();
0N/A }
0N/A
0N/A /**
0N/A * Sets the thread factory used to create new threads.
0N/A *
0N/A * @param threadFactory the new thread factory
0N/A * @throws NullPointerException if threadFactory is null
0N/A * @see #getThreadFactory
0N/A */
0N/A public void setThreadFactory(ThreadFactory threadFactory) {
0N/A if (threadFactory == null)
0N/A throw new NullPointerException();
0N/A this.threadFactory = threadFactory;
0N/A }
0N/A
0N/A /**
0N/A * Returns the thread factory used to create new threads.
0N/A *
0N/A * @return the current thread factory
0N/A * @see #setThreadFactory
0N/A */
0N/A public ThreadFactory getThreadFactory() {
0N/A return threadFactory;
0N/A }
0N/A
0N/A /**
0N/A * Sets a new handler for unexecutable tasks.
0N/A *
0N/A * @param handler the new handler
0N/A * @throws NullPointerException if handler is null
0N/A * @see #getRejectedExecutionHandler
0N/A */
0N/A public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
0N/A if (handler == null)
0N/A throw new NullPointerException();
0N/A this.handler = handler;
0N/A }
0N/A
0N/A /**
0N/A * Returns the current handler for unexecutable tasks.
0N/A *
0N/A * @return the current handler
0N/A * @see #setRejectedExecutionHandler
0N/A */
0N/A public RejectedExecutionHandler getRejectedExecutionHandler() {
0N/A return handler;
0N/A }
0N/A
0N/A /**
0N/A * Sets the core number of threads. This overrides any value set
0N/A * in the constructor. If the new value is smaller than the
0N/A * current value, excess existing threads will be terminated when
0N/A * they next become idle. If larger, new threads will, if needed,
0N/A * be started to execute any queued tasks.
0N/A *
0N/A * @param corePoolSize the new core size
0N/A * @throws IllegalArgumentException if {@code corePoolSize < 0}
0N/A * @see #getCorePoolSize
0N/A */
0N/A public void setCorePoolSize(int corePoolSize) {
0N/A if (corePoolSize < 0)
0N/A throw new IllegalArgumentException();
0N/A int delta = corePoolSize - this.corePoolSize;
0N/A this.corePoolSize = corePoolSize;
0N/A if (workerCountOf(ctl.get()) > corePoolSize)
0N/A interruptIdleWorkers();
0N/A else if (delta > 0) {
0N/A // We don't really know how many new threads are "needed".
0N/A // As a heuristic, prestart enough new workers (up to new
0N/A // core size) to handle the current number of tasks in
0N/A // queue, but stop if queue becomes empty while doing so.
0N/A int k = Math.min(delta, workQueue.size());
0N/A while (k-- > 0 && addWorker(null, true)) {
0N/A if (workQueue.isEmpty())
0N/A break;
0N/A }
0N/A }
0N/A }
0N/A
0N/A /**
0N/A * Returns the core number of threads.
0N/A *
0N/A * @return the core number of threads
0N/A * @see #setCorePoolSize
0N/A */
0N/A public int getCorePoolSize() {
0N/A return corePoolSize;
0N/A }
0N/A
0N/A /**
0N/A * Starts a core thread, causing it to idly wait for work. This
0N/A * overrides the default policy of starting core threads only when
0N/A * new tasks are executed. This method will return {@code false}
0N/A * if all core threads have already been started.
0N/A *
0N/A * @return {@code true} if a thread was started
0N/A */
0N/A public boolean prestartCoreThread() {
0N/A return workerCountOf(ctl.get()) < corePoolSize &&
0N/A addWorker(null, true);
0N/A }
0N/A
0N/A /**
4783N/A * Same as prestartCoreThread except arranges that at least one
4783N/A * thread is started even if corePoolSize is 0.
4783N/A */
4783N/A void ensurePrestart() {
4783N/A int wc = workerCountOf(ctl.get());
4783N/A if (wc < corePoolSize)
4783N/A addWorker(null, true);
4783N/A else if (wc == 0)
4783N/A addWorker(null, false);
4783N/A }
4783N/A
4783N/A /**
0N/A * Starts all core threads, causing them to idly wait for work. This
0N/A * overrides the default policy of starting core threads only when
0N/A * new tasks are executed.
0N/A *
0N/A * @return the number of threads started
0N/A */
0N/A public int prestartAllCoreThreads() {
0N/A int n = 0;
0N/A while (addWorker(null, true))
0N/A ++n;
0N/A return n;
0N/A }
0N/A
0N/A /**
0N/A * Returns true if this pool allows core threads to time out and
0N/A * terminate if no tasks arrive within the keepAlive time, being
0N/A * replaced if needed when new tasks arrive. When true, the same
0N/A * keep-alive policy applying to non-core threads applies also to
0N/A * core threads. When false (the default), core threads are never
0N/A * terminated due to lack of incoming tasks.
0N/A *
0N/A * @return {@code true} if core threads are allowed to time out,
0N/A * else {@code false}
0N/A *
0N/A * @since 1.6
0N/A */
0N/A public boolean allowsCoreThreadTimeOut() {
0N/A return allowCoreThreadTimeOut;
0N/A }
0N/A
0N/A /**
0N/A * Sets the policy governing whether core threads may time out and
0N/A * terminate if no tasks arrive within the keep-alive time, being
0N/A * replaced if needed when new tasks arrive. When false, core
0N/A * threads are never terminated due to lack of incoming
0N/A * tasks. When true, the same keep-alive policy applying to
0N/A * non-core threads applies also to core threads. To avoid
0N/A * continual thread replacement, the keep-alive time must be
0N/A * greater than zero when setting {@code true}. This method
0N/A * should in general be called before the pool is actively used.
0N/A *
0N/A * @param value {@code true} if should time out, else {@code false}
0N/A * @throws IllegalArgumentException if value is {@code true}
0N/A * and the current keep-alive time is not greater than zero
0N/A *
0N/A * @since 1.6
0N/A */
0N/A public void allowCoreThreadTimeOut(boolean value) {
0N/A if (value && keepAliveTime <= 0)
0N/A throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
0N/A if (value != allowCoreThreadTimeOut) {
0N/A allowCoreThreadTimeOut = value;
0N/A if (value)
0N/A interruptIdleWorkers();
0N/A }
0N/A }
0N/A
0N/A /**
0N/A * Sets the maximum allowed number of threads. This overrides any
0N/A * value set in the constructor. If the new value is smaller than
0N/A * the current value, excess existing threads will be
0N/A * terminated when they next become idle.
0N/A *
0N/A * @param maximumPoolSize the new maximum
0N/A * @throws IllegalArgumentException if the new maximum is
0N/A * less than or equal to zero, or
0N/A * less than the {@linkplain #getCorePoolSize core pool size}
0N/A * @see #getMaximumPoolSize
0N/A */
0N/A public void setMaximumPoolSize(int maximumPoolSize) {
0N/A if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
0N/A throw new IllegalArgumentException();
0N/A this.maximumPoolSize = maximumPoolSize;
0N/A if (workerCountOf(ctl.get()) > maximumPoolSize)
0N/A interruptIdleWorkers();
0N/A }
0N/A
0N/A /**
0N/A * Returns the maximum allowed number of threads.
0N/A *
0N/A * @return the maximum allowed number of threads
0N/A * @see #setMaximumPoolSize
0N/A */
0N/A public int getMaximumPoolSize() {
0N/A return maximumPoolSize;
0N/A }
0N/A
0N/A /**
0N/A * Sets the time limit for which threads may remain idle before
0N/A * being terminated. If there are more than the core number of
0N/A * threads currently in the pool, after waiting this amount of
0N/A * time without processing a task, excess threads will be
0N/A * terminated. This overrides any value set in the constructor.
0N/A *
0N/A * @param time the time to wait. A time value of zero will cause
0N/A * excess threads to terminate immediately after executing tasks.
0N/A * @param unit the time unit of the {@code time} argument
0N/A * @throws IllegalArgumentException if {@code time} less than zero or
0N/A * if {@code time} is zero and {@code allowsCoreThreadTimeOut}
0N/A * @see #getKeepAliveTime
0N/A */
0N/A public void setKeepAliveTime(long time, TimeUnit unit) {
0N/A if (time < 0)
0N/A throw new IllegalArgumentException();
0N/A if (time == 0 && allowsCoreThreadTimeOut())
0N/A throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
0N/A long keepAliveTime = unit.toNanos(time);
0N/A long delta = keepAliveTime - this.keepAliveTime;
0N/A this.keepAliveTime = keepAliveTime;
0N/A if (delta < 0)
0N/A interruptIdleWorkers();
0N/A }
0N/A
0N/A /**
0N/A * Returns the thread keep-alive time, which is the amount of time
0N/A * that threads in excess of the core pool size may remain
0N/A * idle before being terminated.
0N/A *
0N/A * @param unit the desired time unit of the result
0N/A * @return the time limit
0N/A * @see #setKeepAliveTime
0N/A */
0N/A public long getKeepAliveTime(TimeUnit unit) {
0N/A return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);
0N/A }
0N/A
0N/A /* User-level queue utilities */
0N/A
0N/A /**
0N/A * Returns the task queue used by this executor. Access to the
0N/A * task queue is intended primarily for debugging and monitoring.
0N/A * This queue may be in active use. Retrieving the task queue
0N/A * does not prevent queued tasks from executing.
0N/A *
0N/A * @return the task queue
0N/A */
0N/A public BlockingQueue<Runnable> getQueue() {
0N/A return workQueue;
0N/A }
0N/A
0N/A /**
0N/A * Removes this task from the executor's internal queue if it is
0N/A * present, thus causing it not to be run if it has not already
0N/A * started.
0N/A *
0N/A * <p> This method may be useful as one part of a cancellation
0N/A * scheme. It may fail to remove tasks that have been converted
0N/A * into other forms before being placed on the internal queue. For
0N/A * example, a task entered using {@code submit} might be
0N/A * converted into a form that maintains {@code Future} status.
0N/A * However, in such cases, method {@link #purge} may be used to
0N/A * remove those Futures that have been cancelled.
0N/A *
0N/A * @param task the task to remove
0N/A * @return true if the task was removed
0N/A */
0N/A public boolean remove(Runnable task) {
0N/A boolean removed = workQueue.remove(task);
0N/A tryTerminate(); // In case SHUTDOWN and now empty
0N/A return removed;
0N/A }
0N/A
0N/A /**
0N/A * Tries to remove from the work queue all {@link Future}
0N/A * tasks that have been cancelled. This method can be useful as a
0N/A * storage reclamation operation, that has no other impact on
0N/A * functionality. Cancelled tasks are never executed, but may
0N/A * accumulate in work queues until worker threads can actively
0N/A * remove them. Invoking this method instead tries to remove them now.
0N/A * However, this method may fail to remove tasks in
0N/A * the presence of interference by other threads.
0N/A */
0N/A public void purge() {
0N/A final BlockingQueue<Runnable> q = workQueue;
0N/A try {
0N/A Iterator<Runnable> it = q.iterator();
0N/A while (it.hasNext()) {
0N/A Runnable r = it.next();
0N/A if (r instanceof Future<?> && ((Future<?>)r).isCancelled())
0N/A it.remove();
0N/A }
0N/A } catch (ConcurrentModificationException fallThrough) {
0N/A // Take slow path if we encounter interference during traversal.
0N/A // Make copy for traversal and call remove for cancelled entries.
0N/A // The slow path is more likely to be O(N*N).
0N/A for (Object r : q.toArray())
0N/A if (r instanceof Future<?> && ((Future<?>)r).isCancelled())
0N/A q.remove(r);
0N/A }
0N/A
0N/A tryTerminate(); // In case SHUTDOWN and now empty
0N/A }
0N/A
0N/A /* Statistics */
0N/A
0N/A /**
0N/A * Returns the current number of threads in the pool.
0N/A *
0N/A * @return the number of threads
0N/A */
0N/A public int getPoolSize() {
0N/A final ReentrantLock mainLock = this.mainLock;
0N/A mainLock.lock();
0N/A try {
0N/A // Remove rare and surprising possibility of
0N/A // isTerminated() && getPoolSize() > 0
0N/A return runStateAtLeast(ctl.get(), TIDYING) ? 0
0N/A : workers.size();
0N/A } finally {
0N/A mainLock.unlock();
0N/A }
0N/A }
0N/A
0N/A /**
0N/A * Returns the approximate number of threads that are actively
0N/A * executing tasks.
0N/A *
0N/A * @return the number of threads
0N/A */
0N/A public int getActiveCount() {
0N/A final ReentrantLock mainLock = this.mainLock;
0N/A mainLock.lock();
0N/A try {
0N/A int n = 0;
0N/A for (Worker w : workers)
0N/A if (w.isLocked())
0N/A ++n;
0N/A return n;
0N/A } finally {
0N/A mainLock.unlock();
0N/A }
0N/A }
0N/A
0N/A /**
0N/A * Returns the largest number of threads that have ever
0N/A * simultaneously been in the pool.
0N/A *
0N/A * @return the number of threads
0N/A */
0N/A public int getLargestPoolSize() {
0N/A final ReentrantLock mainLock = this.mainLock;
0N/A mainLock.lock();
0N/A try {
0N/A return largestPoolSize;
0N/A } finally {
0N/A mainLock.unlock();
0N/A }
0N/A }
0N/A
0N/A /**
0N/A * Returns the approximate total number of tasks that have ever been
0N/A * scheduled for execution. Because the states of tasks and
0N/A * threads may change dynamically during computation, the returned
0N/A * value is only an approximation.
0N/A *
0N/A * @return the number of tasks
0N/A */
0N/A public long getTaskCount() {
0N/A final ReentrantLock mainLock = this.mainLock;
0N/A mainLock.lock();
0N/A try {
0N/A long n = completedTaskCount;
0N/A for (Worker w : workers) {
0N/A n += w.completedTasks;
0N/A if (w.isLocked())
0N/A ++n;
0N/A }
0N/A return n + workQueue.size();
0N/A } finally {
0N/A mainLock.unlock();
0N/A }
0N/A }
0N/A
0N/A /**
0N/A * Returns the approximate total number of tasks that have
0N/A * completed execution. Because the states of tasks and threads
0N/A * may change dynamically during computation, the returned value
0N/A * is only an approximation, but one that does not ever decrease
0N/A * across successive calls.
0N/A *
0N/A * @return the number of tasks
0N/A */
0N/A public long getCompletedTaskCount() {
0N/A final ReentrantLock mainLock = this.mainLock;
0N/A mainLock.lock();
0N/A try {
0N/A long n = completedTaskCount;
0N/A for (Worker w : workers)
0N/A n += w.completedTasks;
0N/A return n;
0N/A } finally {
0N/A mainLock.unlock();
0N/A }
0N/A }
0N/A
3387N/A /**
3387N/A * Returns a string identifying this pool, as well as its state,
3387N/A * including indications of run state and estimated worker and
3387N/A * task counts.
3387N/A *
3387N/A * @return a string identifying this pool, as well as its state
3387N/A */
3387N/A public String toString() {
3387N/A long ncompleted;
3387N/A int nworkers, nactive;
3387N/A final ReentrantLock mainLock = this.mainLock;
3387N/A mainLock.lock();
3387N/A try {
3387N/A ncompleted = completedTaskCount;
3387N/A nactive = 0;
3387N/A nworkers = workers.size();
3387N/A for (Worker w : workers) {
3387N/A ncompleted += w.completedTasks;
3387N/A if (w.isLocked())
3387N/A ++nactive;
3387N/A }
3387N/A } finally {
3387N/A mainLock.unlock();
3387N/A }
3387N/A int c = ctl.get();
3387N/A String rs = (runStateLessThan(c, SHUTDOWN) ? "Running" :
3387N/A (runStateAtLeast(c, TERMINATED) ? "Terminated" :
3387N/A "Shutting down"));
3387N/A return super.toString() +
3387N/A "[" + rs +
3387N/A ", pool size = " + nworkers +
3387N/A ", active threads = " + nactive +
3387N/A ", queued tasks = " + workQueue.size() +
3387N/A ", completed tasks = " + ncompleted +
3387N/A "]";
3387N/A }
3387N/A
0N/A /* Extension hooks */
0N/A
0N/A /**
0N/A * Method invoked prior to executing the given Runnable in the
0N/A * given thread. This method is invoked by thread {@code t} that
0N/A * will execute task {@code r}, and may be used to re-initialize
0N/A * ThreadLocals, or to perform logging.
0N/A *
0N/A * <p>This implementation does nothing, but may be customized in
0N/A * subclasses. Note: To properly nest multiple overridings, subclasses
0N/A * should generally invoke {@code super.beforeExecute} at the end of
0N/A * this method.
0N/A *
0N/A * @param t the thread that will run task {@code r}
0N/A * @param r the task that will be executed
0N/A */
0N/A protected void beforeExecute(Thread t, Runnable r) { }
0N/A
0N/A /**
0N/A * Method invoked upon completion of execution of the given Runnable.
0N/A * This method is invoked by the thread that executed the task. If
0N/A * non-null, the Throwable is the uncaught {@code RuntimeException}
0N/A * or {@code Error} that caused execution to terminate abruptly.
0N/A *
0N/A * <p>This implementation does nothing, but may be customized in
0N/A * subclasses. Note: To properly nest multiple overridings, subclasses
0N/A * should generally invoke {@code super.afterExecute} at the
0N/A * beginning of this method.
0N/A *
0N/A * <p><b>Note:</b> When actions are enclosed in tasks (such as
0N/A * {@link FutureTask}) either explicitly or via methods such as
0N/A * {@code submit}, these task objects catch and maintain
0N/A * computational exceptions, and so they do not cause abrupt
0N/A * termination, and the internal exceptions are <em>not</em>
0N/A * passed to this method. If you would like to trap both kinds of
0N/A * failures in this method, you can further probe for such cases,
0N/A * as in this sample subclass that prints either the direct cause
0N/A * or the underlying exception if a task has been aborted:
0N/A *
0N/A * <pre> {@code
0N/A * class ExtendedExecutor extends ThreadPoolExecutor {
0N/A * // ...
0N/A * protected void afterExecute(Runnable r, Throwable t) {
0N/A * super.afterExecute(r, t);
0N/A * if (t == null && r instanceof Future<?>) {
0N/A * try {
0N/A * Object result = ((Future<?>) r).get();
0N/A * } catch (CancellationException ce) {
0N/A * t = ce;
0N/A * } catch (ExecutionException ee) {
0N/A * t = ee.getCause();
0N/A * } catch (InterruptedException ie) {
0N/A * Thread.currentThread().interrupt(); // ignore/reset
0N/A * }
0N/A * }
0N/A * if (t != null)
0N/A * System.out.println(t);
0N/A * }
0N/A * }}</pre>
0N/A *
0N/A * @param r the runnable that has completed
0N/A * @param t the exception that caused termination, or null if
0N/A * execution completed normally
0N/A */
0N/A protected void afterExecute(Runnable r, Throwable t) { }
0N/A
0N/A /**
0N/A * Method invoked when the Executor has terminated. Default
0N/A * implementation does nothing. Note: To properly nest multiple
0N/A * overridings, subclasses should generally invoke
0N/A * {@code super.terminated} within this method.
0N/A */
0N/A protected void terminated() { }
0N/A
0N/A /* Predefined RejectedExecutionHandlers */
0N/A
0N/A /**
0N/A * A handler for rejected tasks that runs the rejected task
0N/A * directly in the calling thread of the {@code execute} method,
0N/A * unless the executor has been shut down, in which case the task
0N/A * is discarded.
0N/A */
0N/A public static class CallerRunsPolicy implements RejectedExecutionHandler {
0N/A /**
0N/A * Creates a {@code CallerRunsPolicy}.
0N/A */
0N/A public CallerRunsPolicy() { }
0N/A
0N/A /**
0N/A * Executes task r in the caller's thread, unless the executor
0N/A * has been shut down, in which case the task is discarded.
0N/A *
0N/A * @param r the runnable task requested to be executed
0N/A * @param e the executor attempting to execute this task
0N/A */
0N/A public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
0N/A if (!e.isShutdown()) {
0N/A r.run();
0N/A }
0N/A }
0N/A }
0N/A
0N/A /**
0N/A * A handler for rejected tasks that throws a
0N/A * {@code RejectedExecutionException}.
0N/A */
0N/A public static class AbortPolicy implements RejectedExecutionHandler {
0N/A /**
0N/A * Creates an {@code AbortPolicy}.
0N/A */
0N/A public AbortPolicy() { }
0N/A
0N/A /**
0N/A * Always throws RejectedExecutionException.
0N/A *
0N/A * @param r the runnable task requested to be executed
0N/A * @param e the executor attempting to execute this task
0N/A * @throws RejectedExecutionException always.
0N/A */
0N/A public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
3387N/A throw new RejectedExecutionException("Task " + r.toString() +
3387N/A " rejected from " +
3387N/A e.toString());
0N/A }
0N/A }
0N/A
0N/A /**
0N/A * A handler for rejected tasks that silently discards the
0N/A * rejected task.
0N/A */
0N/A public static class DiscardPolicy implements RejectedExecutionHandler {
0N/A /**
0N/A * Creates a {@code DiscardPolicy}.
0N/A */
0N/A public DiscardPolicy() { }
0N/A
0N/A /**
0N/A * Does nothing, which has the effect of discarding task r.
0N/A *
0N/A * @param r the runnable task requested to be executed
0N/A * @param e the executor attempting to execute this task
0N/A */
0N/A public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
0N/A }
0N/A }
0N/A
0N/A /**
0N/A * A handler for rejected tasks that discards the oldest unhandled
0N/A * request and then retries {@code execute}, unless the executor
0N/A * is shut down, in which case the task is discarded.
0N/A */
0N/A public static class DiscardOldestPolicy implements RejectedExecutionHandler {
0N/A /**
0N/A * Creates a {@code DiscardOldestPolicy} for the given executor.
0N/A */
0N/A public DiscardOldestPolicy() { }
0N/A
0N/A /**
0N/A * Obtains and ignores the next task that the executor
0N/A * would otherwise execute, if one is immediately available,
0N/A * and then retries execution of task r, unless the executor
0N/A * is shut down, in which case task r is instead discarded.
0N/A *
0N/A * @param r the runnable task requested to be executed
0N/A * @param e the executor attempting to execute this task
0N/A */
0N/A public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
0N/A if (!e.isShutdown()) {
0N/A e.getQueue().poll();
0N/A e.execute(r);
0N/A }
0N/A }
0N/A }
0N/A}
5681N/A