LinkedBlockingQueue.java revision 1468
216N/A * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 0N/A * This code is free software; you can redistribute it and/or modify it 0N/A * under the terms of the GNU General Public License version 2 only, as 0N/A * published by the Free Software Foundation. Sun designates this 0N/A * particular file as subject to the "Classpath" exception as provided 0N/A * by Sun in the LICENSE file that accompanied this code. 0N/A * This code is distributed in the hope that it will be useful, but WITHOUT 0N/A * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 0N/A * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 0N/A * version 2 for more details (a copy is included in the LICENSE file that 0N/A * accompanied this code). 0N/A * You should have received a copy of the GNU General Public License version 0N/A * 2 along with this work; if not, write to the Free Software Foundation, 0N/A * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 0N/A * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, 0N/A * CA 95054 USA or visit www.sun.com if you need additional information or 0N/A * have any questions. 0N/A * This file is available under and governed by the GNU General Public 0N/A * License version 2 only, as published by the Free Software Foundation. 0N/A * However, the following notice accompanied the original version of this 0N/A * Written by Doug Lea with assistance from members of JCP JSR-166 0N/A * Expert Group and released to the public domain, as explained at 0N/A * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on 0N/A * This queue orders elements FIFO (first-in-first-out). 0N/A * The <em>head</em> of the queue is that element that has been on the 0N/A * queue the longest time. 0N/A * The <em>tail</em> of the queue is that element that has been on the 0N/A * queue the shortest time. New elements 0N/A * are inserted at the tail of the queue, and the queue retrieval 0N/A * operations obtain elements at the head of the queue. 0N/A * Linked queues typically have higher throughput than array-based queues but 0N/A * less predictable performance in most concurrent applications. 0N/A * <p> The optional capacity bound constructor argument serves as a 216N/A * way to prevent excessive queue expansion. The capacity, if unspecified, 0N/A * is equal to {@link Integer#MAX_VALUE}. Linked nodes are 0N/A * dynamically created upon each insertion unless this would bring the 0N/A * queue above capacity. 0N/A * <p>This class and its iterator implement all of the 1696N/A * <em>optional</em> methods of the {@link Collection} and {@link 0N/A * Iterator} interfaces. 0N/A * <p>This class is a member of the 0N/A * Java Collections Framework</a>. 0N/A * @param <E> the type of elements held in this collection 0N/A * A variant of the "two lock queue" algorithm. The putLock gates 0N/A * entry to put (and offer), and has an associated condition for 0N/A * waiting puts. Similarly for the takeLock. The "count" field 0N/A * that they both rely on is maintained as an atomic to avoid 0N/A * needing to get both locks in most cases. Also, to minimize need 0N/A * for puts to get takeLock and vice-versa, cascading notifies are 0N/A * used. When a put notices that it has enabled at least one take, 0N/A * it signals taker. That taker in turn signals others if more 0N/A * items have been entered since the signal. And symmetrically for 0N/A * takes signalling puts. Operations such as remove(Object) and 0N/A * iterators acquire both locks. 0N/A * Visibility between writers and readers is provided as follows: 0N/A * Whenever an element is enqueued, the putLock is acquired and 0N/A * count updated. A subsequent reader guarantees visibility to the 0N/A * enqueued Node by either acquiring the putLock (via fullyLock) 0N/A * or by acquiring the takeLock, and then reading n = count.get(); 0N/A * this gives visibility to the first n items. 0N/A * To implement weakly consistent iterators, it appears we need to 0N/A * keep all Nodes GC-reachable from a predecessor dequeued Node. 0N/A * That would cause two problems: 0N/A * - allow a rogue Iterator to cause unbounded memory retention 0N/A * - cause cross-generational linking of old Nodes to new Nodes if 0N/A * a Node was tenured while live, which generational GCs have a 0N/A * hard time dealing with, causing repeated major collections. 0N/A * However, only non-deleted Nodes need to be reachable from 0N/A * dequeued Nodes, and reachability does not necessarily have to 0N/A * be of the kind understood by the GC. We use the trick of 0N/A * linking a Node that has just been dequeued to itself. Such a 1696N/A * self-link implicitly means to advance to head.next. 0N/A * Linked list node class 0N/A * - the real successor Node 0N/A * - this Node, meaning the successor is head.next 0N/A * - null, meaning there is no successor (this is the last node) 0N/A /** The capacity bound, or Integer.MAX_VALUE if none */ 0N/A /** Current number of elements */ 0N/A * Head of linked list. 0N/A * Invariant: head.item == null 0N/A * Tail of linked list. 0N/A * Invariant: last.next == null 216N/A /** Lock held by take, poll, etc */ 0N/A /** Wait queue for waiting takes */ 0N/A /** Lock held by put, offer, etc */ 0N/A /** Wait queue for waiting puts */ 0N/A * Signals a waiting take. Called only from put/offer (which do not 216N/A * otherwise ordinarily lock takeLock.) 0N/A * Creates a node and links it at end of queue. 0N/A // assert putLock.isHeldByCurrentThread(); 0N/A // assert last.next == null; 0N/A * Removes a node from head of queue. 0N/A // assert takeLock.isHeldByCurrentThread(); 1696N/A // assert head.item == null; * Lock to prevent both puts and takes. * Unlock to allow both puts and takes. // * Tells whether both locks are held by current thread. // boolean isFullyLocked() { // return (putLock.isHeldByCurrentThread() && // takeLock.isHeldByCurrentThread()); * Creates a {@code LinkedBlockingQueue} with a capacity of * {@link Integer#MAX_VALUE}. * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity. * @param capacity the capacity of this queue * @throws IllegalArgumentException if {@code capacity} is not greater * Creates a {@code LinkedBlockingQueue} with a capacity of * {@link Integer#MAX_VALUE}, initially containing the elements of the * added in traversal order of the collection's iterator. * @param c the collection of elements to initially contain * @throws NullPointerException if the specified collection or any * of its elements are null putLock.
lock();
// Never contended, but necessary for visibility // this doc comment is overridden to remove the reference to collections // greater in size than Integer.MAX_VALUE * Returns the number of elements in this queue. * @return the number of elements in this queue // this doc comment is a modified copy of the inherited doc comment, // without the reference to unlimited queues. * Returns the number of additional elements that this queue can ideally * (in the absence of memory or resource constraints) accept without * blocking. This is always equal to the initial capacity of this queue * less the current {@code size} of this queue. * <p>Note that you <em>cannot</em> always tell if an attempt to insert * an element will succeed by inspecting {@code remainingCapacity} * because it may be the case that another thread is about to * insert or remove an element. * Inserts the specified element at the tail of this queue, waiting if * necessary for space to become available. * @throws InterruptedException {@inheritDoc} * @throws NullPointerException {@inheritDoc} // Note: convention in all put/take/etc is to preset local var // holding count negative to indicate failure unless set. * Note that count is used in wait guard even though it is * not protected by lock. This works because count can * only decrease at this point (all other puts are shut * out by lock), and we (or some other waiting put) are * signalled if it ever changes from capacity. Similarly * for all other uses of count in other wait guards. * Inserts the specified element at the tail of this queue, waiting if * necessary up to the specified wait time for space to become available. * @return {@code true} if successful, or {@code false} if * the specified waiting time elapses before space is available. * @throws InterruptedException {@inheritDoc} * @throws NullPointerException {@inheritDoc} * Inserts the specified element at the tail of this queue if it is * possible to do so immediately without exceeding the queue's capacity, * returning {@code true} upon success and {@code false} if this queue * When using a capacity-restricted queue, this method is generally * preferable to method {@link BlockingQueue#add add}, which can fail to * insert an element only by throwing an exception. * @throws NullPointerException if the specified element is null public boolean offer(E e) {
* Unlinks interior Node p with predecessor trail. // assert isFullyLocked(); // p.next is not changed, to allow iterators that are // traversing p to maintain their weak-consistency guarantee. * Removes a single instance of the specified element from this queue, * if it is present. More formally, removes an element {@code e} such * that {@code o.equals(e)}, if this queue contains one or more such * Returns {@code true} if this queue contained the specified element * (or equivalently, if this queue changed as a result of the call). * @param o element to be removed from this queue, if present * @return {@code true} if this queue changed as a result of the call if (o ==
null)
return false;
* Returns an array containing all of the elements in this queue, in * <p>The returned array will be "safe" in that no references to it are * maintained by this queue. (In other words, this method must allocate * a new array). The caller is thus free to modify the returned array. * <p>This method acts as bridge between array-based and collection-based * @return an array containing all of the elements in this queue * Returns an array containing all of the elements in this queue, in * proper sequence; the runtime type of the returned array is that of * the specified array. If the queue fits in the specified array, it * is returned therein. Otherwise, a new array is allocated with the * runtime type of the specified array and the size of this queue. * <p>If this queue fits in the specified array with room to spare * (i.e., the array has more elements than this queue), the element in * the array immediately following the end of the queue is set to * <p>Like the {@link #toArray()} method, this method acts as bridge between * array-based and collection-based APIs. Further, this method allows * precise control over the runtime type of the output array, and may, * under certain circumstances, be used to save allocation costs. * <p>Suppose {@code x} is a queue known to contain only strings. * The following code can be used to dump the queue into a newly * allocated array of {@code String}: * String[] y = x.toArray(new String[0]);</pre> * Note that {@code toArray(new Object[0])} is identical in function to * @param a the array into which the elements of the queue are to * be stored, if it is big enough; otherwise, a new array of the * same runtime type is allocated for this purpose * @return an array containing all of the elements in this queue * @throws ArrayStoreException if the runtime type of the specified array * is not a supertype of the runtime type of every element in * @throws NullPointerException if the specified array is null * Atomically removes all of the elements from this queue. * The queue will be empty after this call returns. // assert head.item == null && head.next == null; * @throws UnsupportedOperationException {@inheritDoc} * @throws ClassCastException {@inheritDoc} * @throws NullPointerException {@inheritDoc} * @throws IllegalArgumentException {@inheritDoc} * @throws UnsupportedOperationException {@inheritDoc} * @throws ClassCastException {@inheritDoc} * @throws NullPointerException {@inheritDoc} * @throws IllegalArgumentException {@inheritDoc} // count.get provides visibility to first n Nodes // Restore invariants even if c.add() threw // assert h.item == null; * Returns an iterator over the elements in this queue in proper sequence. * The returned {@code Iterator} is a "weakly consistent" iterator that * will never throw {@link ConcurrentModificationException}, * and guarantees to traverse elements as they existed upon * construction of the iterator, and may (but is not guaranteed to) * reflect any modifications subsequent to construction. * @return an iterator over the elements in this queue in proper sequence * Basic weakly-consistent iterator. At all times hold the next * item to hand out so that if hasNext() reports true, we will * still have it to return even if lost race with a take etc. * Unlike other traversal methods, iterators need to handle: * - dequeued nodes (p.next == p) * - interior removed nodes (p.item == null) // Skip over removed nodes. // May be necessary if multiple interior Nodes are removed. * Save the state to a stream (that is, serialize it). * @serialData The capacity is emitted (int), followed by all of * its elements (each an {@code Object}) in the proper order, // Write out any hidden stuff, plus capacity // Write out all elements in the proper order. // Use trailing null as sentinel * Reconstitute this queue instance from a stream (that is, // Read in capacity, and any hidden stuff // Read in all elements and place in queue