AgentIndexRunner.java revision 1470
340N/A/*
340N/A * CDDL HEADER START
340N/A *
340N/A * The contents of this file are subject to the terms of the
340N/A * Common Development and Distribution License (the "License").
340N/A * You may not use this file except in compliance with the License.
340N/A *
340N/A * See LICENSE.txt included in this distribution for the specific
340N/A * language governing permissions and limitations under the License.
340N/A *
340N/A * When distributing Covered Code, include this CDDL HEADER in each
340N/A * file and include the License file at LICENSE.txt.
340N/A * If applicable, add the following below this CDDL HEADER, with the
340N/A * fields enclosed by brackets "[]" replaced with your own identifying
340N/A * information: Portions Copyright [yyyy] [name of copyright owner]
340N/A *
340N/A * CDDL HEADER END
340N/A */
340N/A
340N/A/*
1054N/A * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
340N/A */
340N/Apackage org.opensolaris.opengrok.management;
340N/A
340N/Aimport java.io.File;
464N/Aimport java.util.Arrays;
340N/Aimport java.util.HashSet;
340N/Aimport java.util.Iterator;
340N/Aimport java.util.List;
340N/Aimport java.util.Set;
340N/Aimport java.util.logging.Level;
340N/Aimport java.util.logging.Logger;
340N/Aimport javax.management.ListenerNotFoundException;
340N/Aimport javax.management.MBeanNotificationInfo;
340N/Aimport javax.management.MBeanRegistration;
340N/Aimport javax.management.MBeanServer;
340N/Aimport javax.management.Notification;
340N/Aimport javax.management.NotificationEmitter;
340N/Aimport javax.management.NotificationFilter;
340N/Aimport javax.management.NotificationListener;
340N/Aimport javax.management.ObjectName;
340N/Aimport org.opensolaris.opengrok.configuration.RuntimeEnvironment;
520N/Aimport org.opensolaris.opengrok.history.HistoryGuru;
340N/Aimport org.opensolaris.opengrok.index.IndexChangedListener;
340N/Aimport org.opensolaris.opengrok.index.Indexer;
340N/A
340N/A/**
340N/A * AgentIndexRunner.
340N/A * @author Jan S Berg
340N/A */
1470N/Apublic final class AgentIndexRunner
1470N/A implements AgentIndexRunnerMBean, NotificationListener, MBeanRegistration,
1470N/A Runnable, IndexChangedListener, NotificationEmitter
1470N/A{
340N/A
1470N/A private static final Logger log =
1470N/A Logger.getLogger(AgentIndexRunner.class.getName());
340N/A private transient static AgentIndexRunner indexerInstance = null;
1238N/A private static final String NOTIFICATIONACTIONTYPE = "ogaaction";
1238N/A private static final String NOTIFICATIONEXCEPTIONTYPE = "ogaexception";
1238N/A private static final String NOTIFICATIONINFOSTRINGTYPE = "ogainfostring";
1238N/A private static final String NOTIFICATIONINFOLONGTYPE = "ogainfolong";
370N/A private boolean enabled;
340N/A private transient Thread indexThread = null;
340N/A private long lastIndexStart = 0;
340N/A private long lastIndexFinish = 0;
340N/A private long lastIndexUsedTime = 0;
340N/A private Exception lastException = null;
368N/A private final Set<NotificationHolder> notifListeners =
368N/A new HashSet<NotificationHolder>();
340N/A private static long sequenceNo = 0;
456N/A private final StringBuilder notifications = new StringBuilder();
1238N/A private static final int MAXMESSAGELENGTH = 50000;
340N/A
340N/A /**
340N/A * The only constructor is private, so other classes will only get an
340N/A * instance through the static factory method getInstance().
340N/A */
340N/A private AgentIndexRunner(boolean enabledParam) {
340N/A enabled = enabledParam;
340N/A }
340N/A
340N/A /**
340N/A * Static factory method to get an instance of AgentIndexRunner.
1463N/A * @param enabledParam If {@code true}, the initial instance should be
1463N/A * running or not.
1463N/A * @return a singleton
340N/A */
591N/A @SuppressWarnings("PMD.AvoidSynchronizedAtMethodLevel")
340N/A public static synchronized AgentIndexRunner getInstance(boolean enabledParam) {
340N/A if (indexerInstance == null) {
340N/A indexerInstance = new AgentIndexRunner(enabledParam);
340N/A }
340N/A return indexerInstance;
340N/A }
340N/A
1463N/A /**
1463N/A * {@inheritDoc}
1463N/A */
1054N/A @Override
340N/A public ObjectName preRegister(MBeanServer serverParam, ObjectName name) {
340N/A return name;
340N/A }
340N/A
1463N/A /**
1463N/A * {@inheritDoc}
1463N/A */
1054N/A @Override
340N/A public void postRegister(Boolean registrationDone) {
456N/A // not used
340N/A }
340N/A
1463N/A /**
1463N/A * {@inheritDoc}
1463N/A */
1054N/A @Override
340N/A public void preDeregister() {
456N/A // not used
340N/A }
340N/A
1463N/A /**
1463N/A * {@inheritDoc}
1463N/A */
1054N/A @Override
340N/A public void postDeregister() {
456N/A // not used
340N/A }
340N/A
1463N/A /**
1463N/A * {@inheritDoc}
1463N/A */
1054N/A @Override
340N/A public void run() {
340N/A try {
340N/A //Indexer ind = new Indexer();
340N/A log.info("Running...");
340N/A lastIndexStart = System.currentTimeMillis();
340N/A lastException = null;
1470N/A doNotify(NOTIFICATIONINFOLONGTYPE,
1470N/A "StartIndexing", Long.valueOf(lastIndexStart));
520N/A String configfile = Management.getInstance().getConfigurationFile();
340N/A if (configfile == null) {
1470N/A doNotify(NOTIFICATIONEXCEPTIONTYPE,
1470N/A "Missing Configuration file", "");
340N/A }
340N/A File cfgFile = new File(configfile);
340N/A if (cfgFile.exists()) {
1470N/A log.log(Level.INFO, "Running indexer with configuration ''{0}''",
1470N/A configfile);
1470N/A RuntimeEnvironment.readConfig(cfgFile);
340N/A
1470N/A int noThreads = Management.getInstance()
1470N/A .getNumberOfThreads().intValue();
1470N/A boolean update = Management.getInstance()
1470N/A .getUpdateIndexDatabase().booleanValue();
520N/A String[] sublist = Management.getInstance().getSubFiles();
520N/A log.info("Update source repositories");
520N/A HistoryGuru.getInstance().updateRepositories();
464N/A List<String> subFiles = Arrays.asList(sublist);
1470N/A log.log(Level.INFO, "Starting index, update {0} noThreads {1} subfiles {2}",
1470N/A new Object[]{ String.valueOf(update),
1470N/A String.valueOf(noThreads), String.valueOf(subFiles.size())});
1463N/A Indexer.doIndexerExecution(update, noThreads, subFiles, this);
340N/A log.info("Finished indexing");
340N/A lastIndexFinish = System.currentTimeMillis();
340N/A sendNotifications();
1470N/A doNotify(NOTIFICATIONINFOLONGTYPE,
1470N/A "FinishedIndexing", Long.valueOf(lastIndexFinish));
340N/A lastIndexUsedTime = lastIndexFinish - lastIndexStart;
520N/A String publishhost = Management.getInstance().getPublishServerURL();
639N/A if ((publishhost == null) || (publishhost.equals(""))) {
460N/A log.warning("No publishhost given, not sending updates");
460N/A } else {
1463N/A Indexer.sendToConfigHost(publishhost);
1470N/A doNotify(NOTIFICATIONINFOSTRINGTYPE,
1470N/A "Published index", publishhost);
340N/A }
340N/A
340N/A
340N/A } else {
1327N/A log.log(Level.WARNING, "Cannot Run indexing without proper configuration file ''{0}''", configfile);
1470N/A doNotify(NOTIFICATIONEXCEPTIONTYPE,
1470N/A "Configuration file not valid", configfile);
340N/A }
340N/A } catch (Exception e) {
1327N/A log.warning("Exception running indexing: " + e.getMessage());
1327N/A log.log(Level.FINE, "run", e);
340N/A lastException = e;
340N/A }
340N/A }
340N/A
340N/A /**
340N/A * Disables indexer
340N/A */
1054N/A @Override
340N/A public void disable() {
340N/A enabled = false;
340N/A }
340N/A
340N/A /**
340N/A * Enables the indexer
340N/A */
1054N/A @Override
340N/A public void enable() {
340N/A enabled = true;
340N/A }
340N/A
340N/A /**
340N/A * Handle timer notifications to the purgatory.
340N/A * Will start the purger if it is enabled and return immediately.
340N/A */
1054N/A @Override
340N/A public void handleNotification(Notification n, Object hb) {
340N/A if (n.getType().equals("timer.notification")) {
340N/A log.finer("Received timer notification");
522N/A if (enabled) {
522N/A index(false);
522N/A } else {
520N/A log.info("Indexing is disabled, doing nothing");
520N/A }
340N/A } else {
1470N/A log.log(Level.WARNING, "Received unknown notification type ''{0}''",
1470N/A n.getType());
340N/A }
340N/A }
340N/A
340N/A /**
1190N/A * The index method starts a thread that will
340N/A * start indexing part of the opengrok agent.
340N/A * @param waitForFinished if false the command returns immediately, if true
340N/A * it will return when the indexing is done.
340N/A */
1054N/A @Override
340N/A public void index(boolean waitForFinished) {
340N/A log.info("Starting indexing.");
340N/A /*
340N/A * Synchronize here to make sure that you never get more than one
340N/A * indexing thread trying to start at the same time.
340N/A */
340N/A synchronized (this) {
340N/A if (indexThread != null) {
340N/A if (indexThread.isAlive()) {
340N/A log.warning("Previous indexer is still alive, will not start another.");
340N/A return;
340N/A }
1185N/A log.fine("Previous indexer is no longer alive, starting a new one.");
340N/A }
340N/A indexThread = new Thread(this);
340N/A try {
340N/A indexThread.start();
340N/A if (!waitForFinished) {
340N/A return;
340N/A }
1327N/A log.fine("Waiting for indexer to finish ...");
340N/A indexThread.join();
340N/A log.fine("indexer finished.");
340N/A } catch (Exception e) {
1327N/A log.warning("Caught Exception while waiting for indexing to finish: "
1327N/A + e.getMessage());
1327N/A log.log(Level.FINE, "index", e);
340N/A }
340N/A return;
340N/A }
340N/A }
340N/A
1463N/A /**
1463N/A * {@inheritDoc}
1463N/A */
1054N/A @Override
1054N/A public void fileAdd(String path, String analyzer) {
1327N/A log.log(Level.FINE, "Add ''{0}'' analyzer {1}", new Object[]{path, analyzer});
1054N/A }
1054N/A
1463N/A /**
1463N/A * {@inheritDoc}
1463N/A */
1054N/A @Override
1054N/A public void fileRemove(String path) {
1327N/A log.log(Level.FINE, "File remove ''{0}''", path);
1054N/A }
1054N/A
1463N/A /**
1463N/A * {@inheritDoc}
1463N/A */
1054N/A @Override
1054N/A public void fileUpdate(String path) {
1327N/A log.log(Level.FINE, "File updated ''{0}''", path);
1054N/A addFileAction("U:", path);
1054N/A }
1054N/A
1054N/A
1463N/A /**
1463N/A * {@inheritDoc}
1463N/A */
1054N/A @Override
340N/A public void fileAdded(String path, String analyzer) {
1327N/A log.log(Level.FINE, "Added ''{0}'' analyzer {1}", new Object[]{path, analyzer});
340N/A addFileAction("A:", path);
340N/A }
340N/A
1463N/A /**
1463N/A * {@inheritDoc}
1463N/A */
1054N/A @Override
340N/A public void fileRemoved(String path) {
1327N/A log.log(Level.FINE, "File removed ''{0}''", path);
340N/A addFileAction("R:", path);
340N/A }
340N/A
340N/A private void addFileAction(String type, String path) {
340N/A notifications.append('\n');
340N/A notifications.append(type);
340N/A notifications.append(path);
340N/A if (notifications.length() > MAXMESSAGELENGTH) {
340N/A sendNotifications();
340N/A }
340N/A }
340N/A
340N/A private void sendNotifications() {
340N/A if (notifications.length() > 0) {
340N/A doNotify(NOTIFICATIONACTIONTYPE, "FilesInfo", notifications.toString());
340N/A notifications.delete(0, notifications.length());
340N/A }
340N/A }
340N/A
1463N/A /**
1463N/A * {@inheritDoc}
1463N/A */
1054N/A @Override
340N/A public long lastIndexTimeFinished() {
340N/A return lastIndexFinish;
340N/A }
340N/A
1463N/A /**
1463N/A * {@inheritDoc}
1463N/A */
1054N/A @Override
340N/A public long lastIndexTimeStarted() {
340N/A return lastIndexStart;
340N/A }
340N/A
1463N/A /**
1463N/A * {@inheritDoc}
1463N/A */
1054N/A @Override
340N/A public long lastIndexTimeUsed() {
340N/A return lastIndexUsedTime;
340N/A }
340N/A
1463N/A /**
1463N/A * {@inheritDoc}
1463N/A */
1054N/A @Override
340N/A public Exception getExceptions() {
340N/A return lastException;
340N/A }
340N/A
1463N/A /**
1463N/A * {@inheritDoc}
1463N/A */
1054N/A @Override
1463N/A public void addNotificationListener(NotificationListener notiflistener,
1463N/A NotificationFilter notfilt, Object obj) throws IllegalArgumentException
1463N/A {
1327N/A log.log(Level.CONFIG, "Adds a notify listener, with obj {0}", obj.toString());
340N/A if (notiflistener == null) {
340N/A throw new IllegalArgumentException("Must have legal NotificationListener");
340N/A }
340N/A synchronized (notifListeners) {
340N/A notifListeners.add(new NotificationHolder(notiflistener, notfilt, obj));
340N/A }
340N/A }
340N/A
1463N/A /**
1463N/A * {@inheritDoc}
1463N/A */
1054N/A @Override
1463N/A public void removeNotificationListener(NotificationListener notiflistener)
1463N/A throws ListenerNotFoundException
1463N/A {
1327N/A log.info("removes a notify listener, no obj");
340N/A boolean removed = false;
340N/A synchronized (notifListeners) {
1185N/A Iterator<NotificationHolder> it = notifListeners.iterator();
340N/A while (it.hasNext()) {
1185N/A NotificationHolder mnf = it.next();
340N/A if (mnf.getNL().equals(notiflistener)) {
340N/A it.remove();
340N/A removed = true;
340N/A }
340N/A }
340N/A }
340N/A if (!removed) {
460N/A throw new ListenerNotFoundException("Didn't remove the given NotificationListener");
340N/A }
340N/A }
340N/A
1463N/A /**
1463N/A * {@inheritDoc}
1463N/A */
1054N/A @Override
1463N/A public void removeNotificationListener(NotificationListener notiflistener,
1463N/A NotificationFilter filt, Object obj) throws ListenerNotFoundException
1463N/A {
1327N/A log.log(Level.CONFIG, "removes a notify listener obj {0}", obj);
340N/A boolean removed = false;
340N/A synchronized (notifListeners) {
1185N/A Iterator<NotificationHolder> it = notifListeners.iterator();
340N/A while (it.hasNext()) {
1185N/A NotificationHolder mnf = it.next();
1190N/A if (mnf.getNL().equals(notiflistener)
1190N/A && ((mnf.getFilter() == null) || mnf.getFilter().equals(filt))
460N/A && ((mnf.getFilter() == null) || mnf.getObj().equals(obj))) {
340N/A it.remove();
340N/A removed = true;
340N/A }
340N/A }
340N/A }
340N/A if (!removed) {
460N/A throw new ListenerNotFoundException("Didn't remove the given NotificationListener");
340N/A }
340N/A }
340N/A
340N/A /**
340N/A * Method that the subclass can override, but doesn't have to
1463N/A * @return MBeanNotificationInfo array of notification (and types) this
1463N/A * class can emitt.
340N/A */
1054N/A @Override
340N/A public MBeanNotificationInfo[] getNotificationInfo() {
340N/A MBeanNotificationInfo[] info = new MBeanNotificationInfo[1];
1463N/A String[] supptypes = { NOTIFICATIONACTIONTYPE, NOTIFICATIONINFOLONGTYPE,
1463N/A NOTIFICATIONINFOSTRINGTYPE };
340N/A String name = "AgentIndexRunner";
340N/A String descr = "OpenGrok Indexer Notifications";
340N/A MBeanNotificationInfo minfo = new MBeanNotificationInfo(supptypes, name,
340N/A descr);
340N/A info[0] = minfo;
340N/A return info;
340N/A }
340N/A
1463N/A @SuppressWarnings("boxing")
340N/A private void doNotify(String type, String msg, Object userdata) {
340N/A try {
1463N/A log.log(Level.CONFIG, "start notifying {0} listeners",
1463N/A notifListeners.size());
340N/A long ts = System.currentTimeMillis();
340N/A sequenceNo++;
340N/A Notification notif = new Notification(type, this, sequenceNo, ts, msg);
340N/A notif.setUserData(userdata);
340N/A synchronized (notifListeners) {
368N/A for (NotificationHolder nl : notifListeners) {
1054N/A log.log(Level.FINE, "having one with obj {0}", nl.getObj());
368N/A try {
368N/A if ((nl.getFilter() == null) ||
368N/A nl.getFilter().isNotificationEnabled(notif)) {
368N/A nl.getNL().handleNotification(notif, nl.getObj());
340N/A }
368N/A } catch (Exception exnot) {
1054N/A log.log(Level.WARNING, "Ex " + exnot, exnot);
340N/A }
340N/A }
340N/A }
340N/A } catch (Exception ex) {
1327N/A log.warning("Exception during notification sending: " + ex.getMessage());
1327N/A log.log(Level.FINE, "doNotify", ex);
340N/A }
340N/A }
1190N/A}