0N/A/*
0N/A * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
0N/A *
0N/A * This code is free software; you can redistribute it and/or modify it
0N/A * under the terms of the GNU General Public License version 2 only, as
2362N/A * published by the Free Software Foundation. Oracle designates this
0N/A * particular file as subject to the "Classpath" exception as provided
2362N/A * by Oracle in the LICENSE file that accompanied this code.
0N/A *
0N/A * This code is distributed in the hope that it will be useful, but WITHOUT
0N/A * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
0N/A * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
0N/A * version 2 for more details (a copy is included in the LICENSE file that
0N/A * accompanied this code).
0N/A *
0N/A * You should have received a copy of the GNU General Public License version
0N/A * 2 along with this work; if not, write to the Free Software Foundation,
0N/A * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
0N/A *
2362N/A * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
2362N/A * or visit www.oracle.com if you need additional information or have any
2362N/A * questions.
0N/A */
0N/A
0N/A/*
0N/A * This file is available under and governed by the GNU General Public
0N/A * License version 2 only, as published by the Free Software Foundation.
0N/A * However, the following notice accompanied the original version of this
0N/A * file:
0N/A *
0N/A * Written by Doug Lea with assistance from members of JCP JSR-166
0N/A * Expert Group and released to the public domain, as explained at
3984N/A * http://creativecommons.org/publicdomain/zero/1.0/
0N/A */
0N/A
0N/Apackage java.util.concurrent;
1468N/A
1468N/Aimport java.util.concurrent.atomic.AtomicInteger;
1468N/Aimport java.util.concurrent.locks.Condition;
1468N/Aimport java.util.concurrent.locks.ReentrantLock;
1468N/Aimport java.util.AbstractQueue;
1468N/Aimport java.util.Collection;
1468N/Aimport java.util.Iterator;
1468N/Aimport java.util.NoSuchElementException;
0N/A
0N/A/**
0N/A * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
0N/A * linked nodes.
0N/A * This queue orders elements FIFO (first-in-first-out).
0N/A * The <em>head</em> of the queue is that element that has been on the
0N/A * queue the longest time.
0N/A * The <em>tail</em> of the queue is that element that has been on the
0N/A * queue the shortest time. New elements
0N/A * are inserted at the tail of the queue, and the queue retrieval
0N/A * operations obtain elements at the head of the queue.
0N/A * Linked queues typically have higher throughput than array-based queues but
0N/A * less predictable performance in most concurrent applications.
0N/A *
0N/A * <p> The optional capacity bound constructor argument serves as a
0N/A * way to prevent excessive queue expansion. The capacity, if unspecified,
0N/A * is equal to {@link Integer#MAX_VALUE}. Linked nodes are
0N/A * dynamically created upon each insertion unless this would bring the
0N/A * queue above capacity.
0N/A *
0N/A * <p>This class and its iterator implement all of the
0N/A * <em>optional</em> methods of the {@link Collection} and {@link
0N/A * Iterator} interfaces.
0N/A *
0N/A * <p>This class is a member of the
0N/A * <a href="{@docRoot}/../technotes/guides/collections/index.html">
0N/A * Java Collections Framework</a>.
0N/A *
0N/A * @since 1.5
0N/A * @author Doug Lea
0N/A * @param <E> the type of elements held in this collection
0N/A *
0N/A */
0N/Apublic class LinkedBlockingQueue<E> extends AbstractQueue<E>
0N/A implements BlockingQueue<E>, java.io.Serializable {
0N/A private static final long serialVersionUID = -6903933977591709194L;
0N/A
0N/A /*
0N/A * A variant of the "two lock queue" algorithm. The putLock gates
0N/A * entry to put (and offer), and has an associated condition for
0N/A * waiting puts. Similarly for the takeLock. The "count" field
0N/A * that they both rely on is maintained as an atomic to avoid
0N/A * needing to get both locks in most cases. Also, to minimize need
0N/A * for puts to get takeLock and vice-versa, cascading notifies are
0N/A * used. When a put notices that it has enabled at least one take,
0N/A * it signals taker. That taker in turn signals others if more
0N/A * items have been entered since the signal. And symmetrically for
0N/A * takes signalling puts. Operations such as remove(Object) and
0N/A * iterators acquire both locks.
1468N/A *
1468N/A * Visibility between writers and readers is provided as follows:
1468N/A *
1468N/A * Whenever an element is enqueued, the putLock is acquired and
1468N/A * count updated. A subsequent reader guarantees visibility to the
1468N/A * enqueued Node by either acquiring the putLock (via fullyLock)
1468N/A * or by acquiring the takeLock, and then reading n = count.get();
1468N/A * this gives visibility to the first n items.
1468N/A *
1468N/A * To implement weakly consistent iterators, it appears we need to
1468N/A * keep all Nodes GC-reachable from a predecessor dequeued Node.
1468N/A * That would cause two problems:
1468N/A * - allow a rogue Iterator to cause unbounded memory retention
1468N/A * - cause cross-generational linking of old Nodes to new Nodes if
1468N/A * a Node was tenured while live, which generational GCs have a
1468N/A * hard time dealing with, causing repeated major collections.
1468N/A * However, only non-deleted Nodes need to be reachable from
1468N/A * dequeued Nodes, and reachability does not necessarily have to
1468N/A * be of the kind understood by the GC. We use the trick of
1468N/A * linking a Node that has just been dequeued to itself. Such a
1468N/A * self-link implicitly means to advance to head.next.
0N/A */
0N/A
0N/A /**
0N/A * Linked list node class
0N/A */
0N/A static class Node<E> {
1468N/A E item;
1468N/A
1468N/A /**
1468N/A * One of:
1468N/A * - the real successor Node
1468N/A * - this Node, meaning the successor is head.next
1468N/A * - null, meaning there is no successor (this is the last node)
1468N/A */
0N/A Node<E> next;
1468N/A
0N/A Node(E x) { item = x; }
0N/A }
0N/A
0N/A /** The capacity bound, or Integer.MAX_VALUE if none */
0N/A private final int capacity;
0N/A
0N/A /** Current number of elements */
0N/A private final AtomicInteger count = new AtomicInteger(0);
0N/A
1468N/A /**
1468N/A * Head of linked list.
1468N/A * Invariant: head.item == null
1468N/A */
0N/A private transient Node<E> head;
0N/A
1468N/A /**
1468N/A * Tail of linked list.
1468N/A * Invariant: last.next == null
1468N/A */
0N/A private transient Node<E> last;
0N/A
0N/A /** Lock held by take, poll, etc */
0N/A private final ReentrantLock takeLock = new ReentrantLock();
0N/A
0N/A /** Wait queue for waiting takes */
0N/A private final Condition notEmpty = takeLock.newCondition();
0N/A
0N/A /** Lock held by put, offer, etc */
0N/A private final ReentrantLock putLock = new ReentrantLock();
0N/A
0N/A /** Wait queue for waiting puts */
0N/A private final Condition notFull = putLock.newCondition();
0N/A
0N/A /**
0N/A * Signals a waiting take. Called only from put/offer (which do not
0N/A * otherwise ordinarily lock takeLock.)
0N/A */
0N/A private void signalNotEmpty() {
0N/A final ReentrantLock takeLock = this.takeLock;
0N/A takeLock.lock();
0N/A try {
0N/A notEmpty.signal();
0N/A } finally {
0N/A takeLock.unlock();
0N/A }
0N/A }
0N/A
0N/A /**
0N/A * Signals a waiting put. Called only from take/poll.
0N/A */
0N/A private void signalNotFull() {
0N/A final ReentrantLock putLock = this.putLock;
0N/A putLock.lock();
0N/A try {
0N/A notFull.signal();
0N/A } finally {
0N/A putLock.unlock();
0N/A }
0N/A }
0N/A
0N/A /**
3387N/A * Links node at end of queue.
1468N/A *
3387N/A * @param node the node
0N/A */
3387N/A private void enqueue(Node<E> node) {
1468N/A // assert putLock.isHeldByCurrentThread();
1468N/A // assert last.next == null;
3387N/A last = last.next = node;
0N/A }
0N/A
0N/A /**
1468N/A * Removes a node from head of queue.
1468N/A *
0N/A * @return the node
0N/A */
1468N/A private E dequeue() {
1468N/A // assert takeLock.isHeldByCurrentThread();
1468N/A // assert head.item == null;
1468N/A Node<E> h = head;
1468N/A Node<E> first = h.next;
1468N/A h.next = h; // help GC
0N/A head = first;
0N/A E x = first.item;
0N/A first.item = null;
0N/A return x;
0N/A }
0N/A
0N/A /**
0N/A * Lock to prevent both puts and takes.
0N/A */
1468N/A void fullyLock() {
0N/A putLock.lock();
0N/A takeLock.lock();
0N/A }
0N/A
0N/A /**
0N/A * Unlock to allow both puts and takes.
0N/A */
1468N/A void fullyUnlock() {
0N/A takeLock.unlock();
0N/A putLock.unlock();
0N/A }
0N/A
1468N/A// /**
1468N/A// * Tells whether both locks are held by current thread.
1468N/A// */
1468N/A// boolean isFullyLocked() {
1468N/A// return (putLock.isHeldByCurrentThread() &&
1468N/A// takeLock.isHeldByCurrentThread());
1468N/A// }
0N/A
0N/A /**
1468N/A * Creates a {@code LinkedBlockingQueue} with a capacity of
0N/A * {@link Integer#MAX_VALUE}.
0N/A */
0N/A public LinkedBlockingQueue() {
0N/A this(Integer.MAX_VALUE);
0N/A }
0N/A
0N/A /**
1468N/A * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
0N/A *
0N/A * @param capacity the capacity of this queue
1468N/A * @throws IllegalArgumentException if {@code capacity} is not greater
0N/A * than zero
0N/A */
0N/A public LinkedBlockingQueue(int capacity) {
0N/A if (capacity <= 0) throw new IllegalArgumentException();
0N/A this.capacity = capacity;
0N/A last = head = new Node<E>(null);
0N/A }
0N/A
0N/A /**
1468N/A * Creates a {@code LinkedBlockingQueue} with a capacity of
0N/A * {@link Integer#MAX_VALUE}, initially containing the elements of the
0N/A * given collection,
0N/A * added in traversal order of the collection's iterator.
0N/A *
0N/A * @param c the collection of elements to initially contain
0N/A * @throws NullPointerException if the specified collection or any
0N/A * of its elements are null
0N/A */
0N/A public LinkedBlockingQueue(Collection<? extends E> c) {
0N/A this(Integer.MAX_VALUE);
1468N/A final ReentrantLock putLock = this.putLock;
1468N/A putLock.lock(); // Never contended, but necessary for visibility
1468N/A try {
1468N/A int n = 0;
1468N/A for (E e : c) {
1468N/A if (e == null)
1468N/A throw new NullPointerException();
1468N/A if (n == capacity)
1468N/A throw new IllegalStateException("Queue full");
3387N/A enqueue(new Node<E>(e));
1468N/A ++n;
1468N/A }
1468N/A count.set(n);
1468N/A } finally {
1468N/A putLock.unlock();
1468N/A }
0N/A }
0N/A
0N/A
0N/A // this doc comment is overridden to remove the reference to collections
0N/A // greater in size than Integer.MAX_VALUE
0N/A /**
0N/A * Returns the number of elements in this queue.
0N/A *
0N/A * @return the number of elements in this queue
0N/A */
0N/A public int size() {
0N/A return count.get();
0N/A }
0N/A
0N/A // this doc comment is a modified copy of the inherited doc comment,
0N/A // without the reference to unlimited queues.
0N/A /**
0N/A * Returns the number of additional elements that this queue can ideally
0N/A * (in the absence of memory or resource constraints) accept without
0N/A * blocking. This is always equal to the initial capacity of this queue
1468N/A * less the current {@code size} of this queue.
0N/A *
0N/A * <p>Note that you <em>cannot</em> always tell if an attempt to insert
1468N/A * an element will succeed by inspecting {@code remainingCapacity}
0N/A * because it may be the case that another thread is about to
0N/A * insert or remove an element.
0N/A */
0N/A public int remainingCapacity() {
0N/A return capacity - count.get();
0N/A }
0N/A
0N/A /**
0N/A * Inserts the specified element at the tail of this queue, waiting if
0N/A * necessary for space to become available.
0N/A *
0N/A * @throws InterruptedException {@inheritDoc}
0N/A * @throws NullPointerException {@inheritDoc}
0N/A */
0N/A public void put(E e) throws InterruptedException {
0N/A if (e == null) throw new NullPointerException();
1468N/A // Note: convention in all put/take/etc is to preset local var
1468N/A // holding count negative to indicate failure unless set.
0N/A int c = -1;
3387N/A Node<E> node = new Node(e);
0N/A final ReentrantLock putLock = this.putLock;
0N/A final AtomicInteger count = this.count;
0N/A putLock.lockInterruptibly();
0N/A try {
0N/A /*
0N/A * Note that count is used in wait guard even though it is
0N/A * not protected by lock. This works because count can
0N/A * only decrease at this point (all other puts are shut
0N/A * out by lock), and we (or some other waiting put) are
1468N/A * signalled if it ever changes from capacity. Similarly
1468N/A * for all other uses of count in other wait guards.
0N/A */
1468N/A while (count.get() == capacity) {
1468N/A notFull.await();
0N/A }
3387N/A enqueue(node);
0N/A c = count.getAndIncrement();
0N/A if (c + 1 < capacity)
0N/A notFull.signal();
0N/A } finally {
0N/A putLock.unlock();
0N/A }
0N/A if (c == 0)
0N/A signalNotEmpty();
0N/A }
0N/A
0N/A /**
0N/A * Inserts the specified element at the tail of this queue, waiting if
0N/A * necessary up to the specified wait time for space to become available.
0N/A *
1468N/A * @return {@code true} if successful, or {@code false} if
0N/A * the specified waiting time elapses before space is available.
0N/A * @throws InterruptedException {@inheritDoc}
0N/A * @throws NullPointerException {@inheritDoc}
0N/A */
0N/A public boolean offer(E e, long timeout, TimeUnit unit)
0N/A throws InterruptedException {
0N/A
0N/A if (e == null) throw new NullPointerException();
0N/A long nanos = unit.toNanos(timeout);
0N/A int c = -1;
0N/A final ReentrantLock putLock = this.putLock;
0N/A final AtomicInteger count = this.count;
0N/A putLock.lockInterruptibly();
0N/A try {
1468N/A while (count.get() == capacity) {
0N/A if (nanos <= 0)
0N/A return false;
1468N/A nanos = notFull.awaitNanos(nanos);
0N/A }
3387N/A enqueue(new Node<E>(e));
1468N/A c = count.getAndIncrement();
1468N/A if (c + 1 < capacity)
1468N/A notFull.signal();
0N/A } finally {
0N/A putLock.unlock();
0N/A }
0N/A if (c == 0)
0N/A signalNotEmpty();
0N/A return true;
0N/A }
0N/A
0N/A /**
0N/A * Inserts the specified element at the tail of this queue if it is
0N/A * possible to do so immediately without exceeding the queue's capacity,
1468N/A * returning {@code true} upon success and {@code false} if this queue
0N/A * is full.
0N/A * When using a capacity-restricted queue, this method is generally
0N/A * preferable to method {@link BlockingQueue#add add}, which can fail to
0N/A * insert an element only by throwing an exception.
0N/A *
0N/A * @throws NullPointerException if the specified element is null
0N/A */
0N/A public boolean offer(E e) {
0N/A if (e == null) throw new NullPointerException();
0N/A final AtomicInteger count = this.count;
0N/A if (count.get() == capacity)
0N/A return false;
0N/A int c = -1;
3387N/A Node<E> node = new Node(e);
0N/A final ReentrantLock putLock = this.putLock;
0N/A putLock.lock();
0N/A try {
0N/A if (count.get() < capacity) {
3387N/A enqueue(node);
0N/A c = count.getAndIncrement();
0N/A if (c + 1 < capacity)
0N/A notFull.signal();
0N/A }
0N/A } finally {
0N/A putLock.unlock();
0N/A }
0N/A if (c == 0)
0N/A signalNotEmpty();
0N/A return c >= 0;
0N/A }
0N/A
0N/A
0N/A public E take() throws InterruptedException {
0N/A E x;
0N/A int c = -1;
0N/A final AtomicInteger count = this.count;
0N/A final ReentrantLock takeLock = this.takeLock;
0N/A takeLock.lockInterruptibly();
0N/A try {
1468N/A while (count.get() == 0) {
1468N/A notEmpty.await();
0N/A }
1468N/A x = dequeue();
0N/A c = count.getAndDecrement();
0N/A if (c > 1)
0N/A notEmpty.signal();
0N/A } finally {
0N/A takeLock.unlock();
0N/A }
0N/A if (c == capacity)
0N/A signalNotFull();
0N/A return x;
0N/A }
0N/A
0N/A public E poll(long timeout, TimeUnit unit) throws InterruptedException {
0N/A E x = null;
0N/A int c = -1;
0N/A long nanos = unit.toNanos(timeout);
0N/A final AtomicInteger count = this.count;
0N/A final ReentrantLock takeLock = this.takeLock;
0N/A takeLock.lockInterruptibly();
0N/A try {
1468N/A while (count.get() == 0) {
0N/A if (nanos <= 0)
0N/A return null;
1468N/A nanos = notEmpty.awaitNanos(nanos);
0N/A }
1468N/A x = dequeue();
1468N/A c = count.getAndDecrement();
1468N/A if (c > 1)
1468N/A notEmpty.signal();
0N/A } finally {
0N/A takeLock.unlock();
0N/A }
0N/A if (c == capacity)
0N/A signalNotFull();
0N/A return x;
0N/A }
0N/A
0N/A public E poll() {
0N/A final AtomicInteger count = this.count;
0N/A if (count.get() == 0)
0N/A return null;
0N/A E x = null;
0N/A int c = -1;
0N/A final ReentrantLock takeLock = this.takeLock;
0N/A takeLock.lock();
0N/A try {
0N/A if (count.get() > 0) {
1468N/A x = dequeue();
0N/A c = count.getAndDecrement();
0N/A if (c > 1)
0N/A notEmpty.signal();
0N/A }
0N/A } finally {
0N/A takeLock.unlock();
0N/A }
0N/A if (c == capacity)
0N/A signalNotFull();
0N/A return x;
0N/A }
0N/A
0N/A public E peek() {
0N/A if (count.get() == 0)
0N/A return null;
0N/A final ReentrantLock takeLock = this.takeLock;
0N/A takeLock.lock();
0N/A try {
0N/A Node<E> first = head.next;
0N/A if (first == null)
0N/A return null;
0N/A else
0N/A return first.item;
0N/A } finally {
0N/A takeLock.unlock();
0N/A }
0N/A }
0N/A
0N/A /**
1468N/A * Unlinks interior Node p with predecessor trail.
1468N/A */
1468N/A void unlink(Node<E> p, Node<E> trail) {
1468N/A // assert isFullyLocked();
1468N/A // p.next is not changed, to allow iterators that are
1468N/A // traversing p to maintain their weak-consistency guarantee.
1468N/A p.item = null;
1468N/A trail.next = p.next;
1468N/A if (last == p)
1468N/A last = trail;
1468N/A if (count.getAndDecrement() == capacity)
1468N/A notFull.signal();
1468N/A }
1468N/A
1468N/A /**
0N/A * Removes a single instance of the specified element from this queue,
1468N/A * if it is present. More formally, removes an element {@code e} such
1468N/A * that {@code o.equals(e)}, if this queue contains one or more such
0N/A * elements.
1468N/A * Returns {@code true} if this queue contained the specified element
0N/A * (or equivalently, if this queue changed as a result of the call).
0N/A *
0N/A * @param o element to be removed from this queue, if present
1468N/A * @return {@code true} if this queue changed as a result of the call
0N/A */
0N/A public boolean remove(Object o) {
0N/A if (o == null) return false;
0N/A fullyLock();
0N/A try {
1468N/A for (Node<E> trail = head, p = trail.next;
1468N/A p != null;
1468N/A trail = p, p = p.next) {
0N/A if (o.equals(p.item)) {
1468N/A unlink(p, trail);
1468N/A return true;
0N/A }
0N/A }
1468N/A return false;
0N/A } finally {
0N/A fullyUnlock();
0N/A }
0N/A }
0N/A
0N/A /**
3387N/A * Returns {@code true} if this queue contains the specified element.
3387N/A * More formally, returns {@code true} if and only if this queue contains
3387N/A * at least one element {@code e} such that {@code o.equals(e)}.
3387N/A *
3387N/A * @param o object to be checked for containment in this queue
3387N/A * @return {@code true} if this queue contains the specified element
3387N/A */
3387N/A public boolean contains(Object o) {
3387N/A if (o == null) return false;
3387N/A fullyLock();
3387N/A try {
3387N/A for (Node<E> p = head.next; p != null; p = p.next)
3387N/A if (o.equals(p.item))
3387N/A return true;
3387N/A return false;
3387N/A } finally {
3387N/A fullyUnlock();
3387N/A }
3387N/A }
3387N/A
3387N/A /**
0N/A * Returns an array containing all of the elements in this queue, in
0N/A * proper sequence.
0N/A *
0N/A * <p>The returned array will be "safe" in that no references to it are
0N/A * maintained by this queue. (In other words, this method must allocate
0N/A * a new array). The caller is thus free to modify the returned array.
0N/A *
0N/A * <p>This method acts as bridge between array-based and collection-based
0N/A * APIs.
0N/A *
0N/A * @return an array containing all of the elements in this queue
0N/A */
0N/A public Object[] toArray() {
0N/A fullyLock();
0N/A try {
0N/A int size = count.get();
0N/A Object[] a = new Object[size];
0N/A int k = 0;
0N/A for (Node<E> p = head.next; p != null; p = p.next)
0N/A a[k++] = p.item;
0N/A return a;
0N/A } finally {
0N/A fullyUnlock();
0N/A }
0N/A }
0N/A
0N/A /**
0N/A * Returns an array containing all of the elements in this queue, in
0N/A * proper sequence; the runtime type of the returned array is that of
0N/A * the specified array. If the queue fits in the specified array, it
0N/A * is returned therein. Otherwise, a new array is allocated with the
0N/A * runtime type of the specified array and the size of this queue.
0N/A *
0N/A * <p>If this queue fits in the specified array with room to spare
0N/A * (i.e., the array has more elements than this queue), the element in
0N/A * the array immediately following the end of the queue is set to
1468N/A * {@code null}.
0N/A *
0N/A * <p>Like the {@link #toArray()} method, this method acts as bridge between
0N/A * array-based and collection-based APIs. Further, this method allows
0N/A * precise control over the runtime type of the output array, and may,
0N/A * under certain circumstances, be used to save allocation costs.
0N/A *
1468N/A * <p>Suppose {@code x} is a queue known to contain only strings.
0N/A * The following code can be used to dump the queue into a newly
1468N/A * allocated array of {@code String}:
0N/A *
0N/A * <pre>
0N/A * String[] y = x.toArray(new String[0]);</pre>
0N/A *
1468N/A * Note that {@code toArray(new Object[0])} is identical in function to
1468N/A * {@code toArray()}.
0N/A *
0N/A * @param a the array into which the elements of the queue are to
0N/A * be stored, if it is big enough; otherwise, a new array of the
0N/A * same runtime type is allocated for this purpose
0N/A * @return an array containing all of the elements in this queue
0N/A * @throws ArrayStoreException if the runtime type of the specified array
0N/A * is not a supertype of the runtime type of every element in
0N/A * this queue
0N/A * @throws NullPointerException if the specified array is null
0N/A */
1468N/A @SuppressWarnings("unchecked")
0N/A public <T> T[] toArray(T[] a) {
0N/A fullyLock();
0N/A try {
0N/A int size = count.get();
0N/A if (a.length < size)
0N/A a = (T[])java.lang.reflect.Array.newInstance
0N/A (a.getClass().getComponentType(), size);
0N/A
0N/A int k = 0;
1468N/A for (Node<E> p = head.next; p != null; p = p.next)
0N/A a[k++] = (T)p.item;
0N/A if (a.length > k)
0N/A a[k] = null;
0N/A return a;
0N/A } finally {
0N/A fullyUnlock();
0N/A }
0N/A }
0N/A
0N/A public String toString() {
0N/A fullyLock();
0N/A try {
3387N/A Node<E> p = head.next;
3387N/A if (p == null)
3387N/A return "[]";
3387N/A
3387N/A StringBuilder sb = new StringBuilder();
3387N/A sb.append('[');
3387N/A for (;;) {
3387N/A E e = p.item;
3387N/A sb.append(e == this ? "(this Collection)" : e);
3387N/A p = p.next;
3387N/A if (p == null)
3387N/A return sb.append(']').toString();
3387N/A sb.append(',').append(' ');
3387N/A }
0N/A } finally {
0N/A fullyUnlock();
0N/A }
0N/A }
0N/A
0N/A /**
0N/A * Atomically removes all of the elements from this queue.
0N/A * The queue will be empty after this call returns.
0N/A */
0N/A public void clear() {
0N/A fullyLock();
0N/A try {
1468N/A for (Node<E> p, h = head; (p = h.next) != null; h = p) {
1468N/A h.next = h;
1468N/A p.item = null;
1468N/A }
1468N/A head = last;
1468N/A // assert head.item == null && head.next == null;
0N/A if (count.getAndSet(0) == capacity)
1468N/A notFull.signal();
0N/A } finally {
0N/A fullyUnlock();
0N/A }
0N/A }
0N/A
0N/A /**
0N/A * @throws UnsupportedOperationException {@inheritDoc}
0N/A * @throws ClassCastException {@inheritDoc}
0N/A * @throws NullPointerException {@inheritDoc}
0N/A * @throws IllegalArgumentException {@inheritDoc}
0N/A */
0N/A public int drainTo(Collection<? super E> c) {
1468N/A return drainTo(c, Integer.MAX_VALUE);
0N/A }
0N/A
0N/A /**
0N/A * @throws UnsupportedOperationException {@inheritDoc}
0N/A * @throws ClassCastException {@inheritDoc}
0N/A * @throws NullPointerException {@inheritDoc}
0N/A * @throws IllegalArgumentException {@inheritDoc}
0N/A */
0N/A public int drainTo(Collection<? super E> c, int maxElements) {
0N/A if (c == null)
0N/A throw new NullPointerException();
0N/A if (c == this)
0N/A throw new IllegalArgumentException();
1468N/A boolean signalNotFull = false;
1468N/A final ReentrantLock takeLock = this.takeLock;
1468N/A takeLock.lock();
0N/A try {
1468N/A int n = Math.min(maxElements, count.get());
1468N/A // count.get provides visibility to first n Nodes
1468N/A Node<E> h = head;
1468N/A int i = 0;
1468N/A try {
1468N/A while (i < n) {
1468N/A Node<E> p = h.next;
1468N/A c.add(p.item);
1468N/A p.item = null;
1468N/A h.next = h;
1468N/A h = p;
1468N/A ++i;
1468N/A }
1468N/A return n;
1468N/A } finally {
1468N/A // Restore invariants even if c.add() threw
1468N/A if (i > 0) {
1468N/A // assert h.item == null;
1468N/A head = h;
1468N/A signalNotFull = (count.getAndAdd(-i) == capacity);
1468N/A }
0N/A }
0N/A } finally {
1468N/A takeLock.unlock();
1468N/A if (signalNotFull)
1468N/A signalNotFull();
0N/A }
0N/A }
0N/A
0N/A /**
0N/A * Returns an iterator over the elements in this queue in proper sequence.
3387N/A * The elements will be returned in order from first (head) to last (tail).
3387N/A *
3387N/A * <p>The returned iterator is a "weakly consistent" iterator that
1472N/A * will never throw {@link java.util.ConcurrentModificationException
3387N/A * ConcurrentModificationException}, and guarantees to traverse
3387N/A * elements as they existed upon construction of the iterator, and
3387N/A * may (but is not guaranteed to) reflect any modifications
3387N/A * subsequent to construction.
0N/A *
0N/A * @return an iterator over the elements in this queue in proper sequence
0N/A */
0N/A public Iterator<E> iterator() {
0N/A return new Itr();
0N/A }
0N/A
0N/A private class Itr implements Iterator<E> {
0N/A /*
1468N/A * Basic weakly-consistent iterator. At all times hold the next
0N/A * item to hand out so that if hasNext() reports true, we will
0N/A * still have it to return even if lost race with a take etc.
0N/A */
0N/A private Node<E> current;
0N/A private Node<E> lastRet;
0N/A private E currentElement;
0N/A
0N/A Itr() {
1468N/A fullyLock();
0N/A try {
0N/A current = head.next;
0N/A if (current != null)
0N/A currentElement = current.item;
0N/A } finally {
1468N/A fullyUnlock();
0N/A }
0N/A }
0N/A
0N/A public boolean hasNext() {
0N/A return current != null;
0N/A }
0N/A
1468N/A /**
1595N/A * Returns the next live successor of p, or null if no such.
1595N/A *
1595N/A * Unlike other traversal methods, iterators need to handle both:
1468N/A * - dequeued nodes (p.next == p)
1595N/A * - (possibly multiple) interior removed nodes (p.item == null)
1468N/A */
1468N/A private Node<E> nextNode(Node<E> p) {
1595N/A for (;;) {
1595N/A Node<E> s = p.next;
1595N/A if (s == p)
1595N/A return head.next;
1595N/A if (s == null || s.item != null)
1595N/A return s;
1595N/A p = s;
1595N/A }
1468N/A }
1468N/A
0N/A public E next() {
1468N/A fullyLock();
0N/A try {
0N/A if (current == null)
0N/A throw new NoSuchElementException();
0N/A E x = currentElement;
0N/A lastRet = current;
1468N/A current = nextNode(current);
1468N/A currentElement = (current == null) ? null : current.item;
0N/A return x;
0N/A } finally {
1468N/A fullyUnlock();
0N/A }
0N/A }
0N/A
0N/A public void remove() {
0N/A if (lastRet == null)
0N/A throw new IllegalStateException();
1468N/A fullyLock();
0N/A try {
0N/A Node<E> node = lastRet;
0N/A lastRet = null;
1468N/A for (Node<E> trail = head, p = trail.next;
1468N/A p != null;
1468N/A trail = p, p = p.next) {
1468N/A if (p == node) {
1468N/A unlink(p, trail);
1468N/A break;
1468N/A }
0N/A }
0N/A } finally {
1468N/A fullyUnlock();
0N/A }
0N/A }
0N/A }
0N/A
0N/A /**
0N/A * Save the state to a stream (that is, serialize it).
0N/A *
0N/A * @serialData The capacity is emitted (int), followed by all of
1468N/A * its elements (each an {@code Object}) in the proper order,
0N/A * followed by a null
0N/A * @param s the stream
0N/A */
0N/A private void writeObject(java.io.ObjectOutputStream s)
0N/A throws java.io.IOException {
0N/A
0N/A fullyLock();
0N/A try {
0N/A // Write out any hidden stuff, plus capacity
0N/A s.defaultWriteObject();
0N/A
0N/A // Write out all elements in the proper order.
0N/A for (Node<E> p = head.next; p != null; p = p.next)
0N/A s.writeObject(p.item);
0N/A
0N/A // Use trailing null as sentinel
0N/A s.writeObject(null);
0N/A } finally {
0N/A fullyUnlock();
0N/A }
0N/A }
0N/A
0N/A /**
0N/A * Reconstitute this queue instance from a stream (that is,
0N/A * deserialize it).
1468N/A *
0N/A * @param s the stream
0N/A */
0N/A private void readObject(java.io.ObjectInputStream s)
0N/A throws java.io.IOException, ClassNotFoundException {
0N/A // Read in capacity, and any hidden stuff
0N/A s.defaultReadObject();
0N/A
0N/A count.set(0);
0N/A last = head = new Node<E>(null);
0N/A
0N/A // Read in all elements and place in queue
0N/A for (;;) {
1468N/A @SuppressWarnings("unchecked")
0N/A E item = (E)s.readObject();
0N/A if (item == null)
0N/A break;
0N/A add(item);
0N/A }
0N/A }
0N/A}