TaskScheduler.java revision ca669ae54f86dbeea277280690584d9f591c7571
325N/A/*
325N/A * CDDL HEADER START
325N/A *
325N/A * The contents of this file are subject to the terms of the
325N/A * Common Development and Distribution License, Version 1.0 only
325N/A * (the "License"). You may not use this file except in compliance
325N/A * with the License.
325N/A *
325N/A * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
325N/A * or http://forgerock.org/license/CDDLv1.0.html.
325N/A * See the License for the specific language governing permissions
325N/A * and limitations under the License.
325N/A *
325N/A * When distributing Covered Code, include this CDDL HEADER in each
325N/A * file and include the License file at legal-notices/CDDLv1_0.txt.
325N/A * If applicable, add the following below this CDDL HEADER, with the
325N/A * fields enclosed by brackets "[]" replaced with your own identifying
325N/A * information:
325N/A * Portions Copyright [yyyy] [name of copyright owner]
325N/A *
325N/A * CDDL HEADER END
325N/A *
325N/A *
325N/A * Copyright 2006-2010 Sun Microsystems, Inc.
325N/A * Portions Copyright 2013-2015 ForgeRock AS
325N/A */
325N/Apackage org.opends.server.backends.task;
325N/A
325N/Aimport static org.opends.messages.BackendMessages.*;
325N/Aimport static org.opends.server.config.ConfigConstants.*;
325N/Aimport static org.opends.server.util.ServerConstants.*;
325N/Aimport static org.opends.server.util.StaticUtils.*;
325N/A
325N/Aimport java.io.File;
325N/Aimport java.io.IOException;
325N/Aimport java.util.*;
325N/Aimport java.util.concurrent.TimeUnit;
325N/Aimport java.util.concurrent.locks.Lock;
325N/Aimport java.util.concurrent.locks.ReentrantLock;
325N/A
325N/Aimport org.forgerock.i18n.LocalizableMessage;
325N/Aimport org.opends.server.api.AlertGenerator;
325N/Aimport org.opends.server.api.DirectoryThread;
325N/Aimport org.opends.server.core.DirectoryServer;
325N/Aimport org.opends.server.core.SearchOperation;
325N/Aimport org.forgerock.i18n.slf4j.LocalizedLogger;
325N/Aimport org.opends.server.types.*;
325N/Aimport org.forgerock.opendj.ldap.ByteString;
325N/Aimport org.forgerock.opendj.ldap.ResultCode;
325N/Aimport org.opends.server.util.LDIFException;
325N/Aimport org.opends.server.util.LDIFReader;
325N/Aimport org.opends.server.util.LDIFWriter;
325N/Aimport org.opends.server.util.TimeThread;
325N/A
325N/A/**
325N/A * This class defines a task scheduler for the Directory Server that will
325N/A * control the execution of scheduled tasks and other administrative functions
325N/A * that need to occur on a regular basis.
325N/A */
325N/Apublic class TaskScheduler
325N/A extends DirectoryThread
325N/A implements AlertGenerator
325N/A{
325N/A private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
325N/A
325N/A /**
325N/A * The fully-qualified name of this class.
325N/A */
325N/A private static final String CLASS_NAME =
325N/A "org.opends.server.backends.task.TaskScheduler";
325N/A
325N/A
325N/A
325N/A /**
325N/A * The maximum length of time in milliseconds to sleep between iterations
325N/A * through the scheduler loop.
325N/A */
325N/A private static long MAX_SLEEP_TIME = 5000;
325N/A
325N/A
325N/A
325N/A // Indicates whether the scheduler is currently running.
325N/A private boolean isRunning;
325N/A
325N/A // Indicates whether a request has been received to stop the scheduler.
325N/A private boolean stopRequested;
325N/A
325N/A // The entry that serves as the immediate parent for recurring tasks.
325N/A private Entry recurringTaskParentEntry;
325N/A
325N/A // The entry that serves as the immediate parent for scheduled tasks.
325N/A private Entry scheduledTaskParentEntry;
325N/A
325N/A // The top-level entry at the root of the task tree.
325N/A private Entry taskRootEntry;
325N/A
325N/A // The set of recurring tasks defined in the server.
325N/A private HashMap<String,RecurringTask> recurringTasks;
325N/A
325N/A // The set of tasks associated with this scheduler.
325N/A private HashMap<String,Task> tasks;
325N/A
325N/A // The set of worker threads that are actively busy processing tasks.
325N/A private HashMap<String,TaskThread> activeThreads;
325N/A
325N/A // The thread ID for the next task thread to be created;
325N/A private int nextThreadID;
325N/A
325N/A // The set of worker threads that may be used to process tasks.
325N/A private LinkedList<TaskThread> idleThreads;
325N/A
325N/A // The lock used to provide threadsafe access to the scheduler.
325N/A private final ReentrantLock schedulerLock;
325N/A
325N/A // The task backend with which this scheduler is associated.
325N/A private TaskBackend taskBackend;
325N/A
325N/A // The thread being used to actually run the scheduler.
325N/A private Thread schedulerThread;
325N/A
325N/A // The set of recently-completed tasks that need to be retained.
325N/A private TreeSet<Task> completedTasks;
325N/A
325N/A // The set of tasks that have been scheduled but not yet arrived.
325N/A private TreeSet<Task> pendingTasks;
325N/A
325N/A // The set of tasks that are currently running.
325N/A private TreeSet<Task> runningTasks;
325N/A
325N/A
325N/A
325N/A /**
325N/A * Creates a new task scheduler that will be used to ensure that tasks are
325N/A * invoked at the appropriate times.
325N/A *
325N/A * @param taskBackend The task backend with which this scheduler is
325N/A * associated.
325N/A *
325N/A * @throws InitializationException If a problem occurs while initializing
325N/A * the scheduler from the backing file.
325N/A */
325N/A public TaskScheduler(TaskBackend taskBackend)
325N/A throws InitializationException
325N/A {
325N/A super("Task Scheduler Thread");
325N/A
325N/A
325N/A this.taskBackend = taskBackend;
325N/A
325N/A schedulerLock = new ReentrantLock();
325N/A isRunning = false;
325N/A stopRequested = false;
325N/A schedulerThread = null;
325N/A nextThreadID = 1;
325N/A recurringTasks = new HashMap<String,RecurringTask>();
325N/A tasks = new HashMap<String,Task>();
325N/A activeThreads = new HashMap<String,TaskThread>();
325N/A idleThreads = new LinkedList<TaskThread>();
325N/A completedTasks = new TreeSet<Task>();
325N/A pendingTasks = new TreeSet<Task>();
325N/A runningTasks = new TreeSet<Task>();
325N/A taskRootEntry = null;
325N/A recurringTaskParentEntry = null;
325N/A scheduledTaskParentEntry = null;
325N/A
325N/A DirectoryServer.registerAlertGenerator(this);
325N/A
325N/A initializeTasksFromBackingFile();
325N/A
325N/A for (RecurringTask recurringTask : recurringTasks.values()) {
325N/A Task task = null;
325N/A try {
325N/A task = recurringTask.scheduleNextIteration(new GregorianCalendar());
325N/A } catch (DirectoryException de) {
325N/A logger.error(de.getMessageObject());
325N/A }
325N/A if (task != null) {
325N/A try {
325N/A scheduleTask(task, false);
325N/A } catch (DirectoryException de) {
325N/A // This task might have been already scheduled from before
325N/A // and thus got initialized from backing file, otherwise
325N/A // log error and continue.
325N/A if (de.getResultCode() != ResultCode.ENTRY_ALREADY_EXISTS) {
325N/A logger.error(de.getMessageObject());
325N/A }
325N/A }
325N/A }
325N/A }
325N/A }
325N/A
325N/A
325N/A
325N/A /**
325N/A * Adds a recurring task to the scheduler, optionally scheduling the first
325N/A * iteration for processing.
325N/A *
325N/A * @param recurringTask The recurring task to add to the scheduler.
325N/A * @param scheduleIteration Indicates whether to schedule an iteration of
325N/A * the recurring task.
325N/A *
325N/A * @throws DirectoryException If a problem occurs while trying to add the
325N/A * recurring task (e.g., there's already another
325N/A * recurring task defined with the same ID).
325N/A */
325N/A public void addRecurringTask(RecurringTask recurringTask,
325N/A boolean scheduleIteration)
325N/A throws DirectoryException
325N/A {
325N/A schedulerLock.lock();
325N/A
325N/A try
325N/A {
325N/A String id = recurringTask.getRecurringTaskID();
325N/A
325N/A if (recurringTasks.containsKey(id))
325N/A {
325N/A LocalizableMessage message =
325N/A ERR_TASKSCHED_DUPLICATE_RECURRING_ID.get(id);
325N/A throw new DirectoryException(ResultCode.ENTRY_ALREADY_EXISTS, message);
325N/A }
325N/A
325N/A Attribute attr = Attributes.create(ATTR_TASK_STATE,
325N/A TaskState.RECURRING.toString());
325N/A ArrayList<Attribute> attrList = new ArrayList<Attribute>(1);
325N/A attrList.add(attr);
325N/A Entry recurringTaskEntry = recurringTask.getRecurringTaskEntry();
325N/A recurringTaskEntry.putAttribute(attr.getAttributeType(), attrList);
325N/A
325N/A if (scheduleIteration)
325N/A {
325N/A Task task = recurringTask.scheduleNextIteration(
325N/A new GregorianCalendar());
325N/A if (task != null)
325N/A {
325N/A // If there is an existing task with the same id
325N/A // and it is in completed state, take its place.
325N/A Task t = tasks.get(task.getTaskID());
325N/A if ((t != null) && TaskState.isDone(t.getTaskState()))
325N/A {
325N/A removeCompletedTask(t.getTaskID());
325N/A }
325N/A
325N/A scheduleTask(task, false);
325N/A }
325N/A }
325N/A
325N/A recurringTasks.put(id, recurringTask);
325N/A writeState();
325N/A }
325N/A finally
325N/A {
325N/A schedulerLock.unlock();
325N/A }
325N/A }
325N/A
325N/A
325N/A
325N/A /**
325N/A * Removes the recurring task with the given ID.
325N/A *
325N/A * @param recurringTaskID The ID of the recurring task to remove.
325N/A *
325N/A * @return The recurring task that was removed, or <CODE>null</CODE> if there
325N/A * was no such recurring task.
325N/A *
325N/A * @throws DirectoryException If there is currently a pending or running
325N/A * iteration of the associated recurring task.
325N/A */
325N/A public RecurringTask removeRecurringTask(String recurringTaskID)
325N/A throws DirectoryException
325N/A {
325N/A schedulerLock.lock();
325N/A
325N/A try
325N/A {
325N/A RecurringTask recurringTask = recurringTasks.remove(recurringTaskID);
325N/A HashMap<String,Task> iterationsMap = new HashMap<String,Task>();
325N/A
325N/A for (Task t : tasks.values())
325N/A {
325N/A // Find any existing task iterations and try to cancel them.
325N/A if ((t.getRecurringTaskID() != null) &&
325N/A (t.getRecurringTaskID().equals(recurringTaskID)))
325N/A {
325N/A TaskState state = t.getTaskState();
325N/A if (!TaskState.isDone(state) && !TaskState.isCancelled(state))
325N/A {
325N/A cancelTask(t.getTaskID());
325N/A }
325N/A iterationsMap.put(t.getTaskID(), t);
325N/A }
325N/A }
325N/A
325N/A // Remove any completed task iterations.
325N/A for (Map.Entry<String,Task> iterationEntry : iterationsMap.entrySet())
325N/A {
325N/A if (TaskState.isDone(iterationEntry.getValue().getTaskState()))
325N/A {
325N/A removeCompletedTask(iterationEntry.getKey());
325N/A }
325N/A }
325N/A
325N/A writeState();
325N/A return recurringTask;
325N/A }
325N/A finally
325N/A {
325N/A schedulerLock.unlock();
325N/A }
325N/A }
325N/A
325N/A
325N/A
325N/A /**
325N/A * Schedules the provided task for execution. If the scheduler is active and
325N/A * the start time has arrived, then the task will begin execution immediately.
325N/A * Otherwise, it will be placed in the pending queue to be started at the
325N/A * appropriate time.
325N/A *
325N/A * @param task The task to be scheduled.
325N/A * @param writeState Indicates whether the current state information for
325N/A * the scheduler should be persisted to disk once the
325N/A * task is scheduled.
325N/A *
325N/A * @throws DirectoryException If a problem occurs while trying to schedule
325N/A * the task (e.g., there's already another task
325N/A * defined with the same ID).
325N/A */
325N/A public void scheduleTask(Task task, boolean writeState)
325N/A throws DirectoryException
325N/A {
325N/A schedulerLock.lock();
325N/A
325N/A
325N/A try
325N/A {
325N/A String id = task.getTaskID();
325N/A
325N/A if (tasks.containsKey(id))
325N/A {
325N/A LocalizableMessage message = ERR_TASKSCHED_DUPLICATE_TASK_ID.get(id);
325N/A throw new DirectoryException(ResultCode.ENTRY_ALREADY_EXISTS, message);
325N/A }
325N/A
325N/A for (String dependencyID : task.getDependencyIDs())
325N/A {
325N/A Task t = tasks.get(dependencyID);
325N/A if (t == null)
325N/A {
325N/A LocalizableMessage message = ERR_TASKSCHED_DEPENDENCY_MISSING.get(id, dependencyID);
325N/A throw new DirectoryException(ResultCode.NO_SUCH_OBJECT, message);
325N/A }
325N/A }
325N/A
325N/A tasks.put(id, task);
325N/A
325N/A TaskState state = shouldStart(task);
325N/A task.setTaskState(state);
325N/A
325N/A if (state == TaskState.RUNNING)
325N/A {
325N/A TaskThread taskThread;
325N/A if (idleThreads.isEmpty())
325N/A {
325N/A taskThread = new TaskThread(this, nextThreadID++);
325N/A taskThread.start();
325N/A }
325N/A else
325N/A {
325N/A taskThread = idleThreads.removeFirst();
325N/A }
325N/A
325N/A runningTasks.add(task);
325N/A activeThreads.put(task.getTaskID(), taskThread);
325N/A taskThread.setTask(task);
325N/A }
325N/A else if (TaskState.isDone(state))
325N/A {
325N/A if ((state == TaskState.CANCELED_BEFORE_STARTING) &&
325N/A task.isRecurring())
325N/A {
325N/A pendingTasks.add(task);
325N/A }
325N/A else
325N/A {
325N/A completedTasks.add(task);
325N/A }
325N/A }
325N/A else
325N/A {
325N/A pendingTasks.add(task);
325N/A }
325N/A
325N/A if (writeState)
325N/A {
325N/A writeState();
325N/A }
325N/A }
325N/A finally
325N/A {
325N/A schedulerLock.unlock();
325N/A }
325N/A }
325N/A
325N/A
325N/A
325N/A /**
325N/A * Attempts to cancel the task with the given task ID. This will only cancel
325N/A * the task if it has not yet started running. If it has started, then it
325N/A * will not be interrupted.
325N/A *
325N/A * @param taskID The task ID of the task to cancel.
325N/A *
325N/A * @return The requested task, which may or may not have actually been
325N/A * cancelled (the task state should make it possible to determine
325N/A * whether it was cancelled), or <CODE>null</CODE> if there is no
325N/A * such task.
325N/A */
325N/A public Task cancelTask(String taskID)
325N/A {
325N/A schedulerLock.lock();
325N/A
325N/A try
325N/A {
325N/A Task t = tasks.get(taskID);
325N/A if (t == null)
325N/A {
325N/A return null;
325N/A }
325N/A
325N/A if (TaskState.isPending(t.getTaskState()))
325N/A {
325N/A pendingTasks.remove(t);
325N/A t.setTaskState(TaskState.CANCELED_BEFORE_STARTING);
325N/A addCompletedTask(t);
325N/A writeState();
325N/A }
325N/A
325N/A return t;
325N/A }
325N/A finally
325N/A {
325N/A schedulerLock.unlock();
325N/A }
325N/A }
325N/A
325N/A
325N/A
325N/A /**
325N/A * Removes the specified pending task. It will be completely removed rather
325N/A * than moving it to the set of completed tasks.
325N/A *
325N/A * @param taskID The task ID of the pending task to remove.
325N/A *
325N/A * @return The task that was removed.
325N/A *
325N/A * @throws DirectoryException If the requested task is not in the pending
325N/A * queue.
325N/A */
325N/A public Task removePendingTask(String taskID)
325N/A throws DirectoryException
325N/A {
325N/A schedulerLock.lock();
325N/A
325N/A try
325N/A {
325N/A Task t = tasks.get(taskID);
325N/A if (t == null)
325N/A {
325N/A LocalizableMessage message = ERR_TASKSCHED_REMOVE_PENDING_NO_SUCH_TASK.get(taskID);
325N/A throw new DirectoryException(ResultCode.NO_SUCH_OBJECT, message);
325N/A }
325N/A
325N/A if (TaskState.isPending(t.getTaskState()))
325N/A {
325N/A tasks.remove(taskID);
325N/A pendingTasks.remove(t);
325N/A writeState();
325N/A return t;
325N/A }
325N/A else
325N/A {
325N/A LocalizableMessage message = ERR_TASKSCHED_REMOVE_PENDING_NOT_PENDING.get(taskID);
325N/A throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, message);
325N/A }
325N/A }
325N/A finally
325N/A {
325N/A schedulerLock.unlock();
325N/A }
325N/A }
325N/A
325N/A
325N/A
325N/A /**
325N/A * Removes the specified completed task.
325N/A *
325N/A * @param taskID The task ID of the completed task to remove.
325N/A *
325N/A * @return The task that was removed.
325N/A *
325N/A * @throws DirectoryException If the requested task could not be found.
325N/A */
325N/A public Task removeCompletedTask(String taskID)
325N/A throws DirectoryException
325N/A {
325N/A schedulerLock.lock();
325N/A
325N/A try
325N/A {
325N/A Iterator<Task> iterator = completedTasks.iterator();
325N/A while (iterator.hasNext())
325N/A {
325N/A Task t = iterator.next();
325N/A if (t.getTaskID().equals(taskID))
325N/A {
325N/A iterator.remove();
325N/A tasks.remove(taskID);
325N/A writeState();
325N/A return t;
325N/A }
325N/A }
325N/A
325N/A LocalizableMessage message = ERR_TASKSCHED_REMOVE_COMPLETED_NO_SUCH_TASK.get(taskID);
325N/A throw new DirectoryException(ResultCode.NO_SUCH_OBJECT, message);
325N/A }
325N/A finally
325N/A {
325N/A schedulerLock.unlock();
325N/A }
325N/A }
325N/A
325N/A
325N/A
325N/A /**
325N/A * Indicates that processing has completed on the provided task thread and
325N/A * that it is now available for processing other tasks. The thread may be
325N/A * immediately used for processing another task if appropriate.
325N/A *
325N/A * @param taskThread The thread that has completed processing on its
325N/A * previously-assigned task.
325N/A * @param completedTask The task for which processing has been completed.
325N/A * @param taskState The task state for this completed task.
325N/A *
325N/A * @return <CODE>true</CODE> if the thread should continue running and
325N/A * wait for the next task to process, or <CODE>false</CODE> if it
325N/A * should exit immediately.
325N/A */
325N/A public boolean threadDone(TaskThread taskThread, Task completedTask,
325N/A TaskState taskState)
325N/A {
325N/A schedulerLock.lock();
325N/A
325N/A try
325N/A {
325N/A completedTask.setCompletionTime(TimeThread.getTime());
325N/A completedTask.setTaskState(taskState);
325N/A addCompletedTask(completedTask);
325N/A
325N/A try
325N/A {
325N/A completedTask.sendNotificationEMailMessage();
325N/A }
325N/A catch (Exception e)
325N/A {
325N/A logger.traceException(e);
325N/A }
325N/A
325N/A String taskID = completedTask.getTaskID();
325N/A if (activeThreads.remove(taskID) == null)
325N/A {
325N/A return false;
325N/A }
325N/A
325N/A // See if the task is part of a recurring task.
325N/A // If so, then schedule the next iteration.
325N/A scheduleNextRecurringTaskIteration(completedTask,
325N/A new GregorianCalendar());
325N/A writeState();
325N/A
325N/A if (isRunning)
325N/A {
325N/A idleThreads.add(taskThread);
325N/A return true;
325N/A }
325N/A else
325N/A {
325N/A return false;
325N/A }
325N/A }
325N/A finally
325N/A {
325N/A schedulerLock.unlock();
325N/A }
325N/A }
325N/A
325N/A
325N/A
325N/A /**
325N/A * Check if a given task is a recurring task iteration and re-schedule it.
325N/A * @param completedTask The task for which processing has been completed.
325N/A * @param calendar The calendar date and time to schedule from.
325N/A */
325N/A protected void scheduleNextRecurringTaskIteration(Task completedTask,
325N/A GregorianCalendar calendar)
325N/A {
325N/A String recurringTaskID = completedTask.getRecurringTaskID();
325N/A if (recurringTaskID != null)
325N/A {
325N/A RecurringTask recurringTask = recurringTasks.get(recurringTaskID);
325N/A if (recurringTask != null)
325N/A {
325N/A Task newIteration = null;
325N/A try {
newIteration = recurringTask.scheduleNextIteration(calendar);
} catch (DirectoryException de) {
logger.error(de.getMessageObject());
}
if (newIteration != null)
{
try
{
// If there is an existing task with the same id
// and it is in completed state, take its place.
Task t = tasks.get(newIteration.getTaskID());
if ((t != null) && TaskState.isDone(t.getTaskState()))
{
removeCompletedTask(t.getTaskID());
}
scheduleTask(newIteration, false);
}
catch (DirectoryException de)
{
// This task might have been already scheduled from before
// and thus got initialized from backing file, otherwise
// log error and continue.
if (de.getResultCode() != ResultCode.ENTRY_ALREADY_EXISTS)
{
logger.traceException(de);
LocalizableMessage message =
ERR_TASKSCHED_ERROR_SCHEDULING_RECURRING_ITERATION.get(
recurringTaskID, de.getMessageObject());
logger.error(message);
DirectoryServer.sendAlertNotification(this,
ALERT_TYPE_CANNOT_SCHEDULE_RECURRING_ITERATION,
message);
}
}
}
}
}
}
/**
* Adds the provided task to the set of completed tasks associated with the
* scheduler. It will be automatically removed after the appropriate
* retention time has elapsed.
*
* @param completedTask The task for which processing has completed.
*/
public void addCompletedTask(Task completedTask)
{
// The scheduler lock is reentrant, so even if we already hold it, we can
// acquire it again.
schedulerLock.lock();
try
{
completedTasks.add(completedTask);
runningTasks.remove(completedTask);
// If the task never ran set its completion
// time here explicitly so that it can be
// correctly evaluated for retention later.
if (completedTask.getCompletionTime() == -1)
{
completedTask.setCompletionTime(TimeThread.getTime());
}
}
finally
{
schedulerLock.unlock();
}
}
/**
* Stops the scheduler so that it will not start any scheduled tasks. It will
* not attempt to interrupt any tasks that are already running. Note that
* once the scheduler has been stopped, it cannot be restarted and it will be
* necessary to restart the task backend to start a new scheduler instance.
*/
public void stopScheduler()
{
stopRequested = true;
try
{
schedulerThread.interrupt();
}
catch (Exception e)
{
logger.traceException(e);
}
try
{
schedulerThread.join();
}
catch (Exception e)
{
logger.traceException(e);
}
pendingTasks.clear();
runningTasks.clear();
completedTasks.clear();
tasks.clear();
for (TaskThread thread : idleThreads)
{
LocalizableMessage message = INFO_TASKBE_INTERRUPTED_BY_SHUTDOWN.get();
thread.interruptTask(TaskState.STOPPED_BY_SHUTDOWN, message, true);
}
}
/**
* Attempts to interrupt any tasks that are actively running. This will not
* make any attempt to stop the scheduler.
*
* @param interruptState The state that should be assigned to the tasks if
* they are successfully interrupted.
* @param interruptReason A message indicating the reason that the tasks
* are to be interrupted.
* @param waitForStop Indicates whether this method should wait until
* all active tasks have stopped before returning.
*/
public void interruptRunningTasks(TaskState interruptState,
LocalizableMessage interruptReason,
boolean waitForStop)
{
// Grab a copy of the running threads so that we can operate on them without
// holding the lock.
LinkedList<TaskThread> threadList = new LinkedList<TaskThread>();
schedulerLock.lock();
try
{
threadList.addAll(activeThreads.values());
}
finally
{
schedulerLock.unlock();
}
// Iterate through all the task threads and request that they stop
// processing.
for (TaskThread t : threadList)
{
try
{
t.interruptTask(interruptState, interruptReason, true);
}
catch (Exception e)
{
logger.traceException(e);
}
}
// If we should actually wait for all the task threads to stop, then do so.
if (waitForStop)
{
for (TaskThread t : threadList)
{
try
{
t.join();
}
catch (Exception e)
{
logger.traceException(e);
}
}
}
}
/**
* Operates in a loop, launching tasks at the appropriate time and performing
* any necessary periodic cleanup.
*/
@Override
public void run()
{
isRunning = true;
schedulerThread = currentThread();
try
{
while (! stopRequested)
{
schedulerLock.lock();
boolean writeState = false;
long sleepTime = MAX_SLEEP_TIME;
try
{
// If there are any pending tasks that need to be started, then do so
// now.
Iterator<Task> iterator = pendingTasks.iterator();
while (iterator.hasNext())
{
Task t = iterator.next();
TaskState state = shouldStart(t);
if (state == TaskState.RUNNING)
{
TaskThread taskThread;
if (idleThreads.isEmpty())
{
taskThread = new TaskThread(this, nextThreadID++);
taskThread.start();
}
else
{
taskThread = idleThreads.removeFirst();
}
runningTasks.add(t);
activeThreads.put(t.getTaskID(), taskThread);
taskThread.setTask(t);
iterator.remove();
writeState = true;
}
else if (state == TaskState.WAITING_ON_START_TIME)
{
// If we're waiting for the start time to arrive, then see if that
// will come before the next sleep time is up.
long waitTime = t.getScheduledStartTime() - TimeThread.getTime();
sleepTime = Math.min(sleepTime, waitTime);
}
// Recurring task iteration has to spawn the next one
// even if the current iteration has been canceled.
else if ((state == TaskState.CANCELED_BEFORE_STARTING) &&
t.isRecurring())
{
if (t.getScheduledStartTime() > TimeThread.getTime()) {
// If we're waiting for the start time to arrive,
// then see if that will come before the next
// sleep time is up.
long waitTime =
t.getScheduledStartTime() - TimeThread.getTime();
sleepTime = Math.min(sleepTime, waitTime);
} else {
TaskThread taskThread;
if (idleThreads.isEmpty()) {
taskThread = new TaskThread(this, nextThreadID++);
taskThread.start();
} else {
taskThread = idleThreads.removeFirst();
}
runningTasks.add(t);
activeThreads.put(t.getTaskID(), taskThread);
taskThread.setTask(t);
}
}
if (state != t.getTaskState())
{
t.setTaskState(state);
writeState = true;
}
}
// Clean up any completed tasks that have been around long enough.
long retentionTimeMillis =
TimeUnit.SECONDS.toMillis(taskBackend.getRetentionTime());
long oldestRetainedCompletionTime =
TimeThread.getTime() - retentionTimeMillis;
iterator = completedTasks.iterator();
while (iterator.hasNext())
{
Task t = iterator.next();
if (t.getCompletionTime() < oldestRetainedCompletionTime)
{
iterator.remove();
tasks.remove(t.getTaskID());
writeState = true;
}
}
// If anything changed, then make sure that the on-disk state gets
// updated.
if (writeState)
{
writeState();
}
}
finally
{
schedulerLock.unlock();
}
try
{
if (sleepTime > 0)
{
Thread.sleep(sleepTime);
}
} catch (InterruptedException ie) {}
}
}
finally
{
isRunning = false;
}
}
/**
* Determines whether the specified task should start running. This is based
* on the start time, the set of dependencies, and whether or not the
* scheduler is active. Note that the caller to this method must hold the
* scheduler lock.
*
* @param task The task for which to make the determination.
*
* @return The task state that should be used for the task. It should be
* RUNNING if the task should be started, or some other state if not.
*/
private TaskState shouldStart(Task task)
{
// If the task has finished we don't want to restart it
TaskState state = task.getTaskState();
// Reset task state if recurring.
if (state == TaskState.RECURRING) {
state = null;
}
if ((state != null) && TaskState.isDone(state))
{
return state;
}
if (! isRunning)
{
return TaskState.UNSCHEDULED;
}
if (task.getScheduledStartTime() > TimeThread.getTime())
{
return TaskState.WAITING_ON_START_TIME;
}
LinkedList<String> dependencyIDs = task.getDependencyIDs();
if (dependencyIDs != null)
{
for (String dependencyID : dependencyIDs)
{
Task t = tasks.get(dependencyID);
if (t != null)
{
TaskState tState = t.getTaskState();
if (!TaskState.isDone(tState))
{
return TaskState.WAITING_ON_DEPENDENCY;
}
if (!TaskState.isSuccessful(tState))
{
FailedDependencyAction action = task.getFailedDependencyAction();
switch (action)
{
case CANCEL:
cancelTask(task.getTaskID());
return task.getTaskState();
case DISABLE:
task.setTaskState(TaskState.DISABLED);
return task.getTaskState();
default:
break;
}
}
}
}
}
return TaskState.RUNNING;
}
/**
* Populates the scheduler with information read from the task backing file.
* If no backing file is found, then create a new one. The caller must
* already hold the scheduler lock or otherwise ensure that this is a
* threadsafe operation.
*
* @throws InitializationException If a fatal error occurs while attempting
* to perform the initialization.
*/
private void initializeTasksFromBackingFile()
throws InitializationException
{
String backingFilePath = taskBackend.getTaskBackingFile();
try
{
File backingFile = getFileForPath(backingFilePath);
if (! backingFile.exists())
{
createNewTaskBackingFile();
return;
}
LDIFImportConfig importConfig = new LDIFImportConfig(backingFilePath);
LDIFReader ldifReader = new LDIFReader(importConfig);
taskRootEntry = null;
recurringTaskParentEntry = null;
scheduledTaskParentEntry = null;
while (true)
{
Entry entry;
try
{
entry = ldifReader.readEntry();
}
catch (LDIFException le)
{
logger.traceException(le);
if (le.canContinueReading())
{
logger.error(ERR_TASKSCHED_CANNOT_PARSE_ENTRY_RECOVERABLE,
backingFilePath, le.getLineNumber(), le.getMessage());
continue;
}
else
{
try
{
ldifReader.close();
}
catch (Exception e)
{
logger.traceException(e);
}
LocalizableMessage message = ERR_TASKSCHED_CANNOT_PARSE_ENTRY_FATAL.get(
backingFilePath, le.getLineNumber(), le.getMessage());
throw new InitializationException(message);
}
}
if (entry == null)
{
break;
}
DN entryDN = entry.getName();
if (entryDN.equals(taskBackend.getTaskRootDN()))
{
taskRootEntry = entry;
}
else if (entryDN.equals(taskBackend.getRecurringTasksParentDN()))
{
recurringTaskParentEntry = entry;
}
else if (entryDN.equals(taskBackend.getScheduledTasksParentDN()))
{
scheduledTaskParentEntry = entry;
}
else
{
DN parentDN = entryDN.getParentDNInSuffix();
if (parentDN == null)
{
logger.error(ERR_TASKSCHED_ENTRY_HAS_NO_PARENT, entryDN, taskBackend.getTaskRootDN());
}
else if (parentDN.equals(taskBackend.getScheduledTasksParentDN()))
{
try
{
Task task = entryToScheduledTask(entry, null);
if (TaskState.isDone(task.getTaskState()))
{
String id = task.getTaskID();
if (tasks.containsKey(id))
{
logger.warn(WARN_TASKSCHED_DUPLICATE_TASK_ID, id);
}
else
{
completedTasks.add(task);
tasks.put(id, task);
}
}
else
{
scheduleTask(task, false);
}
}
catch (DirectoryException de)
{
logger.traceException(de);
logger.error(ERR_TASKSCHED_CANNOT_SCHEDULE_TASK_FROM_ENTRY, entryDN, de.getMessageObject());
}
}
else if (parentDN.equals(taskBackend.getRecurringTasksParentDN()))
{
try
{
RecurringTask recurringTask = entryToRecurringTask(entry);
addRecurringTask(recurringTask, false);
}
catch (DirectoryException de)
{
logger.traceException(de);
logger.error(ERR_TASKSCHED_CANNOT_SCHEDULE_RECURRING_TASK_FROM_ENTRY, entryDN, de.getMessageObject());
}
}
else
{
logger.error(ERR_TASKSCHED_INVALID_TASK_ENTRY_DN, entryDN, backingFilePath);
}
}
}
ldifReader.close();
}
catch (IOException ioe)
{
logger.traceException(ioe);
LocalizableMessage message = ERR_TASKSCHED_ERROR_READING_TASK_BACKING_FILE.get(
backingFilePath, stackTraceToSingleLineString(ioe));
throw new InitializationException(message, ioe);
}
}
/**
* Creates a new task backing file that contains only the basic structure but
* no scheduled or recurring task entries. The caller must already hold the
* scheduler lock or otherwise ensure that this is a threadsafe operation.
*
* @throws InitializationException If a problem occurs while attempting to
* create the backing file.
*/
private void createNewTaskBackingFile()
throws InitializationException
{
String backingFile = taskBackend.getTaskBackingFile();
LDIFExportConfig exportConfig =
new LDIFExportConfig(backingFile, ExistingFileBehavior.OVERWRITE);
try
{
LDIFWriter writer = new LDIFWriter(exportConfig);
// First, write a header to the top of the file to indicate that it should
// not be manually edited.
writer.writeComment(INFO_TASKBE_BACKING_FILE_HEADER.get(), 80);
// Next, create the required hierarchical entries and add them to the
// LDIF.
taskRootEntry = createEntry(taskBackend.getTaskRootDN());
writer.writeEntry(taskRootEntry);
scheduledTaskParentEntry =
createEntry(taskBackend.getScheduledTasksParentDN());
writer.writeEntry(scheduledTaskParentEntry);
recurringTaskParentEntry =
createEntry(taskBackend.getRecurringTasksParentDN());
writer.writeEntry(recurringTaskParentEntry);
// Close the file and we're done.
writer.close();
}
catch (IOException ioe)
{
logger.traceException(ioe);
LocalizableMessage message = ERR_TASKSCHED_CANNOT_CREATE_BACKING_FILE.get(
backingFile, stackTraceToSingleLineString(ioe));
throw new InitializationException(message, ioe);
}
catch (LDIFException le)
{
logger.traceException(le);
LocalizableMessage message = ERR_TASKSCHED_CANNOT_CREATE_BACKING_FILE.get(
backingFile, le.getMessage());
throw new InitializationException(message, le);
}
}
/**
* Writes state information about all tasks and recurring tasks to disk.
*/
public void writeState()
{
String backingFilePath = taskBackend.getTaskBackingFile();
String tmpFilePath = backingFilePath + ".tmp";
LDIFExportConfig exportConfig =
new LDIFExportConfig(tmpFilePath, ExistingFileBehavior.OVERWRITE);
schedulerLock.lock();
try
{
LDIFWriter writer = new LDIFWriter(exportConfig);
// First, write a header to the top of the file to indicate that it should
// not be manually edited.
writer.writeComment(INFO_TASKBE_BACKING_FILE_HEADER.get(), 80);
// Next, write the structural entries to the top of the LDIF.
writer.writeEntry(taskRootEntry);
writer.writeEntry(scheduledTaskParentEntry);
writer.writeEntry(recurringTaskParentEntry);
// Iterate through all the recurring tasks and write them to LDIF.
for (RecurringTask recurringTask : recurringTasks.values())
{
writer.writeEntry(recurringTask.getRecurringTaskEntry());
}
// Iterate through all the scheduled tasks and write them to LDIF.
for (Task task : tasks.values())
{
writer.writeEntry(task.getTaskEntry());
}
// Close the file.
writer.close();
// See if there is a ".save" file. If so, then delete it.
File saveFile = getFileForPath(backingFilePath + ".save");
try
{
if (saveFile.exists())
{
saveFile.delete();
}
}
catch (Exception e)
{
logger.traceException(e);
}
// If there is an existing backing file, then rename it to ".save".
File backingFile = getFileForPath(backingFilePath);
try
{
if (backingFile.exists())
{
backingFile.renameTo(saveFile);
}
}
catch (Exception e)
{
logger.traceException(e);
LocalizableMessage message = WARN_TASKSCHED_CANNOT_RENAME_CURRENT_BACKING_FILE.get(
backingFilePath, saveFile.getAbsolutePath(), stackTraceToSingleLineString(e));
logger.warn(message);
DirectoryServer.sendAlertNotification(
this, ALERT_TYPE_CANNOT_RENAME_CURRENT_TASK_FILE, message);
}
// Rename the ".tmp" file into place.
File tmpFile = getFileForPath(tmpFilePath);
try
{
tmpFile.renameTo(backingFile);
}
catch (Exception e)
{
logger.traceException(e);
LocalizableMessage message = ERR_TASKSCHED_CANNOT_RENAME_NEW_BACKING_FILE.get(
tmpFilePath, backingFilePath, stackTraceToSingleLineString(e));
logger.error(message);
DirectoryServer.sendAlertNotification(
this, ALERT_TYPE_CANNOT_RENAME_NEW_TASK_FILE, message);
}
}
catch (IOException ioe)
{
logger.traceException(ioe);
LocalizableMessage message =
ERR_TASKSCHED_CANNOT_WRITE_BACKING_FILE.get(tmpFilePath,
stackTraceToSingleLineString(ioe));
logger.error(message);
DirectoryServer.sendAlertNotification(this,
ALERT_TYPE_CANNOT_WRITE_TASK_FILE, message);
}
catch (LDIFException le)
{
logger.traceException(le);
LocalizableMessage message =
ERR_TASKSCHED_CANNOT_WRITE_BACKING_FILE.get(tmpFilePath, le
.getMessage());
logger.error(message);
DirectoryServer.sendAlertNotification(this,
ALERT_TYPE_CANNOT_WRITE_TASK_FILE, message);
}
catch (Exception e)
{
logger.traceException(e);
LocalizableMessage message =
ERR_TASKSCHED_CANNOT_WRITE_BACKING_FILE.get(tmpFilePath,
stackTraceToSingleLineString(e));
logger.error(message);
DirectoryServer.sendAlertNotification(this,
ALERT_TYPE_CANNOT_WRITE_TASK_FILE, message);
}
finally
{
schedulerLock.unlock();
}
}
/**
* Retrieves the total number of entries in the task backend.
*
* @return The total number of entries in the task backend.
*/
public long getEntryCount()
{
schedulerLock.lock();
try
{
return tasks.size() + recurringTasks.size() + 3;
}
finally
{
schedulerLock.unlock();
}
}
/**
* Retrieves the number of scheduled tasks in the task backend.
*
* @return The total number of entries in the task backend.
*/
public long getScheduledTaskCount()
{
schedulerLock.lock();
try
{
return tasks.size();
}
finally
{
schedulerLock.unlock();
}
}
/**
* Retrieves the number of recurring tasks in the task backend.
*
* @return The total number of entries in the task backend.
*/
public long getRecurringTaskCount()
{
schedulerLock.lock();
try
{
return recurringTasks.size();
}
finally
{
schedulerLock.unlock();
}
}
/**
* Retrieves the task backend with which this scheduler is associated.
*
* @return The task backend with which this scheduler is associated.
*/
public TaskBackend getTaskBackend()
{
return taskBackend;
}
/**
* Retrieves the root entry that is the common ancestor for all entries in the
* task backend.
*
* @return The root entry that is the common ancestor for all entries in the
* task backend.
*/
public Entry getTaskRootEntry()
{
return taskRootEntry.duplicate(true);
}
/**
* Retrieves the entry that is the immediate parent for all scheduled task
* entries in the task backend.
*
* @return The entry that is the immediate parent for all scheduled task
* entries in the task backend.
*/
public Entry getScheduledTaskParentEntry()
{
return scheduledTaskParentEntry.duplicate(true);
}
/**
* Retrieves the entry that is the immediate parent for all recurring task
* entries in the task backend.
*
* @return The entry that is the immediate parent for all recurring task
* entries in the task backend.
*/
public Entry getRecurringTaskParentEntry()
{
return recurringTaskParentEntry.duplicate(true);
}
/**
* Retrieves the scheduled task with the given task ID.
*
* @param taskID The task ID for the scheduled task to retrieve.
*
* @return The requested scheduled task, or <CODE>null</CODE> if there is no
* such task.
*/
public Task getScheduledTask(String taskID)
{
schedulerLock.lock();
try
{
return tasks.get(taskID);
}
finally
{
schedulerLock.unlock();
}
}
/**
* Retrieves the scheduled task created from the specified entry.
*
* @param taskEntryDN The DN of the task configuration entry associated
* with the task to retrieve.
*
* @return The requested scheduled task, or <CODE>null</CODE> if there is no
* such task.
*/
public Task getScheduledTask(DN taskEntryDN)
{
schedulerLock.lock();
try
{
for (Task t : tasks.values())
{
if (taskEntryDN.equals(t.getTaskEntry().getName()))
{
return t;
}
}
return null;
}
finally
{
schedulerLock.unlock();
}
}
/**
* Indicates whether the current thread already holds a lock on the scheduler.
*
* @return {@code true} if the current thread holds the scheduler lock, or
* {@code false} if not.
*/
boolean holdsSchedulerLock()
{
return schedulerLock.isHeldByCurrentThread();
}
/**
* Attempts to acquire a write lock on the specified entry, trying as many
* times as necessary until the lock has been acquired.
*
* @param entryDN The DN of the entry for which to acquire the write lock.
*
* @return The write lock that has been acquired for the entry.
*/
Lock writeLockEntry(DN entryDN)
{
Lock lock = LockManager.lockWrite(entryDN);
while (lock == null)
{
lock = LockManager.lockWrite(entryDN);
}
return lock;
}
/**
* Attempts to acquire a read lock on the specified entry, trying up to five
* times before failing.
*
* @param entryDN The DN of the entry for which to acquire the read lock.
*
* @return The read lock that has been acquired for the entry.
*
* @throws DirectoryException If the read lock cannot be acquired.
*/
Lock readLockEntry(DN entryDN)
throws DirectoryException
{
final Lock lock = LockManager.lockRead(entryDN);
if (lock == null)
{
throw new DirectoryException(ResultCode.BUSY,
ERR_BACKEND_CANNOT_LOCK_ENTRY.get(entryDN));
}
else
{
return lock;
}
}
/**
* Releases the lock held on the specified entry.
*
* @param entryDN The DN of the entry for which the lock is held.
* @param lock The lock held on the entry.
*/
void unlockEntry(DN entryDN, Lock lock)
{
LockManager.unlock(entryDN, lock);
}
/**
* Retrieves the scheduled task entry with the provided DN. The caller should
* hold a read lock on the target entry.
*
* @param scheduledTaskEntryDN The entry DN that indicates which scheduled
* task entry to retrieve.
*
* @return The scheduled task entry with the provided DN, or
* <CODE>null</CODE> if no scheduled task has the provided DN.
*/
public Entry getScheduledTaskEntry(DN scheduledTaskEntryDN)
{
schedulerLock.lock();
try
{
for (Task task : tasks.values())
{
Entry taskEntry = task.getTaskEntry();
if (scheduledTaskEntryDN.equals(taskEntry.getName()))
{
return taskEntry.duplicate(true);
}
}
return null;
}
finally
{
schedulerLock.unlock();
}
}
/**
* Compares the filter in the provided search operation against each of the
* task entries, returning any that match. Note that only the search filter
* will be used -- the base and scope will be ignored, so the caller must
* ensure that they are correct for scheduled tasks.
*
* @param searchOperation The search operation to use when performing the
* search.
*
* @return <CODE>true</CODE> if processing should continue on the search
* operation, or <CODE>false</CODE> if it should not for some reason
* (e.g., a size or time limit was reached).
*
* @throws DirectoryException If a problem occurs while processing the
* search operation against the scheduled tasks.
*/
public boolean searchScheduledTasks(SearchOperation searchOperation)
throws DirectoryException
{
SearchFilter filter = searchOperation.getFilter();
schedulerLock.lock();
try
{
for (Task t : tasks.values())
{
DN taskEntryDN = t.getTaskEntryDN();
Lock lock = readLockEntry(taskEntryDN);
try
{
Entry e = t.getTaskEntry().duplicate(true);
if (filter.matchesEntry(e) && !searchOperation.returnEntry(e, null))
{
return false;
}
}
finally
{
unlockEntry(taskEntryDN, lock);
}
}
return true;
}
finally
{
schedulerLock.unlock();
}
}
/**
* Retrieves the recurring task with the given recurring task ID.
*
* @param recurringTaskID The recurring task ID for the recurring task to
* retrieve.
*
* @return The requested recurring task, or <CODE>null</CODE> if there is no
* such recurring task.
*/
public RecurringTask getRecurringTask(String recurringTaskID)
{
schedulerLock.lock();
try
{
return recurringTasks.get(recurringTaskID);
}
finally
{
schedulerLock.unlock();
}
}
/**
* Retrieves the recurring task with the given recurring task ID.
*
* @param recurringTaskEntryDN The recurring task ID for the recurring task
* to retrieve.
*
* @return The requested recurring task, or <CODE>null</CODE> if there is no
* such recurring task.
*/
public RecurringTask getRecurringTask(DN recurringTaskEntryDN)
{
schedulerLock.lock();
try
{
for (RecurringTask rt : recurringTasks.values())
{
if (recurringTaskEntryDN.equals(rt.getRecurringTaskEntry().getName()))
{
return rt;
}
}
return null;
}
finally
{
schedulerLock.unlock();
}
}
/**
* Retrieves the recurring task entry with the provided DN. The caller should
* hold a read lock on the target entry.
*
* @param recurringTaskEntryDN The entry DN that indicates which recurring
* task entry to retrieve.
*
* @return The recurring task entry with the provided DN, or
* <CODE>null</CODE> if no recurring task has the provided DN.
*/
public Entry getRecurringTaskEntry(DN recurringTaskEntryDN)
{
schedulerLock.lock();
try
{
for (RecurringTask recurringTask : recurringTasks.values())
{
Entry recurringTaskEntry = recurringTask.getRecurringTaskEntry();
if (recurringTaskEntryDN.equals(recurringTaskEntry.getName()))
{
return recurringTaskEntry.duplicate(true);
}
}
return null;
}
finally
{
schedulerLock.unlock();
}
}
/**
* Compares the filter in the provided search operation against each of the
* recurring task entries, returning any that match. Note that only the
* search filter will be used -- the base and scope will be ignored, so the
* caller must ensure that they are correct for recurring tasks.
*
* @param searchOperation The search operation to use when performing the
* search.
*
* @return <CODE>true</CODE> if processing should continue on the search
* operation, or <CODE>false</CODE> if it should not for some reason
* (e.g., a size or time limit was reached).
*
* @throws DirectoryException If a problem occurs while processing the
* search operation against the recurring tasks.
*/
public boolean searchRecurringTasks(SearchOperation searchOperation)
throws DirectoryException
{
SearchFilter filter = searchOperation.getFilter();
schedulerLock.lock();
try
{
for (RecurringTask rt : recurringTasks.values())
{
DN recurringTaskEntryDN = rt.getRecurringTaskEntryDN();
Lock lock = readLockEntry(recurringTaskEntryDN);
try
{
Entry e = rt.getRecurringTaskEntry().duplicate(true);
if (filter.matchesEntry(e) && ! searchOperation.returnEntry(e, null))
{
return false;
}
}
finally
{
unlockEntry(recurringTaskEntryDN, lock);
}
}
return true;
}
finally
{
schedulerLock.unlock();
}
}
/**
* Decodes the contents of the provided entry as a scheduled task. The
* resulting task will not actually be scheduled for processing.
*
* @param entry The entry to decode as a scheduled task.
* @param operation The operation used to create this task in the server, or
* {@code null} if the operation is not available.
*
* @return The scheduled task decoded from the provided entry.
*
* @throws DirectoryException If the provided entry cannot be decoded as a
* scheduled task.
*/
public Task entryToScheduledTask(Entry entry, Operation operation)
throws DirectoryException
{
// Get the name of the class that implements the task logic.
AttributeType attrType =
DirectoryServer.getAttributeType(ATTR_TASK_CLASS.toLowerCase());
if (attrType == null)
{
attrType = DirectoryServer.getDefaultAttributeType(ATTR_TASK_CLASS);
}
List<Attribute> attrList = entry.getAttribute(attrType);
if ((attrList == null) || attrList.isEmpty())
{
LocalizableMessage message = ERR_TASKSCHED_NO_CLASS_ATTRIBUTE.get(ATTR_TASK_ID);
throw new DirectoryException(ResultCode.CONSTRAINT_VIOLATION, message);
}
if (attrList.size() > 1)
{
LocalizableMessage message = ERR_TASKSCHED_MULTIPLE_CLASS_TYPES.get(ATTR_TASK_ID);
throw new DirectoryException(ResultCode.CONSTRAINT_VIOLATION, message);
}
Attribute attr = attrList.get(0);
if (attr.isEmpty())
{
LocalizableMessage message = ERR_TASKSCHED_NO_CLASS_VALUES.get(ATTR_TASK_ID);
throw new DirectoryException(ResultCode.CONSTRAINT_VIOLATION, message);
}
Iterator<ByteString> iterator = attr.iterator();
ByteString value = iterator.next();
if (iterator.hasNext())
{
LocalizableMessage message = ERR_TASKSCHED_MULTIPLE_CLASS_VALUES.get(ATTR_TASK_ID);
throw new DirectoryException(ResultCode.OBJECTCLASS_VIOLATION, message);
}
// Try to load the specified class.
String taskClassName = value.toString();
Class<?> taskClass;
try
{
taskClass = DirectoryServer.loadClass(taskClassName);
}
catch (Exception e)
{
logger.traceException(e);
LocalizableMessage message = ERR_TASKSCHED_CANNOT_LOAD_CLASS.
get(taskClassName, ATTR_TASK_CLASS, stackTraceToSingleLineString(e));
throw new DirectoryException(DirectoryServer.getServerErrorResultCode(), message);
}
// Instantiate the class as a task.
Task task;
try
{
task = (Task) taskClass.newInstance();
}
catch (Exception e)
{
logger.traceException(e);
LocalizableMessage message = ERR_TASKSCHED_CANNOT_INSTANTIATE_CLASS_AS_TASK.get(
taskClassName, Task.class.getName());
throw new DirectoryException(DirectoryServer.getServerErrorResultCode(), message);
}
// Perform the necessary internal and external initialization for the task.
try
{
task.initializeTaskInternal(this, entry);
}
catch (InitializationException ie)
{
logger.traceException(ie);
LocalizableMessage message = ERR_TASKSCHED_CANNOT_INITIALIZE_INTERNAL.get(
taskClassName, ie.getMessage());
throw new DirectoryException(DirectoryServer.getServerErrorResultCode(), message);
}
catch (Exception e)
{
LocalizableMessage message = ERR_TASKSCHED_CANNOT_INITIALIZE_INTERNAL.get(
taskClassName, stackTraceToSingleLineString(e));
throw new DirectoryException(DirectoryServer.getServerErrorResultCode(), message);
}
if (!TaskState.isDone(task.getTaskState()) &&
!DirectoryServer.getAllowedTasks().contains(taskClassName))
{
LocalizableMessage message = ERR_TASKSCHED_NOT_ALLOWED_TASK.get(taskClassName);
throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, message);
}
task.setOperation(operation);
// Avoid task specific initialization for completed tasks.
if (!TaskState.isDone(task.getTaskState())) {
task.initializeTask();
}
task.setOperation(null);
return task;
}
/**
* Decodes the contents of the provided entry as a recurring task. The
* resulting recurring task will not actually be added to the scheduler.
*
* @param entry The entry to decode as a recurring task.
*
* @return The recurring task decoded from the provided entry.
*
* @throws DirectoryException If the provided entry cannot be decoded as a
* recurring task.
*/
public RecurringTask entryToRecurringTask(Entry entry)
throws DirectoryException
{
return new RecurringTask(this, entry);
}
/**
* Retrieves the DN of the configuration entry with which this alert generator
* is associated.
*
* @return The DN of the configuration entry with which this alert generator
* is associated.
*/
@Override
public DN getComponentEntryDN()
{
return taskBackend.getConfigEntryDN();
}
/**
* Retrieves the fully-qualified name of the Java class for this alert
* generator implementation.
*
* @return The fully-qualified name of the Java class for this alert
* generator implementation.
*/
@Override
public String getClassName()
{
return CLASS_NAME;
}
/**
* Retrieves information about the set of alerts that this generator may
* produce. The map returned should be between the notification type for a
* particular notification and the human-readable description for that
* notification. This alert generator must not generate any alerts with types
* that are not contained in this list.
*
* @return Information about the set of alerts that this generator may
* produce.
*/
@Override
public LinkedHashMap<String,String> getAlerts()
{
LinkedHashMap<String,String> alerts = new LinkedHashMap<String,String>();
alerts.put(ALERT_TYPE_CANNOT_SCHEDULE_RECURRING_ITERATION,
ALERT_DESCRIPTION_CANNOT_SCHEDULE_RECURRING_ITERATION);
alerts.put(ALERT_TYPE_CANNOT_RENAME_CURRENT_TASK_FILE,
ALERT_DESCRIPTION_CANNOT_RENAME_CURRENT_TASK_FILE);
alerts.put(ALERT_TYPE_CANNOT_RENAME_NEW_TASK_FILE,
ALERT_DESCRIPTION_CANNOT_RENAME_NEW_TASK_FILE);
alerts.put(ALERT_TYPE_CANNOT_WRITE_TASK_FILE,
ALERT_DESCRIPTION_CANNOT_WRITE_TASK_FILE);
return alerts;
}
}