4700N/A/*
4700N/A * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
4700N/A * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4700N/A *
4700N/A * This code is free software; you can redistribute it and/or modify it
4700N/A * under the terms of the GNU General Public License version 2 only, as
4700N/A * published by the Free Software Foundation. Oracle designates this
4700N/A * particular file as subject to the "Classpath" exception as provided
4700N/A * by Oracle in the LICENSE file that accompanied this code.
4700N/A *
4700N/A * This code is distributed in the hope that it will be useful, but WITHOUT
4700N/A * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
4700N/A * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
4700N/A * version 2 for more details (a copy is included in the LICENSE file that
4700N/A * accompanied this code).
4700N/A *
4700N/A * You should have received a copy of the GNU General Public License version
4700N/A * 2 along with this work; if not, write to the Free Software Foundation,
4700N/A * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
4700N/A *
4700N/A * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
4700N/A * or visit www.oracle.com if you need additional information or have any
4700N/A * questions.
4700N/A */
4700N/A
4700N/Apackage sun.nio.ch;
4700N/A
4700N/Aimport java.nio.channels.spi.AsynchronousChannelProvider;
4700N/Aimport java.io.IOException;
4700N/Aimport java.util.concurrent.ArrayBlockingQueue;
4700N/Aimport java.util.concurrent.RejectedExecutionException;
4700N/Aimport java.util.concurrent.atomic.AtomicInteger;
4700N/Aimport static sun.nio.ch.KQueue.*;
4700N/A
4700N/A/**
4700N/A * AsynchronousChannelGroup implementation based on the BSD kqueue facility.
4700N/A */
4700N/A
4700N/Afinal class KQueuePort
4700N/A extends Port
4700N/A{
4700N/A // maximum number of events to poll at a time
4700N/A private static final int MAX_KEVENTS_TO_POLL = 512;
4700N/A
4700N/A // kqueue file descriptor
4700N/A private final int kqfd;
4700N/A
4700N/A // true if kqueue closed
4700N/A private boolean closed;
4700N/A
4700N/A // socket pair used for wakeup
4700N/A private final int sp[];
4700N/A
4700N/A // number of wakeups pending
4700N/A private final AtomicInteger wakeupCount = new AtomicInteger();
4700N/A
4700N/A // address of the poll array passed to kqueue_wait
4700N/A private final long address;
4700N/A
4700N/A // encapsulates an event for a channel
4700N/A static class Event {
4700N/A final PollableChannel channel;
4700N/A final int events;
4700N/A
4700N/A Event(PollableChannel channel, int events) {
4700N/A this.channel = channel;
4700N/A this.events = events;
4700N/A }
4700N/A
4700N/A PollableChannel channel() { return channel; }
4700N/A int events() { return events; }
4700N/A }
4700N/A
4700N/A // queue of events for cases that a polling thread dequeues more than one
4700N/A // event
4700N/A private final ArrayBlockingQueue<Event> queue;
4700N/A private final Event NEED_TO_POLL = new Event(null, 0);
4700N/A private final Event EXECUTE_TASK_OR_SHUTDOWN = new Event(null, 0);
4700N/A
4700N/A KQueuePort(AsynchronousChannelProvider provider, ThreadPool pool)
4700N/A throws IOException
4700N/A {
4700N/A super(provider, pool);
4700N/A
4700N/A // open kqueue
4700N/A this.kqfd = kqueue();
4700N/A
4700N/A // create socket pair for wakeup mechanism
4700N/A int[] sv = new int[2];
4700N/A try {
4700N/A socketpair(sv);
4700N/A
4700N/A // register one end with kqueue
4700N/A keventRegister(kqfd, sv[0], EVFILT_READ, EV_ADD);
4700N/A } catch (IOException x) {
4700N/A close0(kqfd);
4700N/A throw x;
4700N/A }
4700N/A this.sp = sv;
4700N/A
4700N/A // allocate the poll array
4700N/A this.address = allocatePollArray(MAX_KEVENTS_TO_POLL);
4700N/A
4700N/A // create the queue and offer the special event to ensure that the first
4700N/A // threads polls
4700N/A this.queue = new ArrayBlockingQueue<Event>(MAX_KEVENTS_TO_POLL);
4700N/A this.queue.offer(NEED_TO_POLL);
4700N/A }
4700N/A
4700N/A KQueuePort start() {
4700N/A startThreads(new EventHandlerTask());
4700N/A return this;
4700N/A }
4700N/A
4700N/A /**
4700N/A * Release all resources
4700N/A */
4700N/A private void implClose() {
4700N/A synchronized (this) {
4700N/A if (closed)
4700N/A return;
4700N/A closed = true;
4700N/A }
4700N/A freePollArray(address);
4700N/A close0(sp[0]);
4700N/A close0(sp[1]);
4700N/A close0(kqfd);
4700N/A }
4700N/A
4700N/A private void wakeup() {
4700N/A if (wakeupCount.incrementAndGet() == 1) {
4700N/A // write byte to socketpair to force wakeup
4700N/A try {
4700N/A interrupt(sp[1]);
4700N/A } catch (IOException x) {
4700N/A throw new AssertionError(x);
4700N/A }
4700N/A }
4700N/A }
4700N/A
4700N/A @Override
4700N/A void executeOnHandlerTask(Runnable task) {
4700N/A synchronized (this) {
4700N/A if (closed)
4700N/A throw new RejectedExecutionException();
4700N/A offerTask(task);
4700N/A wakeup();
4700N/A }
4700N/A }
4700N/A
4700N/A @Override
4700N/A void shutdownHandlerTasks() {
4700N/A /*
4700N/A * If no tasks are running then just release resources; otherwise
4700N/A * write to the one end of the socketpair to wakeup any polling threads.
4700N/A */
4700N/A int nThreads = threadCount();
4700N/A if (nThreads == 0) {
4700N/A implClose();
4700N/A } else {
4700N/A // send interrupt to each thread
4700N/A while (nThreads-- > 0) {
4700N/A wakeup();
4700N/A }
4700N/A }
4700N/A }
4700N/A
4700N/A // invoked by clients to register a file descriptor
4700N/A @Override
4700N/A void startPoll(int fd, int events) {
4700N/A // We use a separate filter for read and write events.
4700N/A // TBD: Measure cost of EV_ONESHOT vs. EV_CLEAR, either will do here.
4700N/A int err = 0;
4700N/A int flags = (EV_ADD|EV_ONESHOT);
4700N/A if ((events & Port.POLLIN) > 0)
4700N/A err = keventRegister(kqfd, fd, EVFILT_READ, flags);
4700N/A if (err == 0 && (events & Port.POLLOUT) > 0)
4700N/A err = keventRegister(kqfd, fd, EVFILT_WRITE, flags);
4700N/A if (err != 0)
4700N/A throw new InternalError("kevent failed: " + err); // should not happen
4700N/A }
4700N/A
4700N/A /*
4700N/A * Task to process events from kqueue and dispatch to the channel's
4700N/A * onEvent handler.
4700N/A *
4700N/A * Events are retreived from kqueue in batch and offered to a BlockingQueue
4700N/A * where they are consumed by handler threads. A special "NEED_TO_POLL"
4700N/A * event is used to signal one consumer to re-poll when all events have
4700N/A * been consumed.
4700N/A */
4700N/A private class EventHandlerTask implements Runnable {
4700N/A private Event poll() throws IOException {
4700N/A try {
4700N/A for (;;) {
4700N/A int n = keventPoll(kqfd, address, MAX_KEVENTS_TO_POLL);
4700N/A /*
4700N/A * 'n' events have been read. Here we map them to their
4700N/A * corresponding channel in batch and queue n-1 so that
4700N/A * they can be handled by other handler threads. The last
4700N/A * event is handled by this thread (and so is not queued).
4700N/A */
4700N/A fdToChannelLock.readLock().lock();
4700N/A try {
4700N/A while (n-- > 0) {
4700N/A long keventAddress = getEvent(address, n);
4700N/A int fd = getDescriptor(keventAddress);
4700N/A
4700N/A // wakeup
4700N/A if (fd == sp[0]) {
4700N/A if (wakeupCount.decrementAndGet() == 0) {
4700N/A // no more wakeups so drain pipe
4700N/A drain1(sp[0]);
4700N/A }
4700N/A
4700N/A // queue special event if there are more events
4700N/A // to handle.
4700N/A if (n > 0) {
4700N/A queue.offer(EXECUTE_TASK_OR_SHUTDOWN);
4700N/A continue;
4700N/A }
4700N/A return EXECUTE_TASK_OR_SHUTDOWN;
4700N/A }
4700N/A
4700N/A PollableChannel channel = fdToChannel.get(fd);
4700N/A if (channel != null) {
4700N/A int filter = getFilter(keventAddress);
4700N/A int events = 0;
4700N/A if (filter == EVFILT_READ)
4700N/A events = Port.POLLIN;
4700N/A else if (filter == EVFILT_WRITE)
4700N/A events = Port.POLLOUT;
4700N/A
4700N/A Event ev = new Event(channel, events);
4700N/A
4700N/A // n-1 events are queued; This thread handles
4700N/A // the last one except for the wakeup
4700N/A if (n > 0) {
4700N/A queue.offer(ev);
4700N/A } else {
4700N/A return ev;
4700N/A }
4700N/A }
4700N/A }
4700N/A } finally {
4700N/A fdToChannelLock.readLock().unlock();
4700N/A }
4700N/A }
4700N/A } finally {
4700N/A // to ensure that some thread will poll when all events have
4700N/A // been consumed
4700N/A queue.offer(NEED_TO_POLL);
4700N/A }
4700N/A }
4700N/A
4700N/A public void run() {
4700N/A Invoker.GroupAndInvokeCount myGroupAndInvokeCount =
4700N/A Invoker.getGroupAndInvokeCount();
4700N/A final boolean isPooledThread = (myGroupAndInvokeCount != null);
4700N/A boolean replaceMe = false;
4700N/A Event ev;
4700N/A try {
4700N/A for (;;) {
4700N/A // reset invoke count
4700N/A if (isPooledThread)
4700N/A myGroupAndInvokeCount.resetInvokeCount();
4700N/A
4700N/A try {
4700N/A replaceMe = false;
4700N/A ev = queue.take();
4700N/A
4700N/A // no events and this thread has been "selected" to
4700N/A // poll for more.
4700N/A if (ev == NEED_TO_POLL) {
4700N/A try {
4700N/A ev = poll();
4700N/A } catch (IOException x) {
4700N/A x.printStackTrace();
4700N/A return;
4700N/A }
4700N/A }
4700N/A } catch (InterruptedException x) {
4700N/A continue;
4700N/A }
4700N/A
4700N/A // handle wakeup to execute task or shutdown
4700N/A if (ev == EXECUTE_TASK_OR_SHUTDOWN) {
4700N/A Runnable task = pollTask();
4700N/A if (task == null) {
4700N/A // shutdown request
4700N/A return;
4700N/A }
4700N/A // run task (may throw error/exception)
4700N/A replaceMe = true;
4700N/A task.run();
4700N/A continue;
4700N/A }
4700N/A
4700N/A // process event
4700N/A try {
4700N/A ev.channel().onEvent(ev.events(), isPooledThread);
4700N/A } catch (Error x) {
4700N/A replaceMe = true; throw x;
4700N/A } catch (RuntimeException x) {
4700N/A replaceMe = true; throw x;
4700N/A }
4700N/A }
4700N/A } finally {
4700N/A // last handler to exit when shutdown releases resources
4700N/A int remaining = threadExit(this, replaceMe);
4700N/A if (remaining == 0 && isShutdown()) {
4700N/A implClose();
4700N/A }
4700N/A }
4700N/A }
4700N/A }
4700N/A
4700N/A // -- Native methods --
4700N/A
4700N/A private static native void socketpair(int[] sv) throws IOException;
4700N/A
4700N/A private static native void interrupt(int fd) throws IOException;
4700N/A
4700N/A private static native void drain1(int fd) throws IOException;
4700N/A
4700N/A private static native void close0(int fd);
4700N/A
4700N/A static {
4700N/A Util.load();
4700N/A }
4700N/A}