PollingServer.java revision 4378
1575N/A/*
2362N/A * Copyright (c) 1999, 2001, Oracle and/or its affiliates. All rights reserved.
1575N/A *
1575N/A * Redistribution and use in source and binary forms, with or without
1575N/A * modification, are permitted provided that the following conditions
1575N/A * are met:
1575N/A *
1575N/A * - Redistributions of source code must retain the above copyright
1575N/A * notice, this list of conditions and the following disclaimer.
1575N/A *
1575N/A * - Redistributions in binary form must reproduce the above copyright
1575N/A * notice, this list of conditions and the following disclaimer in the
1575N/A * documentation and/or other materials provided with the distribution.
1575N/A *
1575N/A * - Neither the name of Oracle nor the names of its
1575N/A * contributors may be used to endorse or promote products derived
1575N/A * from this software without specific prior written permission.
1575N/A *
2362N/A * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
2362N/A * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
2362N/A * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
1575N/A * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
1575N/A * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
1575N/A * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
1575N/A * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
1575N/A * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
1575N/A * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
1575N/A * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
1575N/A * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
3471N/A */
1575N/A
1575N/A/*
1575N/A * This source code is provided to illustrate the usage of a given feature
1575N/A * or technique and has been deliberately simplified. Additional steps
1575N/A * required for a production-quality application, such as security checks,
1575N/A * input validation and proper error handling, might not be present in
1575N/A * this sample code.
1575N/A */
1575N/A
1575N/A
1575N/Aimport java.io.*;
1575N/Aimport java.net.*;
1575N/Aimport java.lang.Byte;
1575N/A
1575N/A/**
1575N/A * Simple Java "server" using the Poller class
1575N/A * to multiplex on incoming connections. Note
1575N/A * that handoff of events, via linked Q is not
1575N/A * actually be a performance booster here, since
1575N/A * the processing of events is cheaper than
1575N/A * the overhead in scheduling/executing them.
1575N/A * Although this demo does allow for concurrency
1575N/A * in handling connections, it uses a rather
1575N/A * primitive "gang scheduling" policy to keep
1575N/A * the code simpler.
1575N/A */
1575N/A
1575N/Apublic class PollingServer
1575N/A{
1575N/A public final static int MAXCONN = 10000;
1575N/A public final static int PORTNUM = 4444;
1575N/A public final static int BYTESPEROP = 10;
1575N/A
1575N/A /**
1575N/A * This synchronization object protects access to certain
1575N/A * data (bytesRead,eventsToProcess) by concurrent Consumer threads.
1575N/A */
1575N/A private final static Object eventSync = new Object();
1575N/A
1575N/A private static InputStream[] instr = new InputStream[MAXCONN];
1575N/A private static int[] mapping = new int[65535];
1575N/A private static LinkedQueue linkedQ = new LinkedQueue();
1575N/A private static int bytesRead = 0;
1575N/A private static int bytesToRead;
1575N/A private static int eventsToProcess=0;
1575N/A
1575N/A public PollingServer(int concurrency) {
1575N/A Socket[] sockArr = new Socket[MAXCONN];
1575N/A long timestart, timestop;
1575N/A short[] revents = new short[MAXCONN];
1575N/A int[] fds = new int[MAXCONN];
1575N/A int bytes;
3471N/A Poller Mux;
1575N/A int serverFd;
3471N/A int totalConn=0;
1575N/A int connects=0;
1575N/A
1575N/A System.out.println ("Serv: Initializing port " + PORTNUM);
3471N/A try {
1575N/A
3471N/A ServerSocket skMain = new ServerSocket (PORTNUM);
1575N/A /*
3471N/A * Create the Poller object Mux, allow for up to MAXCONN
1575N/A * sockets/filedescriptors to be polled.
3471N/A */
1575N/A Mux = new Poller(MAXCONN);
1575N/A serverFd = Mux.add(skMain, Poller.POLLIN);
1575N/A
1575N/A Socket ctrlSock = skMain.accept();
1575N/A
1575N/A BufferedReader ctrlReader =
3471N/A new BufferedReader(new InputStreamReader(ctrlSock.getInputStream()));
1575N/A String ctrlString = ctrlReader.readLine();
1575N/A bytesToRead = Integer.valueOf(ctrlString).intValue();
1575N/A ctrlString = ctrlReader.readLine();
1575N/A totalConn = Integer.valueOf(ctrlString).intValue();
1575N/A
1575N/A System.out.println("Receiving " + bytesToRead + " bytes from " +
3471N/A totalConn + " client connections");
1575N/A
1575N/A timestart = System.currentTimeMillis();
1575N/A
1575N/A /*
1575N/A * Start the consumer threads to read data.
1575N/A */
1575N/A for (int consumerThread = 0;
3471N/A consumerThread < concurrency; consumerThread++ ) {
1575N/A new Consumer(consumerThread).start();
1575N/A }
1575N/A
1575N/A /*
1575N/A * Take connections, read Data
1575N/A */
1575N/A int numEvents=0;
1575N/A
1575N/A while ( bytesRead < bytesToRead ) {
1575N/A
1575N/A int loopWaits=0;
1575N/A while (eventsToProcess > 0) {
3471N/A synchronized (eventSync) {
3471N/A loopWaits++;
1575N/A if (eventsToProcess <= 0) break;
1575N/A try { eventSync.wait(); } catch (Exception e) {e.printStackTrace();};
1575N/A }
1575N/A }
1575N/A if (loopWaits > 1)
1575N/A System.out.println("Done waiting...loops = " + loopWaits +
1575N/A " events " + numEvents +
1575N/A " bytes read : " + bytesRead );
1575N/A
1575N/A if (bytesRead >= bytesToRead) break; // may be done!
1575N/A
1575N/A /*
1575N/A * Wait for events
1575N/A */
1575N/A numEvents = Mux.waitMultiple(100, fds, revents);
1575N/A synchronized (eventSync) {
1575N/A eventsToProcess = numEvents;
1575N/A }
1575N/A /*
1575N/A * Process all the events we got from Mux.waitMultiple
1575N/A */
1575N/A int cnt = 0;
1575N/A while ( (cnt < numEvents) && (bytesRead < bytesToRead) ) {
1575N/A int fd = fds[cnt];
1575N/A
1575N/A if (revents[cnt] == Poller.POLLIN) {
1575N/A if (fd == serverFd) {
1575N/A /*
1575N/A * New connection coming in on the ServerSocket
1575N/A * Add the socket to the Mux, keep track of mapping
1575N/A * the fdval returned by Mux.add to the connection.
1575N/A */
1575N/A sockArr[connects] = skMain.accept();
1575N/A instr[connects] = sockArr[connects].getInputStream();
1575N/A int fdval = Mux.add(sockArr[connects], Poller.POLLIN);
1575N/A mapping[fdval] = connects;
1575N/A synchronized(eventSync) {
1575N/A eventsToProcess--; // just processed this one!
1575N/A }
1575N/A connects++;
1575N/A } else {
1575N/A /*
1575N/A * We've got data from this client connection.
1575N/A * Put it on the queue for the consumer threads to process.
1575N/A */
1575N/A linkedQ.put(new Integer(fd));
1575N/A }
1575N/A } else {
1575N/A System.out.println("Got revents[" + cnt + "] == " + revents[cnt]);
1575N/A }
1575N/A cnt++;
1575N/A }
1575N/A }
1575N/A timestop = System.currentTimeMillis();
1575N/A System.out.println("Time for all reads (" + totalConn +
1575N/A " sockets) : " + (timestop-timestart));
1575N/A
1575N/A // Tell the client it can now go away
1575N/A byte[] buff = new byte[BYTESPEROP];
1575N/A ctrlSock.getOutputStream().write(buff,0,BYTESPEROP);
1575N/A
1575N/A // Tell the cunsumer threads they can exit.
1575N/A for (int cThread = 0; cThread < concurrency; cThread++ ) {
1575N/A linkedQ.put(new Integer(-1));
1575N/A }
1575N/A } catch (Exception exc) { exc.printStackTrace(); }
1575N/A }
1575N/A
1575N/A /*
1575N/A * main ... just check if a concurrency was specified
1575N/A */
1575N/A public static void main (String args[])
1575N/A {
1575N/A int concurrency;
1575N/A
1575N/A if (args.length == 1)
1575N/A concurrency = java.lang.Integer.valueOf(args[0]).intValue();
1575N/A else
1575N/A concurrency = Poller.getNumCPUs() + 1;
1575N/A PollingServer server = new PollingServer(concurrency);
1575N/A }
1575N/A
1575N/A /*
1575N/A * This class is for handling the Client data.
1575N/A * The PollingServer spawns off a number of these based upon
1575N/A * the number of CPUs (or concurrency argument).
1575N/A * Each just loops grabbing events off the queue and
1575N/A * processing them.
1575N/A */
3471N/A class Consumer extends Thread {
1575N/A private int threadNumber;
1575N/A public Consumer(int i) { threadNumber = i; }
1575N/A
1575N/A public void run() {
1575N/A byte[] buff = new byte[BYTESPEROP];
1575N/A int bytes = 0;
1575N/A
1575N/A InputStream instream;
1575N/A while (bytesRead < bytesToRead) {
1575N/A try {
1575N/A Integer Fd = (Integer) linkedQ.take();
1575N/A int fd = Fd.intValue();
1575N/A if (fd == -1) break; /* got told we could exit */
1575N/A
1575N/A /*
1575N/A * We have to map the fd value returned from waitMultiple
1575N/A * to the actual input stream associated with that fd.
1575N/A * Take a look at how the Mux.add() was done to see how
1575N/A * we stored that.
1575N/A */
1575N/A int map = mapping[fd];
1575N/A instream = instr[map];
1575N/A bytes = instream.read(buff,0,BYTESPEROP);
1575N/A } catch (Exception e) { System.out.println(e.toString()); }
1575N/A
1575N/A if (bytes > 0) {
1575N/A /*
1575N/A * Any real server would do some synchronized and some
1575N/A * unsynchronized work on behalf of the client, and
1575N/A * most likely send some data back...but this is a
1575N/A * gross oversimplification.
1575N/A */
1575N/A synchronized(eventSync) {
1575N/A bytesRead += bytes;
1575N/A eventsToProcess--;
1575N/A if (eventsToProcess <= 0) {
1575N/A eventSync.notify();
1575N/A }
1575N/A }
1575N/A }
1575N/A }
1575N/A }
1575N/A }
1575N/A}
1575N/A