893N/A/*
2362N/A * Copyright (c) 2008, 2009, Oracle and/or its affiliates. All rights reserved.
893N/A * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
893N/A *
893N/A * This code is free software; you can redistribute it and/or modify it
893N/A * under the terms of the GNU General Public License version 2 only, as
2362N/A * published by the Free Software Foundation. Oracle designates this
893N/A * particular file as subject to the "Classpath" exception as provided
2362N/A * by Oracle in the LICENSE file that accompanied this code.
893N/A *
893N/A * This code is distributed in the hope that it will be useful, but WITHOUT
893N/A * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
893N/A * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
893N/A * version 2 for more details (a copy is included in the LICENSE file that
893N/A * accompanied this code).
893N/A *
893N/A * You should have received a copy of the GNU General Public License version
893N/A * 2 along with this work; if not, write to the Free Software Foundation,
893N/A * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
893N/A *
2362N/A * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
2362N/A * or visit www.oracle.com if you need additional information or have any
2362N/A * questions.
893N/A */
893N/A
893N/Apackage sun.nio.ch;
893N/A
893N/Aimport java.nio.channels.spi.AsynchronousChannelProvider;
893N/Aimport java.io.IOException;
893N/Aimport java.util.concurrent.ArrayBlockingQueue;
893N/Aimport java.util.concurrent.RejectedExecutionException;
893N/Aimport java.util.concurrent.atomic.AtomicInteger;
893N/Aimport static sun.nio.ch.EPoll.*;
893N/A
893N/A/**
893N/A * AsynchronousChannelGroup implementation based on the Linux epoll facility.
893N/A */
893N/A
893N/Afinal class EPollPort
893N/A extends Port
893N/A{
893N/A // maximum number of events to poll at a time
893N/A private static final int MAX_EPOLL_EVENTS = 512;
893N/A
893N/A // errors
893N/A private static final int ENOENT = 2;
893N/A
893N/A // epoll file descriptor
893N/A private final int epfd;
893N/A
893N/A // true if epoll closed
893N/A private boolean closed;
893N/A
893N/A // socket pair used for wakeup
893N/A private final int sp[];
893N/A
893N/A // number of wakeups pending
893N/A private final AtomicInteger wakeupCount = new AtomicInteger();
893N/A
893N/A // address of the poll array passed to epoll_wait
893N/A private final long address;
893N/A
893N/A // encapsulates an event for a channel
893N/A static class Event {
893N/A final PollableChannel channel;
893N/A final int events;
893N/A
893N/A Event(PollableChannel channel, int events) {
893N/A this.channel = channel;
893N/A this.events = events;
893N/A }
893N/A
893N/A PollableChannel channel() { return channel; }
893N/A int events() { return events; }
893N/A }
893N/A
893N/A // queue of events for cases that a polling thread dequeues more than one
893N/A // event
893N/A private final ArrayBlockingQueue<Event> queue;
893N/A private final Event NEED_TO_POLL = new Event(null, 0);
893N/A private final Event EXECUTE_TASK_OR_SHUTDOWN = new Event(null, 0);
893N/A
893N/A EPollPort(AsynchronousChannelProvider provider, ThreadPool pool)
893N/A throws IOException
893N/A {
893N/A super(provider, pool);
893N/A
893N/A // open epoll
893N/A this.epfd = epollCreate();
893N/A
893N/A // create socket pair for wakeup mechanism
893N/A int[] sv = new int[2];
893N/A try {
893N/A socketpair(sv);
893N/A // register one end with epoll
893N/A epollCtl(epfd, EPOLL_CTL_ADD, sv[0], POLLIN);
893N/A } catch (IOException x) {
893N/A close0(epfd);
893N/A throw x;
893N/A }
893N/A this.sp = sv;
893N/A
893N/A // allocate the poll array
893N/A this.address = allocatePollArray(MAX_EPOLL_EVENTS);
893N/A
893N/A // create the queue and offer the special event to ensure that the first
893N/A // threads polls
893N/A this.queue = new ArrayBlockingQueue<Event>(MAX_EPOLL_EVENTS);
893N/A this.queue.offer(NEED_TO_POLL);
893N/A }
893N/A
893N/A EPollPort start() {
893N/A startThreads(new EventHandlerTask());
893N/A return this;
893N/A }
893N/A
893N/A /**
893N/A * Release all resources
893N/A */
893N/A private void implClose() {
893N/A synchronized (this) {
893N/A if (closed)
893N/A return;
893N/A closed = true;
893N/A }
893N/A freePollArray(address);
893N/A close0(sp[0]);
893N/A close0(sp[1]);
893N/A close0(epfd);
893N/A }
893N/A
893N/A private void wakeup() {
893N/A if (wakeupCount.incrementAndGet() == 1) {
893N/A // write byte to socketpair to force wakeup
893N/A try {
893N/A interrupt(sp[1]);
893N/A } catch (IOException x) {
893N/A throw new AssertionError(x);
893N/A }
893N/A }
893N/A }
893N/A
893N/A @Override
893N/A void executeOnHandlerTask(Runnable task) {
893N/A synchronized (this) {
893N/A if (closed)
893N/A throw new RejectedExecutionException();
893N/A offerTask(task);
893N/A wakeup();
893N/A }
893N/A }
893N/A
893N/A @Override
893N/A void shutdownHandlerTasks() {
893N/A /*
893N/A * If no tasks are running then just release resources; otherwise
893N/A * write to the one end of the socketpair to wakeup any polling threads.
893N/A */
893N/A int nThreads = threadCount();
893N/A if (nThreads == 0) {
893N/A implClose();
893N/A } else {
893N/A // send interrupt to each thread
893N/A while (nThreads-- > 0) {
893N/A wakeup();
893N/A }
893N/A }
893N/A }
893N/A
893N/A // invoke by clients to register a file descriptor
893N/A @Override
893N/A void startPoll(int fd, int events) {
893N/A // update events (or add to epoll on first usage)
893N/A int err = epollCtl(epfd, EPOLL_CTL_MOD, fd, (events | EPOLLONESHOT));
893N/A if (err == ENOENT)
893N/A err = epollCtl(epfd, EPOLL_CTL_ADD, fd, (events | EPOLLONESHOT));
893N/A if (err != 0)
893N/A throw new AssertionError(); // should not happen
893N/A }
893N/A
893N/A /*
893N/A * Task to process events from epoll and dispatch to the channel's
893N/A * onEvent handler.
893N/A *
893N/A * Events are retreived from epoll in batch and offered to a BlockingQueue
893N/A * where they are consumed by handler threads. A special "NEED_TO_POLL"
893N/A * event is used to signal one consumer to re-poll when all events have
893N/A * been consumed.
893N/A */
893N/A private class EventHandlerTask implements Runnable {
893N/A private Event poll() throws IOException {
893N/A try {
893N/A for (;;) {
893N/A int n = epollWait(epfd, address, MAX_EPOLL_EVENTS);
893N/A /*
893N/A * 'n' events have been read. Here we map them to their
893N/A * corresponding channel in batch and queue n-1 so that
893N/A * they can be handled by other handler threads. The last
893N/A * event is handled by this thread (and so is not queued).
893N/A */
893N/A fdToChannelLock.readLock().lock();
893N/A try {
893N/A while (n-- > 0) {
893N/A long eventAddress = getEvent(address, n);
893N/A int fd = getDescriptor(eventAddress);
893N/A
893N/A // wakeup
893N/A if (fd == sp[0]) {
893N/A if (wakeupCount.decrementAndGet() == 0) {
893N/A // no more wakeups so drain pipe
893N/A drain1(sp[0]);
893N/A }
893N/A
893N/A // queue special event if there are more events
893N/A // to handle.
893N/A if (n > 0) {
893N/A queue.offer(EXECUTE_TASK_OR_SHUTDOWN);
893N/A continue;
893N/A }
893N/A return EXECUTE_TASK_OR_SHUTDOWN;
893N/A }
893N/A
893N/A PollableChannel channel = fdToChannel.get(fd);
893N/A if (channel != null) {
893N/A int events = getEvents(eventAddress);
893N/A Event ev = new Event(channel, events);
893N/A
893N/A // n-1 events are queued; This thread handles
893N/A // the last one except for the wakeup
893N/A if (n > 0) {
893N/A queue.offer(ev);
893N/A } else {
893N/A return ev;
893N/A }
893N/A }
893N/A }
893N/A } finally {
893N/A fdToChannelLock.readLock().unlock();
893N/A }
893N/A }
893N/A } finally {
893N/A // to ensure that some thread will poll when all events have
893N/A // been consumed
893N/A queue.offer(NEED_TO_POLL);
893N/A }
893N/A }
893N/A
893N/A public void run() {
893N/A Invoker.GroupAndInvokeCount myGroupAndInvokeCount =
893N/A Invoker.getGroupAndInvokeCount();
1580N/A final boolean isPooledThread = (myGroupAndInvokeCount != null);
893N/A boolean replaceMe = false;
893N/A Event ev;
893N/A try {
893N/A for (;;) {
893N/A // reset invoke count
1580N/A if (isPooledThread)
893N/A myGroupAndInvokeCount.resetInvokeCount();
893N/A
893N/A try {
893N/A replaceMe = false;
893N/A ev = queue.take();
893N/A
893N/A // no events and this thread has been "selected" to
893N/A // poll for more.
893N/A if (ev == NEED_TO_POLL) {
893N/A try {
893N/A ev = poll();
893N/A } catch (IOException x) {
893N/A x.printStackTrace();
893N/A return;
893N/A }
893N/A }
893N/A } catch (InterruptedException x) {
893N/A continue;
893N/A }
893N/A
893N/A // handle wakeup to execute task or shutdown
893N/A if (ev == EXECUTE_TASK_OR_SHUTDOWN) {
893N/A Runnable task = pollTask();
893N/A if (task == null) {
893N/A // shutdown request
893N/A return;
893N/A }
893N/A // run task (may throw error/exception)
893N/A replaceMe = true;
893N/A task.run();
893N/A continue;
893N/A }
893N/A
893N/A // process event
893N/A try {
1580N/A ev.channel().onEvent(ev.events(), isPooledThread);
893N/A } catch (Error x) {
893N/A replaceMe = true; throw x;
893N/A } catch (RuntimeException x) {
893N/A replaceMe = true; throw x;
893N/A }
893N/A }
893N/A } finally {
893N/A // last handler to exit when shutdown releases resources
893N/A int remaining = threadExit(this, replaceMe);
893N/A if (remaining == 0 && isShutdown()) {
893N/A implClose();
893N/A }
893N/A }
893N/A }
893N/A }
893N/A
893N/A // -- Native methods --
893N/A
893N/A private static native void socketpair(int[] sv) throws IOException;
893N/A
893N/A private static native void interrupt(int fd) throws IOException;
893N/A
893N/A private static native void drain1(int fd) throws IOException;
893N/A
893N/A private static native void close0(int fd);
893N/A
893N/A static {
893N/A Util.load();
893N/A }
893N/A}