0N/A/*
2362N/A * Copyright (c) 1999, 2001, Oracle and/or its affiliates. All rights reserved.
0N/A *
0N/A * Redistribution and use in source and binary forms, with or without
0N/A * modification, are permitted provided that the following conditions
0N/A * are met:
0N/A *
0N/A * - Redistributions of source code must retain the above copyright
0N/A * notice, this list of conditions and the following disclaimer.
0N/A *
0N/A * - Redistributions in binary form must reproduce the above copyright
0N/A * notice, this list of conditions and the following disclaimer in the
0N/A * documentation and/or other materials provided with the distribution.
0N/A *
2362N/A * - Neither the name of Oracle nor the names of its
0N/A * contributors may be used to endorse or promote products derived
0N/A * from this software without specific prior written permission.
0N/A *
0N/A * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
0N/A * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
0N/A * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
0N/A * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
0N/A * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
0N/A * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
0N/A * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
0N/A * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
0N/A * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
0N/A * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
0N/A * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
0N/A */
0N/A
4378N/A/*
4378N/A * This source code is provided to illustrate the usage of a given feature
4378N/A * or technique and has been deliberately simplified. Additional steps
4378N/A * required for a production-quality application, such as security checks,
4378N/A * input validation and proper error handling, might not be present in
4378N/A * this sample code.
4378N/A */
4378N/A
4378N/A
0N/Aimport java.io.*;
0N/Aimport java.net.*;
0N/Aimport java.lang.Byte;
0N/A
0N/A/**
0N/A * Simple Java "server" using the Poller class
0N/A * to multiplex on incoming connections. Note
0N/A * that handoff of events, via linked Q is not
0N/A * actually be a performance booster here, since
0N/A * the processing of events is cheaper than
0N/A * the overhead in scheduling/executing them.
0N/A * Although this demo does allow for concurrency
0N/A * in handling connections, it uses a rather
0N/A * primitive "gang scheduling" policy to keep
0N/A * the code simpler.
0N/A */
0N/A
0N/Apublic class PollingServer
0N/A{
0N/A public final static int MAXCONN = 10000;
0N/A public final static int PORTNUM = 4444;
0N/A public final static int BYTESPEROP = 10;
0N/A
0N/A /**
0N/A * This synchronization object protects access to certain
0N/A * data (bytesRead,eventsToProcess) by concurrent Consumer threads.
0N/A */
0N/A private final static Object eventSync = new Object();
0N/A
0N/A private static InputStream[] instr = new InputStream[MAXCONN];
0N/A private static int[] mapping = new int[65535];
0N/A private static LinkedQueue linkedQ = new LinkedQueue();
0N/A private static int bytesRead = 0;
0N/A private static int bytesToRead;
0N/A private static int eventsToProcess=0;
0N/A
0N/A public PollingServer(int concurrency) {
0N/A Socket[] sockArr = new Socket[MAXCONN];
0N/A long timestart, timestop;
0N/A short[] revents = new short[MAXCONN];
0N/A int[] fds = new int[MAXCONN];
0N/A int bytes;
0N/A Poller Mux;
0N/A int serverFd;
0N/A int totalConn=0;
0N/A int connects=0;
0N/A
0N/A System.out.println ("Serv: Initializing port " + PORTNUM);
0N/A try {
0N/A
0N/A ServerSocket skMain = new ServerSocket (PORTNUM);
0N/A /*
0N/A * Create the Poller object Mux, allow for up to MAXCONN
0N/A * sockets/filedescriptors to be polled.
0N/A */
0N/A Mux = new Poller(MAXCONN);
0N/A serverFd = Mux.add(skMain, Poller.POLLIN);
0N/A
0N/A Socket ctrlSock = skMain.accept();
0N/A
0N/A BufferedReader ctrlReader =
0N/A new BufferedReader(new InputStreamReader(ctrlSock.getInputStream()));
0N/A String ctrlString = ctrlReader.readLine();
0N/A bytesToRead = Integer.valueOf(ctrlString).intValue();
0N/A ctrlString = ctrlReader.readLine();
0N/A totalConn = Integer.valueOf(ctrlString).intValue();
0N/A
0N/A System.out.println("Receiving " + bytesToRead + " bytes from " +
0N/A totalConn + " client connections");
0N/A
0N/A timestart = System.currentTimeMillis();
0N/A
0N/A /*
0N/A * Start the consumer threads to read data.
0N/A */
0N/A for (int consumerThread = 0;
0N/A consumerThread < concurrency; consumerThread++ ) {
0N/A new Consumer(consumerThread).start();
0N/A }
0N/A
0N/A /*
0N/A * Take connections, read Data
0N/A */
0N/A int numEvents=0;
0N/A
0N/A while ( bytesRead < bytesToRead ) {
0N/A
0N/A int loopWaits=0;
0N/A while (eventsToProcess > 0) {
0N/A synchronized (eventSync) {
0N/A loopWaits++;
0N/A if (eventsToProcess <= 0) break;
0N/A try { eventSync.wait(); } catch (Exception e) {e.printStackTrace();};
0N/A }
0N/A }
0N/A if (loopWaits > 1)
0N/A System.out.println("Done waiting...loops = " + loopWaits +
0N/A " events " + numEvents +
0N/A " bytes read : " + bytesRead );
0N/A
0N/A if (bytesRead >= bytesToRead) break; // may be done!
0N/A
0N/A /*
0N/A * Wait for events
0N/A */
0N/A numEvents = Mux.waitMultiple(100, fds, revents);
0N/A synchronized (eventSync) {
0N/A eventsToProcess = numEvents;
0N/A }
0N/A /*
0N/A * Process all the events we got from Mux.waitMultiple
0N/A */
0N/A int cnt = 0;
0N/A while ( (cnt < numEvents) && (bytesRead < bytesToRead) ) {
0N/A int fd = fds[cnt];
0N/A
0N/A if (revents[cnt] == Poller.POLLIN) {
0N/A if (fd == serverFd) {
0N/A /*
0N/A * New connection coming in on the ServerSocket
0N/A * Add the socket to the Mux, keep track of mapping
0N/A * the fdval returned by Mux.add to the connection.
0N/A */
0N/A sockArr[connects] = skMain.accept();
0N/A instr[connects] = sockArr[connects].getInputStream();
0N/A int fdval = Mux.add(sockArr[connects], Poller.POLLIN);
0N/A mapping[fdval] = connects;
0N/A synchronized(eventSync) {
0N/A eventsToProcess--; // just processed this one!
0N/A }
0N/A connects++;
0N/A } else {
0N/A /*
0N/A * We've got data from this client connection.
0N/A * Put it on the queue for the consumer threads to process.
0N/A */
0N/A linkedQ.put(new Integer(fd));
0N/A }
0N/A } else {
0N/A System.out.println("Got revents[" + cnt + "] == " + revents[cnt]);
0N/A }
0N/A cnt++;
0N/A }
0N/A }
0N/A timestop = System.currentTimeMillis();
0N/A System.out.println("Time for all reads (" + totalConn +
0N/A " sockets) : " + (timestop-timestart));
0N/A
0N/A // Tell the client it can now go away
0N/A byte[] buff = new byte[BYTESPEROP];
0N/A ctrlSock.getOutputStream().write(buff,0,BYTESPEROP);
0N/A
0N/A // Tell the cunsumer threads they can exit.
0N/A for (int cThread = 0; cThread < concurrency; cThread++ ) {
0N/A linkedQ.put(new Integer(-1));
0N/A }
0N/A } catch (Exception exc) { exc.printStackTrace(); }
0N/A }
0N/A
0N/A /*
0N/A * main ... just check if a concurrency was specified
0N/A */
0N/A public static void main (String args[])
0N/A {
0N/A int concurrency;
0N/A
0N/A if (args.length == 1)
0N/A concurrency = java.lang.Integer.valueOf(args[0]).intValue();
0N/A else
0N/A concurrency = Poller.getNumCPUs() + 1;
0N/A PollingServer server = new PollingServer(concurrency);
0N/A }
0N/A
0N/A /*
0N/A * This class is for handling the Client data.
0N/A * The PollingServer spawns off a number of these based upon
0N/A * the number of CPUs (or concurrency argument).
0N/A * Each just loops grabbing events off the queue and
0N/A * processing them.
0N/A */
0N/A class Consumer extends Thread {
0N/A private int threadNumber;
0N/A public Consumer(int i) { threadNumber = i; }
0N/A
0N/A public void run() {
0N/A byte[] buff = new byte[BYTESPEROP];
0N/A int bytes = 0;
0N/A
0N/A InputStream instream;
0N/A while (bytesRead < bytesToRead) {
0N/A try {
0N/A Integer Fd = (Integer) linkedQ.take();
0N/A int fd = Fd.intValue();
0N/A if (fd == -1) break; /* got told we could exit */
0N/A
0N/A /*
0N/A * We have to map the fd value returned from waitMultiple
0N/A * to the actual input stream associated with that fd.
0N/A * Take a look at how the Mux.add() was done to see how
0N/A * we stored that.
0N/A */
0N/A int map = mapping[fd];
0N/A instream = instr[map];
0N/A bytes = instream.read(buff,0,BYTESPEROP);
0N/A } catch (Exception e) { System.out.println(e.toString()); }
0N/A
0N/A if (bytes > 0) {
0N/A /*
0N/A * Any real server would do some synchronized and some
0N/A * unsynchronized work on behalf of the client, and
0N/A * most likely send some data back...but this is a
0N/A * gross oversimplification.
0N/A */
0N/A synchronized(eventSync) {
0N/A bytesRead += bytes;
0N/A eventsToProcess--;
0N/A if (eventsToProcess <= 0) {
0N/A eventSync.notify();
0N/A }
0N/A }
0N/A }
0N/A }
0N/A }
0N/A }
0N/A}