1817N/A/*
1817N/A * Copyright 2009 Google Inc. All Rights Reserved.
1817N/A * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
1817N/A *
1817N/A * This code is free software; you can redistribute it and/or modify it
1817N/A * under the terms of the GNU General Public License version 2 only, as
1817N/A * published by the Free Software Foundation.
1817N/A *
1817N/A * This code is distributed in the hope that it will be useful, but WITHOUT
1817N/A * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
1817N/A * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
1817N/A * version 2 for more details (a copy is included in the LICENSE file that
1817N/A * accompanied this code).
1817N/A *
1817N/A * You should have received a copy of the GNU General Public License version
1817N/A * 2 along with this work; if not, write to the Free Software Foundation,
1817N/A * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
1817N/A *
2362N/A * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
2362N/A * or visit www.oracle.com if you need additional information or have any
2362N/A * questions.
1817N/A */
1817N/A
1817N/Aimport java.net.InetSocketAddress;
1817N/Aimport java.net.SocketAddress;
1817N/Aimport java.nio.channels.SelectionKey;
1817N/Aimport java.nio.channels.Selector;
1817N/Aimport java.nio.channels.ServerSocketChannel;
1817N/Aimport java.nio.channels.SocketChannel;
1817N/Aimport java.util.ArrayList;
1817N/Aimport java.util.Iterator;
1817N/Aimport java.util.List;
1817N/A
1817N/A/**
1817N/A * Reproduces O(N^2) behavior of JDK6/7 select() call. This happens when
1817N/A * a selector has many unprocessed updates to its interest set (e.g. adding
1817N/A * OP_READ on a bunch of newly accepted sockets). The O(N^2) is triggered
1817N/A * by cancelling a number of selection keys (or just closing a few sockets).
1817N/A * In this case, select() will first go through the list of cancelled keys
1817N/A * and try to deregister them. That deregistration is O(N^2) over the list
1817N/A * of unprocessed updates to the interest set.
1817N/A *
1817N/A * <p> This O(N^2) behavior is a BUG in JVM and should be fixed.
1817N/A *
1817N/A * <p> The test first creates initCount connections, and adds them
1817N/A * to the server epoll set. It then creates massCount connections,
1817N/A * registers interest (causing updateList to be populated with massCount*2
1817N/A * elements), but does not add them to epoll set (that would've cleared
1817N/A * updateList). The test then closes initCount connections, thus populating
1817N/A * deregistration queue. The subsequent call to selectNow() will first process
1817N/A * deregistration queue, performing O(N^2) over updateList size,
1817N/A * equal to massCount * 2.
1817N/A *
1817N/A * <p> Note that connect rate is artificially slowed down to compensate
1817N/A * for what I believe is a Linux bug, where too high of a connection rate
1817N/A * ends up in SYN's being dropped and then slow retransmits.
1817N/A *
1817N/A * @author Igor Chernyshev
1817N/A */
1817N/Apublic class LotsOfCancels {
1817N/A
1817N/A static long testStartTime;
1817N/A
1817N/A public static void main(String[] args) throws Exception {
1817N/A // the final select should run in less than 1000ms.
1817N/A runTest(500, 2700, 1000);
1817N/A }
1817N/A
1817N/A static void log(String msg) {
1817N/A System.out.println(getLogPrefix() + msg);
1817N/A }
1817N/A
1817N/A static String getLogPrefix() {
1817N/A return durationMillis(testStartTime) + ": ";
1817N/A }
1817N/A
1817N/A /**
1817N/A * Returns the elapsed time since startNanos, in milliseconds.
1817N/A * @param startNanos the start time; this must be a value returned
1817N/A * by {@link System.nanoTime}
1817N/A */
1817N/A static long durationMillis(long startNanos) {
1817N/A return (System.nanoTime() - startNanos) / (1000L * 1000L);
1817N/A }
1817N/A
1817N/A static void runTest(int initCount, int massCount, int maxSelectTime)
1817N/A throws Exception {
1817N/A testStartTime = System.nanoTime();
1817N/A
1817N/A InetSocketAddress address = new InetSocketAddress("127.0.0.1", 7359);
1817N/A
1817N/A // Create server channel, add it to selector and run epoll_ctl.
1817N/A log("Setting up server");
1817N/A Selector serverSelector = Selector.open();
1817N/A ServerSocketChannel server = ServerSocketChannel.open();
1817N/A server.configureBlocking(false);
1817N/A server.socket().bind(address, 5000);
1817N/A server.register(serverSelector, SelectionKey.OP_ACCEPT);
1817N/A serverSelector.selectNow();
1817N/A
1817N/A log("Setting up client");
1817N/A ClientThread client = new ClientThread(address);
1817N/A client.start();
1817N/A Thread.sleep(100);
1817N/A
1817N/A // Set up initial set of client sockets.
1817N/A log("Starting initial client connections");
1817N/A client.connectClients(initCount);
1817N/A Thread.sleep(500); // Wait for client connections to arrive
1817N/A
1817N/A // Accept all initial client sockets, add to selector and run
1817N/A // epoll_ctl.
1817N/A log("Accepting initial connections");
1817N/A List<SocketChannel> serverChannels1 =
1817N/A acceptAndAddAll(serverSelector, server, initCount);
1817N/A if (serverChannels1.size() != initCount) {
1817N/A throw new Exception("Accepted " + serverChannels1.size() +
1817N/A " instead of " + initCount);
1817N/A }
1817N/A serverSelector.selectNow();
1817N/A
1817N/A // Set up mass set of client sockets.
1817N/A log("Requesting mass client connections");
1817N/A client.connectClients(massCount);
1817N/A Thread.sleep(500); // Wait for client connections to arrive
1817N/A
1817N/A // Accept all mass client sockets, add to selector and do NOT
1817N/A // run epoll_ctl.
1817N/A log("Accepting mass connections");
1817N/A List<SocketChannel> serverChannels2 =
1817N/A acceptAndAddAll(serverSelector, server, massCount);
1817N/A if (serverChannels2.size() != massCount) {
1817N/A throw new Exception("Accepted " + serverChannels2.size() +
1817N/A " instead of " + massCount);
1817N/A }
1817N/A
1817N/A // Close initial set of sockets.
1817N/A log("Closing initial connections");
1817N/A closeAll(serverChannels1);
1817N/A
1817N/A // Now get the timing of select() call.
1817N/A log("Running the final select call");
1817N/A long startTime = System.nanoTime();
1817N/A serverSelector.selectNow();
1817N/A long duration = durationMillis(startTime);
1817N/A log("Init count = " + initCount +
1817N/A ", mass count = " + massCount +
1817N/A ", duration = " + duration + "ms");
1817N/A
1817N/A if (duration > maxSelectTime) {
1817N/A System.out.println
1817N/A ("\n\n\n\n\nFAILURE: The final selectNow() took " +
1817N/A duration + "ms " +
1817N/A "- seems like O(N^2) bug is still here\n\n");
1817N/A System.exit(1);
1817N/A }
1817N/A }
1817N/A
1817N/A static List<SocketChannel> acceptAndAddAll(Selector selector,
1817N/A ServerSocketChannel server,
1817N/A int expected)
1817N/A throws Exception {
1817N/A int retryCount = 0;
1817N/A int acceptCount = 0;
1817N/A List<SocketChannel> channels = new ArrayList<SocketChannel>();
1817N/A while (channels.size() < expected) {
1817N/A SocketChannel channel = server.accept();
1817N/A if (channel == null) {
1817N/A log("accept() returned null " +
1817N/A "after accepting " + acceptCount + " more connections");
1817N/A acceptCount = 0;
1817N/A if (retryCount < 10) {
1817N/A // See if more new sockets got stacked behind.
1817N/A retryCount++;
1817N/A Thread.sleep(500);
1817N/A continue;
1817N/A }
1817N/A break;
1817N/A }
1817N/A retryCount = 0;
1817N/A acceptCount++;
1817N/A channel.configureBlocking(false);
1817N/A channel.register(selector, SelectionKey.OP_READ);
1817N/A channels.add(channel);
1817N/A }
1817N/A // Cause an additional updateList entry per channel.
1817N/A for (SocketChannel channel : channels) {
1817N/A channel.register(selector, SelectionKey.OP_WRITE);
1817N/A }
1817N/A return channels;
1817N/A }
1817N/A
1817N/A static void closeAll(List<SocketChannel> channels)
1817N/A throws Exception {
1817N/A for (SocketChannel channel : channels) {
1817N/A channel.close();
1817N/A }
1817N/A }
1817N/A
1817N/A static class ClientThread extends Thread {
1817N/A private final SocketAddress address;
1817N/A private final Selector selector;
1817N/A private int connectionsNeeded;
1817N/A private int totalCreated;
1817N/A
1817N/A ClientThread(SocketAddress address) throws Exception {
1817N/A this.address = address;
1817N/A selector = Selector.open();
1817N/A setDaemon(true);
1817N/A }
1817N/A
1817N/A void connectClients(int count) throws Exception {
1817N/A synchronized (this) {
1817N/A connectionsNeeded += count;
1817N/A }
1817N/A selector.wakeup();
1817N/A }
1817N/A
1817N/A @Override
1817N/A public void run() {
1817N/A try {
1817N/A handleClients();
1817N/A } catch (Throwable e) {
1817N/A e.printStackTrace();
1817N/A System.exit(1);
1817N/A }
1817N/A }
1817N/A
1817N/A private void handleClients() throws Exception {
1817N/A int selectCount = 0;
1817N/A while (true) {
1817N/A int createdCount = 0;
1817N/A synchronized (this) {
1817N/A if (connectionsNeeded > 0) {
1817N/A
1817N/A while (connectionsNeeded > 0 && createdCount < 20) {
1817N/A connectionsNeeded--;
1817N/A createdCount++;
1817N/A totalCreated++;
1817N/A
1817N/A SocketChannel channel = SocketChannel.open();
1817N/A channel.configureBlocking(false);
1817N/A channel.connect(address);
1817N/A if (!channel.finishConnect()) {
1817N/A channel.register(selector,
1817N/A SelectionKey.OP_CONNECT);
1817N/A }
1817N/A }
1817N/A
1817N/A log("Started total of " +
1817N/A totalCreated + " client connections");
1817N/A Thread.sleep(200);
1817N/A }
1817N/A }
1817N/A
1817N/A if (createdCount > 0) {
1817N/A selector.selectNow();
1817N/A } else {
1817N/A selectCount++;
1817N/A long startTime = System.nanoTime();
1817N/A selector.select();
1817N/A long duration = durationMillis(startTime);
1817N/A log("Exited clientSelector.select(), loop #"
1817N/A + selectCount + ", duration = " + duration + "ms");
1817N/A }
1817N/A
1817N/A int keyCount = -1;
1817N/A Iterator<SelectionKey> keys =
1817N/A selector.selectedKeys().iterator();
1817N/A while (keys.hasNext()) {
1817N/A SelectionKey key = keys.next();
1817N/A synchronized (key) {
1817N/A keyCount++;
1817N/A keys.remove();
1817N/A if (!key.isValid()) {
1817N/A log("Ignoring client key #" + keyCount);
1817N/A continue;
1817N/A }
1817N/A int readyOps = key.readyOps();
1817N/A if (readyOps == SelectionKey.OP_CONNECT) {
1817N/A key.interestOps(0);
1817N/A ((SocketChannel) key.channel()).finishConnect();
1817N/A } else {
1817N/A log("readyOps() on client key #" + keyCount +
1817N/A " returned " + readyOps);
1817N/A }
1817N/A }
1817N/A }
1817N/A }
1817N/A }
1817N/A }
1817N/A}