1771N/A/*
1771N/A * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
1771N/A *
1771N/A * This code is free software; you can redistribute it and/or modify it
1771N/A * under the terms of the GNU General Public License version 2 only, as
2362N/A * published by the Free Software Foundation. Oracle designates this
1771N/A * particular file as subject to the "Classpath" exception as provided
2362N/A * by Oracle in the LICENSE file that accompanied this code.
1771N/A *
1771N/A * This code is distributed in the hope that it will be useful, but WITHOUT
1771N/A * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
1771N/A * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
1771N/A * version 2 for more details (a copy is included in the LICENSE file that
1771N/A * accompanied this code).
1771N/A *
1771N/A * You should have received a copy of the GNU General Public License version
1771N/A * 2 along with this work; if not, write to the Free Software Foundation,
1771N/A * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
1771N/A *
2362N/A * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
2362N/A * or visit www.oracle.com if you need additional information or have any
2362N/A * questions.
1771N/A */
1771N/A
1771N/A/*
1771N/A * This file is available under and governed by the GNU General Public
1771N/A * License version 2 only, as published by the Free Software Foundation.
1771N/A * However, the following notice accompanied the original version of this
1771N/A * file:
1771N/A *
1771N/A * Written by Doug Lea with assistance from members of JCP JSR-166
1771N/A * Expert Group and released to the public domain, as explained at
3984N/A * http://creativecommons.org/publicdomain/zero/1.0/
1771N/A */
1771N/A
1771N/Apackage java.util.concurrent;
1771N/A
1771N/Aimport java.util.Collection;
3387N/Aimport java.util.concurrent.RejectedExecutionException;
1771N/A
1771N/A/**
3387N/A * A thread managed by a {@link ForkJoinPool}, which executes
3387N/A * {@link ForkJoinTask}s.
3387N/A * This class is subclassable solely for the sake of adding
3387N/A * functionality -- there are no overridable methods dealing with
3387N/A * scheduling or execution. However, you can override initialization
3387N/A * and termination methods surrounding the main task processing loop.
3387N/A * If you do create such a subclass, you will also need to supply a
3387N/A * custom {@link ForkJoinPool.ForkJoinWorkerThreadFactory} to use it
3387N/A * in a {@code ForkJoinPool}.
1771N/A *
1771N/A * @since 1.7
1771N/A * @author Doug Lea
1771N/A */
1771N/Apublic class ForkJoinWorkerThread extends Thread {
1771N/A /*
2754N/A * Overview:
2754N/A *
2754N/A * ForkJoinWorkerThreads are managed by ForkJoinPools and perform
2754N/A * ForkJoinTasks. This class includes bookkeeping in support of
2754N/A * worker activation, suspension, and lifecycle control described
2754N/A * in more detail in the internal documentation of class
2754N/A * ForkJoinPool. And as described further below, this class also
2754N/A * includes special-cased support for some ForkJoinTask
2754N/A * methods. But the main mechanics involve work-stealing:
1771N/A *
2754N/A * Work-stealing queues are special forms of Deques that support
2754N/A * only three of the four possible end-operations -- push, pop,
2754N/A * and deq (aka steal), under the further constraints that push
2754N/A * and pop are called only from the owning thread, while deq may
2754N/A * be called from other threads. (If you are unfamiliar with
2754N/A * them, you probably want to read Herlihy and Shavit's book "The
2754N/A * Art of Multiprocessor programming", chapter 16 describing these
2754N/A * in more detail before proceeding.) The main work-stealing
2754N/A * queue design is roughly similar to those in the papers "Dynamic
2754N/A * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005
2754N/A * (http://research.sun.com/scalable/pubs/index.html) and
2754N/A * "Idempotent work stealing" by Michael, Saraswat, and Vechev,
2754N/A * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186).
2754N/A * The main differences ultimately stem from gc requirements that
2754N/A * we null out taken slots as soon as we can, to maintain as small
2754N/A * a footprint as possible even in programs generating huge
2754N/A * numbers of tasks. To accomplish this, we shift the CAS
2754N/A * arbitrating pop vs deq (steal) from being on the indices
3736N/A * ("queueBase" and "queueTop") to the slots themselves (mainly
3736N/A * via method "casSlotNull()"). So, both a successful pop and deq
3736N/A * mainly entail a CAS of a slot from non-null to null. Because
3736N/A * we rely on CASes of references, we do not need tag bits on
3736N/A * queueBase or queueTop. They are simple ints as used in any
3736N/A * circular array-based queue (see for example ArrayDeque).
3736N/A * Updates to the indices must still be ordered in a way that
3736N/A * guarantees that queueTop == queueBase means the queue is empty,
3736N/A * but otherwise may err on the side of possibly making the queue
3736N/A * appear nonempty when a push, pop, or deq have not fully
3736N/A * committed. Note that this means that the deq operation,
3736N/A * considered individually, is not wait-free. One thief cannot
3736N/A * successfully continue until another in-progress one (or, if
3736N/A * previously empty, a push) completes. However, in the
2754N/A * aggregate, we ensure at least probabilistic non-blockingness.
2754N/A * If an attempted steal fails, a thief always chooses a different
2754N/A * random victim target to try next. So, in order for one thief to
2754N/A * progress, it suffices for any in-progress deq or new push on
3736N/A * any empty queue to complete.
1771N/A *
1771N/A * This approach also enables support for "async mode" where local
1771N/A * task processing is in FIFO, not LIFO order; simply by using a
1771N/A * version of deq rather than pop when locallyFifo is true (as set
1771N/A * by the ForkJoinPool). This allows use in message-passing
3736N/A * frameworks in which tasks are never joined. However neither
3736N/A * mode considers affinities, loads, cache localities, etc, so
3736N/A * rarely provide the best possible performance on a given
3736N/A * machine, but portably provide good throughput by averaging over
3736N/A * these factors. (Further, even if we did try to use such
3736N/A * information, we do not usually have a basis for exploiting
3736N/A * it. For example, some sets of tasks profit from cache
3736N/A * affinities, but others are harmed by cache pollution effects.)
1771N/A *
2754N/A * When a worker would otherwise be blocked waiting to join a
2754N/A * task, it first tries a form of linear helping: Each worker
2754N/A * records (in field currentSteal) the most recent task it stole
2754N/A * from some other worker. Plus, it records (in field currentJoin)
2754N/A * the task it is currently actively joining. Method joinTask uses
2754N/A * these markers to try to find a worker to help (i.e., steal back
2754N/A * a task from and execute it) that could hasten completion of the
2754N/A * actively joined task. In essence, the joiner executes a task
2754N/A * that would be on its own local deque had the to-be-joined task
2754N/A * not been stolen. This may be seen as a conservative variant of
2754N/A * the approach in Wagner & Calder "Leapfrogging: a portable
2754N/A * technique for implementing efficient futures" SIGPLAN Notices,
2754N/A * 1993 (http://portal.acm.org/citation.cfm?id=155354). It differs
2754N/A * in that: (1) We only maintain dependency links across workers
2754N/A * upon steals, rather than use per-task bookkeeping. This may
2754N/A * require a linear scan of workers array to locate stealers, but
2754N/A * usually doesn't because stealers leave hints (that may become
2754N/A * stale/wrong) of where to locate them. This isolates cost to
2754N/A * when it is needed, rather than adding to per-task overhead.
2754N/A * (2) It is "shallow", ignoring nesting and potentially cyclic
2754N/A * mutual steals. (3) It is intentionally racy: field currentJoin
2754N/A * is updated only while actively joining, which means that we
2754N/A * miss links in the chain during long-lived tasks, GC stalls etc
2754N/A * (which is OK since blocking in such cases is usually a good
2754N/A * idea). (4) We bound the number of attempts to find work (see
3736N/A * MAX_HELP) and fall back to suspending the worker and if
3736N/A * necessary replacing it with another.
2754N/A *
2754N/A * Efficient implementation of these algorithms currently relies
2754N/A * on an uncomfortable amount of "Unsafe" mechanics. To maintain
3736N/A * correct orderings, reads and writes of variable queueBase
3736N/A * require volatile ordering. Variable queueTop need not be
3736N/A * volatile because non-local reads always follow those of
3736N/A * queueBase. Similarly, because they are protected by volatile
3736N/A * queueBase reads, reads of the queue array and its slots by
3736N/A * other threads do not need volatile load semantics, but writes
3736N/A * (in push) require store order and CASes (in pop and deq)
3736N/A * require (volatile) CAS semantics. (Michael, Saraswat, and
3736N/A * Vechev's algorithm has similar properties, but without support
3736N/A * for nulling slots.) Since these combinations aren't supported
3736N/A * using ordinary volatiles, the only way to accomplish these
3736N/A * efficiently is to use direct Unsafe calls. (Using external
3736N/A * AtomicIntegers and AtomicReferenceArrays for the indices and
3736N/A * array is significantly slower because of memory locality and
3736N/A * indirection effects.)
1771N/A *
1771N/A * Further, performance on most platforms is very sensitive to
1771N/A * placement and sizing of the (resizable) queue array. Even
1771N/A * though these queues don't usually become all that big, the
1771N/A * initial size must be large enough to counteract cache
1771N/A * contention effects across multiple queues (especially in the
1771N/A * presence of GC cardmarking). Also, to improve thread-locality,
3736N/A * queues are initialized after starting.
1771N/A */
1771N/A
1771N/A /**
3736N/A * Mask for pool indices encoded as shorts
2754N/A */
3736N/A private static final int SMASK = 0xffff;
2754N/A
2754N/A /**
1771N/A * Capacity of work-stealing queue array upon initialization.
2754N/A * Must be a power of two. Initial size must be at least 4, but is
1771N/A * padded to minimize cache effects.
1771N/A */
1771N/A private static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
1771N/A
1771N/A /**
3736N/A * Maximum size for queue array. Must be a power of two
3736N/A * less than or equal to 1 << (31 - width of array entry) to
3736N/A * ensure lack of index wraparound, but is capped at a lower
3736N/A * value to help users trap runaway computations.
1771N/A */
3736N/A private static final int MAXIMUM_QUEUE_CAPACITY = 1 << 24; // 16M
3736N/A
3736N/A /**
3736N/A * The work-stealing queue array. Size must be a power of two.
3736N/A * Initialized when started (as oposed to when constructed), to
3736N/A * improve memory locality.
3736N/A */
3736N/A ForkJoinTask<?>[] queue;
1771N/A
1771N/A /**
1771N/A * The pool this thread works in. Accessed directly by ForkJoinTask.
1771N/A */
1771N/A final ForkJoinPool pool;
1771N/A
1771N/A /**
3736N/A * Index (mod queue.length) of next queue slot to push to or pop
3736N/A * from. It is written only by owner thread, and accessed by other
3736N/A * threads only after reading (volatile) queueBase. Both queueTop
3736N/A * and queueBase are allowed to wrap around on overflow, but
3736N/A * (queueTop - queueBase) still estimates size.
1771N/A */
3736N/A int queueTop;
1771N/A
1771N/A /**
1771N/A * Index (mod queue.length) of least valid queue slot, which is
1771N/A * always the next position to steal from if nonempty.
1771N/A */
3736N/A volatile int queueBase;
2754N/A
2754N/A /**
2754N/A * The index of most recent stealer, used as a hint to avoid
2754N/A * traversal in method helpJoinTask. This is only a hint because a
2754N/A * worker might have had multiple steals and this only holds one
2754N/A * of them (usually the most current). Declared non-volatile,
2754N/A * relying on other prevailing sync to keep reasonably current.
2754N/A */
3736N/A int stealHint;
1771N/A
1771N/A /**
1771N/A * Index of this worker in pool array. Set once by pool before
2754N/A * running, and accessed directly by pool to locate this worker in
2754N/A * its workers array.
1771N/A */
3736N/A final int poolIndex;
3736N/A
3736N/A /**
3736N/A * Encoded record for pool task waits. Usages are always
3736N/A * surrounded by volatile reads/writes
3736N/A */
3736N/A int nextWait;
1771N/A
1771N/A /**
3736N/A * Complement of poolIndex, offset by count of entries of task
3736N/A * waits. Accessed by ForkJoinPool to manage event waiters.
1771N/A */
3736N/A volatile int eventCount;
3736N/A
3736N/A /**
3736N/A * Seed for random number generator for choosing steal victims.
3736N/A * Uses Marsaglia xorshift. Must be initialized as nonzero.
3736N/A */
3736N/A int seed;
2754N/A
2754N/A /**
3736N/A * Number of steals. Directly accessed (and reset) by pool when
3736N/A * idle.
2754N/A */
3736N/A int stealCount;
3736N/A
3736N/A /**
3736N/A * True if this worker should or did terminate
3736N/A */
3736N/A volatile boolean terminate;
2754N/A
2754N/A /**
3736N/A * Set to true before LockSupport.park; false on return
2754N/A */
3736N/A volatile boolean parked;
1771N/A
1771N/A /**
3736N/A * True if use local fifo, not default lifo, for local polling.
3736N/A * Shadows value from ForkJoinPool.
1771N/A */
3736N/A final boolean locallyFifo;
3736N/A
3736N/A /**
3736N/A * The task most recently stolen from another worker (or
3736N/A * submission queue). All uses are surrounded by enough volatile
3736N/A * reads/writes to maintain as non-volatile.
3736N/A */
3736N/A ForkJoinTask<?> currentSteal;
2754N/A
2754N/A /**
2754N/A * The task currently being joined, set only when actively trying
3736N/A * to help other stealers in helpJoinTask. All uses are surrounded
3736N/A * by enough volatile reads/writes to maintain as non-volatile.
2754N/A */
3736N/A ForkJoinTask<?> currentJoin;
1771N/A
1771N/A /**
1771N/A * Creates a ForkJoinWorkerThread operating in the given pool.
1771N/A *
1771N/A * @param pool the pool this thread works in
1771N/A * @throws NullPointerException if pool is null
1771N/A */
1771N/A protected ForkJoinWorkerThread(ForkJoinPool pool) {
3736N/A super(pool.nextWorkerName());
1771N/A this.pool = pool;
3736N/A int k = pool.registerWorker(this);
3736N/A poolIndex = k;
3736N/A eventCount = ~k & SMASK; // clear wait count
3736N/A locallyFifo = pool.locallyFifo;
3736N/A Thread.UncaughtExceptionHandler ueh = pool.ueh;
3736N/A if (ueh != null)
3736N/A setUncaughtExceptionHandler(ueh);
2754N/A setDaemon(true);
1771N/A }
1771N/A
3736N/A // Public methods
1771N/A
1771N/A /**
1771N/A * Returns the pool hosting this thread.
1771N/A *
1771N/A * @return the pool
1771N/A */
1771N/A public ForkJoinPool getPool() {
1771N/A return pool;
1771N/A }
1771N/A
1771N/A /**
1771N/A * Returns the index number of this thread in its pool. The
1771N/A * returned value ranges from zero to the maximum number of
1771N/A * threads (minus one) that have ever been created in the pool.
1771N/A * This method may be useful for applications that track status or
1771N/A * collect results per-worker rather than per-task.
1771N/A *
1771N/A * @return the index number
1771N/A */
1771N/A public int getPoolIndex() {
1771N/A return poolIndex;
1771N/A }
1771N/A
3736N/A // Randomization
3736N/A
3736N/A /**
3736N/A * Computes next value for random victim probes and backoffs.
3736N/A * Scans don't require a very high quality generator, but also not
3736N/A * a crummy one. Marsaglia xor-shift is cheap and works well
3736N/A * enough. Note: This is manually inlined in FJP.scan() to avoid
3736N/A * writes inside busy loops.
3736N/A */
3736N/A private int nextSeed() {
3736N/A int r = seed;
3736N/A r ^= r << 13;
3736N/A r ^= r >>> 17;
3736N/A r ^= r << 5;
3736N/A return seed = r;
3736N/A }
3736N/A
3736N/A // Run State management
3736N/A
1771N/A /**
1771N/A * Initializes internal state after construction but before
1771N/A * processing any tasks. If you override this method, you must
3387N/A * invoke {@code super.onStart()} at the beginning of the method.
1771N/A * Initialization requires care: Most fields must have legal
1771N/A * default values, to ensure that attempted accesses from other
1771N/A * threads work correctly even before this thread starts
1771N/A * processing tasks.
1771N/A */
1771N/A protected void onStart() {
1771N/A queue = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
3736N/A int r = pool.workerSeedGenerator.nextInt();
4020N/A seed = (r == 0) ? 1 : r; // must be nonzero
1771N/A }
1771N/A
1771N/A /**
1771N/A * Performs cleanup associated with termination of this worker
1771N/A * thread. If you override this method, you must invoke
1771N/A * {@code super.onTermination} at the end of the overridden method.
1771N/A *
1771N/A * @param exception the exception causing this thread to abort due
1771N/A * to an unrecoverable error, or {@code null} if completed normally
1771N/A */
1771N/A protected void onTermination(Throwable exception) {
2754N/A try {
3736N/A terminate = true;
1771N/A cancelTasks();
3736N/A pool.deregisterWorker(this, exception);
1771N/A } catch (Throwable ex) { // Shouldn't ever happen
1771N/A if (exception == null) // but if so, at least rethrown
1771N/A exception = ex;
1771N/A } finally {
1771N/A if (exception != null)
2754N/A UNSAFE.throwException(exception);
2754N/A }
2754N/A }
2754N/A
2754N/A /**
2754N/A * This method is required to be public, but should never be
2754N/A * called explicitly. It performs the main run loop to execute
3387N/A * {@link ForkJoinTask}s.
2754N/A */
2754N/A public void run() {
2754N/A Throwable exception = null;
2754N/A try {
2754N/A onStart();
3736N/A pool.work(this);
2754N/A } catch (Throwable ex) {
2754N/A exception = ex;
2754N/A } finally {
2754N/A onTermination(exception);
1771N/A }
1771N/A }
1771N/A
2754N/A /*
2754N/A * Intrinsics-based atomic writes for queue slots. These are
2754N/A * basically the same as methods in AtomicReferenceArray, but
2754N/A * specialized for (1) ForkJoinTask elements (2) requirement that
2754N/A * nullness and bounds checks have already been performed by
2754N/A * callers and (3) effective offsets are known not to overflow
2754N/A * from int to long (because of MAXIMUM_QUEUE_CAPACITY). We don't
2754N/A * need corresponding version for reads: plain array reads are OK
2754N/A * because they are protected by other volatile reads and are
2754N/A * confirmed by CASes.
2754N/A *
3736N/A * Most uses don't actually call these methods, but instead
3736N/A * contain inlined forms that enable more predictable
3736N/A * optimization. We don't define the version of write used in
3736N/A * pushTask at all, but instead inline there a store-fenced array
3736N/A * slot write.
3736N/A *
3736N/A * Also in most methods, as a performance (not correctness) issue,
3736N/A * we'd like to encourage compilers not to arbitrarily postpone
3736N/A * setting queueTop after writing slot. Currently there is no
3736N/A * intrinsic for arranging this, but using Unsafe putOrderedInt
3736N/A * may be a preferable strategy on some compilers even though its
3736N/A * main effect is a pre-, not post- fence. To simplify possible
3736N/A * changes, the option is left in comments next to the associated
3736N/A * assignments.
2754N/A */
1771N/A
1771N/A /**
2754N/A * CASes slot i of array q from t to null. Caller must ensure q is
2754N/A * non-null and index is in range.
2754N/A */
2754N/A private static final boolean casSlotNull(ForkJoinTask<?>[] q, int i,
2754N/A ForkJoinTask<?> t) {
3736N/A return UNSAFE.compareAndSwapObject(q, (i << ASHIFT) + ABASE, t, null);
2754N/A }
2754N/A
2754N/A /**
2754N/A * Performs a volatile write of the given task at given slot of
2754N/A * array q. Caller must ensure q is non-null and index is in
2754N/A * range. This method is used only during resets and backouts.
2754N/A */
2754N/A private static final void writeSlot(ForkJoinTask<?>[] q, int i,
2754N/A ForkJoinTask<?> t) {
3736N/A UNSAFE.putObjectVolatile(q, (i << ASHIFT) + ABASE, t);
2754N/A }
2754N/A
2754N/A // queue methods
2754N/A
2754N/A /**
2754N/A * Pushes a task. Call only from this thread.
1771N/A *
1771N/A * @param t the task. Caller must ensure non-null.
1771N/A */
1771N/A final void pushTask(ForkJoinTask<?> t) {
3736N/A ForkJoinTask<?>[] q; int s, m;
3736N/A if ((q = queue) != null) { // ignore if queue removed
3736N/A long u = (((s = queueTop) & (m = q.length - 1)) << ASHIFT) + ABASE;
3736N/A UNSAFE.putOrderedObject(q, u, t);
3736N/A queueTop = s + 1; // or use putOrderedInt
3736N/A if ((s -= queueBase) <= 2)
3736N/A pool.signalWork();
3736N/A else if (s == m)
3736N/A growQueue();
3736N/A }
3736N/A }
3736N/A
3736N/A /**
3736N/A * Creates or doubles queue array. Transfers elements by
3736N/A * emulating steals (deqs) from old array and placing, oldest
3736N/A * first, into new array.
3736N/A */
3736N/A private void growQueue() {
3736N/A ForkJoinTask<?>[] oldQ = queue;
3736N/A int size = oldQ != null ? oldQ.length << 1 : INITIAL_QUEUE_CAPACITY;
3736N/A if (size > MAXIMUM_QUEUE_CAPACITY)
3736N/A throw new RejectedExecutionException("Queue capacity exceeded");
3736N/A if (size < INITIAL_QUEUE_CAPACITY)
3736N/A size = INITIAL_QUEUE_CAPACITY;
3736N/A ForkJoinTask<?>[] q = queue = new ForkJoinTask<?>[size];
3736N/A int mask = size - 1;
3736N/A int top = queueTop;
3736N/A int oldMask;
3736N/A if (oldQ != null && (oldMask = oldQ.length - 1) >= 0) {
3736N/A for (int b = queueBase; b != top; ++b) {
3736N/A long u = ((b & oldMask) << ASHIFT) + ABASE;
3736N/A Object x = UNSAFE.getObjectVolatile(oldQ, u);
3736N/A if (x != null && UNSAFE.compareAndSwapObject(oldQ, u, x, null))
3736N/A UNSAFE.putObjectVolatile
3736N/A (q, ((b & mask) << ASHIFT) + ABASE, x);
3736N/A }
3736N/A }
1771N/A }
1771N/A
1771N/A /**
1771N/A * Tries to take a task from the base of the queue, failing if
2754N/A * empty or contended. Note: Specializations of this code appear
2754N/A * in locallyDeqTask and elsewhere.
1771N/A *
1771N/A * @return a task, or null if none or contended
1771N/A */
1771N/A final ForkJoinTask<?> deqTask() {
3736N/A ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i;
3736N/A if (queueTop != (b = queueBase) &&
1771N/A (q = queue) != null && // must read q after b
3736N/A (i = (q.length - 1) & b) >= 0 &&
3736N/A (t = q[i]) != null && queueBase == b &&
3736N/A UNSAFE.compareAndSwapObject(q, (i << ASHIFT) + ABASE, t, null)) {
3736N/A queueBase = b + 1;
1771N/A return t;
1771N/A }
1771N/A return null;
1771N/A }
1771N/A
1771N/A /**
3736N/A * Tries to take a task from the base of own queue. Called only
3736N/A * by this thread.
1771N/A *
1771N/A * @return a task, or null if none
1771N/A */
1771N/A final ForkJoinTask<?> locallyDeqTask() {
3736N/A ForkJoinTask<?> t; int m, b, i;
2754N/A ForkJoinTask<?>[] q = queue;
3736N/A if (q != null && (m = q.length - 1) >= 0) {
3736N/A while (queueTop != (b = queueBase)) {
3736N/A if ((t = q[i = m & b]) != null &&
3736N/A queueBase == b &&
3736N/A UNSAFE.compareAndSwapObject(q, (i << ASHIFT) + ABASE,
2754N/A t, null)) {
3736N/A queueBase = b + 1;
1771N/A return t;
1771N/A }
1771N/A }
1771N/A }
1771N/A return null;
1771N/A }
1771N/A
1771N/A /**
3736N/A * Returns a popped task, or null if empty.
2754N/A * Called only by this thread.
1771N/A */
2754N/A private ForkJoinTask<?> popTask() {
3736N/A int m;
2754N/A ForkJoinTask<?>[] q = queue;
3736N/A if (q != null && (m = q.length - 1) >= 0) {
3736N/A for (int s; (s = queueTop) != queueBase;) {
3736N/A int i = m & --s;
3736N/A long u = (i << ASHIFT) + ABASE; // raw offset
1771N/A ForkJoinTask<?> t = q[i];
2754N/A if (t == null) // lost to stealer
1771N/A break;
2754N/A if (UNSAFE.compareAndSwapObject(q, u, t, null)) {
3736N/A queueTop = s; // or putOrderedInt
2754N/A return t;
2754N/A }
1771N/A }
1771N/A }
1771N/A return null;
1771N/A }
1771N/A
1771N/A /**
2754N/A * Specialized version of popTask to pop only if topmost element
3736N/A * is the given task. Called only by this thread.
1771N/A *
1771N/A * @param t the task. Caller must ensure non-null.
1771N/A */
1771N/A final boolean unpushTask(ForkJoinTask<?> t) {
3736N/A ForkJoinTask<?>[] q;
2754N/A int s;
3736N/A if ((q = queue) != null && (s = queueTop) != queueBase &&
2754N/A UNSAFE.compareAndSwapObject
3736N/A (q, (((q.length - 1) & --s) << ASHIFT) + ABASE, t, null)) {
3736N/A queueTop = s; // or putOrderedInt
1771N/A return true;
1771N/A }
1771N/A return false;
1771N/A }
1771N/A
1771N/A /**
2754N/A * Returns next task, or null if empty or contended.
1771N/A */
1771N/A final ForkJoinTask<?> peekTask() {
3736N/A int m;
1771N/A ForkJoinTask<?>[] q = queue;
3736N/A if (q == null || (m = q.length - 1) < 0)
1771N/A return null;
3736N/A int i = locallyFifo ? queueBase : (queueTop - 1);
3736N/A return q[i & m];
1771N/A }
1771N/A
3736N/A // Support methods for ForkJoinPool
2754N/A
2754N/A /**
3736N/A * Runs the given task, plus any local tasks until queue is empty
1771N/A */
3736N/A final void execTask(ForkJoinTask<?> t) {
3736N/A currentSteal = t;
3736N/A for (;;) {
3736N/A if (t != null)
3736N/A t.doExec();
3736N/A if (queueTop == queueBase)
3736N/A break;
3736N/A t = locallyFifo ? locallyDeqTask() : popTask();
2805N/A }
3736N/A ++stealCount;
3736N/A currentSteal = null;
2754N/A }
1771N/A
1771N/A /**
1771N/A * Removes and cancels all tasks in queue. Can be called from any
1771N/A * thread.
1771N/A */
1771N/A final void cancelTasks() {
2754N/A ForkJoinTask<?> cj = currentJoin; // try to cancel ongoing tasks
3736N/A if (cj != null && cj.status >= 0)
2754N/A cj.cancelIgnoringExceptions();
2754N/A ForkJoinTask<?> cs = currentSteal;
3387N/A if (cs != null && cs.status >= 0)
2754N/A cs.cancelIgnoringExceptions();
3736N/A while (queueBase != queueTop) {
2754N/A ForkJoinTask<?> t = deqTask();
2754N/A if (t != null)
2754N/A t.cancelIgnoringExceptions();
2754N/A }
1771N/A }
1771N/A
1771N/A /**
1771N/A * Drains tasks to given collection c.
1771N/A *
1771N/A * @return the number of tasks drained
1771N/A */
1771N/A final int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
1771N/A int n = 0;
3736N/A while (queueBase != queueTop) {
2754N/A ForkJoinTask<?> t = deqTask();
2754N/A if (t != null) {
2754N/A c.add(t);
2754N/A ++n;
2754N/A }
1771N/A }
1771N/A return n;
1771N/A }
1771N/A
1771N/A // Support methods for ForkJoinTask
1771N/A
1771N/A /**
3736N/A * Returns an estimate of the number of tasks in the queue.
3736N/A */
3736N/A final int getQueueSize() {
3736N/A return queueTop - queueBase;
3736N/A }
3736N/A
3736N/A /**
2754N/A * Gets and removes a local task.
2754N/A *
2754N/A * @return a task, if available
2754N/A */
2754N/A final ForkJoinTask<?> pollLocalTask() {
3736N/A return locallyFifo ? locallyDeqTask() : popTask();
2754N/A }
2754N/A
2754N/A /**
2754N/A * Gets and removes a local or stolen task.
2754N/A *
2754N/A * @return a task, if available
2754N/A */
2754N/A final ForkJoinTask<?> pollTask() {
3736N/A ForkJoinWorkerThread[] ws;
2754N/A ForkJoinTask<?> t = pollLocalTask();
3736N/A if (t != null || (ws = pool.workers) == null)
3736N/A return t;
3736N/A int n = ws.length; // cheap version of FJP.scan
3736N/A int steps = n << 1;
3736N/A int r = nextSeed();
3736N/A int i = 0;
3736N/A while (i < steps) {
3736N/A ForkJoinWorkerThread w = ws[(i++ + r) & (n - 1)];
3736N/A if (w != null && w.queueBase != w.queueTop && w.queue != null) {
3736N/A if ((t = w.deqTask()) != null)
3736N/A return t;
3736N/A i = 0;
3736N/A }
2754N/A }
3736N/A return null;
2754N/A }
2754N/A
2754N/A /**
3736N/A * The maximum stolen->joining link depth allowed in helpJoinTask,
3736N/A * as well as the maximum number of retries (allowing on average
3736N/A * one staleness retry per level) per attempt to instead try
3736N/A * compensation. Depths for legitimate chains are unbounded, but
3736N/A * we use a fixed constant to avoid (otherwise unchecked) cycles
3736N/A * and bound staleness of traversal parameters at the expense of
3736N/A * sometimes blocking when we could be helping.
3736N/A */
3736N/A private static final int MAX_HELP = 16;
3736N/A
3736N/A /**
3736N/A * Possibly runs some tasks and/or blocks, until joinMe is done.
2754N/A *
2754N/A * @param joinMe the task to join
3736N/A * @return completion status on exit
1771N/A */
3736N/A final int joinTask(ForkJoinTask<?> joinMe) {
2754N/A ForkJoinTask<?> prevJoin = currentJoin;
3736N/A currentJoin = joinMe;
3736N/A for (int s, retries = MAX_HELP;;) {
3736N/A if ((s = joinMe.status) < 0) {
3736N/A currentJoin = prevJoin;
3736N/A return s;
3736N/A }
3736N/A if (retries > 0) {
3736N/A if (queueTop != queueBase) {
3736N/A if (!localHelpJoinTask(joinMe))
3736N/A retries = 0; // cannot help
3736N/A }
3736N/A else if (retries == MAX_HELP >>> 1) {
3736N/A --retries; // check uncommon case
3736N/A if (tryDeqAndExec(joinMe) >= 0)
3736N/A Thread.yield(); // for politeness
3736N/A }
3736N/A else
4020N/A retries = helpJoinTask(joinMe) ? MAX_HELP : retries - 1;
3736N/A }
3736N/A else {
3736N/A retries = MAX_HELP; // restart if not done
3736N/A pool.tryAwaitJoin(joinMe);
3736N/A }
3736N/A }
3736N/A }
3736N/A
3736N/A /**
3736N/A * If present, pops and executes the given task, or any other
3736N/A * cancelled task
3736N/A *
3736N/A * @return false if any other non-cancelled task exists in local queue
3736N/A */
3736N/A private boolean localHelpJoinTask(ForkJoinTask<?> joinMe) {
3736N/A int s, i; ForkJoinTask<?>[] q; ForkJoinTask<?> t;
3736N/A if ((s = queueTop) != queueBase && (q = queue) != null &&
3736N/A (i = (q.length - 1) & --s) >= 0 &&
3736N/A (t = q[i]) != null) {
3736N/A if (t != joinMe && t.status >= 0)
3736N/A return false;
3736N/A if (UNSAFE.compareAndSwapObject
3736N/A (q, (i << ASHIFT) + ABASE, t, null)) {
3736N/A queueTop = s; // or putOrderedInt
3736N/A t.doExec();
3736N/A }
3736N/A }
3736N/A return true;
2754N/A }
2754N/A
2754N/A /**
3736N/A * Tries to locate and execute tasks for a stealer of the given
3736N/A * task, or in turn one of its stealers, Traces
3387N/A * currentSteal->currentJoin links looking for a thread working on
3387N/A * a descendant of the given task and with a non-empty queue to
3736N/A * steal back and execute tasks from. The implementation is very
3736N/A * branchy to cope with potential inconsistencies or loops
3736N/A * encountering chains that are stale, unknown, or of length
3736N/A * greater than MAX_HELP links. All of these cases are dealt with
3736N/A * by just retrying by caller.
2754N/A *
2754N/A * @param joinMe the task to join
3736N/A * @param canSteal true if local queue is empty
3736N/A * @return true if ran a task
1771N/A */
3736N/A private boolean helpJoinTask(ForkJoinTask<?> joinMe) {
3736N/A boolean helped = false;
3736N/A int m = pool.scanGuard & SMASK;
3736N/A ForkJoinWorkerThread[] ws = pool.workers;
3736N/A if (ws != null && ws.length > m && joinMe.status >= 0) {
3736N/A int levels = MAX_HELP; // remaining chain length
3736N/A ForkJoinTask<?> task = joinMe; // base of chain
3736N/A outer:for (ForkJoinWorkerThread thread = this;;) {
3736N/A // Try to find v, the stealer of task, by first using hint
3736N/A ForkJoinWorkerThread v = ws[thread.stealHint & m];
3736N/A if (v == null || v.currentSteal != task) {
3736N/A for (int j = 0; ;) { // search array
3736N/A if ((v = ws[j]) != null && v.currentSteal == task) {
3736N/A thread.stealHint = j;
3736N/A break; // save hint for next time
3736N/A }
3736N/A if (++j > m)
3736N/A break outer; // can't find stealer
3736N/A }
3736N/A }
3736N/A // Try to help v, using specialized form of deqTask
3736N/A for (;;) {
3736N/A ForkJoinTask<?>[] q; int b, i;
3736N/A if (joinMe.status < 0)
3736N/A break outer;
3736N/A if ((b = v.queueBase) == v.queueTop ||
3736N/A (q = v.queue) == null ||
3736N/A (i = (q.length-1) & b) < 0)
3736N/A break; // empty
3736N/A long u = (i << ASHIFT) + ABASE;
3736N/A ForkJoinTask<?> t = q[i];
3736N/A if (task.status < 0)
3736N/A break outer; // stale
3736N/A if (t != null && v.queueBase == b &&
3736N/A UNSAFE.compareAndSwapObject(q, u, t, null)) {
3736N/A v.queueBase = b + 1;
3736N/A v.stealHint = poolIndex;
3736N/A ForkJoinTask<?> ps = currentSteal;
3736N/A currentSteal = t;
3736N/A t.doExec();
3736N/A currentSteal = ps;
3736N/A helped = true;
3736N/A }
3736N/A }
3736N/A // Try to descend to find v's stealer
3736N/A ForkJoinTask<?> next = v.currentJoin;
3736N/A if (--levels > 0 && task.status >= 0 &&
3736N/A next != null && next != task) {
3736N/A task = next;
3736N/A thread = v;
3736N/A }
3736N/A else
3736N/A break; // max levels, stale, dead-end, or cyclic
3387N/A }
3736N/A }
3736N/A return helped;
3736N/A }
3736N/A
3736N/A /**
3736N/A * Performs an uncommon case for joinTask: If task t is at base of
3736N/A * some workers queue, steals and executes it.
3736N/A *
3736N/A * @param t the task
3736N/A * @return t's status
3736N/A */
3736N/A private int tryDeqAndExec(ForkJoinTask<?> t) {
3736N/A int m = pool.scanGuard & SMASK;
3736N/A ForkJoinWorkerThread[] ws = pool.workers;
3736N/A if (ws != null && ws.length > m && t.status >= 0) {
3736N/A for (int j = 0; j <= m; ++j) {
3736N/A ForkJoinTask<?>[] q; int b, i;
3736N/A ForkJoinWorkerThread v = ws[j];
3736N/A if (v != null &&
3736N/A (b = v.queueBase) != v.queueTop &&
3736N/A (q = v.queue) != null &&
3736N/A (i = (q.length - 1) & b) >= 0 &&
3736N/A q[i] == t) {
3736N/A long u = (i << ASHIFT) + ABASE;
3736N/A if (v.queueBase == b &&
3736N/A UNSAFE.compareAndSwapObject(q, u, t, null)) {
3736N/A v.queueBase = b + 1;
3736N/A v.stealHint = poolIndex;
3736N/A ForkJoinTask<?> ps = currentSteal;
3736N/A currentSteal = t;
3736N/A t.doExec();
3736N/A currentSteal = ps;
3736N/A }
3736N/A break;
2754N/A }
2754N/A }
3387N/A }
3736N/A return t.status;
1771N/A }
1771N/A
1771N/A /**
3736N/A * Implements ForkJoinTask.getSurplusQueuedTaskCount(). Returns
3736N/A * an estimate of the number of tasks, offset by a function of
3736N/A * number of idle workers.
2754N/A *
2754N/A * This method provides a cheap heuristic guide for task
2754N/A * partitioning when programmers, frameworks, tools, or languages
2754N/A * have little or no idea about task granularity. In essence by
2754N/A * offering this method, we ask users only about tradeoffs in
2754N/A * overhead vs expected throughput and its variance, rather than
2754N/A * how finely to partition tasks.
2754N/A *
2754N/A * In a steady state strict (tree-structured) computation, each
2754N/A * thread makes available for stealing enough tasks for other
2754N/A * threads to remain active. Inductively, if all threads play by
2754N/A * the same rules, each thread should make available only a
2754N/A * constant number of tasks.
2754N/A *
2754N/A * The minimum useful constant is just 1. But using a value of 1
2754N/A * would require immediate replenishment upon each steal to
2754N/A * maintain enough tasks, which is infeasible. Further,
2754N/A * partitionings/granularities of offered tasks should minimize
2754N/A * steal rates, which in general means that threads nearer the top
2754N/A * of computation tree should generate more than those nearer the
2754N/A * bottom. In perfect steady state, each thread is at
2754N/A * approximately the same level of computation tree. However,
2754N/A * producing extra tasks amortizes the uncertainty of progress and
2754N/A * diffusion assumptions.
2754N/A *
2754N/A * So, users will want to use values larger, but not much larger
2754N/A * than 1 to both smooth over transient shortages and hedge
2754N/A * against uneven progress; as traded off against the cost of
2754N/A * extra task overhead. We leave the user to pick a threshold
2754N/A * value to compare with the results of this call to guide
2754N/A * decisions, but recommend values such as 3.
2754N/A *
2754N/A * When all threads are active, it is on average OK to estimate
2754N/A * surplus strictly locally. In steady-state, if one thread is
2754N/A * maintaining say 2 surplus tasks, then so are others. So we can
3736N/A * just use estimated queue length (although note that (queueTop -
3736N/A * queueBase) can be an overestimate because of stealers lagging
3736N/A * increments of queueBase). However, this strategy alone leads
3736N/A * to serious mis-estimates in some non-steady-state conditions
3736N/A * (ramp-up, ramp-down, other stalls). We can detect many of these
3736N/A * by further considering the number of "idle" threads, that are
2754N/A * known to have zero queued tasks, so compensate by a factor of
2754N/A * (#idle/#active) threads.
1771N/A */
2754N/A final int getEstimatedSurplusTaskCount() {
3736N/A return queueTop - queueBase - pool.idlePerActive();
1771N/A }
1771N/A
1771N/A /**
3736N/A * Runs tasks until {@code pool.isQuiescent()}. We piggyback on
3736N/A * pool's active count ctl maintenance, but rather than blocking
3736N/A * when tasks cannot be found, we rescan until all others cannot
3736N/A * find tasks either. The bracketing by pool quiescerCounts
3736N/A * updates suppresses pool auto-shutdown mechanics that could
3736N/A * otherwise prematurely terminate the pool because all threads
3736N/A * appear to be inactive.
1771N/A */
1771N/A final void helpQuiescePool() {
3736N/A boolean active = true;
2754N/A ForkJoinTask<?> ps = currentSteal; // to restore below
3736N/A ForkJoinPool p = pool;
3736N/A p.addQuiescerCount(1);
1771N/A for (;;) {
3736N/A ForkJoinWorkerThread[] ws = p.workers;
3736N/A ForkJoinWorkerThread v = null;
3736N/A int n;
3736N/A if (queueTop != queueBase)
3736N/A v = this;
3736N/A else if (ws != null && (n = ws.length) > 1) {
3736N/A ForkJoinWorkerThread w;
3736N/A int r = nextSeed(); // cheap version of FJP.scan
3736N/A int steps = n << 1;
3736N/A for (int i = 0; i < steps; ++i) {
3736N/A if ((w = ws[(i + r) & (n - 1)]) != null &&
3736N/A w.queueBase != w.queueTop) {
3736N/A v = w;
3736N/A break;
3736N/A }
3736N/A }
3736N/A }
3736N/A if (v != null) {
3736N/A ForkJoinTask<?> t;
3736N/A if (!active) {
3736N/A active = true;
3736N/A p.addActiveCount(1);
3736N/A }
3736N/A if ((t = (v != this) ? v.deqTask() :
4020N/A locallyFifo ? locallyDeqTask() : popTask()) != null) {
3736N/A currentSteal = t;
3736N/A t.doExec();
3736N/A currentSteal = ps;
3736N/A }
3736N/A }
2754N/A else {
2754N/A if (active) {
3736N/A active = false;
3736N/A p.addActiveCount(-1);
2754N/A }
2754N/A if (p.isQuiescent()) {
3736N/A p.addActiveCount(1);
3736N/A p.addQuiescerCount(-1);
3736N/A break;
2754N/A }
2754N/A }
1771N/A }
1771N/A }
1771N/A
1771N/A // Unsafe mechanics
3736N/A private static final sun.misc.Unsafe UNSAFE;
3736N/A private static final long ABASE;
3736N/A private static final int ASHIFT;
1771N/A
1771N/A static {
3736N/A int s;
3736N/A try {
3736N/A UNSAFE = sun.misc.Unsafe.getUnsafe();
3736N/A Class a = ForkJoinTask[].class;
3736N/A ABASE = UNSAFE.arrayBaseOffset(a);
3736N/A s = UNSAFE.arrayIndexScale(a);
3736N/A } catch (Exception e) {
3736N/A throw new Error(e);
3736N/A }
1771N/A if ((s & (s-1)) != 0)
1771N/A throw new Error("data type scale not a power of two");
3736N/A ASHIFT = 31 - Integer.numberOfLeadingZeros(s);
1771N/A }
1771N/A
1771N/A}