/*
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
/**
* AsynchronousChannelGroup implementation based on the Linux epoll facility.
*/
final class EPollPort
extends Port
{
// maximum number of events to poll at a time
// errors
// epoll file descriptor
private final int epfd;
// true if epoll closed
private boolean closed;
// socket pair used for wakeup
private final int sp[];
// number of wakeups pending
// address of the poll array passed to epoll_wait
private final long address;
// encapsulates an event for a channel
static class Event {
final int events;
}
}
// queue of events for cases that a polling thread dequeues more than one
// event
throws IOException
{
// open epoll
this.epfd = epollCreate();
// create socket pair for wakeup mechanism
int[] sv = new int[2];
try {
socketpair(sv);
// register one end with epoll
} catch (IOException x) {
throw x;
}
// allocate the poll array
// create the queue and offer the special event to ensure that the first
// threads polls
}
startThreads(new EventHandlerTask());
return this;
}
/**
* Release all resources
*/
private void implClose() {
synchronized (this) {
if (closed)
return;
closed = true;
}
}
private void wakeup() {
// write byte to socketpair to force wakeup
try {
} catch (IOException x) {
throw new AssertionError(x);
}
}
}
synchronized (this) {
if (closed)
throw new RejectedExecutionException();
wakeup();
}
}
void shutdownHandlerTasks() {
/*
* If no tasks are running then just release resources; otherwise
* write to the one end of the socketpair to wakeup any polling threads.
*/
int nThreads = threadCount();
if (nThreads == 0) {
implClose();
} else {
// send interrupt to each thread
while (nThreads-- > 0) {
wakeup();
}
}
}
// invoke by clients to register a file descriptor
// update events (or add to epoll on first usage)
if (err != 0)
throw new AssertionError(); // should not happen
}
/*
* Task to process events from epoll and dispatch to the channel's
* onEvent handler.
*
* Events are retreived from epoll in batch and offered to a BlockingQueue
* where they are consumed by handler threads. A special "NEED_TO_POLL"
* event is used to signal one consumer to re-poll when all events have
* been consumed.
*/
try {
for (;;) {
/*
* 'n' events have been read. Here we map them to their
* corresponding channel in batch and queue n-1 so that
* they can be handled by other handler threads. The last
* event is handled by this thread (and so is not queued).
*/
try {
while (n-- > 0) {
// wakeup
// no more wakeups so drain pipe
}
// queue special event if there are more events
// to handle.
if (n > 0) {
continue;
}
return EXECUTE_TASK_OR_SHUTDOWN;
}
// n-1 events are queued; This thread handles
// the last one except for the wakeup
if (n > 0) {
} else {
return ev;
}
}
}
} finally {
}
}
} finally {
// to ensure that some thread will poll when all events have
// been consumed
}
}
public void run() {
boolean replaceMe = false;
try {
for (;;) {
// reset invoke count
if (isPooledThread)
try {
replaceMe = false;
// no events and this thread has been "selected" to
// poll for more.
if (ev == NEED_TO_POLL) {
try {
} catch (IOException x) {
x.printStackTrace();
return;
}
}
} catch (InterruptedException x) {
continue;
}
// handle wakeup to execute task or shutdown
if (ev == EXECUTE_TASK_OR_SHUTDOWN) {
// shutdown request
return;
}
replaceMe = true;
continue;
}
// process event
try {
} catch (Error x) {
replaceMe = true; throw x;
} catch (RuntimeException x) {
replaceMe = true; throw x;
}
}
} finally {
// last handler to exit when shutdown releases resources
implClose();
}
}
}
}
// -- Native methods --
static {
}
}