AgentIndexRunner.java revision 464
1364N/A/*
1364N/A * CDDL HEADER START
1364N/A *
1364N/A * The contents of this file are subject to the terms of the
1364N/A * Common Development and Distribution License (the "License").
1364N/A * You may not use this file except in compliance with the License.
1364N/A *
1364N/A * See LICENSE.txt included in this distribution for the specific
1364N/A * language governing permissions and limitations under the License.
1364N/A *
1364N/A * When distributing Covered Code, include this CDDL HEADER in each
1364N/A * file and include the License file at LICENSE.txt.
1364N/A * If applicable, add the following below this CDDL HEADER, with the
1364N/A * fields enclosed by brackets "[]" replaced with your own identifying
1364N/A * information: Portions Copyright [yyyy] [name of copyright owner]
1364N/A *
1364N/A * CDDL HEADER END
1364N/A */
1364N/A
1364N/A/*
1383N/A * Copyright 2008 Sun Microsystems, Inc. All rights reserved.
1364N/A * Use is subject to license terms.
1364N/A */
1364N/Apackage org.opensolaris.opengrok.management;
1364N/A
1364N/Aimport java.io.File;
1364N/Aimport java.util.Arrays;
1364N/Aimport java.util.HashSet;
1364N/Aimport java.util.Iterator;
1364N/Aimport java.util.List;
1364N/Aimport java.util.Set;
1364N/Aimport java.util.logging.Level;
1364N/Aimport java.util.logging.Logger;
1364N/Aimport javax.management.ListenerNotFoundException;
1364N/Aimport javax.management.MBeanNotificationInfo;
1364N/Aimport javax.management.MBeanRegistration;
1364N/Aimport javax.management.MBeanServer;
1383N/Aimport javax.management.Notification;
1364N/Aimport javax.management.NotificationEmitter;
1364N/Aimport javax.management.NotificationFilter;
1364N/Aimport javax.management.NotificationListener;
1364N/Aimport javax.management.ObjectName;
1364N/Aimport org.opensolaris.opengrok.configuration.RuntimeEnvironment;
1364N/Aimport org.opensolaris.opengrok.index.IndexChangedListener;
1364N/Aimport org.opensolaris.opengrok.index.Indexer;
1364N/A
1364N/A/**
1364N/A * AgentIndexRunner.
1383N/A * @author Jan S Berg
1364N/A */
1364N/Apublic final class AgentIndexRunner implements AgentIndexRunnerMBean, NotificationListener,
1364N/A MBeanRegistration, Runnable, IndexChangedListener, NotificationEmitter {
1364N/A
1364N/A private transient static AgentIndexRunner indexerInstance = null;
1364N/A private final static String NOTIFICATIONACTIONTYPE = "ogaaction";
1383N/A private final static String NOTIFICATIONEXCEPTIONTYPE = "ogaexception";
1383N/A private final static String NOTIFICATIONINFOSTRINGTYPE = "ogainfostring";
1383N/A private final static String NOTIFICATIONINFOLONGTYPE = "ogainfolong";
1383N/A private boolean enabled;
1364N/A private transient Thread indexThread = null;
1364N/A private final static Logger log = Logger.getLogger("org.opensolaris.opengrok");
1364N/A private final Management jagmgt;
1364N/A private RuntimeEnvironment env = null;
1364N/A private long lastIndexStart = 0;
1364N/A private long lastIndexFinish = 0;
1364N/A private long lastIndexUsedTime = 0;
1364N/A private Exception lastException = null;
1364N/A private final Set<NotificationHolder> notifListeners =
1364N/A new HashSet<NotificationHolder>();
1364N/A private static long sequenceNo = 0;
1364N/A private final StringBuilder notifications = new StringBuilder();
1364N/A private final static int MAXMESSAGELENGTH = 50000;
1364N/A
1364N/A /**
1364N/A * The only constructor is private, so other classes will only get an
1364N/A * instance through the static factory method getInstance().
1364N/A */
1383N/A private AgentIndexRunner(boolean enabledParam) {
1364N/A enabled = enabledParam;
1364N/A jagmgt = Management.getInstance();
1364N/A }
1364N/A
1364N/A /**
1364N/A * Static factory method to get an instance of AgentIndexRunner.
1364N/A * @param enabledParam if true, the initial instance of the purgatory will
1370N/A * have purging enabled.
1364N/A */
1364N/A public static synchronized AgentIndexRunner getInstance(boolean enabledParam) {
1370N/A if (indexerInstance == null) {
1364N/A indexerInstance = new AgentIndexRunner(enabledParam);
1364N/A }
1370N/A return indexerInstance;
1364N/A }
1364N/A
1364N/A public ObjectName preRegister(MBeanServer serverParam, ObjectName name) {
1370N/A return name;
1364N/A }
1364N/A
1383N/A public void postRegister(Boolean registrationDone) {
1364N/A // not used
1364N/A }
1383N/A
1383N/A public void preDeregister() {
1364N/A // not used
1370N/A }
1364N/A
1364N/A public void postDeregister() {
1364N/A // not used
1364N/A }
1383N/A
1364N/A public void run() {
1364N/A try {
1383N/A //Indexer ind = new Indexer();
1364N/A log.info("Running...");
1383N/A lastIndexStart = System.currentTimeMillis();
1383N/A lastException = null;
1370N/A doNotify(NOTIFICATIONINFOLONGTYPE, "StartIndexing", Long.valueOf(lastIndexStart));
1364N/A String configfile = jagmgt.getInstance().getConfigurationFile();
1364N/A if (configfile == null) {
1364N/A doNotify(NOTIFICATIONEXCEPTIONTYPE, "Missing Configuration file", "");
1364N/A }
1370N/A File cfgFile = new File(configfile);
1364N/A if (cfgFile.exists()) {
1364N/A env = RuntimeEnvironment.getInstance();
1364N/A log.info("Running indexer with configuration " + configfile);
1383N/A env.readConfiguration(cfgFile);
1383N/A
1364N/A Indexer index = Indexer.getInstance();
1383N/A int noThreads = jagmgt.getInstance().getNumberOfThreads().intValue();
1383N/A boolean update = jagmgt.getInstance().getUpdateIndexDatabase().booleanValue();
1364N/A String[] sublist = jagmgt.getInstance().getSubFiles();
1370N/A List<String> subFiles = Arrays.asList(sublist);
1364N/A log.info("Starting index, update " + update + " noThreads " + noThreads + " subfiles " + subFiles.size());
1364N/A index.doIndexerExecution(update, noThreads, subFiles, this);
1370N/A log.info("Finished indexing");
1364N/A lastIndexFinish = System.currentTimeMillis();
1364N/A sendNotifications();
1364N/A doNotify(NOTIFICATIONINFOLONGTYPE, "FinishedIndexing", Long.valueOf(lastIndexFinish));
1364N/A lastIndexUsedTime = lastIndexFinish - lastIndexStart;
1383N/A String publishhost = jagmgt.getInstance().getPublishServerURL();
1383N/A if (publishhost == null) {
1383N/A log.warning("No publishhost given, not sending updates");
1383N/A } else {
1383N/A index.sendToConfigHost(env, publishhost);
1383N/A doNotify(NOTIFICATIONINFOSTRINGTYPE, "Published index", publishhost);
1383N/A }
1383N/A
1383N/A
1383N/A } else {
1383N/A log.warning("Cannot Run indexing without proper configuration file " + configfile);
1383N/A doNotify(NOTIFICATIONEXCEPTIONTYPE, "Configuration file not valid", configfile);
1383N/A }
1383N/A } catch (Exception e) {
1383N/A log.log(Level.SEVERE,
1383N/A "Exception running indexing ", e);
1383N/A lastException = e;
1383N/A }
1383N/A }
1383N/A
1383N/A /**
1383N/A * Disables indexer
1364N/A */
1364N/A public void disable() {
1364N/A enabled = false;
1364N/A }
1364N/A
1364N/A /**
1364N/A * Enables the indexer
1364N/A */
1364N/A public void enable() {
1364N/A enabled = true;
1364N/A }
1364N/A
1364N/A /**
1364N/A * Handle timer notifications to the purgatory.
1364N/A * Will start the purger if it is enabled and return immediately.
1364N/A */
1364N/A public void handleNotification(Notification n, Object hb) {
1364N/A if (n.getType().equals("timer.notification")) {
1364N/A log.finer("Received timer notification");
1364N/A } else {
1364N/A log.warning("Received unknown notification type: " +
1364N/A n.getType());
1364N/A return;
1364N/A }
1364N/A if (!enabled) {
1364N/A log.finer("Indexing is disabled, doing nothing");
1364N/A return;
1364N/A }
1364N/A index(false); // just start the purger and return (do not delay timer)
1364N/A }
1364N/A
1364N/A /**
1364N/A * The index method starts a thread that will
1364N/A * start indexing part of the opengrok agent.
1364N/A * @param waitForFinished if false the command returns immediately, if true
1364N/A * it will return when the indexing is done.
1364N/A */
1364N/A public void index(boolean waitForFinished) {
1364N/A log.info("Starting indexing.");
1364N/A /*
1364N/A * Synchronize here to make sure that you never get more than one
1364N/A * indexing thread trying to start at the same time.
1364N/A */
1364N/A synchronized (this) {
1364N/A if (indexThread != null) {
1364N/A if (indexThread.isAlive()) {
1364N/A log.warning("Previous indexer is still alive, will not start another.");
1364N/A return;
1364N/A } else {
1364N/A log.fine("Previous indexer is no longer alive, starting a new one.");
1364N/A }
1364N/A }
1364N/A indexThread = new Thread(this);
1364N/A try {
1364N/A indexThread.start();
1364N/A if (!waitForFinished) {
1364N/A return;
1364N/A }
1364N/A log.fine("Waiting for indexer to finish...");
1364N/A indexThread.join();
1364N/A log.fine("indexer finished.");
1364N/A } catch (Exception e) {
1364N/A log.log(Level.SEVERE,
1364N/A "Caught Exception while waiting for indexing to finish.", e);
1364N/A }
1364N/A return;
1364N/A }
1364N/A }
1364N/A
1364N/A public void fileAdded(String path, String analyzer) {
1364N/A log.info("Added " + path + " analyzer " + analyzer);
1364N/A addFileAction("A:", path);
1364N/A }
1364N/A
1364N/A public void fileRemoved(String path) {
1364N/A log.info("File removed " + path);
1364N/A addFileAction("R:", path);
1364N/A }
1364N/A
1364N/A public void fileUpdated(String path) {
1364N/A log.info("File updated " + path);
1364N/A addFileAction("U:", path);
1364N/A }
1364N/A
1364N/A private void addFileAction(String type, String path) {
1364N/A notifications.append('\n');
1364N/A notifications.append(type);
1364N/A notifications.append(path);
1364N/A if (notifications.length() > MAXMESSAGELENGTH) {
1364N/A sendNotifications();
1364N/A }
1364N/A }
1364N/A
1364N/A private void sendNotifications() {
1364N/A if (notifications.length() > 0) {
1364N/A doNotify(NOTIFICATIONACTIONTYPE, "FilesInfo", notifications.toString());
1364N/A notifications.delete(0, notifications.length());
1364N/A }
1364N/A }
1364N/A
1364N/A public long lastIndexTimeFinished() {
1364N/A return lastIndexFinish;
1364N/A }
1364N/A
1364N/A public long lastIndexTimeStarted() {
1364N/A return lastIndexStart;
1364N/A }
1364N/A
1364N/A public long lastIndexTimeUsed() {
1364N/A return lastIndexUsedTime;
1364N/A }
1364N/A
1364N/A public Exception getExceptions() {
1364N/A return lastException;
1364N/A }
1364N/A
1364N/A public void addNotificationListener(NotificationListener notiflistener, NotificationFilter notfilt, Object obj) throws IllegalArgumentException {
1364N/A log.info("Adds a notiflistner, with obj " + obj.toString());
1364N/A if (notiflistener == null) {
1364N/A throw new IllegalArgumentException("Must have legal NotificationListener");
1364N/A }
1364N/A synchronized (notifListeners) {
1364N/A notifListeners.add(new NotificationHolder(notiflistener, notfilt, obj));
1364N/A }
1364N/A }
1364N/A
1364N/A public void removeNotificationListener(NotificationListener notiflistener) throws ListenerNotFoundException {
1364N/A log.info("removes a notiflistener, no obj");
1364N/A boolean removed = false;
1364N/A synchronized (notifListeners) {
1364N/A Iterator it = notifListeners.iterator();
1364N/A while (it.hasNext()) {
1364N/A NotificationHolder mnf = (NotificationHolder) it.next();
949N/A if (mnf.getNL().equals(notiflistener)) {
949N/A it.remove();
949N/A removed = true;
1294N/A }
1186N/A }
949N/A }
949N/A if (!removed) {
1186N/A throw new ListenerNotFoundException("Didn't remove the given NotificationListener");
949N/A }
949N/A }
949N/A
1186N/A public void removeNotificationListener(NotificationListener notiflistener, NotificationFilter filt, Object obj) throws ListenerNotFoundException {
1186N/A log.info("removes a notiflistener obj " + obj);
1186N/A boolean removed = false;
1186N/A synchronized (notifListeners) {
1186N/A Iterator it = notifListeners.iterator();
1186N/A while (it.hasNext()) {
1186N/A NotificationHolder mnf = (NotificationHolder) it.next();
1186N/A if (mnf.getNL().equals(notiflistener)
1186N/A && ((mnf.getFilter() == null) || mnf.getFilter().equals(filt))
1186N/A && ((mnf.getFilter() == null) || mnf.getObj().equals(obj))) {
1186N/A it.remove();
1186N/A removed = true;
1186N/A }
1186N/A }
1186N/A }
1186N/A if (!removed) {
1186N/A throw new ListenerNotFoundException("Didn't remove the given NotificationListener");
1186N/A }
1186N/A }
1186N/A
1186N/A /**
1186N/A * Method that the subclass can override, but doesn't have to
1186N/A * @return MBeanNotificationInfo array of notification (and types) this class can emitt.
1186N/A */
1186N/A public MBeanNotificationInfo[] getNotificationInfo() {
1186N/A MBeanNotificationInfo[] info = new MBeanNotificationInfo[1];
1186N/A String[] supptypes = {NOTIFICATIONACTIONTYPE, NOTIFICATIONINFOLONGTYPE, NOTIFICATIONINFOSTRINGTYPE};
1186N/A String name = "AgentIndexRunner";
949N/A String descr = "OpenGrok Indexer Notifications";
949N/A MBeanNotificationInfo minfo = new MBeanNotificationInfo(supptypes, name,
1186N/A descr);
1186N/A info[0] = minfo;
1186N/A return info;
1186N/A }
1186N/A
1186N/A private void doNotify(String type, String msg, Object userdata) {
1186N/A try {
1186N/A log.info("start notifying " + notifListeners.size() + " listeners");
1186N/A //String str = "JET Finished";
1186N/A //String obj = JAGConstants.jetFinishedNType;
1186N/A long ts = System.currentTimeMillis();
1254N/A sequenceNo++;
1186N/A Notification notif = new Notification(type, this, sequenceNo, ts, msg);
1186N/A notif.setUserData(userdata);
1186N/A synchronized (notifListeners) {
1186N/A for (NotificationHolder nl : notifListeners) {
1186N/A log.fine("having one with obj " + nl.getObj());
1186N/A try {
1186N/A if ((nl.getFilter() == null) ||
1186N/A nl.getFilter().isNotificationEnabled(notif)) {
1186N/A nl.getNL().handleNotification(notif, nl.getObj());
1294N/A }
1186N/A } catch (Exception exnot) {
1186N/A log.log(Level.INFO, "Ex " + exnot, exnot);
1186N/A }
1186N/A }
1186N/A }
1186N/A } catch (Exception ex) {
1186N/A log.log(Level.SEVERE,
1186N/A "Exception during notification sending: " + ex.getMessage(),
1186N/A ex);
1186N/A }
1186N/A }
1186N/A}
949N/A