/*
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
/*
*/
/**
* A multi-threaded implementation of Selector for Windows.
*
* @author Konstantin Kladko
* @author Mark Reinhold
*/
// Initial capacity of the poll array
// Maximum number of sockets for select().
// Should be INIT_CAP times a power of 2
// The list of SelectableChannels serviced by this Selector. Every mod
// MAX_SELECTABLE_FDS entry is bogus, to align this array with the poll
// array, where the corresponding entry is occupied by the wakeupSocket
// The global native poll array holds file decriptors and event masks
// The number of valid entries in poll array, including entries occupied
// by wakeup socket handle.
// Number of helper threads needed for select. We need one thread per
// each additional set of MAX_SELECTABLE_FDS - 1 channels.
// A list of helper threads for select.
//Pipe used as a wakeup object.
// File descriptors corresponding to source and sink
// Lock for close cleanup
// Maps file descriptors to their indices in pollArray
}
}
return null;
}
}
// class for fdMap entries
private final static class MapEntry {
}
}
// SubSelector for the main thread
// Lock for interrupt triggering and clearing
private volatile boolean interruptTriggered = false;
super(sp);
// Disable the Nagle algorithm so that the wakeup is more immediate
}
if (channelArray == null)
throw new ClosedSelectorException();
if (interruptTriggered) {
return 0;
}
// Calculate number of helper threads needed for poll. If necessary
// threads are created here and start waiting on startLock
// Wakeup helper threads, waiting on startLock, so they start polling.
// Redundant threads will exit here after wakeup.
// do polling in the main thread. Main thread is responsible for
// first MAX_SELECTABLE_FDS entries in pollArray.
try {
begin();
try {
subSelector.poll();
} catch (IOException e) {
}
// Main thread is out of poll(). Wakeup others and wait for them
} finally {
end();
}
// Done with poll(). Set wakeupSocket to nonsignaled for the next run.
int updated = updateSelectedKeys();
// Done with poll(). Set wakeupSocket to nonsignaled for the next run.
return updated;
}
// Helper threads wait on this lock for the next poll.
private final class StartLock {
// A variable which distinguishes the current run of doSelect from the
// previous one. Incrementing runsCounter and notifying threads will
// trigger another round of poll.
private long runsCounter;
// Triggers threads, waiting on this lock to start polling.
private synchronized void startThreads() {
runsCounter++; // next run
notifyAll(); // wake up threads.
}
// This function is called by a helper thread to wait for the
// next round of poll(). It also checks, if this thread became
// redundant. If yes, it returns true, notifying the thread
// that it should exit.
while (true) {
try {
} catch (InterruptedException e) {
}
}
return true; // will cause run() to exit.
} else {
return false; // will cause run() to poll.
}
}
}
}
// Main thread waits on this lock, until all helper threads are done
// with poll().
private final class FinishLock {
// Number of helper threads, that did not finish yet.
private int threadsToFinish;
// IOException which occured during the last run.
// Called before polling.
private void reset() {
}
// Each helper thread invokes this function on finishLock, when
// the thread is done with poll().
private synchronized void threadFinished() {
// if finished first, wakeup others
wakeup();
}
notify(); // notify the main thread
}
// The main thread invokes this function on finishLock to wait
// for helper threads to finish poll().
private synchronized void waitForHelperThreads() {
// no helper threads finished yet. Wakeup them up.
wakeup();
}
while (threadsToFinish != 0) {
try {
finishLock.wait();
} catch (InterruptedException e) {
// Interrupted - set interrupted state.
}
}
}
// sets IOException for this run
exception = e;
}
// Checks if there was any exception during the last run.
// If yes, throws it
return;
" during the execution of select(): \n");
}
}
private final class SubSelector {
// These arrays will hold result of native select().
// The first element of each array is the number of selected sockets.
// Other elements are file descriptors of selected sockets.
private SubSelector() {
}
}
}
// poll for helper threads
}
int numKeysUpdated = 0;
false);
false);
true);
return numKeysUpdated;
}
/**
* Note, clearedCount is used to determine if the readyOps have
* been reset in this select operation. updateCount is used to
* tell if a key has been counted as updated in this select
* operation.
*
* me.updateCount <= me.clearedCount <= updateCount
*/
boolean isExceptFds)
{
int numKeysUpdated = 0;
if (desc == wakeupSourceFd) {
synchronized (interruptLock) {
interruptTriggered = true;
}
continue;
}
// If me is null, the key was deregistered in the previous
// processDeregisterQueue.
continue;
// The descriptor may be in the exceptfds set because there is
// OOB data queued to the socket. If there is OOB data then it
// is discarded and the key is not added to the selected set.
if (isExceptFds &&
{
continue;
}
}
} else { // The readyOps have been set; now add
}
}
} else { // Key is not in selected set yet
}
} else { // The readyOps have been set; now add
}
}
}
}
return numKeysUpdated;
}
}
// Represents a helper thread used for select.
private volatile boolean zombie;
// Creates a new thread
private SelectThread(int i) {
this.index = i;
this.subSelector = new SubSelector(i);
//make sure we wait for next round of poll
}
void makeZombie() {
zombie = true;
}
boolean isZombie() {
return zombie;
}
public void run() {
while (true) { // poll loop
// wait for the start of poll. If this thread has become
// redundant, then exit.
if (startLock.waitForStart(this))
return;
// call poll()
try {
} catch (IOException e) {
// Save this exception and let other threads finish.
}
// notify main thread, that this thread has finished, and
// wakeup others, if this thread is the first to finish.
}
}
}
// After some channels registered/deregistered, the number of required
// helper threads may have changed. Adjust this number.
private void adjustThreadsCount() {
// More threads needed. Start more threads.
}
// Some threads become redundant. Remove them from the threads List.
}
}
// Sets Windows wakeup socket to a signaled state.
private void setWakeupSocket() {
}
// Sets Windows wakeup socket to a non-signaled state.
private void resetWakeupSocket() {
synchronized (interruptLock) {
if (interruptTriggered == false)
return;
interruptTriggered = false;
}
}
// We increment this counter on each call to updateSelectedKeys()
// each entry in SubSelector.fdsMap has a memorized value of
// updateCount. When we increment numKeysUpdated we set updateCount
// for the corresponding entry to its current value. This is used to
// avoid counting the same key more than once - the same key can
// appear in readfds and writefds.
// Update ops of the corresponding Channels. Add the ready keys to the
// ready queue.
private int updateSelectedKeys() {
updateCount++;
int numKeysUpdated = 0;
for (SelectThread t: threads) {
}
return numKeysUpdated;
}
synchronized (closeLock) {
if (channelArray != null) {
if (pollWrapper != null) {
// prevent further wakeup
synchronized (interruptLock) {
interruptTriggered = true;
}
deregister(channelArray[i]);
}
}
pollWrapper.free();
pollWrapper = null;
selectedKeys = null;
channelArray = null;
// Make all remaining helper threads exit
for (SelectThread t: threads)
t.makeZombie();
}
}
}
}
synchronized (closeLock) {
if (pollWrapper == null)
throw new ClosedSelectorException();
growIfNeeded();
}
}
private void growIfNeeded() {
channelArray = temp;
}
threadsCount++;
}
}
assert (i >= 0);
synchronized (closeLock) {
if (i != totalChannels - 1) {
// Copy end one over it
channelArray[i] = endChannel;
endChannel.setIndex(i);
pollWrapper, i);
}
}
threadsCount--; // The last thread has become redundant.
}
}
synchronized (closeLock) {
if (pollWrapper == null)
throw new ClosedSelectorException();
// make sure this sk has not been removed yet
if (index == -1)
throw new CancelledKeyException();
}
}
synchronized (interruptLock) {
if (!interruptTriggered) {
interruptTriggered = true;
}
}
return this;
}
static {
}
}