0N/A/*
2362N/A * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
0N/A * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
0N/A *
0N/A * This code is free software; you can redistribute it and/or modify it
0N/A * under the terms of the GNU General Public License version 2 only, as
2362N/A * published by the Free Software Foundation. Oracle designates this
0N/A * particular file as subject to the "Classpath" exception as provided
2362N/A * by Oracle in the LICENSE file that accompanied this code.
0N/A *
0N/A * This code is distributed in the hope that it will be useful, but WITHOUT
0N/A * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
0N/A * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
0N/A * version 2 for more details (a copy is included in the LICENSE file that
0N/A * accompanied this code).
0N/A *
0N/A * You should have received a copy of the GNU General Public License version
0N/A * 2 along with this work; if not, write to the Free Software Foundation,
0N/A * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
0N/A *
2362N/A * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
2362N/A * or visit www.oracle.com if you need additional information or have any
2362N/A * questions.
0N/A */
0N/A
0N/Apackage sun.nio.ch;
0N/A
0N/Aimport java.nio.ByteBuffer;
0N/Aimport java.nio.channels.*;
0N/Aimport java.net.SocketOption;
0N/Aimport java.net.StandardSocketOptions;
0N/Aimport java.net.SocketAddress;
0N/Aimport java.net.InetSocketAddress;
0N/Aimport java.io.IOException;
0N/Aimport java.io.FileDescriptor;
0N/Aimport java.util.Set;
0N/Aimport java.util.HashSet;
0N/Aimport java.util.Collections;
0N/Aimport java.util.concurrent.*;
0N/Aimport java.util.concurrent.locks.*;
0N/Aimport sun.net.NetHooks;
0N/A
0N/A/**
0N/A * Base implementation of AsynchronousSocketChannel
0N/A */
0N/A
0N/Aabstract class AsynchronousSocketChannelImpl
0N/A extends AsynchronousSocketChannel
0N/A implements Cancellable, Groupable
0N/A{
0N/A protected final FileDescriptor fd;
0N/A
0N/A // protects state, localAddress, and remoteAddress
0N/A protected final Object stateLock = new Object();
0N/A
0N/A protected volatile InetSocketAddress localAddress = null;
0N/A protected volatile InetSocketAddress remoteAddress = null;
0N/A
0N/A // State, increases monotonically
0N/A static final int ST_UNINITIALIZED = -1;
0N/A static final int ST_UNCONNECTED = 0;
0N/A static final int ST_PENDING = 1;
0N/A static final int ST_CONNECTED = 2;
0N/A protected volatile int state = ST_UNINITIALIZED;
0N/A
0N/A // reading state
0N/A private final Object readLock = new Object();
0N/A private boolean reading;
0N/A private boolean readShutdown;
0N/A private boolean readKilled; // further reading disallowed due to timeout
0N/A
0N/A // writing state
0N/A private final Object writeLock = new Object();
0N/A private boolean writing;
0N/A private boolean writeShutdown;
0N/A private boolean writeKilled; // further writing disallowed due to timeout
0N/A
0N/A // close support
0N/A private final ReadWriteLock closeLock = new ReentrantReadWriteLock();
0N/A private volatile boolean open = true;
0N/A
0N/A // set true when exclusive binding is on and SO_REUSEADDR is emulated
0N/A private boolean isReuseAddress;
0N/A
0N/A AsynchronousSocketChannelImpl(AsynchronousChannelGroupImpl group)
0N/A throws IOException
0N/A {
0N/A super(group.provider());
0N/A this.fd = Net.socket(true);
0N/A this.state = ST_UNCONNECTED;
0N/A }
0N/A
0N/A // Constructor for sockets obtained from AsynchronousServerSocketChannelImpl
0N/A AsynchronousSocketChannelImpl(AsynchronousChannelGroupImpl group,
0N/A FileDescriptor fd,
0N/A InetSocketAddress remote)
0N/A throws IOException
0N/A {
0N/A super(group.provider());
0N/A this.fd = fd;
0N/A this.state = ST_CONNECTED;
0N/A this.localAddress = Net.localAddress(fd);
0N/A this.remoteAddress = remote;
0N/A }
0N/A
0N/A @Override
0N/A public final boolean isOpen() {
0N/A return open;
0N/A }
0N/A
0N/A /**
0N/A * Marks beginning of access to file descriptor/handle
0N/A */
0N/A final void begin() throws IOException {
0N/A closeLock.readLock().lock();
0N/A if (!isOpen())
0N/A throw new ClosedChannelException();
0N/A }
0N/A
0N/A /**
0N/A * Marks end of access to file descriptor/handle
0N/A */
0N/A final void end() {
0N/A closeLock.readLock().unlock();
0N/A }
0N/A
0N/A /**
0N/A * Invoked to close socket and release other resources.
0N/A */
0N/A abstract void implClose() throws IOException;
0N/A
0N/A @Override
0N/A public final void close() throws IOException {
0N/A // synchronize with any threads initiating asynchronous operations
0N/A closeLock.writeLock().lock();
0N/A try {
0N/A if (!open)
0N/A return; // already closed
0N/A open = false;
0N/A } finally {
0N/A closeLock.writeLock().unlock();
0N/A }
0N/A implClose();
0N/A }
0N/A
0N/A final void enableReading(boolean killed) {
0N/A synchronized (readLock) {
0N/A reading = false;
0N/A if (killed)
0N/A readKilled = true;
0N/A }
0N/A }
0N/A
0N/A final void enableReading() {
0N/A enableReading(false);
0N/A }
0N/A
0N/A final void enableWriting(boolean killed) {
0N/A synchronized (writeLock) {
0N/A writing = false;
0N/A if (killed)
0N/A writeKilled = true;
0N/A }
0N/A }
0N/A
0N/A final void enableWriting() {
0N/A enableWriting(false);
0N/A }
0N/A
0N/A final void killReading() {
0N/A synchronized (readLock) {
0N/A readKilled = true;
0N/A }
0N/A }
0N/A
0N/A final void killWriting() {
0N/A synchronized (writeLock) {
0N/A writeKilled = true;
0N/A }
0N/A }
0N/A
0N/A final void killConnect() {
0N/A // when a connect is cancelled then the connection may have been
0N/A // established so prevent reading or writing.
0N/A killReading();
0N/A killWriting();
0N/A }
0N/A
0N/A /**
0N/A * Invoked by connect to initiate the connect operation.
0N/A */
0N/A abstract <A> Future<Void> implConnect(SocketAddress remote,
0N/A A attachment,
0N/A CompletionHandler<Void,? super A> handler);
0N/A
0N/A @Override
0N/A public final Future<Void> connect(SocketAddress remote) {
0N/A return implConnect(remote, null, null);
0N/A }
0N/A
0N/A @Override
0N/A public final <A> void connect(SocketAddress remote,
0N/A A attachment,
0N/A CompletionHandler<Void,? super A> handler)
0N/A {
0N/A if (handler == null)
0N/A throw new NullPointerException("'handler' is null");
0N/A implConnect(remote, attachment, handler);
0N/A }
0N/A
0N/A /**
0N/A * Invoked by read to initiate the I/O operation.
0N/A */
0N/A abstract <V extends Number,A> Future<V> implRead(boolean isScatteringRead,
0N/A ByteBuffer dst,
0N/A ByteBuffer[] dsts,
0N/A long timeout,
0N/A TimeUnit unit,
0N/A A attachment,
0N/A CompletionHandler<V,? super A> handler);
0N/A
0N/A @SuppressWarnings("unchecked")
0N/A private <V extends Number,A> Future<V> read(boolean isScatteringRead,
0N/A ByteBuffer dst,
0N/A ByteBuffer[] dsts,
0N/A long timeout,
0N/A TimeUnit unit,
0N/A A att,
0N/A CompletionHandler<V,? super A> handler)
0N/A {
0N/A if (!isOpen()) {
0N/A Throwable e = new ClosedChannelException();
0N/A if (handler == null)
0N/A return CompletedFuture.withFailure(e);
0N/A Invoker.invoke(this, handler, att, null, e);
0N/A return null;
0N/A }
0N/A
0N/A if (remoteAddress == null)
0N/A throw new NotYetConnectedException();
0N/A
0N/A boolean hasSpaceToRead = isScatteringRead || dst.hasRemaining();
0N/A boolean shutdown = false;
0N/A
0N/A // check and update state
0N/A synchronized (readLock) {
0N/A if (readKilled)
0N/A throw new IllegalStateException("Reading not allowed due to timeout or cancellation");
0N/A if (reading)
0N/A throw new ReadPendingException();
0N/A if (readShutdown) {
0N/A shutdown = true;
0N/A } else {
0N/A if (hasSpaceToRead) {
0N/A reading = true;
0N/A }
0N/A }
0N/A }
0N/A
0N/A // immediately complete with -1 if shutdown for read
0N/A // immediately complete with 0 if no space remaining
0N/A if (shutdown || !hasSpaceToRead) {
0N/A Number result;
0N/A if (isScatteringRead) {
0N/A result = (shutdown) ? Long.valueOf(-1L) : Long.valueOf(0L);
0N/A } else {
0N/A result = (shutdown) ? -1 : 0;
0N/A }
0N/A if (handler == null)
0N/A return CompletedFuture.withResult((V)result);
0N/A Invoker.invoke(this, handler, att, (V)result, null);
0N/A return null;
0N/A }
0N/A
0N/A return implRead(isScatteringRead, dst, dsts, timeout, unit, att, handler);
0N/A }
0N/A
0N/A @Override
0N/A public final Future<Integer> read(ByteBuffer dst) {
0N/A if (dst.isReadOnly())
0N/A throw new IllegalArgumentException("Read-only buffer");
0N/A return read(false, dst, null, 0L, TimeUnit.MILLISECONDS, null, null);
0N/A }
0N/A
0N/A @Override
0N/A public final <A> void read(ByteBuffer dst,
0N/A long timeout,
0N/A TimeUnit unit,
0N/A A attachment,
0N/A CompletionHandler<Integer,? super A> handler)
{
if (handler == null)
throw new NullPointerException("'handler' is null");
if (dst.isReadOnly())
throw new IllegalArgumentException("Read-only buffer");
read(false, dst, null, timeout, unit, attachment, handler);
}
@Override
public final <A> void read(ByteBuffer[] dsts,
int offset,
int length,
long timeout,
TimeUnit unit,
A attachment,
CompletionHandler<Long,? super A> handler)
{
if (handler == null)
throw new NullPointerException("'handler' is null");
if ((offset < 0) || (length < 0) || (offset > dsts.length - length))
throw new IndexOutOfBoundsException();
ByteBuffer[] bufs = Util.subsequence(dsts, offset, length);
for (int i=0; i<bufs.length; i++) {
if (bufs[i].isReadOnly())
throw new IllegalArgumentException("Read-only buffer");
}
read(true, null, bufs, timeout, unit, attachment, handler);
}
/**
* Invoked by write to initiate the I/O operation.
*/
abstract <V extends Number,A> Future<V> implWrite(boolean isGatheringWrite,
ByteBuffer src,
ByteBuffer[] srcs,
long timeout,
TimeUnit unit,
A attachment,
CompletionHandler<V,? super A> handler);
@SuppressWarnings("unchecked")
private <V extends Number,A> Future<V> write(boolean isGatheringWrite,
ByteBuffer src,
ByteBuffer[] srcs,
long timeout,
TimeUnit unit,
A att,
CompletionHandler<V,? super A> handler)
{
boolean hasDataToWrite = isGatheringWrite || src.hasRemaining();
boolean closed = false;
if (isOpen()) {
if (remoteAddress == null)
throw new NotYetConnectedException();
// check and update state
synchronized (writeLock) {
if (writeKilled)
throw new IllegalStateException("Writing not allowed due to timeout or cancellation");
if (writing)
throw new WritePendingException();
if (writeShutdown) {
closed = true;
} else {
if (hasDataToWrite)
writing = true;
}
}
} else {
closed = true;
}
// channel is closed or shutdown for write
if (closed) {
Throwable e = new ClosedChannelException();
if (handler == null)
return CompletedFuture.withFailure(e);
Invoker.invoke(this, handler, att, null, e);
return null;
}
// nothing to write so complete immediately
if (!hasDataToWrite) {
Number result = (isGatheringWrite) ? (Number)0L : (Number)0;
if (handler == null)
return CompletedFuture.withResult((V)result);
Invoker.invoke(this, handler, att, (V)result, null);
return null;
}
return implWrite(isGatheringWrite, src, srcs, timeout, unit, att, handler);
}
@Override
public final Future<Integer> write(ByteBuffer src) {
return write(false, src, null, 0L, TimeUnit.MILLISECONDS, null, null);
}
@Override
public final <A> void write(ByteBuffer src,
long timeout,
TimeUnit unit,
A attachment,
CompletionHandler<Integer,? super A> handler)
{
if (handler == null)
throw new NullPointerException("'handler' is null");
write(false, src, null, timeout, unit, attachment, handler);
}
@Override
public final <A> void write(ByteBuffer[] srcs,
int offset,
int length,
long timeout,
TimeUnit unit,
A attachment,
CompletionHandler<Long,? super A> handler)
{
if (handler == null)
throw new NullPointerException("'handler' is null");
if ((offset < 0) || (length < 0) || (offset > srcs.length - length))
throw new IndexOutOfBoundsException();
srcs = Util.subsequence(srcs, offset, length);
write(true, null, srcs, timeout, unit, attachment, handler);
}
@Override
public final AsynchronousSocketChannel bind(SocketAddress local)
throws IOException
{
try {
begin();
synchronized (stateLock) {
if (state == ST_PENDING)
throw new ConnectionPendingException();
if (localAddress != null)
throw new AlreadyBoundException();
InetSocketAddress isa = (local == null) ?
new InetSocketAddress(0) : Net.checkAddress(local);
NetHooks.beforeTcpBind(fd, isa.getAddress(), isa.getPort());
Net.bind(fd, isa.getAddress(), isa.getPort());
localAddress = Net.localAddress(fd);
}
} finally {
end();
}
return this;
}
@Override
public final SocketAddress getLocalAddress() throws IOException {
if (!isOpen())
throw new ClosedChannelException();
return Net.getRevealedLocalAddress(localAddress);
}
@Override
public final <T> AsynchronousSocketChannel setOption(SocketOption<T> name, T value)
throws IOException
{
if (name == null)
throw new NullPointerException();
if (!supportedOptions().contains(name))
throw new UnsupportedOperationException("'" + name + "' not supported");
try {
begin();
if (writeShutdown)
throw new IOException("Connection has been shutdown for writing");
if (name == StandardSocketOptions.SO_REUSEADDR &&
Net.useExclusiveBind())
{
// SO_REUSEADDR emulated when using exclusive bind
isReuseAddress = (Boolean)value;
} else {
Net.setSocketOption(fd, Net.UNSPEC, name, value);
}
return this;
} finally {
end();
}
}
@Override
@SuppressWarnings("unchecked")
public final <T> T getOption(SocketOption<T> name) throws IOException {
if (name == null)
throw new NullPointerException();
if (!supportedOptions().contains(name))
throw new UnsupportedOperationException("'" + name + "' not supported");
try {
begin();
if (name == StandardSocketOptions.SO_REUSEADDR &&
Net.useExclusiveBind())
{
// SO_REUSEADDR emulated when using exclusive bind
return (T)Boolean.valueOf(isReuseAddress);
}
return (T) Net.getSocketOption(fd, Net.UNSPEC, name);
} finally {
end();
}
}
private static class DefaultOptionsHolder {
static final Set<SocketOption<?>> defaultOptions = defaultOptions();
private static Set<SocketOption<?>> defaultOptions() {
HashSet<SocketOption<?>> set = new HashSet<SocketOption<?>>(5);
set.add(StandardSocketOptions.SO_SNDBUF);
set.add(StandardSocketOptions.SO_RCVBUF);
set.add(StandardSocketOptions.SO_KEEPALIVE);
set.add(StandardSocketOptions.SO_REUSEADDR);
set.add(StandardSocketOptions.TCP_NODELAY);
return Collections.unmodifiableSet(set);
}
}
@Override
public final Set<SocketOption<?>> supportedOptions() {
return DefaultOptionsHolder.defaultOptions;
}
@Override
public final SocketAddress getRemoteAddress() throws IOException {
if (!isOpen())
throw new ClosedChannelException();
return remoteAddress;
}
@Override
public final AsynchronousSocketChannel shutdownInput() throws IOException {
try {
begin();
if (remoteAddress == null)
throw new NotYetConnectedException();
synchronized (readLock) {
if (!readShutdown) {
Net.shutdown(fd, Net.SHUT_RD);
readShutdown = true;
}
}
} finally {
end();
}
return this;
}
@Override
public final AsynchronousSocketChannel shutdownOutput() throws IOException {
try {
begin();
if (remoteAddress == null)
throw new NotYetConnectedException();
synchronized (writeLock) {
if (!writeShutdown) {
Net.shutdown(fd, Net.SHUT_WR);
writeShutdown = true;
}
}
} finally {
end();
}
return this;
}
@Override
public final String toString() {
StringBuilder sb = new StringBuilder();
sb.append(this.getClass().getName());
sb.append('[');
synchronized (stateLock) {
if (!isOpen()) {
sb.append("closed");
} else {
switch (state) {
case ST_UNCONNECTED:
sb.append("unconnected");
break;
case ST_PENDING:
sb.append("connection-pending");
break;
case ST_CONNECTED:
sb.append("connected");
if (readShutdown)
sb.append(" ishut");
if (writeShutdown)
sb.append(" oshut");
break;
}
if (localAddress != null) {
sb.append(" local=");
sb.append(
Net.getRevealedLocalAddressAsString(localAddress));
}
if (remoteAddress != null) {
sb.append(" remote=");
sb.append(remoteAddress.toString());
}
}
}
sb.append(']');
return sb.toString();
}
}