1096N/A/*
3261N/A * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
1096N/A * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
1096N/A *
1096N/A * This code is free software; you can redistribute it and/or modify it
1096N/A * under the terms of the GNU General Public License version 2 only, as
2362N/A * published by the Free Software Foundation. Oracle designates this
1096N/A * particular file as subject to the "Classpath" exception as provided
2362N/A * by Oracle in the LICENSE file that accompanied this code.
1096N/A *
1096N/A * This code is distributed in the hope that it will be useful, but WITHOUT
1096N/A * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
1096N/A * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
1096N/A * version 2 for more details (a copy is included in the LICENSE file that
1096N/A * accompanied this code).
1096N/A *
1096N/A * You should have received a copy of the GNU General Public License version
1096N/A * 2 along with this work; if not, write to the Free Software Foundation,
1096N/A * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
1096N/A *
2362N/A * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
2362N/A * or visit www.oracle.com if you need additional information or have any
2362N/A * questions.
1096N/A */
1096N/Apackage sun.nio.ch;
1096N/A
1096N/Aimport java.net.InetAddress;
1096N/Aimport java.net.SocketAddress;
1326N/Aimport java.net.SocketException;
1096N/Aimport java.net.InetSocketAddress;
1096N/Aimport java.io.FileDescriptor;
1096N/Aimport java.io.IOException;
1096N/Aimport java.util.Collections;
1096N/Aimport java.util.Set;
1096N/Aimport java.util.HashSet;
1096N/Aimport java.nio.ByteBuffer;
1096N/Aimport java.nio.channels.SelectionKey;
1096N/Aimport java.nio.channels.ClosedChannelException;
1096N/Aimport java.nio.channels.ConnectionPendingException;
1096N/Aimport java.nio.channels.NoConnectionPendingException;
1096N/Aimport java.nio.channels.AlreadyConnectedException;
1096N/Aimport java.nio.channels.NotYetBoundException;
1096N/Aimport java.nio.channels.NotYetConnectedException;
1096N/Aimport java.nio.channels.spi.SelectorProvider;
1096N/Aimport com.sun.nio.sctp.AbstractNotificationHandler;
1096N/Aimport com.sun.nio.sctp.Association;
1096N/Aimport com.sun.nio.sctp.AssociationChangeNotification;
1096N/Aimport com.sun.nio.sctp.HandlerResult;
1096N/Aimport com.sun.nio.sctp.IllegalReceiveException;
1096N/Aimport com.sun.nio.sctp.InvalidStreamException;
1096N/Aimport com.sun.nio.sctp.IllegalUnbindException;
1096N/Aimport com.sun.nio.sctp.MessageInfo;
1096N/Aimport com.sun.nio.sctp.NotificationHandler;
1096N/Aimport com.sun.nio.sctp.SctpChannel;
1096N/Aimport com.sun.nio.sctp.SctpSocketOption;
1096N/Aimport sun.nio.ch.PollArrayWrapper;
1096N/Aimport sun.nio.ch.SelChImpl;
4216N/Aimport static com.sun.nio.sctp.SctpStandardSocketOptions.*;
1096N/Aimport static sun.nio.ch.SctpResultContainer.SEND_FAILED;
1096N/Aimport static sun.nio.ch.SctpResultContainer.ASSOCIATION_CHANGED;
1096N/Aimport static sun.nio.ch.SctpResultContainer.PEER_ADDRESS_CHANGED;
1096N/Aimport static sun.nio.ch.SctpResultContainer.SHUTDOWN;
1096N/A
1096N/A/**
1096N/A * An implementation of an SctpChannel
1096N/A */
1096N/Apublic class SctpChannelImpl extends SctpChannel
1096N/A implements SelChImpl
1096N/A{
1096N/A private final FileDescriptor fd;
1096N/A
1096N/A private final int fdVal;
1096N/A
1096N/A /* IDs of native threads doing send and receivess, for signalling */
1096N/A private volatile long receiverThread = 0;
1096N/A private volatile long senderThread = 0;
1096N/A
1096N/A /* Lock held by current receiving or connecting thread */
1096N/A private final Object receiveLock = new Object();
1096N/A
1096N/A /* Lock held by current sending or connecting thread */
1096N/A private final Object sendLock = new Object();
1096N/A
1096N/A private final ThreadLocal<Boolean> receiveInvoked =
1096N/A new ThreadLocal<Boolean>() {
1096N/A @Override protected Boolean initialValue() {
1096N/A return Boolean.FALSE;
1096N/A }
1096N/A };
1096N/A
1096N/A /* Lock held by any thread that modifies the state fields declared below
1096N/A DO NOT invoke a blocking I/O operation while holding this lock! */
1096N/A private final Object stateLock = new Object();
1096N/A
1096N/A private enum ChannelState {
1096N/A UNINITIALIZED,
1096N/A UNCONNECTED,
1096N/A PENDING,
1096N/A CONNECTED,
1096N/A KILLPENDING,
1096N/A KILLED,
1096N/A }
1096N/A /* -- The following fields are protected by stateLock -- */
1096N/A private ChannelState state = ChannelState.UNINITIALIZED;
1096N/A
1096N/A /* Binding; Once bound the port will remain constant. */
1096N/A int port = -1;
1096N/A private HashSet<InetSocketAddress> localAddresses = new HashSet<InetSocketAddress>();
1096N/A /* Has the channel been bound to the wildcard address */
1096N/A private boolean wildcard; /* false */
1096N/A //private InetSocketAddress remoteAddress = null;
1096N/A
1096N/A /* Input/Output open */
1096N/A private boolean readyToConnect;
1096N/A
1096N/A /* Shutdown */
1096N/A private boolean isShutdown;
1096N/A
1096N/A private Association association;
1096N/A
1326N/A private Set<SocketAddress> remoteAddresses = Collections.EMPTY_SET;
1326N/A
1096N/A /* -- End of fields protected by stateLock -- */
1096N/A
1096N/A /**
1096N/A * Constructor for normal connecting sockets
1096N/A */
1096N/A public SctpChannelImpl(SelectorProvider provider) throws IOException {
1096N/A //TODO: update provider remove public modifier
1096N/A super(provider);
1096N/A this.fd = SctpNet.socket(true);
1096N/A this.fdVal = IOUtil.fdVal(fd);
1096N/A this.state = ChannelState.UNCONNECTED;
1096N/A }
1096N/A
1096N/A /**
1096N/A * Constructor for sockets obtained from server sockets
1096N/A */
1096N/A public SctpChannelImpl(SelectorProvider provider, FileDescriptor fd)
1096N/A throws IOException {
1326N/A this(provider, fd, null);
1326N/A }
1326N/A
1326N/A /**
1326N/A * Constructor for sockets obtained from branching
1326N/A */
1326N/A public SctpChannelImpl(SelectorProvider provider,
1326N/A FileDescriptor fd,
1326N/A Association association)
1326N/A throws IOException {
1096N/A super(provider);
1096N/A this.fd = fd;
1096N/A this.fdVal = IOUtil.fdVal(fd);
1096N/A this.state = ChannelState.CONNECTED;
1096N/A port = (Net.localAddress(fd)).getPort();
1096N/A
1326N/A if (association != null) { /* branched */
1326N/A this.association = association;
1326N/A } else { /* obtained from server channel */
1326N/A /* Receive COMM_UP */
1326N/A ByteBuffer buf = Util.getTemporaryDirectBuffer(50);
1326N/A try {
1326N/A receive(buf, null, null, true);
1326N/A } finally {
1326N/A Util.releaseTemporaryDirectBuffer(buf);
1326N/A }
1096N/A }
1096N/A }
1096N/A
1096N/A /**
1096N/A * Binds the channel's socket to a local address.
1096N/A */
1096N/A @Override
1096N/A public SctpChannel bind(SocketAddress local) throws IOException {
1096N/A synchronized (receiveLock) {
1096N/A synchronized (sendLock) {
1096N/A synchronized (stateLock) {
1096N/A ensureOpenAndUnconnected();
1096N/A if (isBound())
2084N/A SctpNet.throwAlreadyBoundException();
1096N/A InetSocketAddress isa = (local == null) ?
1096N/A new InetSocketAddress(0) : Net.checkAddress(local);
1096N/A Net.bind(fd, isa.getAddress(), isa.getPort());
1096N/A InetSocketAddress boundIsa = Net.localAddress(fd);
1096N/A port = boundIsa.getPort();
1096N/A localAddresses.add(isa);
1096N/A if (isa.getAddress().isAnyLocalAddress())
1096N/A wildcard = true;
1096N/A }
1096N/A }
1096N/A }
1096N/A return this;
1096N/A }
1096N/A
1096N/A @Override
1096N/A public SctpChannel bindAddress(InetAddress address)
1096N/A throws IOException {
1096N/A bindUnbindAddress(address, true);
1096N/A localAddresses.add(new InetSocketAddress(address, port));
1096N/A return this;
1096N/A }
1096N/A
1096N/A @Override
1096N/A public SctpChannel unbindAddress(InetAddress address)
1096N/A throws IOException {
1096N/A bindUnbindAddress(address, false);
1096N/A localAddresses.remove(new InetSocketAddress(address, port));
1096N/A return this;
1096N/A }
1096N/A
1096N/A private SctpChannel bindUnbindAddress(InetAddress address, boolean add)
1096N/A throws IOException {
1096N/A if (address == null)
1096N/A throw new IllegalArgumentException();
1096N/A
1096N/A synchronized (receiveLock) {
1096N/A synchronized (sendLock) {
1096N/A synchronized (stateLock) {
1096N/A if (!isOpen())
1096N/A throw new ClosedChannelException();
1096N/A if (!isBound())
1096N/A throw new NotYetBoundException();
1096N/A if (wildcard)
1096N/A throw new IllegalStateException(
1096N/A "Cannot add or remove addresses from a channel that is bound to the wildcard address");
1096N/A if (address.isAnyLocalAddress())
1096N/A throw new IllegalArgumentException(
1096N/A "Cannot add or remove the wildcard address");
1096N/A if (add) {
1096N/A for (InetSocketAddress addr : localAddresses) {
1096N/A if (addr.getAddress().equals(address)) {
2084N/A SctpNet.throwAlreadyBoundException();
1096N/A }
1096N/A }
1096N/A } else { /*removing */
1096N/A /* Verify that there is more than one address
1096N/A * and that address is already bound */
1096N/A if (localAddresses.size() <= 1)
1096N/A throw new IllegalUnbindException("Cannot remove address from a channel with only one address bound");
1096N/A boolean foundAddress = false;
1096N/A for (InetSocketAddress addr : localAddresses) {
1096N/A if (addr.getAddress().equals(address)) {
1096N/A foundAddress = true;
1096N/A break;
1096N/A }
1096N/A }
1096N/A if (!foundAddress )
1096N/A throw new IllegalUnbindException("Cannot remove address from a channel that is not bound to that address");
1096N/A }
1096N/A
1096N/A SctpNet.bindx(fdVal, new InetAddress[]{address}, port, add);
1096N/A
1096N/A /* Update our internal Set to reflect the addition/removal */
1096N/A if (add)
1096N/A localAddresses.add(new InetSocketAddress(address, port));
1096N/A else {
1096N/A for (InetSocketAddress addr : localAddresses) {
1096N/A if (addr.getAddress().equals(address)) {
1096N/A localAddresses.remove(addr);
1096N/A break;
1096N/A }
1096N/A }
1096N/A }
1096N/A }
1096N/A }
1096N/A }
1096N/A return this;
1096N/A }
1096N/A
1096N/A private boolean isBound() {
1096N/A synchronized (stateLock) {
1096N/A return port == -1 ? false : true;
1096N/A }
1096N/A }
1096N/A
1096N/A private boolean isConnected() {
1096N/A synchronized (stateLock) {
1096N/A return (state == ChannelState.CONNECTED);
1096N/A }
1096N/A }
1096N/A
1096N/A private void ensureOpenAndUnconnected() throws IOException {
1096N/A synchronized (stateLock) {
1096N/A if (!isOpen())
1096N/A throw new ClosedChannelException();
1096N/A if (isConnected())
1096N/A throw new AlreadyConnectedException();
1096N/A if (state == ChannelState.PENDING)
1096N/A throw new ConnectionPendingException();
1096N/A }
1096N/A }
1096N/A
1096N/A private boolean ensureReceiveOpen() throws ClosedChannelException {
1096N/A synchronized (stateLock) {
1096N/A if (!isOpen())
1096N/A throw new ClosedChannelException();
1096N/A if (!isConnected())
1096N/A throw new NotYetConnectedException();
1096N/A else
1096N/A return true;
1096N/A }
1096N/A }
1096N/A
1096N/A private void ensureSendOpen() throws ClosedChannelException {
1096N/A synchronized (stateLock) {
1096N/A if (!isOpen())
1096N/A throw new ClosedChannelException();
1096N/A if (isShutdown)
1096N/A throw new ClosedChannelException();
1096N/A if (!isConnected())
1096N/A throw new NotYetConnectedException();
1096N/A }
1096N/A }
1096N/A
1096N/A private void receiverCleanup() throws IOException {
1096N/A synchronized (stateLock) {
1096N/A receiverThread = 0;
1096N/A if (state == ChannelState.KILLPENDING)
1096N/A kill();
1096N/A }
1096N/A }
1096N/A
1096N/A private void senderCleanup() throws IOException {
1096N/A synchronized (stateLock) {
1096N/A senderThread = 0;
1096N/A if (state == ChannelState.KILLPENDING)
1096N/A kill();
1096N/A }
1096N/A }
1096N/A
1096N/A @Override
1096N/A public Association association() throws ClosedChannelException {
1096N/A synchronized (stateLock) {
1096N/A if (!isOpen())
1096N/A throw new ClosedChannelException();
1096N/A if (!isConnected())
1096N/A return null;
1096N/A
1096N/A return association;
1096N/A }
1096N/A }
1096N/A
1096N/A @Override
1096N/A public boolean connect(SocketAddress endpoint) throws IOException {
1096N/A synchronized (receiveLock) {
1096N/A synchronized (sendLock) {
1096N/A ensureOpenAndUnconnected();
1096N/A InetSocketAddress isa = Net.checkAddress(endpoint);
1096N/A SecurityManager sm = System.getSecurityManager();
1096N/A if (sm != null)
1096N/A sm.checkConnect(isa.getAddress().getHostAddress(),
1096N/A isa.getPort());
1096N/A synchronized (blockingLock()) {
1096N/A int n = 0;
1096N/A try {
1096N/A try {
1096N/A begin();
1096N/A synchronized (stateLock) {
1096N/A if (!isOpen()) {
1096N/A return false;
1096N/A }
1096N/A receiverThread = NativeThread.current();
1096N/A }
1096N/A for (;;) {
1096N/A InetAddress ia = isa.getAddress();
1096N/A if (ia.isAnyLocalAddress())
1096N/A ia = InetAddress.getLocalHost();
2084N/A n = SctpNet.connect(fdVal, ia, isa.getPort());
1096N/A if ( (n == IOStatus.INTERRUPTED)
1096N/A && isOpen())
1096N/A continue;
1096N/A break;
1096N/A }
1096N/A } finally {
1096N/A receiverCleanup();
1096N/A end((n > 0) || (n == IOStatus.UNAVAILABLE));
1096N/A assert IOStatus.check(n);
1096N/A }
1096N/A } catch (IOException x) {
1096N/A /* If an exception was thrown, close the channel after
1096N/A * invoking end() so as to avoid bogus
1096N/A * AsynchronousCloseExceptions */
1096N/A close();
1096N/A throw x;
1096N/A }
1096N/A
1096N/A if (n > 0) {
1096N/A synchronized (stateLock) {
1096N/A /* Connection succeeded */
1096N/A state = ChannelState.CONNECTED;
1096N/A if (!isBound()) {
1096N/A InetSocketAddress boundIsa =
1096N/A Net.localAddress(fd);
1096N/A port = boundIsa.getPort();
1096N/A }
1096N/A
1096N/A /* Receive COMM_UP */
1096N/A ByteBuffer buf = Util.getTemporaryDirectBuffer(50);
1096N/A try {
1096N/A receive(buf, null, null, true);
1096N/A } finally {
1096N/A Util.releaseTemporaryDirectBuffer(buf);
1096N/A }
1326N/A
1326N/A /* cache remote addresses */
1326N/A try {
1326N/A remoteAddresses = getRemoteAddresses();
1326N/A } catch (IOException unused) { /* swallow exception */ }
1326N/A
1096N/A return true;
1096N/A }
1096N/A } else {
1096N/A synchronized (stateLock) {
1096N/A /* If nonblocking and no exception then connection
1096N/A * pending; disallow another invocation */
1096N/A if (!isBlocking())
1096N/A state = ChannelState.PENDING;
1096N/A else
1096N/A assert false;
1096N/A }
1096N/A }
1096N/A }
1096N/A return false;
1096N/A }
1096N/A }
1096N/A }
1096N/A
1096N/A @Override
1096N/A public boolean connect(SocketAddress endpoint,
1096N/A int maxOutStreams,
1096N/A int maxInStreams)
1096N/A throws IOException {
1326N/A ensureOpenAndUnconnected();
1096N/A return setOption(SCTP_INIT_MAXSTREAMS, InitMaxStreams.
1096N/A create(maxInStreams, maxOutStreams)).connect(endpoint);
1096N/A
1096N/A }
1096N/A
1096N/A @Override
1096N/A public boolean isConnectionPending() {
1096N/A synchronized (stateLock) {
1096N/A return (state == ChannelState.PENDING);
1096N/A }
1096N/A }
1096N/A
1096N/A @Override
1096N/A public boolean finishConnect() throws IOException {
1096N/A synchronized (receiveLock) {
1096N/A synchronized (sendLock) {
1096N/A synchronized (stateLock) {
1096N/A if (!isOpen())
1096N/A throw new ClosedChannelException();
1096N/A if (isConnected())
1096N/A return true;
1096N/A if (state != ChannelState.PENDING)
1096N/A throw new NoConnectionPendingException();
1096N/A }
1096N/A int n = 0;
1096N/A try {
1096N/A try {
1096N/A begin();
1096N/A synchronized (blockingLock()) {
1096N/A synchronized (stateLock) {
1096N/A if (!isOpen()) {
1096N/A return false;
1096N/A }
1096N/A receiverThread = NativeThread.current();
1096N/A }
1096N/A if (!isBlocking()) {
1096N/A for (;;) {
1096N/A n = checkConnect(fd, false, readyToConnect);
1096N/A if ( (n == IOStatus.INTERRUPTED)
1096N/A && isOpen())
1096N/A continue;
1096N/A break;
1096N/A }
1096N/A } else {
1096N/A for (;;) {
1096N/A n = checkConnect(fd, true, readyToConnect);
1096N/A if (n == 0) {
1096N/A // Loop in case of
1096N/A // spurious notifications
1096N/A continue;
1096N/A }
1096N/A if ( (n == IOStatus.INTERRUPTED)
1096N/A && isOpen())
1096N/A continue;
1096N/A break;
1096N/A }
1096N/A }
1096N/A }
1096N/A } finally {
1096N/A synchronized (stateLock) {
1096N/A receiverThread = 0;
1096N/A if (state == ChannelState.KILLPENDING) {
1096N/A kill();
1096N/A /* poll()/getsockopt() does not report
1096N/A * error (throws exception, with n = 0)
1096N/A * on Linux platform after dup2 and
1096N/A * signal-wakeup. Force n to 0 so the
1096N/A * end() can throw appropriate exception */
1096N/A n = 0;
1096N/A }
1096N/A }
1096N/A end((n > 0) || (n == IOStatus.UNAVAILABLE));
1096N/A assert IOStatus.check(n);
1096N/A }
1096N/A } catch (IOException x) {
1096N/A /* If an exception was thrown, close the channel after
1096N/A * invoking end() so as to avoid bogus
1096N/A * AsynchronousCloseExceptions */
1096N/A close();
1096N/A throw x;
1096N/A }
1096N/A
1096N/A if (n > 0) {
1096N/A synchronized (stateLock) {
1096N/A state = ChannelState.CONNECTED;
1096N/A if (!isBound()) {
1096N/A InetSocketAddress boundIsa =
1096N/A Net.localAddress(fd);
1096N/A port = boundIsa.getPort();
1096N/A }
1096N/A
1096N/A /* Receive COMM_UP */
1096N/A ByteBuffer buf = Util.getTemporaryDirectBuffer(50);
1096N/A try {
1096N/A receive(buf, null, null, true);
1096N/A } finally {
1096N/A Util.releaseTemporaryDirectBuffer(buf);
1096N/A }
1326N/A
1326N/A /* cache remote addresses */
1326N/A try {
1326N/A remoteAddresses = getRemoteAddresses();
1326N/A } catch (IOException unused) { /* swallow exception */ }
1326N/A
1096N/A return true;
1096N/A }
1096N/A }
1096N/A }
1096N/A }
1096N/A return false;
1096N/A }
1096N/A
1096N/A @Override
1096N/A protected void implConfigureBlocking(boolean block) throws IOException {
1096N/A IOUtil.configureBlocking(fd, block);
1096N/A }
1096N/A
1096N/A @Override
1096N/A public void implCloseSelectableChannel() throws IOException {
1096N/A synchronized (stateLock) {
2084N/A SctpNet.preClose(fdVal);
1096N/A
1096N/A if (receiverThread != 0)
1096N/A NativeThread.signal(receiverThread);
1096N/A
1096N/A if (senderThread != 0)
1096N/A NativeThread.signal(senderThread);
1096N/A
1096N/A if (!isRegistered())
1096N/A kill();
1096N/A }
1096N/A }
1096N/A
1096N/A @Override
1096N/A public FileDescriptor getFD() {
1096N/A return fd;
1096N/A }
1096N/A
1096N/A @Override
1096N/A public int getFDVal() {
1096N/A return fdVal;
1096N/A }
1096N/A
1096N/A /**
1096N/A * Translates native poll revent ops into a ready operation ops
1096N/A */
1096N/A private boolean translateReadyOps(int ops, int initialOps, SelectionKeyImpl sk) {
1096N/A int intOps = sk.nioInterestOps();
1096N/A int oldOps = sk.nioReadyOps();
1096N/A int newOps = initialOps;
1096N/A
1096N/A if ((ops & PollArrayWrapper.POLLNVAL) != 0) {
1096N/A /* This should only happen if this channel is pre-closed while a
1096N/A * selection operation is in progress
1096N/A * ## Throw an error if this channel has not been pre-closed */
1096N/A return false;
1096N/A }
1096N/A
1096N/A if ((ops & (PollArrayWrapper.POLLERR
1096N/A | PollArrayWrapper.POLLHUP)) != 0) {
1096N/A newOps = intOps;
1096N/A sk.nioReadyOps(newOps);
1096N/A /* No need to poll again in checkConnect,
1096N/A * the error will be detected there */
1096N/A readyToConnect = true;
1096N/A return (newOps & ~oldOps) != 0;
1096N/A }
1096N/A
1096N/A if (((ops & PollArrayWrapper.POLLIN) != 0) &&
1096N/A ((intOps & SelectionKey.OP_READ) != 0) &&
1096N/A isConnected())
1096N/A newOps |= SelectionKey.OP_READ;
1096N/A
1096N/A if (((ops & PollArrayWrapper.POLLCONN) != 0) &&
1096N/A ((intOps & SelectionKey.OP_CONNECT) != 0) &&
1096N/A ((state == ChannelState.UNCONNECTED) || (state == ChannelState.PENDING))) {
1096N/A newOps |= SelectionKey.OP_CONNECT;
1096N/A readyToConnect = true;
1096N/A }
1096N/A
1096N/A if (((ops & PollArrayWrapper.POLLOUT) != 0) &&
1096N/A ((intOps & SelectionKey.OP_WRITE) != 0) &&
1096N/A isConnected())
1096N/A newOps |= SelectionKey.OP_WRITE;
1096N/A
1096N/A sk.nioReadyOps(newOps);
1096N/A return (newOps & ~oldOps) != 0;
1096N/A }
1096N/A
1096N/A @Override
1096N/A public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl sk) {
1096N/A return translateReadyOps(ops, sk.nioReadyOps(), sk);
1096N/A }
1096N/A
1096N/A @Override
1096N/A @SuppressWarnings("all")
1096N/A public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl sk) {
1096N/A return translateReadyOps(ops, 0, sk);
1096N/A }
1096N/A
1096N/A @Override
1096N/A public void translateAndSetInterestOps(int ops, SelectionKeyImpl sk) {
1096N/A int newOps = 0;
1096N/A if ((ops & SelectionKey.OP_READ) != 0)
1096N/A newOps |= PollArrayWrapper.POLLIN;
1096N/A if ((ops & SelectionKey.OP_WRITE) != 0)
1096N/A newOps |= PollArrayWrapper.POLLOUT;
1096N/A if ((ops & SelectionKey.OP_CONNECT) != 0)
1096N/A newOps |= PollArrayWrapper.POLLCONN;
1096N/A sk.selector.putEventOps(sk, newOps);
1096N/A }
1096N/A
1096N/A @Override
1096N/A public void kill() throws IOException {
1096N/A synchronized (stateLock) {
1096N/A if (state == ChannelState.KILLED)
1096N/A return;
1096N/A if (state == ChannelState.UNINITIALIZED) {
1096N/A state = ChannelState.KILLED;
1096N/A return;
1096N/A }
1096N/A assert !isOpen() && !isRegistered();
1096N/A
1096N/A /* Postpone the kill if there is a waiting reader
1096N/A * or writer thread. */
1096N/A if (receiverThread == 0 && senderThread == 0) {
2084N/A SctpNet.close(fdVal);
1096N/A state = ChannelState.KILLED;
1096N/A } else {
1096N/A state = ChannelState.KILLPENDING;
1096N/A }
1096N/A }
1096N/A }
1096N/A
1096N/A @Override
1096N/A public <T> SctpChannel setOption(SctpSocketOption<T> name, T value)
1096N/A throws IOException {
1096N/A if (name == null)
1096N/A throw new NullPointerException();
1096N/A if (!supportedOptions().contains(name))
1096N/A throw new UnsupportedOperationException("'" + name + "' not supported");
1096N/A
1096N/A synchronized (stateLock) {
1096N/A if (!isOpen())
1096N/A throw new ClosedChannelException();
1096N/A
1096N/A SctpNet.setSocketOption(fdVal, name, value, 0 /*oneToOne*/);
1096N/A }
1096N/A return this;
1096N/A }
1096N/A
1096N/A @Override
1096N/A @SuppressWarnings("unchecked")
1096N/A public <T> T getOption(SctpSocketOption<T> name) throws IOException {
1096N/A if (name == null)
1096N/A throw new NullPointerException();
1096N/A if (!supportedOptions().contains(name))
1096N/A throw new UnsupportedOperationException("'" + name + "' not supported");
1096N/A
1096N/A synchronized (stateLock) {
1096N/A if (!isOpen())
1096N/A throw new ClosedChannelException();
1096N/A
1096N/A return (T)SctpNet.getSocketOption(fdVal, name, 0 /*oneToOne*/);
1096N/A }
1096N/A }
1096N/A
1096N/A private static class DefaultOptionsHolder {
1096N/A static final Set<SctpSocketOption<?>> defaultOptions = defaultOptions();
1096N/A
1096N/A private static Set<SctpSocketOption<?>> defaultOptions() {
1096N/A HashSet<SctpSocketOption<?>> set = new HashSet<SctpSocketOption<?>>(10);
1096N/A set.add(SCTP_DISABLE_FRAGMENTS);
1096N/A set.add(SCTP_EXPLICIT_COMPLETE);
1096N/A set.add(SCTP_FRAGMENT_INTERLEAVE);
1096N/A set.add(SCTP_INIT_MAXSTREAMS);
1096N/A set.add(SCTP_NODELAY);
1096N/A set.add(SCTP_PRIMARY_ADDR);
1096N/A set.add(SCTP_SET_PEER_PRIMARY_ADDR);
1096N/A set.add(SO_SNDBUF);
1096N/A set.add(SO_RCVBUF);
1096N/A set.add(SO_LINGER);
1096N/A return Collections.unmodifiableSet(set);
1096N/A }
1096N/A }
1096N/A
1096N/A @Override
1096N/A public final Set<SctpSocketOption<?>> supportedOptions() {
1096N/A return DefaultOptionsHolder.defaultOptions;
1096N/A }
1096N/A
1096N/A @Override
1096N/A public <T> MessageInfo receive(ByteBuffer buffer,
1096N/A T attachment,
1096N/A NotificationHandler<T> handler)
1096N/A throws IOException {
1096N/A return receive(buffer, attachment, handler, false);
1096N/A }
1096N/A
1096N/A private <T> MessageInfo receive(ByteBuffer buffer,
1096N/A T attachment,
1096N/A NotificationHandler<T> handler,
1096N/A boolean fromConnect)
1096N/A throws IOException {
1096N/A if (buffer == null)
1096N/A throw new IllegalArgumentException("buffer cannot be null");
1096N/A
1096N/A if (buffer.isReadOnly())
1096N/A throw new IllegalArgumentException("Read-only buffer");
1096N/A
1096N/A if (receiveInvoked.get())
1096N/A throw new IllegalReceiveException(
1096N/A "cannot invoke receive from handler");
1096N/A receiveInvoked.set(Boolean.TRUE);
1096N/A
1096N/A try {
1096N/A SctpResultContainer resultContainer = new SctpResultContainer();
1096N/A do {
1096N/A resultContainer.clear();
1096N/A synchronized (receiveLock) {
1096N/A if (!ensureReceiveOpen())
1096N/A return null;
1096N/A
1096N/A int n = 0;
1096N/A try {
1096N/A begin();
1096N/A
1096N/A synchronized (stateLock) {
1096N/A if(!isOpen())
1096N/A return null;
1096N/A receiverThread = NativeThread.current();
1096N/A }
1096N/A
1096N/A do {
1427N/A n = receive(fdVal, buffer, resultContainer, fromConnect);
1096N/A } while ((n == IOStatus.INTERRUPTED) && isOpen());
1096N/A } finally {
1096N/A receiverCleanup();
1096N/A end((n > 0) || (n == IOStatus.UNAVAILABLE));
1096N/A assert IOStatus.check(n);
1096N/A }
1096N/A
1096N/A if (!resultContainer.isNotification()) {
1096N/A /* message or nothing */
1096N/A if (resultContainer.hasSomething()) {
1096N/A /* Set the association before returning */
1096N/A SctpMessageInfoImpl info =
1096N/A resultContainer.getMessageInfo();
1096N/A synchronized (stateLock) {
1096N/A assert association != null;
1096N/A info.setAssociation(association);
1096N/A }
1096N/A return info;
1096N/A } else
1096N/A /* Non-blocking may return null if nothing available*/
1096N/A return null;
1096N/A } else { /* notification */
1096N/A synchronized (stateLock) {
1096N/A handleNotificationInternal(
1096N/A resultContainer);
1096N/A }
1096N/A }
1096N/A
1096N/A if (fromConnect) {
1096N/A /* If we reach here, then it was connect that invoked
1427N/A * receive and received the COMM_UP. We have already
1427N/A * handled the COMM_UP with the internal notification
1427N/A * handler. Simply return. */
1096N/A return null;
1096N/A }
1096N/A } /* receiveLock */
1096N/A } while (handler == null ? true :
1096N/A (invokeNotificationHandler(resultContainer, handler, attachment)
1096N/A == HandlerResult.CONTINUE));
1096N/A
1096N/A return null;
1096N/A } finally {
1096N/A receiveInvoked.set(Boolean.FALSE);
1096N/A }
1096N/A }
1096N/A
1096N/A private int receive(int fd,
1096N/A ByteBuffer dst,
1427N/A SctpResultContainer resultContainer,
1427N/A boolean peek)
1096N/A throws IOException {
1096N/A int pos = dst.position();
1096N/A int lim = dst.limit();
1096N/A assert (pos <= lim);
1096N/A int rem = (pos <= lim ? lim - pos : 0);
1096N/A if (dst instanceof DirectBuffer && rem > 0)
1427N/A return receiveIntoNativeBuffer(fd, resultContainer, dst, rem, pos, peek);
1096N/A
1096N/A /* Substitute a native buffer */
1096N/A int newSize = Math.max(rem, 1);
1096N/A ByteBuffer bb = Util.getTemporaryDirectBuffer(newSize);
1096N/A try {
1427N/A int n = receiveIntoNativeBuffer(fd, resultContainer, bb, newSize, 0, peek);
1096N/A bb.flip();
1096N/A if (n > 0 && rem > 0)
1096N/A dst.put(bb);
1096N/A return n;
1096N/A } finally {
1096N/A Util.releaseTemporaryDirectBuffer(bb);
1096N/A }
1096N/A }
1096N/A
1096N/A private int receiveIntoNativeBuffer(int fd,
1096N/A SctpResultContainer resultContainer,
1096N/A ByteBuffer bb,
1096N/A int rem,
1427N/A int pos,
1427N/A boolean peek)
1096N/A throws IOException
1096N/A {
1427N/A int n = receive0(fd, resultContainer, ((DirectBuffer)bb).address() + pos, rem, peek);
1096N/A
1096N/A if (n > 0)
1096N/A bb.position(pos + n);
1096N/A return n;
1096N/A }
1096N/A
1096N/A private InternalNotificationHandler<?> internalNotificationHandler =
1096N/A new InternalNotificationHandler();
1096N/A
1096N/A private void handleNotificationInternal(SctpResultContainer resultContainer)
1096N/A {
1096N/A invokeNotificationHandler(resultContainer,
1096N/A internalNotificationHandler, null);
1096N/A }
1096N/A
1096N/A private class InternalNotificationHandler<T>
1096N/A extends AbstractNotificationHandler<T>
1096N/A {
1096N/A @Override
1096N/A public HandlerResult handleNotification(
1096N/A AssociationChangeNotification not, T unused) {
1096N/A if (not.event().equals(
2092N/A AssociationChangeNotification.AssocChangeEvent.COMM_UP) &&
2092N/A association == null) {
1096N/A SctpAssocChange sac = (SctpAssocChange) not;
1096N/A association = new SctpAssociationImpl
1096N/A (sac.assocId(), sac.maxInStreams(), sac.maxOutStreams());
1096N/A }
1096N/A return HandlerResult.CONTINUE;
1096N/A }
1096N/A }
1096N/A
1096N/A private <T> HandlerResult invokeNotificationHandler
1096N/A (SctpResultContainer resultContainer,
1096N/A NotificationHandler<T> handler,
1096N/A T attachment) {
1096N/A SctpNotification notification = resultContainer.notification();
1096N/A synchronized (stateLock) {
1096N/A notification.setAssociation(association);
1096N/A }
1096N/A
1096N/A if (!(handler instanceof AbstractNotificationHandler)) {
1096N/A return handler.handleNotification(notification, attachment);
1096N/A }
1096N/A
1096N/A /* AbstractNotificationHandler */
1096N/A AbstractNotificationHandler absHandler =
1096N/A (AbstractNotificationHandler)handler;
1096N/A switch(resultContainer.type()) {
1096N/A case ASSOCIATION_CHANGED :
1096N/A return absHandler.handleNotification(
1096N/A resultContainer.getAssociationChanged(), attachment);
1096N/A case PEER_ADDRESS_CHANGED :
1096N/A return absHandler.handleNotification(
1096N/A resultContainer.getPeerAddressChanged(), attachment);
1096N/A case SEND_FAILED :
1096N/A return absHandler.handleNotification(
1096N/A resultContainer.getSendFailed(), attachment);
1096N/A case SHUTDOWN :
1096N/A return absHandler.handleNotification(
1096N/A resultContainer.getShutdown(), attachment);
1096N/A default :
1096N/A /* implementation specific handlers */
1096N/A return absHandler.handleNotification(
1096N/A resultContainer.notification(), attachment);
1096N/A }
1096N/A }
1096N/A
1096N/A private void checkAssociation(Association sendAssociation) {
1096N/A synchronized (stateLock) {
1096N/A if (sendAssociation != null && !sendAssociation.equals(association)) {
1096N/A throw new IllegalArgumentException(
1096N/A "Cannot send to another association");
1096N/A }
1096N/A }
1096N/A }
1096N/A
1096N/A private void checkStreamNumber(int streamNumber) {
1096N/A synchronized (stateLock) {
1096N/A if (association != null) {
1096N/A if (streamNumber < 0 ||
1096N/A streamNumber >= association.maxOutboundStreams())
1096N/A throw new InvalidStreamException();
1096N/A }
1096N/A }
1096N/A }
1096N/A
1096N/A /* TODO: Add support for ttl and isComplete to both 121 12M
1096N/A * SCTP_EOR not yet supported on reference platforms
1096N/A * TTL support limited...
1096N/A */
1096N/A @Override
1096N/A public int send(ByteBuffer buffer, MessageInfo messageInfo)
1096N/A throws IOException {
1096N/A if (buffer == null)
1096N/A throw new IllegalArgumentException("buffer cannot be null");
1096N/A
1096N/A if (messageInfo == null)
1096N/A throw new IllegalArgumentException("messageInfo cannot be null");
1096N/A
1096N/A checkAssociation(messageInfo.association());
1096N/A checkStreamNumber(messageInfo.streamNumber());
1096N/A
1096N/A synchronized (sendLock) {
1096N/A ensureSendOpen();
1096N/A
1096N/A int n = 0;
1096N/A try {
1096N/A begin();
1096N/A
1096N/A synchronized (stateLock) {
1096N/A if(!isOpen())
1096N/A return 0;
1096N/A senderThread = NativeThread.current();
1096N/A }
1096N/A
1096N/A do {
1096N/A n = send(fdVal, buffer, messageInfo);
1096N/A } while ((n == IOStatus.INTERRUPTED) && isOpen());
1096N/A
1096N/A return IOStatus.normalize(n);
1096N/A } finally {
1096N/A senderCleanup();
1096N/A end((n > 0) || (n == IOStatus.UNAVAILABLE));
1096N/A assert IOStatus.check(n);
1096N/A }
1096N/A }
1096N/A }
1096N/A
1096N/A private int send(int fd, ByteBuffer src, MessageInfo messageInfo)
1096N/A throws IOException {
1096N/A int streamNumber = messageInfo.streamNumber();
1096N/A SocketAddress target = messageInfo.address();
1096N/A boolean unordered = messageInfo.isUnordered();
1096N/A int ppid = messageInfo.payloadProtocolID();
1096N/A
1096N/A if (src instanceof DirectBuffer)
2092N/A return sendFromNativeBuffer(fd, src, target, streamNumber,
1096N/A unordered, ppid);
1096N/A
1096N/A /* Substitute a native buffer */
2092N/A int pos = src.position();
2092N/A int lim = src.limit();
2092N/A assert (pos <= lim && streamNumber >= 0);
2092N/A
2092N/A int rem = (pos <= lim ? lim - pos : 0);
1096N/A ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);
1096N/A try {
1096N/A bb.put(src);
1096N/A bb.flip();
1096N/A /* Do not update src until we see how many bytes were written */
1096N/A src.position(pos);
1096N/A
2092N/A int n = sendFromNativeBuffer(fd, bb, target, streamNumber,
1096N/A unordered, ppid);
1096N/A if (n > 0) {
1096N/A /* now update src */
1096N/A src.position(pos + n);
1096N/A }
1096N/A return n;
1096N/A } finally {
1096N/A Util.releaseTemporaryDirectBuffer(bb);
1096N/A }
1096N/A }
1096N/A
1096N/A private int sendFromNativeBuffer(int fd,
1096N/A ByteBuffer bb,
1096N/A SocketAddress target,
1096N/A int streamNumber,
1096N/A boolean unordered,
1096N/A int ppid)
1096N/A throws IOException {
5697N/A InetAddress addr = null; // no preferred address
5697N/A int port = 0;
5697N/A if (target != null) {
5697N/A InetSocketAddress isa = Net.checkAddress(target);
5697N/A addr = isa.getAddress();
5697N/A port = isa.getPort();
5697N/A }
5697N/A
2092N/A int pos = bb.position();
2092N/A int lim = bb.limit();
2092N/A assert (pos <= lim);
2092N/A int rem = (pos <= lim ? lim - pos : 0);
2092N/A
5697N/A int written = send0(fd, ((DirectBuffer)bb).address() + pos, rem, addr,
5697N/A port, -1 /*121*/, streamNumber, unordered, ppid);
1096N/A if (written > 0)
1096N/A bb.position(pos + written);
1096N/A return written;
1096N/A }
1096N/A
1096N/A @Override
1096N/A public SctpChannel shutdown() throws IOException {
1096N/A synchronized(stateLock) {
1096N/A if (isShutdown)
1096N/A return this;
1096N/A
1096N/A ensureSendOpen();
1096N/A SctpNet.shutdown(fdVal, -1);
1096N/A if (senderThread != 0)
1096N/A NativeThread.signal(senderThread);
1096N/A isShutdown = true;
1096N/A }
1096N/A return this;
1096N/A }
1096N/A
1096N/A @Override
1096N/A public Set<SocketAddress> getAllLocalAddresses()
1096N/A throws IOException {
1096N/A synchronized (stateLock) {
1096N/A if (!isOpen())
1096N/A throw new ClosedChannelException();
1096N/A if (!isBound())
1096N/A return Collections.EMPTY_SET;
1096N/A
1096N/A return SctpNet.getLocalAddresses(fdVal);
1096N/A }
1096N/A }
1096N/A
1096N/A @Override
1096N/A public Set<SocketAddress> getRemoteAddresses()
1096N/A throws IOException {
1096N/A synchronized (stateLock) {
1096N/A if (!isOpen())
1096N/A throw new ClosedChannelException();
1326N/A if (!isConnected() || isShutdown)
1096N/A return Collections.EMPTY_SET;
1096N/A
1326N/A try {
1326N/A return SctpNet.getRemoteAddresses(fdVal, 0/*unused*/);
1326N/A } catch (SocketException unused) {
1326N/A /* an open connected channel should always have remote addresses */
1326N/A return remoteAddresses;
1326N/A }
1096N/A }
1096N/A }
1096N/A
1096N/A /* Native */
1096N/A private static native void initIDs();
1096N/A
1096N/A static native int receive0(int fd, SctpResultContainer resultContainer,
1427N/A long address, int length, boolean peek) throws IOException;
1096N/A
1096N/A static native int send0(int fd, long address, int length,
5697N/A InetAddress addr, int port, int assocId, int streamNumber,
1096N/A boolean unordered, int ppid) throws IOException;
1096N/A
1096N/A private static native int checkConnect(FileDescriptor fd, boolean block,
1096N/A boolean ready) throws IOException;
1096N/A
1096N/A static {
1096N/A Util.load(); /* loads nio & net native libraries */
1096N/A java.security.AccessController.doPrivileged(
1096N/A new sun.security.action.LoadLibraryAction("sctp"));
1096N/A initIDs();
1096N/A }
1096N/A}