0N/A * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 0N/A * This code is free software; you can redistribute it and/or modify it 0N/A * under the terms of the GNU General Public License version 2 only, as 2362N/A * published by the Free Software Foundation. Oracle designates this 0N/A * particular file as subject to the "Classpath" exception as provided 2362N/A * by Oracle in the LICENSE file that accompanied this code. 0N/A * This code is distributed in the hope that it will be useful, but WITHOUT 0N/A * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 0N/A * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 0N/A * version 2 for more details (a copy is included in the LICENSE file that 0N/A * accompanied this code). 0N/A * You should have received a copy of the GNU General Public License version 0N/A * 2 along with this work; if not, write to the Free Software Foundation, 0N/A * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 2362N/A * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 2362N/A * or visit www.oracle.com if you need additional information or have any 0N/A * This file is available under and governed by the GNU General Public 0N/A * License version 2 only, as published by the Free Software Foundation. 0N/A * However, the following notice accompanied the original version of this 0N/A * Written by Doug Lea, Bill Scherer, and Michael Scott with 0N/A * assistance from members of JCP JSR-166 Expert Group and released to 0N/A * the public domain, as explained at 0N/A * A {@linkplain BlockingQueue blocking queue} in which each insert 0N/A * operation must wait for a corresponding remove operation by another 0N/A * thread, and vice versa. A synchronous queue does not have any 0N/A * internal capacity, not even a capacity of one. You cannot 0N/A * <tt>peek</tt> at a synchronous queue because an element is only 0N/A * present when you try to remove it; you cannot insert an element 0N/A * (using any method) unless another thread is trying to remove it; 0N/A * you cannot iterate as there is nothing to iterate. The 0N/A * <em>head</em> of the queue is the element that the first queued 0N/A * inserting thread is trying to add to the queue; if there is no such 0N/A * queued thread then no element is available for removal and 0N/A * <tt>poll()</tt> will return <tt>null</tt>. For purposes of other 0N/A * <tt>Collection</tt> methods (for example <tt>contains</tt>), a 0N/A * <tt>SynchronousQueue</tt> acts as an empty collection. This queue 0N/A * does not permit <tt>null</tt> elements. 0N/A * <p>Synchronous queues are similar to rendezvous channels used in 0N/A * CSP and Ada. They are well suited for handoff designs, in which an 0N/A * object running in one thread must sync up with an object running 0N/A * in another thread in order to hand it some information, event, or 0N/A * <p> This class supports an optional fairness policy for ordering 0N/A * waiting producer and consumer threads. By default, this ordering 0N/A * is not guaranteed. However, a queue constructed with fairness set 0N/A * to <tt>true</tt> grants threads access in FIFO order. 0N/A * <p>This class and its iterator implement all of the 0N/A * <em>optional</em> methods of the {@link Collection} and {@link 0N/A * Iterator} interfaces. 0N/A * <p>This class is a member of the 0N/A * Java Collections Framework</a>. 0N/A * @author Doug Lea and Bill Scherer and Michael Scott 0N/A * @param <E> the type of elements held in this collection 0N/A * This class implements extensions of the dual stack and dual 0N/A * queue algorithms described in "Nonblocking Concurrent Objects 0N/A * with Condition Synchronization", by W. N. Scherer III and 0N/A * M. L. Scott. 18th Annual Conf. on Distributed Computing, 0N/A * Oct. 2004 (see also 0N/A * The (Lifo) stack is used for non-fair mode, and the (Fifo) 0N/A * queue for fair mode. The performance of the two is generally 0N/A * similar. Fifo usually supports higher throughput under 0N/A * contention but Lifo maintains higher thread locality in common 0N/A * A dual queue (and similarly stack) is one that at any given 0N/A * time either holds "data" -- items provided by put operations, 0N/A * or "requests" -- slots representing take operations, or is 0N/A * empty. A call to "fulfill" (i.e., a call requesting an item 0N/A * from a queue holding data or vice versa) dequeues a 0N/A * complementary node. The most interesting feature of these 0N/A * queues is that any operation can figure out which mode the 0N/A * queue is in, and act accordingly without needing locks. 0N/A * Both the queue and stack extend abstract class Transferer 0N/A * defining the single method transfer that does a put or a 0N/A * take. These are unified into a single method because in dual 0N/A * data structures, the put and take operations are symmetrical, 0N/A * so nearly all code can be combined. The resulting transfer 0N/A * methods are on the long side, but are easier to follow than 0N/A * they would be if broken up into nearly-duplicated parts. 0N/A * The queue and stack data structures share many conceptual 0N/A * similarities but very few concrete details. For simplicity, 0N/A * they are kept distinct so that they can later evolve 0N/A * The algorithms here differ from the versions in the above paper 0N/A * in extending them for use in synchronous queues, as well as 0N/A * dealing with cancellation. The main differences include: 0N/A * 1. The original algorithms used bit-marked pointers, but 0N/A * the ones here use mode bits in nodes, leading to a number 0N/A * of further adaptations. 0N/A * 2. SynchronousQueues must block threads waiting to become 0N/A * 3. Support for cancellation via timeout and interrupts, 0N/A * from lists to avoid garbage retention and memory depletion. 0N/A * except that nodes that appear to be the next ones to become 0N/A * fulfilled first spin a bit (on multiprocessors only). On very 0N/A * busy synchronous queues, spinning can dramatically improve 0N/A * throughput. And on less busy ones, the amount of spinning is 0N/A * small enough not to be noticeable. 0N/A * Cleaning is done in different ways in queues vs stacks. For 0N/A * queues, we can almost always remove a node immediately in O(1) 0N/A * time (modulo retries for consistency checks) when it is 0N/A * cancelled. But if it may be pinned as the current tail, it must 0N/A * wait until some subsequent cancellation. For stacks, we need a 0N/A * potentially O(n) traversal to be sure that we can remove the 0N/A * node, but this can run concurrently with other threads 0N/A * accessing the stack. 0N/A * While garbage collection takes care of most node reclamation 0N/A * issues that otherwise complicate nonblocking algorithms, care 0N/A * is taken to "forget" references to data, other nodes, and 0N/A * threads that might be held on to long-term by blocked 0N/A * threads. In cases where setting to null would otherwise 0N/A * conflict with main algorithms, this is done by changing a 0N/A * node's link to now point to the node itself. This doesn't arise 0N/A * much for Stack nodes (because blocked threads do not hang on to 0N/A * old head pointers), but references in Queue nodes must be 0N/A * aggressively forgotten to avoid reachability of everything any 0N/A * node has ever referred to since arrival. 0N/A * Shared internal API for dual stacks and queues. 0N/A * Performs a put or take. 0N/A * @param e if non-null, the item to be handed to a consumer; 0N/A * if null, requests that transfer return an item 0N/A * offered by producer. 0N/A * @param timed if this operation should timeout 0N/A * @param nanos the timeout, in nanoseconds 0N/A * @return if non-null, the item provided or received; if null, 0N/A * the operation failed due to timeout or interrupt -- 0N/A * the caller can distinguish which of these occurred 0N/A * by checking Thread.interrupted. 0N/A /** The number of CPUs, for spin control */ 0N/A * The number of times to spin before blocking in timed waits. 0N/A * The value is empirically derived -- it works well across a 0N/A * variety of processors and OSes. Empirically, the best value 0N/A * seems not to vary with number of CPUs (beyond 2) so is just 0N/A * The number of times to spin before blocking in untimed waits. 0N/A * This is greater than timed value because untimed waits spin 0N/A * faster since they don't need to check times on each spin. 0N/A * The number of nanoseconds for which it is faster to spin 0N/A * rather than to use timed park. A rough estimate suffices. 0N/A * This extends Scherer-Scott dual stack algorithm, differing, 0N/A * among other ways, by using "covering" nodes rather than 0N/A * bit-marked pointers: Fulfilling operations push on marker 0N/A * nodes (with FULFILLING bit set in mode) to reserve a spot 0N/A * to match a waiting node. 0N/A /* Modes for SNodes, ORed together in node fields */ 0N/A /** Node represents an unfulfilled consumer */ 0N/A /** Node represents an unfulfilled producer */ 0N/A /** Node is fulfilling another unfulfilled DATA or REQUEST */ 0N/A /** Return true if m has fulfilling bit set */ 0N/A /** Node class for TransferStacks. */ 0N/A // Note: item and mode fields don't need to be volatile 0N/A // since they are always written before, and read after, 0N/A * Tries to match node s to this node, if so, waking up thread. 0N/A * Fulfillers call tryMatch to identify their waiters. 0N/A * Waiters block until they have been matched. 0N/A * @param s the node to match 0N/A * @return true if successfully matched to s 0N/A if (w !=
null) {
// waiters need at most one unpark 0N/A * Tries to cancel a wait by matching node to itself. 0N/A /** The head (top) of the stack */ 0N/A * Creates or resets fields of a node. Called only from transfer 0N/A * where the node to push on stack is lazily created and 0N/A * reused when possible to help reduce intervals between reads 0N/A * and CASes of head and to avoid surges of garbage when CASes 0N/A * to push nodes fail due to contention. 0N/A * Puts or takes an item. 0N/A * Basic algorithm is to loop trying one of three actions: 0N/A * 1. If apparently empty or already containing nodes of same 0N/A * mode, try to push node on stack and wait for a match, 0N/A * returning it, or null if cancelled. 0N/A * 2. If apparently containing node of complementary mode, 0N/A * try to push a fulfilling node on to stack, match 0N/A * with corresponding waiting node, pop both from 0N/A * stack, and return matched item. The matching or 0N/A * unlinking might not actually be necessary because of 0N/A * other threads performing action 3: 0N/A * 3. If top of stack already holds another fulfilling node, 0N/A * help it out by doing its match and/or pop 0N/A * operations, and then continue. The code for helping 0N/A * is essentially the same as for fulfilling, except 0N/A * that it doesn't return the item. 0N/A if (m == s) {
// wait was cancelled 0N/A for (;;) {
// loop until matched or waiters disappear 0N/A if (m ==
null) {
// all waiters are gone 0N/A s =
null;
// use new node next time 0N/A break;
// restart main loop 0N/A }
else // lost match 0N/A }
else {
// help a fulfiller 0N/A * @param s the waiting node 0N/A * @param timed true if timed wait 0N/A * @param nanos timeout value 0N/A * @return matched node, or s if cancelled 0N/A * field and then rechecks state at least one more time 0N/A * before actually parking, thus covering race vs 0N/A * fulfiller noticing that waiter is non-null so should be 0N/A * When invoked by nodes that appear at the point of call 0N/A * to be at the head of the stack, calls to park are 0N/A * preceded by spins to avoid blocking when producers and 0N/A * consumers are arriving very close in time. This can 0N/A * happen enough to bother only on multiprocessors. 0N/A * The order of checks for returning out of main loop 0N/A * reflects fact that interrupts have precedence over 0N/A * normal returns, which have precedence over 0N/A * timeouts. (So, on timeout, one last check for match is 0N/A * done before giving up.) Except that calls from untimed 0N/A * and don't wait at all, so are trapped in transfer 0N/A * method rather than calling awaitFulfill. 0N/A s.
waiter = w;
// establish waiter so can park next iter 0N/A * Returns true if node s is at head or there is an active 0N/A * Unlinks s from the stack. 0N/A * At worst we may need to traverse entire stack to unlink 0N/A * s. If there are multiple concurrent calls to clean, we 0N/A * might not see s if another thread has already removed 0N/A * it. But we can stop when we see any node known to 0N/A * follow s. We use s.next unless it too is cancelled, in 0N/A * which case we try the node one past. We don't check any 0N/A * further because we don't want to doubly traverse just to 0N/A // Absorb cancelled nodes at head 0N/A // Unsplice embedded nodes 0N/A * This extends Scherer-Scott dual queue algorithm, differing, 0N/A * among other ways, by using modes within nodes rather than 0N/A * marked pointers. The algorithm is a little simpler than 0N/A * that for stacks because fulfillers do not need explicit 0N/A * nodes, and matching is done by CAS'ing QNode.item field 0N/A * from non-null to null (for put) or vice versa (for take). 0N/A /** Node class for TransferQueue. */ 0N/A * Tries to cancel by CAS'ing ref to this as item. 0N/A * Returns true if this node is known to be off the queue 0N/A * because its next pointer has been forgotten due to 0N/A * an advanceHead operation. 0N/A /** Head of queue */ 0N/A /** Tail of queue */ 0N/A * Reference to a cancelled node that might not yet have been 0N/A * unlinked from queue because it was the last inserted node 0N/A * when it cancelled. 0N/A * Tries to cas nh as new head; if successful, unlink 0N/A * old head's next node to avoid garbage retention. 0N/A * Tries to cas nt as new tail. 0N/A * Tries to CAS cleanMe slot. 0N/A * Puts or takes an item. 0N/A /* Basic algorithm is to loop trying to take either of 0N/A * 1. If queue apparently empty or holding same-mode nodes, 0N/A * try to add node to queue of waiters, wait to be 0N/A * fulfilled (or cancelled) and return matching item. 0N/A * 2. If queue apparently contains waiting items, and this 0N/A * call is of complementary mode, try to fulfill by CAS'ing 0N/A * item field of waiting node and dequeuing it, and then 0N/A * returning matching item. 0N/A * In each case, along the way, check for and try to help 0N/A * The loop starts off with a null check guarding against 0N/A * seeing uninitialized head or tail values. This never 0N/A * happens in current SynchronousQueue, but could if 0N/A * transferer. The check is here anyway because it places 0N/A * null checks at top of loop, which is usually faster 0N/A * than having them implicitly interspersed. 0N/A if (t !=
tail)
// inconsistent read 0N/A if (x == s) {
// wait was cancelled 0N/A if (x !=
null)
// and forget fields 0N/A }
else {
// complementary-mode 0N/A continue;
// inconsistent read 0N/A x == m ||
// m cancelled 0N/A * @param s the waiting node 0N/A * @param e the comparison value for checking match 0N/A * @param timed true if timed wait 0N/A * @param nanos timeout value 0N/A * @return matched item, or s if cancelled 0N/A /* Same idea as TransferStack.awaitFulfill */ 0N/A * Gets rid of cancelled node s with original predecessor pred. 0N/A * At any given time, exactly one node on list cannot be 0N/A * deleted -- the last inserted node. To accommodate this, 0N/A * if we cannot delete s, we save its predecessor as 0N/A * "cleanMe", deleting the previously saved version 0N/A * first. At least one of node s or the node previously 0N/A * saved can always be deleted, so this always terminates. 0N/A while (
pred.
next == s) {
// Return early if already unlinked 0N/A if (s != t) {
// If not tail, try to unsplice 0N/A if (
dp !=
null) {
// Try unlinking previous cancelled node 0N/A d ==
dp ||
// d is off list or 0N/A (d != t &&
// d not tail and 0N/A dn != d &&
// that is on list 0N/A return;
// s is already saved node 0N/A return;
// Postpone cleaning s 0N/A * The transferer. Set only in constructor, but cannot be declared 0N/A * as final without further complicating serialization. Since 0N/A * this is accessed only at most once per public method, there 0N/A * isn't a noticeable performance penalty for using volatile 0N/A * instead of final here. 0N/A * Creates a <tt>SynchronousQueue</tt> with nonfair access policy. 0N/A * Creates a <tt>SynchronousQueue</tt> with the specified fairness policy. 0N/A * @param fair if true, waiting threads contend in FIFO order for 0N/A * access; otherwise the order is unspecified. 0N/A * Adds the specified element to this queue, waiting if necessary for 0N/A * another thread to receive it. 0N/A * @throws InterruptedException {@inheritDoc} 0N/A * @throws NullPointerException {@inheritDoc} 0N/A * Inserts the specified element into this queue, waiting if necessary 0N/A * up to the specified wait time for another thread to receive it. 0N/A * @return <tt>true</tt> if successful, or <tt>false</tt> if the 0N/A * specified waiting time elapses before a consumer appears. 0N/A * @throws InterruptedException {@inheritDoc} 0N/A * @throws NullPointerException {@inheritDoc} 0N/A * Inserts the specified element into this queue, if another thread is 0N/A * waiting to receive it. 0N/A * @param e the element to add 0N/A * @return <tt>true</tt> if the element was added to this queue, else 0N/A * @throws NullPointerException if the specified element is null 0N/A * Retrieves and removes the head of this queue, waiting if necessary 0N/A * for another thread to insert it. 0N/A * @return the head of this queue 0N/A * @throws InterruptedException {@inheritDoc} 0N/A * Retrieves and removes the head of this queue, waiting 0N/A * if necessary up to the specified wait time, for another thread 0N/A * @return the head of this queue, or <tt>null</tt> if the 0N/A * specified waiting time elapses before an element is present. 0N/A * @throws InterruptedException {@inheritDoc} 0N/A * Retrieves and removes the head of this queue, if another thread 0N/A * is currently making an element available. 0N/A * @return the head of this queue, or <tt>null</tt> if no 0N/A * element is available. 0N/A * Always returns <tt>true</tt>. 0N/A * A <tt>SynchronousQueue</tt> has no internal capacity. 0N/A * @return <tt>true</tt> 0N/A * Always returns zero. 0N/A * A <tt>SynchronousQueue</tt> has no internal capacity. 0N/A * Always returns zero. 0N/A * A <tt>SynchronousQueue</tt> has no internal capacity. 0N/A * A <tt>SynchronousQueue</tt> has no internal capacity. 0N/A * Always returns <tt>false</tt>. 0N/A * A <tt>SynchronousQueue</tt> has no internal capacity. 0N/A * @param o the element 0N/A * @return <tt>false</tt> 0N/A * Always returns <tt>false</tt>. 0N/A * A <tt>SynchronousQueue</tt> has no internal capacity. 0N/A * @param o the element to remove 0N/A * @return <tt>false</tt> 0N/A * Returns <tt>false</tt> unless the given collection is empty. 0N/A * A <tt>SynchronousQueue</tt> has no internal capacity. 0N/A * @param c the collection 0N/A * @return <tt>false</tt> unless given collection is empty 0N/A * Always returns <tt>false</tt>. 0N/A * A <tt>SynchronousQueue</tt> has no internal capacity. 0N/A * @param c the collection 0N/A * @return <tt>false</tt> 0N/A * Always returns <tt>false</tt>. 0N/A * A <tt>SynchronousQueue</tt> has no internal capacity. 0N/A * @param c the collection 0N/A * @return <tt>false</tt> 0N/A * Always returns <tt>null</tt>. 0N/A * A <tt>SynchronousQueue</tt> does not return elements 0N/A * unless actively waited on. 0N/A * @return <tt>null</tt> 0N/A * Returns an empty iterator in which <tt>hasNext</tt> always returns 0N/A * @return an empty iterator 0N/A * Returns a zero-length array. 0N/A * @return a zero-length array 0N/A * Sets the zeroeth element of the specified array to <tt>null</tt> 0N/A * (if the array has non-zero length) and returns it. 0N/A * @param a the array 0N/A * @return the specified array 0N/A * @throws NullPointerException if the specified array is null 0N/A * @throws UnsupportedOperationException {@inheritDoc} 0N/A * @throws ClassCastException {@inheritDoc} 0N/A * @throws NullPointerException {@inheritDoc} 0N/A * @throws IllegalArgumentException {@inheritDoc} 0N/A * @throws UnsupportedOperationException {@inheritDoc} 0N/A * @throws ClassCastException {@inheritDoc} 0N/A * @throws NullPointerException {@inheritDoc} 0N/A * @throws IllegalArgumentException {@inheritDoc} 0N/A * To cope with serialization strategy in the 1.5 version of 0N/A * SynchronousQueue, we declare some unused classes and fields 0N/A * that exist solely to enable serializability across versions. 0N/A * These fields are never used, so are initialized only if this 0N/A * object is ever serialized or deserialized. 0N/A * Save the state to a stream (that is, serialize it). 0N/A * @param s the stream 3387N/A // Convert Exception to corresponding Error