430N/A/*
2362N/A * Copyright (c) 2008, 2009, Oracle and/or its affiliates. All rights reserved.
430N/A * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
430N/A *
430N/A * This code is free software; you can redistribute it and/or modify it
430N/A * under the terms of the GNU General Public License version 2 only, as
2362N/A * published by the Free Software Foundation. Oracle designates this
430N/A * particular file as subject to the "Classpath" exception as provided
2362N/A * by Oracle in the LICENSE file that accompanied this code.
430N/A *
430N/A * This code is distributed in the hope that it will be useful, but WITHOUT
430N/A * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
430N/A * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
430N/A * version 2 for more details (a copy is included in the LICENSE file that
430N/A * accompanied this code).
430N/A *
430N/A * You should have received a copy of the GNU General Public License version
430N/A * 2 along with this work; if not, write to the Free Software Foundation,
430N/A * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
430N/A *
2362N/A * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
2362N/A * or visit www.oracle.com if you need additional information or have any
2362N/A * questions.
430N/A */
430N/A
430N/Apackage sun.nio.ch;
430N/A
430N/Aimport java.nio.channels.*;
430N/Aimport java.nio.channels.spi.AsynchronousChannelProvider;
430N/Aimport java.io.Closeable;
430N/Aimport java.io.IOException;
430N/Aimport java.io.FileDescriptor;
430N/Aimport java.util.*;
430N/Aimport java.util.concurrent.*;
430N/Aimport java.util.concurrent.locks.ReadWriteLock;
430N/Aimport java.util.concurrent.locks.ReentrantReadWriteLock;
430N/Aimport java.security.AccessController;
430N/Aimport sun.security.action.GetPropertyAction;
430N/Aimport sun.misc.Unsafe;
430N/A
430N/A/**
430N/A * Windows implementation of AsynchronousChannelGroup encapsulating an I/O
430N/A * completion port.
430N/A */
430N/A
430N/Aclass Iocp extends AsynchronousChannelGroupImpl {
430N/A private static final Unsafe unsafe = Unsafe.getUnsafe();
430N/A private static final long INVALID_HANDLE_VALUE = -1L;
430N/A private static final boolean supportsThreadAgnosticIo;
430N/A
430N/A // maps completion key to channel
430N/A private final ReadWriteLock keyToChannelLock = new ReentrantReadWriteLock();
430N/A private final Map<Integer,OverlappedChannel> keyToChannel =
430N/A new HashMap<Integer,OverlappedChannel>();
430N/A private int nextCompletionKey;
430N/A
430N/A // handle to completion port
430N/A private final long port;
430N/A
430N/A // true if port has been closed
430N/A private boolean closed;
430N/A
430N/A // the set of "stale" OVERLAPPED structures. These OVERLAPPED structures
430N/A // relate to I/O operations where the completion notification was not
430N/A // received in a timely manner after the channel is closed.
430N/A private final Set<Long> staleIoSet = new HashSet<Long>();
430N/A
430N/A Iocp(AsynchronousChannelProvider provider, ThreadPool pool)
430N/A throws IOException
430N/A {
430N/A super(provider, pool);
430N/A this.port =
430N/A createIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, fixedThreadCount());
430N/A this.nextCompletionKey = 1;
430N/A }
430N/A
430N/A Iocp start() {
430N/A startThreads(new EventHandlerTask());
430N/A return this;
430N/A }
430N/A
430N/A /*
430N/A * Channels implements this interface support overlapped I/O and can be
430N/A * associated with a completion port.
430N/A */
430N/A static interface OverlappedChannel extends Closeable {
430N/A /**
430N/A * Returns a reference to the pending I/O result.
430N/A */
430N/A <V,A> PendingFuture<V,A> getByOverlapped(long overlapped);
430N/A }
430N/A
430N/A /**
430N/A * Indicates if this operating system supports thread agnostic I/O.
430N/A */
430N/A static boolean supportsThreadAgnosticIo() {
430N/A return supportsThreadAgnosticIo;
430N/A }
430N/A
430N/A // release all resources
430N/A void implClose() {
430N/A synchronized (this) {
430N/A if (closed)
430N/A return;
430N/A closed = true;
430N/A }
430N/A close0(port);
430N/A synchronized (staleIoSet) {
430N/A for (Long ov: staleIoSet) {
430N/A unsafe.freeMemory(ov);
430N/A }
430N/A staleIoSet.clear();
430N/A }
430N/A }
430N/A
430N/A @Override
430N/A boolean isEmpty() {
430N/A keyToChannelLock.writeLock().lock();
430N/A try {
430N/A return keyToChannel.isEmpty();
430N/A } finally {
430N/A keyToChannelLock.writeLock().unlock();
430N/A }
430N/A }
430N/A
430N/A @Override
430N/A final Object attachForeignChannel(final Channel channel, FileDescriptor fdObj)
430N/A throws IOException
430N/A {
430N/A int key = associate(new OverlappedChannel() {
430N/A public <V,A> PendingFuture<V,A> getByOverlapped(long overlapped) {
430N/A return null;
430N/A }
430N/A public void close() throws IOException {
430N/A channel.close();
430N/A }
430N/A }, 0L);
430N/A return Integer.valueOf(key);
430N/A }
430N/A
430N/A @Override
430N/A final void detachForeignChannel(Object key) {
430N/A disassociate((Integer)key);
}
@Override
void closeAllChannels() {
/**
* On Windows the close operation will close the socket/file handle
* and then wait until all outstanding I/O operations have aborted.
* This is necessary as each channel's cache of OVERLAPPED structures
* can only be freed once all I/O operations have completed. As I/O
* completion requires a lookup of the keyToChannel then we must close
* the channels when not holding the write lock.
*/
final int MAX_BATCH_SIZE = 32;
OverlappedChannel channels[] = new OverlappedChannel[MAX_BATCH_SIZE];
int count;
do {
// grab a batch of up to 32 channels
keyToChannelLock.writeLock().lock();
count = 0;
try {
for (Integer key: keyToChannel.keySet()) {
channels[count++] = keyToChannel.get(key);
if (count >= MAX_BATCH_SIZE)
break;
}
} finally {
keyToChannelLock.writeLock().unlock();
}
// close them
for (int i=0; i<count; i++) {
try {
channels[i].close();
} catch (IOException ignore) { }
}
} while (count > 0);
}
private void wakeup() {
try {
postQueuedCompletionStatus(port, 0);
} catch (IOException e) {
// should not happen
throw new AssertionError(e);
}
}
@Override
void executeOnHandlerTask(Runnable task) {
synchronized (this) {
if (closed)
throw new RejectedExecutionException();
offerTask(task);
wakeup();
}
}
@Override
void shutdownHandlerTasks() {
// shutdown all handler threads
int nThreads = threadCount();
while (nThreads-- > 0) {
wakeup();
}
}
/**
* Associate the given handle with this group
*/
int associate(OverlappedChannel ch, long handle) throws IOException {
keyToChannelLock.writeLock().lock();
// generate a completion key (if not shutdown)
int key;
try {
if (isShutdown())
throw new ShutdownChannelGroupException();
// generate unique key
do {
key = nextCompletionKey++;
} while ((key == 0) || keyToChannel.containsKey(key));
// associate with I/O completion port
if (handle != 0L) {
createIoCompletionPort(handle, port, key, 0);
}
// setup mapping
keyToChannel.put(key, ch);
} finally {
keyToChannelLock.writeLock().unlock();
}
return key;
}
/**
* Disassociate channel from the group.
*/
void disassociate(int key) {
boolean checkForShutdown = false;
keyToChannelLock.writeLock().lock();
try {
keyToChannel.remove(key);
// last key to be removed so check if group is shutdown
if (keyToChannel.isEmpty())
checkForShutdown = true;
} finally {
keyToChannelLock.writeLock().unlock();
}
// continue shutdown
if (checkForShutdown && isShutdown()) {
try {
shutdownNow();
} catch (IOException ignore) { }
}
}
/**
* Invoked when a channel associated with this port is closed before
* notifications for all outstanding I/O operations have been received.
*/
void makeStale(Long overlapped) {
synchronized (staleIoSet) {
staleIoSet.add(overlapped);
}
}
/**
* Checks if the given OVERLAPPED is stale and if so, releases it.
*/
private void checkIfStale(long ov) {
synchronized (staleIoSet) {
boolean removed = staleIoSet.remove(ov);
if (removed) {
unsafe.freeMemory(ov);
}
}
}
/**
* The handler for consuming the result of an asynchronous I/O operation.
*/
static interface ResultHandler {
/**
* Invoked if the I/O operation completes successfully.
*/
public void completed(int bytesTransferred, boolean canInvokeDirect);
/**
* Invoked if the I/O operation fails.
*/
public void failed(int error, IOException ioe);
}
// Creates IOException for the given I/O error.
private static IOException translateErrorToIOException(int error) {
String msg = getErrorMessage(error);
if (msg == null)
msg = "Unknown error: 0x0" + Integer.toHexString(error);
return new IOException(msg);
}
/**
* Long-running task servicing system-wide or per-file completion port
*/
private class EventHandlerTask implements Runnable {
public void run() {
Invoker.GroupAndInvokeCount myGroupAndInvokeCount =
Invoker.getGroupAndInvokeCount();
boolean canInvokeDirect = (myGroupAndInvokeCount != null);
CompletionStatus ioResult = new CompletionStatus();
boolean replaceMe = false;
try {
for (;;) {
// reset invoke count
if (myGroupAndInvokeCount != null)
myGroupAndInvokeCount.resetInvokeCount();
// wait for I/O completion event
// A error here is fatal (thread will not be replaced)
replaceMe = false;
try {
getQueuedCompletionStatus(port, ioResult);
} catch (IOException x) {
// should not happen
x.printStackTrace();
return;
}
// handle wakeup to execute task or shutdown
if (ioResult.completionKey() == 0 &&
ioResult.overlapped() == 0L)
{
Runnable task = pollTask();
if (task == null) {
// shutdown request
return;
}
// run task
// (if error/exception then replace thread)
replaceMe = true;
task.run();
continue;
}
// map key to channel
OverlappedChannel ch = null;
keyToChannelLock.readLock().lock();
try {
ch = keyToChannel.get(ioResult.completionKey());
if (ch == null) {
checkIfStale(ioResult.overlapped());
continue;
}
} finally {
keyToChannelLock.readLock().unlock();
}
// lookup I/O request
PendingFuture<?,?> result = ch.getByOverlapped(ioResult.overlapped());
if (result == null) {
// we get here if the OVERLAPPED structure is associated
// with an I/O operation on a channel that was closed
// but the I/O operation event wasn't read in a timely
// manner. Alternatively, it may be related to a
// tryLock operation as the OVERLAPPED structures for
// these operations are not in the I/O cache.
checkIfStale(ioResult.overlapped());
continue;
}
// synchronize on result in case I/O completed immediately
// and was handled by initiator
synchronized (result) {
if (result.isDone()) {
continue;
}
// not handled by initiator
}
// invoke I/O result handler
int error = ioResult.error();
ResultHandler rh = (ResultHandler)result.getContext();
replaceMe = true; // (if error/exception then replace thread)
if (error == 0) {
rh.completed(ioResult.bytesTransferred(), canInvokeDirect);
} else {
rh.failed(error, translateErrorToIOException(error));
}
}
} finally {
// last thread to exit when shutdown releases resources
int remaining = threadExit(this, replaceMe);
if (remaining == 0 && isShutdown()) {
implClose();
}
}
}
}
/**
* Container for data returned by GetQueuedCompletionStatus
*/
private static class CompletionStatus {
private int error;
private int bytesTransferred;
private int completionKey;
private long overlapped;
private CompletionStatus() { }
int error() { return error; }
int bytesTransferred() { return bytesTransferred; }
int completionKey() { return completionKey; }
long overlapped() { return overlapped; }
}
// -- native methods --
private static native void initIDs();
private static native long createIoCompletionPort(long handle,
long existingPort, int completionKey, int concurrency) throws IOException;
private static native void close0(long handle);
private static native void getQueuedCompletionStatus(long completionPort,
CompletionStatus status) throws IOException;
private static native void postQueuedCompletionStatus(long completionPort,
int completionKey) throws IOException;
private static native String getErrorMessage(int error);
static {
Util.load();
initIDs();
// thread agnostic I/O on Vista/2008 or newer
String osversion = AccessController.doPrivileged(
new GetPropertyAction("os.version"));
String vers[] = osversion.split("\\.");
supportsThreadAgnosticIo = Integer.parseInt(vers[0]) >= 6;
}
}