893N/A/*
3909N/A * Copyright (c) 2008, 2011, Oracle and/or its affiliates. All rights reserved.
893N/A * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
893N/A *
893N/A * This code is free software; you can redistribute it and/or modify it
893N/A * under the terms of the GNU General Public License version 2 only, as
2362N/A * published by the Free Software Foundation. Oracle designates this
893N/A * particular file as subject to the "Classpath" exception as provided
2362N/A * by Oracle in the LICENSE file that accompanied this code.
893N/A *
893N/A * This code is distributed in the hope that it will be useful, but WITHOUT
893N/A * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
893N/A * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
893N/A * version 2 for more details (a copy is included in the LICENSE file that
893N/A * accompanied this code).
893N/A *
893N/A * You should have received a copy of the GNU General Public License version
893N/A * 2 along with this work; if not, write to the Free Software Foundation,
954N/A * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
893N/A *
2362N/A * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
2362N/A * or visit www.oracle.com if you need additional information or have any
2362N/A * questions.
893N/A */
893N/A
893N/Apackage sun.nio.ch;
893N/A
893N/Aimport java.nio.channels.*;
893N/Aimport java.nio.ByteBuffer;
893N/Aimport java.nio.BufferOverflowException;
893N/Aimport java.net.*;
893N/Aimport java.util.concurrent.*;
893N/Aimport java.io.IOException;
893N/Aimport sun.misc.Unsafe;
893N/A
893N/A/**
893N/A * Windows implementation of AsynchronousSocketChannel using overlapped I/O.
893N/A */
893N/A
893N/Aclass WindowsAsynchronousSocketChannelImpl
893N/A extends AsynchronousSocketChannelImpl implements Iocp.OverlappedChannel
893N/A{
893N/A private static final Unsafe unsafe = Unsafe.getUnsafe();
893N/A private static int addressSize = unsafe.addressSize();
893N/A
893N/A private static int dependsArch(int value32, int value64) {
893N/A return (addressSize == 4) ? value32 : value64;
893N/A }
893N/A
893N/A /*
893N/A * typedef struct _WSABUF {
893N/A * u_long len;
893N/A * char FAR * buf;
893N/A * } WSABUF;
893N/A */
893N/A private static final int SIZEOF_WSABUF = dependsArch(8, 16);
893N/A private static final int OFFSETOF_LEN = 0;
893N/A private static final int OFFSETOF_BUF = dependsArch(4, 8);
893N/A
893N/A // maximum vector size for scatter/gather I/O
893N/A private static final int MAX_WSABUF = 16;
893N/A
893N/A private static final int SIZEOF_WSABUFARRAY = MAX_WSABUF * SIZEOF_WSABUF;
893N/A
893N/A
893N/A // socket handle. Use begin()/end() around each usage of this handle.
893N/A final long handle;
893N/A
893N/A // I/O completion port that the socket is associated with
893N/A private final Iocp iocp;
893N/A
893N/A // completion key to identify channel when I/O completes
893N/A private final int completionKey;
893N/A
893N/A // Pending I/O operations are tied to an OVERLAPPED structure that can only
893N/A // be released when the I/O completion event is posted to the completion
893N/A // port. Where I/O operations complete immediately then it is possible
893N/A // there may be more than two OVERLAPPED structures in use.
893N/A private final PendingIoCache ioCache;
893N/A
893N/A // per-channel arrays of WSABUF structures
893N/A private final long readBufferArray;
893N/A private final long writeBufferArray;
893N/A
893N/A
893N/A WindowsAsynchronousSocketChannelImpl(Iocp iocp, boolean failIfGroupShutdown)
893N/A throws IOException
893N/A {
893N/A super(iocp);
893N/A
893N/A // associate socket with default completion port
893N/A long h = IOUtil.fdVal(fd);
893N/A int key = 0;
893N/A try {
893N/A key = iocp.associate(this, h);
893N/A } catch (ShutdownChannelGroupException x) {
893N/A if (failIfGroupShutdown) {
893N/A closesocket0(h);
893N/A throw x;
893N/A }
893N/A } catch (IOException x) {
893N/A closesocket0(h);
893N/A throw x;
893N/A }
893N/A
893N/A this.handle = h;
893N/A this.iocp = iocp;
893N/A this.completionKey = key;
893N/A this.ioCache = new PendingIoCache();
893N/A
893N/A // allocate WSABUF arrays
893N/A this.readBufferArray = unsafe.allocateMemory(SIZEOF_WSABUFARRAY);
893N/A this.writeBufferArray = unsafe.allocateMemory(SIZEOF_WSABUFARRAY);
893N/A }
893N/A
893N/A WindowsAsynchronousSocketChannelImpl(Iocp iocp) throws IOException {
893N/A this(iocp, true);
893N/A }
893N/A
893N/A @Override
893N/A public AsynchronousChannelGroupImpl group() {
893N/A return iocp;
893N/A }
893N/A
893N/A /**
893N/A * Invoked by Iocp when an I/O operation competes.
893N/A */
893N/A @Override
893N/A public <V,A> PendingFuture<V,A> getByOverlapped(long overlapped) {
893N/A return ioCache.remove(overlapped);
893N/A }
893N/A
893N/A // invoked by WindowsAsynchronousServerSocketChannelImpl
893N/A long handle() {
893N/A return handle;
893N/A }
893N/A
893N/A // invoked by WindowsAsynchronousServerSocketChannelImpl when new connection
893N/A // accept
6319N/A void setConnected(InetSocketAddress localAddress,
6319N/A InetSocketAddress remoteAddress)
6319N/A {
893N/A synchronized (stateLock) {
893N/A state = ST_CONNECTED;
893N/A this.localAddress = localAddress;
893N/A this.remoteAddress = remoteAddress;
893N/A }
893N/A }
893N/A
893N/A @Override
893N/A void implClose() throws IOException {
893N/A // close socket (may cause outstanding async I/O operations to fail).
893N/A closesocket0(handle);
893N/A
893N/A // waits until all I/O operations have completed
893N/A ioCache.close();
893N/A
893N/A // release arrays of WSABUF structures
893N/A unsafe.freeMemory(readBufferArray);
893N/A unsafe.freeMemory(writeBufferArray);
893N/A
893N/A // finally disassociate from the completion port (key can be 0 if
893N/A // channel created when group is shutdown)
893N/A if (completionKey != 0)
893N/A iocp.disassociate(completionKey);
893N/A }
893N/A
893N/A @Override
893N/A public void onCancel(PendingFuture<?,?> task) {
893N/A if (task.getContext() instanceof ConnectTask)
893N/A killConnect();
893N/A if (task.getContext() instanceof ReadTask)
893N/A killReading();
893N/A if (task.getContext() instanceof WriteTask)
893N/A killWriting();
893N/A }
893N/A
893N/A /**
893N/A * Implements the task to initiate a connection and the handler to
893N/A * consume the result when the connection is established (or fails).
893N/A */
893N/A private class ConnectTask<A> implements Runnable, Iocp.ResultHandler {
893N/A private final InetSocketAddress remote;
893N/A private final PendingFuture<Void,A> result;
893N/A
893N/A ConnectTask(InetSocketAddress remote, PendingFuture<Void,A> result) {
893N/A this.remote = remote;
893N/A this.result = result;
893N/A }
893N/A
893N/A private void closeChannel() {
893N/A try {
893N/A close();
893N/A } catch (IOException ignore) { }
893N/A }
893N/A
893N/A private IOException toIOException(Throwable x) {
893N/A if (x instanceof IOException) {
893N/A if (x instanceof ClosedChannelException)
893N/A x = new AsynchronousCloseException();
893N/A return (IOException)x;
893N/A }
893N/A return new IOException(x);
893N/A }
893N/A
893N/A /**
893N/A * Invoke after a connection is successfully established.
893N/A */
893N/A private void afterConnect() throws IOException {
893N/A updateConnectContext(handle);
893N/A synchronized (stateLock) {
893N/A state = ST_CONNECTED;
893N/A remoteAddress = remote;
893N/A }
893N/A }
893N/A
893N/A /**
893N/A * Task to initiate a connection.
893N/A */
893N/A @Override
893N/A public void run() {
893N/A long overlapped = 0L;
893N/A Throwable exc = null;
893N/A try {
893N/A begin();
893N/A
893N/A // synchronize on result to allow this thread handle the case
893N/A // where the connection is established immediately.
893N/A synchronized (result) {
893N/A overlapped = ioCache.add(result);
893N/A // initiate the connection
893N/A int n = connect0(handle, Net.isIPv6Available(), remote.getAddress(),
893N/A remote.getPort(), overlapped);
893N/A if (n == IOStatus.UNAVAILABLE) {
893N/A // connection is pending
893N/A return;
893N/A }
893N/A
893N/A // connection established immediately
893N/A afterConnect();
893N/A result.setResult(null);
893N/A }
893N/A } catch (Throwable x) {
3671N/A if (overlapped != 0L)
3671N/A ioCache.remove(overlapped);
893N/A exc = x;
893N/A } finally {
893N/A end();
893N/A }
893N/A
893N/A if (exc != null) {
893N/A closeChannel();
893N/A result.setFailure(toIOException(exc));
893N/A }
1580N/A Invoker.invoke(result);
893N/A }
893N/A
893N/A /**
893N/A * Invoked by handler thread when connection established.
893N/A */
893N/A @Override
1580N/A public void completed(int bytesTransferred, boolean canInvokeDirect) {
893N/A Throwable exc = null;
893N/A try {
893N/A begin();
893N/A afterConnect();
893N/A result.setResult(null);
893N/A } catch (Throwable x) {
893N/A // channel is closed or unable to finish connect
893N/A exc = x;
893N/A } finally {
893N/A end();
893N/A }
893N/A
893N/A // can't close channel while in begin/end block
893N/A if (exc != null) {
893N/A closeChannel();
893N/A result.setFailure(toIOException(exc));
893N/A }
893N/A
1580N/A if (canInvokeDirect) {
1580N/A Invoker.invokeUnchecked(result);
1580N/A } else {
1580N/A Invoker.invoke(result);
1580N/A }
893N/A }
893N/A
893N/A /**
893N/A * Invoked by handler thread when failed to establish connection.
893N/A */
893N/A @Override
893N/A public void failed(int error, IOException x) {
893N/A if (isOpen()) {
893N/A closeChannel();
893N/A result.setFailure(x);
893N/A } else {
893N/A result.setFailure(new AsynchronousCloseException());
893N/A }
1580N/A Invoker.invoke(result);
893N/A }
893N/A }
893N/A
893N/A @Override
1580N/A <A> Future<Void> implConnect(SocketAddress remote,
1580N/A A attachment,
1580N/A CompletionHandler<Void,? super A> handler)
893N/A {
893N/A if (!isOpen()) {
1580N/A Throwable exc = new ClosedChannelException();
1580N/A if (handler == null)
1580N/A return CompletedFuture.withFailure(exc);
1580N/A Invoker.invoke(this, handler, attachment, null, exc);
1580N/A return null;
893N/A }
893N/A
893N/A InetSocketAddress isa = Net.checkAddress(remote);
893N/A
893N/A // permission check
893N/A SecurityManager sm = System.getSecurityManager();
893N/A if (sm != null)
893N/A sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort());
893N/A
893N/A // check and update state
893N/A // ConnectEx requires the socket to be bound to a local address
893N/A IOException bindException = null;
893N/A synchronized (stateLock) {
893N/A if (state == ST_CONNECTED)
893N/A throw new AlreadyConnectedException();
893N/A if (state == ST_PENDING)
893N/A throw new ConnectionPendingException();
893N/A if (localAddress == null) {
893N/A try {
893N/A bind(new InetSocketAddress(0));
893N/A } catch (IOException x) {
893N/A bindException = x;
893N/A }
893N/A }
893N/A if (bindException == null)
893N/A state = ST_PENDING;
893N/A }
893N/A
893N/A // handle bind failure
893N/A if (bindException != null) {
893N/A try {
893N/A close();
893N/A } catch (IOException ignore) { }
1580N/A if (handler == null)
1580N/A return CompletedFuture.withFailure(bindException);
1580N/A Invoker.invoke(this, handler, attachment, null, bindException);
1580N/A return null;
893N/A }
893N/A
893N/A // setup task
893N/A PendingFuture<Void,A> result =
893N/A new PendingFuture<Void,A>(this, handler, attachment);
893N/A ConnectTask task = new ConnectTask<A>(isa, result);
893N/A result.setContext(task);
893N/A
1580N/A // initiate I/O
1580N/A if (Iocp.supportsThreadAgnosticIo()) {
1580N/A task.run();
1580N/A } else {
1580N/A Invoker.invokeOnThreadInThreadPool(this, task);
1580N/A }
893N/A return result;
893N/A }
893N/A
893N/A /**
893N/A * Implements the task to initiate a read and the handler to consume the
893N/A * result when the read completes.
893N/A */
893N/A private class ReadTask<V,A> implements Runnable, Iocp.ResultHandler {
893N/A private final ByteBuffer[] bufs;
893N/A private final int numBufs;
893N/A private final boolean scatteringRead;
893N/A private final PendingFuture<V,A> result;
893N/A
893N/A // set by run method
893N/A private ByteBuffer[] shadow;
893N/A
893N/A ReadTask(ByteBuffer[] bufs,
893N/A boolean scatteringRead,
893N/A PendingFuture<V,A> result)
893N/A {
893N/A this.bufs = bufs;
893N/A this.numBufs = (bufs.length > MAX_WSABUF) ? MAX_WSABUF : bufs.length;
893N/A this.scatteringRead = scatteringRead;
893N/A this.result = result;
893N/A }
893N/A
893N/A /**
893N/A * Invoked prior to read to prepare the WSABUF array. Where necessary,
893N/A * it substitutes non-direct buffers with direct buffers.
893N/A */
893N/A void prepareBuffers() {
893N/A shadow = new ByteBuffer[numBufs];
893N/A long address = readBufferArray;
893N/A for (int i=0; i<numBufs; i++) {
893N/A ByteBuffer dst = bufs[i];
893N/A int pos = dst.position();
893N/A int lim = dst.limit();
893N/A assert (pos <= lim);
893N/A int rem = (pos <= lim ? lim - pos : 0);
893N/A long a;
893N/A if (!(dst instanceof DirectBuffer)) {
893N/A // substitute with direct buffer
893N/A ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);
893N/A shadow[i] = bb;
893N/A a = ((DirectBuffer)bb).address();
893N/A } else {
893N/A shadow[i] = dst;
893N/A a = ((DirectBuffer)dst).address() + pos;
893N/A }
893N/A unsafe.putAddress(address + OFFSETOF_BUF, a);
893N/A unsafe.putInt(address + OFFSETOF_LEN, rem);
893N/A address += SIZEOF_WSABUF;
893N/A }
893N/A }
893N/A
893N/A /**
893N/A * Invoked after a read has completed to update the buffer positions
893N/A * and release any substituted buffers.
893N/A */
893N/A void updateBuffers(int bytesRead) {
893N/A for (int i=0; i<numBufs; i++) {
893N/A ByteBuffer nextBuffer = shadow[i];
893N/A int pos = nextBuffer.position();
893N/A int len = nextBuffer.remaining();
893N/A if (bytesRead >= len) {
893N/A bytesRead -= len;
893N/A int newPosition = pos + len;
893N/A try {
893N/A nextBuffer.position(newPosition);
893N/A } catch (IllegalArgumentException x) {
893N/A // position changed by another
893N/A }
893N/A } else { // Buffers not completely filled
893N/A if (bytesRead > 0) {
893N/A assert(pos + bytesRead < (long)Integer.MAX_VALUE);
893N/A int newPosition = pos + bytesRead;
893N/A try {
893N/A nextBuffer.position(newPosition);
893N/A } catch (IllegalArgumentException x) {
893N/A // position changed by another
893N/A }
893N/A }
893N/A break;
893N/A }
893N/A }
893N/A
893N/A // Put results from shadow into the slow buffers
893N/A for (int i=0; i<numBufs; i++) {
893N/A if (!(bufs[i] instanceof DirectBuffer)) {
893N/A shadow[i].flip();
893N/A try {
893N/A bufs[i].put(shadow[i]);
893N/A } catch (BufferOverflowException x) {
893N/A // position changed by another
893N/A }
893N/A }
893N/A }
893N/A }
893N/A
893N/A void releaseBuffers() {
893N/A for (int i=0; i<numBufs; i++) {
893N/A if (!(bufs[i] instanceof DirectBuffer)) {
893N/A Util.releaseTemporaryDirectBuffer(shadow[i]);
893N/A }
893N/A }
893N/A }
893N/A
893N/A @Override
893N/A @SuppressWarnings("unchecked")
893N/A public void run() {
893N/A long overlapped = 0L;
893N/A boolean prepared = false;
893N/A boolean pending = false;
893N/A
893N/A try {
893N/A begin();
893N/A
893N/A // substitute non-direct buffers
893N/A prepareBuffers();
893N/A prepared = true;
893N/A
893N/A // get an OVERLAPPED structure (from the cache or allocate)
893N/A overlapped = ioCache.add(result);
893N/A
1191N/A // initiate read
1191N/A int n = read0(handle, numBufs, readBufferArray, overlapped);
1191N/A if (n == IOStatus.UNAVAILABLE) {
1191N/A // I/O is pending
1191N/A pending = true;
1191N/A return;
1191N/A }
1191N/A if (n == IOStatus.EOF) {
1191N/A // input shutdown
1191N/A enableReading();
1191N/A if (scatteringRead) {
1191N/A result.setResult((V)Long.valueOf(-1L));
1191N/A } else {
1191N/A result.setResult((V)Integer.valueOf(-1));
893N/A }
1191N/A } else {
1191N/A throw new InternalError("Read completed immediately");
893N/A }
893N/A } catch (Throwable x) {
1191N/A // failed to initiate read
1191N/A // reset read flag before releasing waiters
893N/A enableReading();
893N/A if (x instanceof ClosedChannelException)
893N/A x = new AsynchronousCloseException();
893N/A if (!(x instanceof IOException))
893N/A x = new IOException(x);
893N/A result.setFailure(x);
893N/A } finally {
1191N/A // release resources if I/O not pending
1191N/A if (!pending) {
1191N/A if (overlapped != 0L)
1191N/A ioCache.remove(overlapped);
1191N/A if (prepared)
1191N/A releaseBuffers();
893N/A }
893N/A end();
893N/A }
893N/A
893N/A // invoke completion handler
1580N/A Invoker.invoke(result);
893N/A }
893N/A
893N/A /**
893N/A * Executed when the I/O has completed
893N/A */
893N/A @Override
893N/A @SuppressWarnings("unchecked")
1580N/A public void completed(int bytesTransferred, boolean canInvokeDirect) {
893N/A if (bytesTransferred == 0) {
893N/A bytesTransferred = -1; // EOF
893N/A } else {
893N/A updateBuffers(bytesTransferred);
893N/A }
893N/A
893N/A // return direct buffer to cache if substituted
893N/A releaseBuffers();
893N/A
893N/A // release waiters if not already released by timeout
893N/A synchronized (result) {
893N/A if (result.isDone())
893N/A return;
893N/A enableReading();
893N/A if (scatteringRead) {
893N/A result.setResult((V)Long.valueOf(bytesTransferred));
893N/A } else {
893N/A result.setResult((V)Integer.valueOf(bytesTransferred));
893N/A }
893N/A }
1580N/A if (canInvokeDirect) {
1580N/A Invoker.invokeUnchecked(result);
1580N/A } else {
1580N/A Invoker.invoke(result);
1580N/A }
893N/A }
893N/A
893N/A @Override
893N/A public void failed(int error, IOException x) {
893N/A // return direct buffer to cache if substituted
893N/A releaseBuffers();
893N/A
893N/A // release waiters if not already released by timeout
893N/A if (!isOpen())
893N/A x = new AsynchronousCloseException();
893N/A
893N/A synchronized (result) {
893N/A if (result.isDone())
893N/A return;
893N/A enableReading();
893N/A result.setFailure(x);
893N/A }
1580N/A Invoker.invoke(result);
893N/A }
893N/A
893N/A /**
893N/A * Invoked if timeout expires before it is cancelled
893N/A */
893N/A void timeout() {
893N/A // synchronize on result as the I/O could complete/fail
893N/A synchronized (result) {
893N/A if (result.isDone())
893N/A return;
893N/A
893N/A // kill further reading before releasing waiters
893N/A enableReading(true);
893N/A result.setFailure(new InterruptedByTimeoutException());
893N/A }
893N/A
893N/A // invoke handler without any locks
1580N/A Invoker.invoke(result);
893N/A }
893N/A }
893N/A
893N/A @Override
1580N/A <V extends Number,A> Future<V> implRead(boolean isScatteringRead,
1580N/A ByteBuffer dst,
1580N/A ByteBuffer[] dsts,
893N/A long timeout,
893N/A TimeUnit unit,
893N/A A attachment,
893N/A CompletionHandler<V,? super A> handler)
893N/A {
893N/A // setup task
893N/A PendingFuture<V,A> result =
893N/A new PendingFuture<V,A>(this, handler, attachment);
1580N/A ByteBuffer[] bufs;
1580N/A if (isScatteringRead) {
1580N/A bufs = dsts;
1580N/A } else {
1580N/A bufs = new ByteBuffer[1];
1580N/A bufs[0] = dst;
1580N/A }
1580N/A final ReadTask readTask = new ReadTask<V,A>(bufs, isScatteringRead, result);
893N/A result.setContext(readTask);
893N/A
893N/A // schedule timeout
893N/A if (timeout > 0L) {
893N/A Future<?> timeoutTask = iocp.schedule(new Runnable() {
893N/A public void run() {
893N/A readTask.timeout();
893N/A }
893N/A }, timeout, unit);
893N/A result.setTimeoutTask(timeoutTask);
893N/A }
893N/A
1580N/A // initiate I/O
1580N/A if (Iocp.supportsThreadAgnosticIo()) {
1580N/A readTask.run();
1580N/A } else {
1580N/A Invoker.invokeOnThreadInThreadPool(this, readTask);
1580N/A }
893N/A return result;
893N/A }
893N/A
893N/A /**
893N/A * Implements the task to initiate a write and the handler to consume the
893N/A * result when the write completes.
893N/A */
893N/A private class WriteTask<V,A> implements Runnable, Iocp.ResultHandler {
893N/A private final ByteBuffer[] bufs;
893N/A private final int numBufs;
893N/A private final boolean gatheringWrite;
893N/A private final PendingFuture<V,A> result;
893N/A
893N/A // set by run method
893N/A private ByteBuffer[] shadow;
893N/A
893N/A WriteTask(ByteBuffer[] bufs,
893N/A boolean gatheringWrite,
893N/A PendingFuture<V,A> result)
893N/A {
893N/A this.bufs = bufs;
893N/A this.numBufs = (bufs.length > MAX_WSABUF) ? MAX_WSABUF : bufs.length;
893N/A this.gatheringWrite = gatheringWrite;
893N/A this.result = result;
893N/A }
893N/A
893N/A /**
893N/A * Invoked prior to write to prepare the WSABUF array. Where necessary,
893N/A * it substitutes non-direct buffers with direct buffers.
893N/A */
893N/A void prepareBuffers() {
893N/A shadow = new ByteBuffer[numBufs];
893N/A long address = writeBufferArray;
893N/A for (int i=0; i<numBufs; i++) {
893N/A ByteBuffer src = bufs[i];
893N/A int pos = src.position();
893N/A int lim = src.limit();
893N/A assert (pos <= lim);
893N/A int rem = (pos <= lim ? lim - pos : 0);
893N/A long a;
893N/A if (!(src instanceof DirectBuffer)) {
893N/A // substitute with direct buffer
893N/A ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);
893N/A bb.put(src);
893N/A bb.flip();
893N/A src.position(pos); // leave heap buffer untouched for now
893N/A shadow[i] = bb;
893N/A a = ((DirectBuffer)bb).address();
893N/A } else {
893N/A shadow[i] = src;
893N/A a = ((DirectBuffer)src).address() + pos;
893N/A }
893N/A unsafe.putAddress(address + OFFSETOF_BUF, a);
893N/A unsafe.putInt(address + OFFSETOF_LEN, rem);
893N/A address += SIZEOF_WSABUF;
893N/A }
893N/A }
893N/A
893N/A /**
893N/A * Invoked after a write has completed to update the buffer positions
893N/A * and release any substituted buffers.
893N/A */
893N/A void updateBuffers(int bytesWritten) {
893N/A // Notify the buffers how many bytes were taken
893N/A for (int i=0; i<numBufs; i++) {
893N/A ByteBuffer nextBuffer = bufs[i];
893N/A int pos = nextBuffer.position();
893N/A int lim = nextBuffer.limit();
893N/A int len = (pos <= lim ? lim - pos : lim);
893N/A if (bytesWritten >= len) {
893N/A bytesWritten -= len;
893N/A int newPosition = pos + len;
893N/A try {
893N/A nextBuffer.position(newPosition);
893N/A } catch (IllegalArgumentException x) {
893N/A // position changed by someone else
893N/A }
893N/A } else { // Buffers not completely filled
893N/A if (bytesWritten > 0) {
893N/A assert(pos + bytesWritten < (long)Integer.MAX_VALUE);
893N/A int newPosition = pos + bytesWritten;
893N/A try {
893N/A nextBuffer.position(newPosition);
893N/A } catch (IllegalArgumentException x) {
893N/A // position changed by someone else
893N/A }
893N/A }
893N/A break;
893N/A }
893N/A }
893N/A }
893N/A
893N/A void releaseBuffers() {
893N/A for (int i=0; i<numBufs; i++) {
893N/A if (!(bufs[i] instanceof DirectBuffer)) {
893N/A Util.releaseTemporaryDirectBuffer(shadow[i]);
893N/A }
893N/A }
893N/A }
893N/A
893N/A @Override
1580N/A //@SuppressWarnings("unchecked")
893N/A public void run() {
893N/A long overlapped = 0L;
893N/A boolean prepared = false;
893N/A boolean pending = false;
893N/A boolean shutdown = false;
893N/A
893N/A try {
893N/A begin();
893N/A
893N/A // substitute non-direct buffers
893N/A prepareBuffers();
893N/A prepared = true;
893N/A
893N/A // get an OVERLAPPED structure (from the cache or allocate)
893N/A overlapped = ioCache.add(result);
1191N/A int n = write0(handle, numBufs, writeBufferArray, overlapped);
1191N/A if (n == IOStatus.UNAVAILABLE) {
1191N/A // I/O is pending
1191N/A pending = true;
1191N/A return;
893N/A }
1191N/A if (n == IOStatus.EOF) {
1191N/A // special case for shutdown output
1191N/A shutdown = true;
1191N/A throw new ClosedChannelException();
1191N/A }
1191N/A // write completed immediately
1191N/A throw new InternalError("Write completed immediately");
893N/A } catch (Throwable x) {
1191N/A // write failed. Enable writing before releasing waiters.
893N/A enableWriting();
893N/A if (!shutdown && (x instanceof ClosedChannelException))
893N/A x = new AsynchronousCloseException();
893N/A if (!(x instanceof IOException))
893N/A x = new IOException(x);
893N/A result.setFailure(x);
893N/A } finally {
1191N/A // release resources if I/O not pending
1191N/A if (!pending) {
1191N/A if (overlapped != 0L)
1191N/A ioCache.remove(overlapped);
1191N/A if (prepared)
1191N/A releaseBuffers();
893N/A }
893N/A end();
893N/A }
893N/A
893N/A // invoke completion handler
1580N/A Invoker.invoke(result);
893N/A }
893N/A
893N/A /**
893N/A * Executed when the I/O has completed
893N/A */
893N/A @Override
893N/A @SuppressWarnings("unchecked")
1580N/A public void completed(int bytesTransferred, boolean canInvokeDirect) {
893N/A updateBuffers(bytesTransferred);
893N/A
893N/A // return direct buffer to cache if substituted
893N/A releaseBuffers();
893N/A
893N/A // release waiters if not already released by timeout
893N/A synchronized (result) {
893N/A if (result.isDone())
893N/A return;
893N/A enableWriting();
893N/A if (gatheringWrite) {
893N/A result.setResult((V)Long.valueOf(bytesTransferred));
893N/A } else {
893N/A result.setResult((V)Integer.valueOf(bytesTransferred));
893N/A }
893N/A }
1580N/A if (canInvokeDirect) {
1580N/A Invoker.invokeUnchecked(result);
1580N/A } else {
1580N/A Invoker.invoke(result);
1580N/A }
893N/A }
893N/A
893N/A @Override
893N/A public void failed(int error, IOException x) {
893N/A // return direct buffer to cache if substituted
893N/A releaseBuffers();
893N/A
893N/A // release waiters if not already released by timeout
893N/A if (!isOpen())
893N/A x = new AsynchronousCloseException();
893N/A
893N/A synchronized (result) {
893N/A if (result.isDone())
893N/A return;
893N/A enableWriting();
893N/A result.setFailure(x);
893N/A }
1580N/A Invoker.invoke(result);
893N/A }
893N/A
893N/A /**
893N/A * Invoked if timeout expires before it is cancelled
893N/A */
893N/A void timeout() {
893N/A // synchronize on result as the I/O could complete/fail
893N/A synchronized (result) {
893N/A if (result.isDone())
893N/A return;
893N/A
893N/A // kill further writing before releasing waiters
893N/A enableWriting(true);
893N/A result.setFailure(new InterruptedByTimeoutException());
893N/A }
893N/A
893N/A // invoke handler without any locks
1580N/A Invoker.invoke(result);
893N/A }
893N/A }
893N/A
893N/A @Override
1580N/A <V extends Number,A> Future<V> implWrite(boolean gatheringWrite,
1580N/A ByteBuffer src,
1580N/A ByteBuffer[] srcs,
893N/A long timeout,
893N/A TimeUnit unit,
893N/A A attachment,
893N/A CompletionHandler<V,? super A> handler)
893N/A {
893N/A // setup task
893N/A PendingFuture<V,A> result =
893N/A new PendingFuture<V,A>(this, handler, attachment);
1580N/A ByteBuffer[] bufs;
1580N/A if (gatheringWrite) {
1580N/A bufs = srcs;
1580N/A } else {
1580N/A bufs = new ByteBuffer[1];
1580N/A bufs[0] = src;
1580N/A }
893N/A final WriteTask writeTask = new WriteTask<V,A>(bufs, gatheringWrite, result);
893N/A result.setContext(writeTask);
893N/A
893N/A // schedule timeout
893N/A if (timeout > 0L) {
893N/A Future<?> timeoutTask = iocp.schedule(new Runnable() {
893N/A public void run() {
893N/A writeTask.timeout();
893N/A }
893N/A }, timeout, unit);
893N/A result.setTimeoutTask(timeoutTask);
893N/A }
893N/A
893N/A // initiate I/O (can only be done from thread in thread pool)
1580N/A // initiate I/O
1580N/A if (Iocp.supportsThreadAgnosticIo()) {
1580N/A writeTask.run();
1580N/A } else {
1580N/A Invoker.invokeOnThreadInThreadPool(this, writeTask);
1580N/A }
893N/A return result;
893N/A }
893N/A
893N/A // -- Native methods --
893N/A
893N/A private static native void initIDs();
893N/A
893N/A private static native int connect0(long socket, boolean preferIPv6,
893N/A InetAddress remote, int remotePort, long overlapped) throws IOException;
893N/A
893N/A private static native void updateConnectContext(long socket) throws IOException;
893N/A
893N/A private static native int read0(long socket, int count, long addres, long overlapped)
893N/A throws IOException;
893N/A
893N/A private static native int write0(long socket, int count, long address,
893N/A long overlapped) throws IOException;
893N/A
893N/A private static native void shutdown0(long socket, int how) throws IOException;
893N/A
893N/A private static native void closesocket0(long socket) throws IOException;
893N/A
893N/A static {
893N/A Util.load();
893N/A initIDs();
893N/A }
893N/A}