/*
* CDDL HEADER START
*
* The contents of this file are subject to the terms of the
* Common Development and Distribution License, Version 1.0 only
* (the "License"). You may not use this file except in compliance
* with the License.
*
* You can obtain a copy of the license at
* See the License for the specific language governing permissions
* and limitations under the License.
*
* When distributing Covered Code, include this CDDL HEADER in each
* file and include the License file at
* trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
* add the following below this CDDL HEADER, with the fields enclosed
* by brackets "[]" replaced with your own identifying information:
* Portions Copyright [yyyy] [name of copyright owner]
*
* CDDL HEADER END
*
*
* Copyright 2006-2010 Sun Microsystems, Inc.
* Portions Copyright 2013 ForgeRock AS
*/
/**
* This class defines a task scheduler for the Directory Server that will
* control the execution of scheduled tasks and other administrative functions
* that need to occur on a regular basis.
*/
public class TaskScheduler
extends DirectoryThread
implements AlertGenerator
{
/**
* The tracer object for the debug logger.
*/
/**
* The fully-qualified name of this class.
*/
"org.opends.server.backends.task.TaskScheduler";
/**
* The maximum length of time in milliseconds to sleep between iterations
* through the scheduler loop.
*/
// Indicates whether the scheduler is currently running.
private boolean isRunning;
// Indicates whether a request has been received to stop the scheduler.
private boolean stopRequested;
// The entry that serves as the immediate parent for recurring tasks.
// The entry that serves as the immediate parent for scheduled tasks.
// The top-level entry at the root of the task tree.
// The set of recurring tasks defined in the server.
// The set of tasks associated with this scheduler.
// The set of worker threads that are actively busy processing tasks.
// The thread ID for the next task thread to be created;
private int nextThreadID;
// The set of worker threads that may be used to process tasks.
// The lock used to provide threadsafe access to the scheduler.
// The task backend with which this scheduler is associated.
// The thread being used to actually run the scheduler.
// The set of recently-completed tasks that need to be retained.
// The set of tasks that have been scheduled but not yet arrived.
// The set of tasks that are currently running.
/**
* Creates a new task scheduler that will be used to ensure that tasks are
* invoked at the appropriate times.
*
* @param taskBackend The task backend with which this scheduler is
* associated.
*
* @throws InitializationException If a problem occurs while initializing
* the scheduler from the backing file.
*/
throws InitializationException
{
super("Task Scheduler Thread");
this.taskBackend = taskBackend;
schedulerLock = new ReentrantLock();
isRunning = false;
stopRequested = false;
nextThreadID = 1;
try {
} catch (DirectoryException de) {
}
try {
scheduleTask(task, false);
} catch (DirectoryException de) {
// This task might have been already scheduled from before
// and thus got initialized from backing file, otherwise
// log error and continue.
}
}
}
}
}
/**
* Adds a recurring task to the scheduler, optionally scheduling the first
* iteration for processing.
*
* @param recurringTask The recurring task to add to the scheduler.
* @param scheduleIteration Indicates whether to schedule an iteration of
* the recurring task.
*
* @throws DirectoryException If a problem occurs while trying to add the
* recurring task (e.g., there's already another
* recurring task defined with the same ID).
*/
boolean scheduleIteration)
throws DirectoryException
{
try
{
{
}
if (scheduleIteration)
{
new GregorianCalendar());
{
// If there is an existing task with the same id
// and it is in completed state, take its place.
{
removeCompletedTask(t.getTaskID());
}
scheduleTask(task, false);
}
}
writeState();
}
finally
{
}
}
/**
* Removes the recurring task with the given ID.
*
* @param recurringTaskID The ID of the recurring task to remove.
*
* @return The recurring task that was removed, or <CODE>null</CODE> if there
* was no such recurring task.
*
* @throws DirectoryException If there is currently a pending or running
* iteration of the associated recurring task.
*/
throws DirectoryException
{
try
{
{
// Find any existing task iterations and try to cancel them.
if ((t.getRecurringTaskID() != null) &&
{
{
cancelTask(t.getTaskID());
}
}
}
// Remove any completed task iterations.
{
{
}
}
writeState();
return recurringTask;
}
finally
{
}
}
/**
* Schedules the provided task for execution. If the scheduler is active and
* the start time has arrived, then the task will begin execution immediately.
* Otherwise, it will be placed in the pending queue to be started at the
* appropriate time.
*
* @param task The task to be scheduled.
* @param writeState Indicates whether the current state information for
* the scheduler should be persisted to disk once the
* task is scheduled.
*
* @throws DirectoryException If a problem occurs while trying to schedule
* the task (e.g., there's already another task
* defined with the same ID).
*/
throws DirectoryException
{
try
{
{
}
{
if (t == null)
{
}
}
{
if (idleThreads.isEmpty())
{
taskThread.start();
}
else
{
}
}
{
task.isRecurring())
{
}
else
{
}
}
else
{
}
if (writeState)
{
writeState();
}
}
finally
{
}
}
/**
* Attempts to cancel the task with the given task ID. This will only cancel
* the task if it has not yet started running. If it has started, then it
* will not be interrupted.
*
* @param taskID The task ID of the task to cancel.
*
* @return The requested task, which may or may not have actually been
* cancelled (the task state should make it possible to determine
* whether it was cancelled), or <CODE>null</CODE> if there is no
* such task.
*/
{
try
{
if (t == null)
{
return null;
}
{
pendingTasks.remove(t);
addCompletedTask(t);
writeState();
}
return t;
}
finally
{
}
}
/**
* Removes the specified pending task. It will be completely removed rather
* than moving it to the set of completed tasks.
*
* @param taskID The task ID of the pending task to remove.
*
* @return The task that was removed.
*
* @throws DirectoryException If the requested task is not in the pending
* queue.
*/
throws DirectoryException
{
try
{
if (t == null)
{
}
{
pendingTasks.remove(t);
writeState();
return t;
}
else
{
}
}
finally
{
}
}
/**
* Removes the specified completed task.
*
* @param taskID The task ID of the completed task to remove.
*
* @return The task that was removed.
*
* @throws DirectoryException If the requested task could not be found.
*/
throws DirectoryException
{
try
{
{
{
writeState();
return t;
}
}
}
finally
{
}
}
/**
* Indicates that processing has completed on the provided task thread and
* that it is now available for processing other tasks. The thread may be
* immediately used for processing another task if appropriate.
*
* @param taskThread The thread that has completed processing on its
* previously-assigned task.
* @param completedTask The task for which processing has been completed.
* @param taskState The task state for this completed task.
*
* @return <CODE>true</CODE> if the thread should continue running and
* wait for the next task to process, or <CODE>false</CODE> if it
* should exit immediately.
*/
{
try
{
try
{
}
catch (Exception e)
{
if (debugEnabled())
{
}
}
{
return false;
}
// See if the task is part of a recurring task.
// If so, then schedule the next iteration.
new GregorianCalendar());
writeState();
if (isRunning)
{
return true;
}
else
{
return false;
}
}
finally
{
}
}
/**
* Check if a given task is a recurring task iteration and re-schedule it.
* @param completedTask The task for which processing has been completed.
* @param calendar The calendar date and time to schedule from.
*/
{
if (recurringTaskID != null)
{
if (recurringTask != null)
{
try {
} catch (DirectoryException de) {
}
if (newIteration != null)
{
try
{
// If there is an existing task with the same id
// and it is in completed state, take its place.
{
removeCompletedTask(t.getTaskID());
}
scheduleTask(newIteration, false);
}
catch (DirectoryException de)
{
// This task might have been already scheduled from before
// and thus got initialized from backing file, otherwise
// log error and continue.
{
if (debugEnabled())
{
}
message);
}
}
}
}
}
}
/**
* Adds the provided task to the set of completed tasks associated with the
* scheduler. It will be automatically removed after the appropriate
* retention time has elapsed.
*
* @param completedTask The task for which processing has completed.
*/
{
// The scheduler lock is reentrant, so even if we already hold it, we can
// acquire it again.
try
{
// If the task never ran set its completion
// time here explicitly so that it can be
// correctly evaluated for retention later.
{
}
}
finally
{
}
}
/**
* Stops the scheduler so that it will not start any scheduled tasks. It will
* not attempt to interrupt any tasks that are already running. Note that
* once the scheduler has been stopped, it cannot be restarted and it will be
* necessary to restart the task backend to start a new scheduler instance.
*/
public void stopScheduler()
{
stopRequested = true;
try
{
}
catch (Exception e)
{
if (debugEnabled())
{
}
}
try
{
}
catch (Exception e)
{
if (debugEnabled())
{
}
}
{
}
}
/**
* Attempts to interrupt any tasks that are actively running. This will not
* make any attempt to stop the scheduler.
*
* @param interruptState The state that should be assigned to the tasks if
* they are successfully interrupted.
* @param interruptReason A message indicating the reason that the tasks
* are to be interrupted.
* @param waitForStop Indicates whether this method should wait until
* all active tasks have stopped before returning.
*/
boolean waitForStop)
{
// Grab a copy of the running threads so that we can operate on them without
// holding the lock.
try
{
}
finally
{
}
// Iterate through all the task threads and request that they stop
// processing.
for (TaskThread t : threadList)
{
try
{
}
catch (Exception e)
{
if (debugEnabled())
{
}
}
}
// If we should actually wait for all the task threads to stop, then do so.
if (waitForStop)
{
for (TaskThread t : threadList)
{
try
{
t.join();
}
catch (Exception e)
{
if (debugEnabled())
{
}
}
}
}
}
/**
* Operates in a loop, launching tasks at the appropriate time and performing
* any necessary periodic cleanup.
*/
public void run()
{
isRunning = true;
try
{
while (! stopRequested)
{
boolean writeState = false;
long sleepTime = MAX_SLEEP_TIME;
try
{
// If there are any pending tasks that need to be started, then do so
// now.
{
{
if (idleThreads.isEmpty())
{
taskThread.start();
}
else
{
}
runningTasks.add(t);
taskThread.setTask(t);
writeState = true;
}
{
// If we're waiting for the start time to arrive, then see if that
// will come before the next sleep time is up.
}
// Recurring task iteration has to spawn the next one
// even if the current iteration has been canceled.
t.isRecurring())
{
// If we're waiting for the start time to arrive,
// then see if that will come before the next
// sleep time is up.
long waitTime =
} else {
if (idleThreads.isEmpty()) {
taskThread.start();
} else {
}
runningTasks.add(t);
taskThread.setTask(t);
}
}
if (state != t.getTaskState())
{
t.setTaskState(state);
writeState = true;
}
}
// Clean up any completed tasks that have been around long enough.
long retentionTimeMillis =
long oldestRetainedCompletionTime =
{
if (t.getCompletionTime() < oldestRetainedCompletionTime)
{
writeState = true;
}
}
// If anything changed, then make sure that the on-disk state gets
// updated.
if (writeState)
{
writeState();
}
}
finally
{
}
try
{
if (sleepTime > 0)
{
}
} catch (InterruptedException ie) {}
}
}
finally
{
isRunning = false;
}
}
/**
* Determines whether the specified task should start running. This is based
* on the start time, the set of dependencies, and whether or not the
* scheduler is active. Note that the caller to this method must hold the
* scheduler lock.
*
* @param task The task for which to make the determination.
*
* @return The task state that should be used for the task. It should be
* RUNNING if the task should be started, or some other state if not.
*/
{
// If the task has finished we don't want to restart it
// Reset task state if recurring.
}
{
return state;
}
if (! isRunning)
{
return TaskState.UNSCHEDULED;
}
{
return TaskState.WAITING_ON_START_TIME;
}
if (dependencyIDs != null)
{
{
if (t != null)
{
{
return TaskState.WAITING_ON_DEPENDENCY;
}
{
switch (action)
{
case CANCEL:
return task.getTaskState();
case DISABLE:
return task.getTaskState();
default:
break;
}
}
}
}
}
}
/**
* Populates the scheduler with information read from the task backing file.
* If no backing file is found, then create a new one. The caller must
* already hold the scheduler lock or otherwise ensure that this is a
* threadsafe operation.
*
* @throws InitializationException If a fatal error occurs while attempting
* to perform the initialization.
*/
private void initializeTasksFromBackingFile()
throws InitializationException
{
try
{
if (! backingFile.exists())
{
return;
}
while (true)
{
try
{
}
catch (LDIFException le)
{
if (debugEnabled())
{
}
if (le.canContinueReading())
{
continue;
}
else
{
try
{
ldifReader.close();
}
catch (Exception e)
{
if (debugEnabled())
{
}
}
throw new InitializationException(message);
}
}
{
break;
}
{
}
{
}
{
}
else
{
{
}
{
try
{
{
{
}
else
{
}
}
else
{
scheduleTask(task, false);
}
}
catch (DirectoryException de)
{
if (debugEnabled())
{
}
}
}
{
try
{
addRecurringTask(recurringTask, false);
}
catch (DirectoryException de)
{
if (debugEnabled())
{
}
}
}
else
{
}
}
}
ldifReader.close();
}
catch (IOException ioe)
{
if (debugEnabled())
{
}
}
}
/**
* Creates a new task backing file that contains only the basic structure but
* no scheduled or recurring task entries. The caller must already hold the
* scheduler lock or otherwise ensure that this is a threadsafe operation.
*
* @throws InitializationException If a problem occurs while attempting to
* create the backing file.
*/
private void createNewTaskBackingFile()
throws InitializationException
{
try
{
// First, write a header to the top of the file to indicate that it should
// not be manually edited.
// Next, create the required hierarchical entries and add them to the
// LDIF.
// Close the file and we're done.
}
catch (IOException ioe)
{
if (debugEnabled())
{
}
}
catch (LDIFException le)
{
if (debugEnabled())
{
}
}
}
/**
* Writes state information about all tasks and recurring tasks to disk.
*/
public void writeState()
{
try
{
// First, write a header to the top of the file to indicate that it should
// not be manually edited.
// Next, write the structural entries to the top of the LDIF.
// Iterate through all the recurring tasks and write them to LDIF.
{
}
// Iterate through all the scheduled tasks and write them to LDIF.
{
}
// Close the file.
// See if there is a ".save" file. If so, then delete it.
try
{
{
}
}
catch (Exception e)
{
if (debugEnabled())
{
}
}
// If there is an existing backing file, then rename it to ".save".
try
{
if (backingFile.exists())
{
}
}
catch (Exception e)
{
if (debugEnabled())
{
}
message);
}
// Rename the ".tmp" file into place.
try
{
}
catch (Exception e)
{
if (debugEnabled())
{
}
message);
}
}
catch (IOException ioe)
{
if (debugEnabled())
{
}
}
catch (LDIFException le)
{
if (debugEnabled())
{
}
}
catch (Exception e)
{
if (debugEnabled())
{
}
}
finally
{
}
}
/**
* Retrieves the total number of entries in the task backend.
*
* @return The total number of entries in the task backend.
*/
public long getEntryCount()
{
try
{
}
finally
{
}
}
/**
* Retrieves the number of scheduled tasks in the task backend.
*
* @return The total number of entries in the task backend.
*/
public long getScheduledTaskCount()
{
try
{
}
finally
{
}
}
/**
* Retrieves the number of recurring tasks in the task backend.
*
* @return The total number of entries in the task backend.
*/
public long getRecurringTaskCount()
{
try
{
return recurringTasks.size();
}
finally
{
}
}
/**
* Retrieves the task backend with which this scheduler is associated.
*
* @return The task backend with which this scheduler is associated.
*/
{
return taskBackend;
}
/**
* Retrieves the root entry that is the common ancestor for all entries in the
* task backend.
*
* @return The root entry that is the common ancestor for all entries in the
* task backend.
*/
{
return taskRootEntry.duplicate(true);
}
/**
* Retrieves the entry that is the immediate parent for all scheduled task
* entries in the task backend.
*
* @return The entry that is the immediate parent for all scheduled task
* entries in the task backend.
*/
{
return scheduledTaskParentEntry.duplicate(true);
}
/**
* Retrieves the entry that is the immediate parent for all recurring task
* entries in the task backend.
*
* @return The entry that is the immediate parent for all recurring task
* entries in the task backend.
*/
{
return recurringTaskParentEntry.duplicate(true);
}
/**
* Retrieves the scheduled task with the given task ID.
*
* @param taskID The task ID for the scheduled task to retrieve.
*
* @return The requested scheduled task, or <CODE>null</CODE> if there is no
* such task.
*/
{
try
{
}
finally
{
}
}
/**
* Retrieves the scheduled task created from the specified entry.
*
* @param taskEntryDN The DN of the task configuration entry associated
* with the task to retrieve.
*
* @return The requested scheduled task, or <CODE>null</CODE> if there is no
* such task.
*/
{
try
{
{
{
return t;
}
}
return null;
}
finally
{
}
}
/**
* Indicates whether the current thread already holds a lock on the scheduler.
*
* @return {@code true} if the current thread holds the scheduler lock, or
* {@code false} if not.
*/
boolean holdsSchedulerLock()
{
return schedulerLock.isHeldByCurrentThread();
}
/**
* Attempts to acquire a write lock on the specified entry, trying as many
* times as necessary until the lock has been acquired.
*
* @param entryDN The DN of the entry for which to acquire the write lock.
*
* @return The write lock that has been acquired for the entry.
*/
{
{
}
return lock;
}
/**
* Attempts to acquire a read lock on the specified entry, trying up to five
* times before failing.
*
* @param entryDN The DN of the entry for which to acquire the read lock.
*
* @return The read lock that has been acquired for the entry.
*
* @throws DirectoryException If the read lock cannot be acquired.
*/
throws DirectoryException
{
{
}
else
{
return lock;
}
}
/**
* Releases the lock held on the specified entry.
*
* @param entryDN The DN of the entry for which the lock is held.
* @param lock The lock held on the entry.
*/
{
}
/**
* Retrieves the scheduled task entry with the provided DN. The caller should
* hold a read lock on the target entry.
*
* @param scheduledTaskEntryDN The entry DN that indicates which scheduled
* task entry to retrieve.
*
* @return The scheduled task entry with the provided DN, or
* <CODE>null</CODE> if no scheduled task has the provided DN.
*/
{
try
{
{
{
}
}
return null;
}
finally
{
}
}
/**
* Compares the filter in the provided search operation against each of the
* task entries, returning any that match. Note that only the search filter
* will be used -- the base and scope will be ignored, so the caller must
* ensure that they are correct for scheduled tasks.
*
* @param searchOperation The search operation to use when performing the
* search.
*
* @return <CODE>true</CODE> if processing should continue on the search
* operation, or <CODE>false</CODE> if it should not for some reason
* (e.g., a size or time limit was reached).
*
* @throws DirectoryException If a problem occurs while processing the
* search operation against the scheduled tasks.
*/
throws DirectoryException
{
try
{
{
try
{
if (filter.matchesEntry(e))
{
{
return false;
}
}
}
finally
{
}
}
return true;
}
finally
{
}
}
/**
* Retrieves the recurring task with the given recurring task ID.
*
* @param recurringTaskID The recurring task ID for the recurring task to
* retrieve.
*
* @return The requested recurring task, or <CODE>null</CODE> if there is no
* such recurring task.
*/
{
try
{
}
finally
{
}
}
/**
* Retrieves the recurring task with the given recurring task ID.
*
* @param recurringTaskEntryDN The recurring task ID for the recurring task
* to retrieve.
*
* @return The requested recurring task, or <CODE>null</CODE> if there is no
* such recurring task.
*/
{
try
{
{
{
return rt;
}
}
return null;
}
finally
{
}
}
/**
* Retrieves the recurring task entry with the provided DN. The caller should
* hold a read lock on the target entry.
*
* @param recurringTaskEntryDN The entry DN that indicates which recurring
* task entry to retrieve.
*
* @return The recurring task entry with the provided DN, or
* <CODE>null</CODE> if no recurring task has the provided DN.
*/
{
try
{
{
{
return recurringTaskEntry.duplicate(true);
}
}
return null;
}
finally
{
}
}
/**
* Compares the filter in the provided search operation against each of the
* recurring task entries, returning any that match. Note that only the
* search filter will be used -- the base and scope will be ignored, so the
* caller must ensure that they are correct for recurring tasks.
*
* @param searchOperation The search operation to use when performing the
* search.
*
* @return <CODE>true</CODE> if processing should continue on the search
* operation, or <CODE>false</CODE> if it should not for some reason
* (e.g., a size or time limit was reached).
*
* @throws DirectoryException If a problem occurs while processing the
* search operation against the recurring tasks.
*/
throws DirectoryException
{
try
{
{
try
{
if (filter.matchesEntry(e))
{
{
return false;
}
}
}
finally
{
}
}
return true;
}
finally
{
}
}
/**
* Decodes the contents of the provided entry as a scheduled task. The
* resulting task will not actually be scheduled for processing.
*
* @param entry The entry to decode as a scheduled task.
* @param operation The operation used to create this task in the server, or
* {@code null} if the operation is not available.
*
* @return The scheduled task decoded from the provided entry.
*
* @throws DirectoryException If the provided entry cannot be decoded as a
* scheduled task.
*/
throws DirectoryException
{
// Get the name of the class that implements the task logic.
{
}
{
}
{
}
{
}
{
}
// Try to load the specified class.
try
{
}
catch (Exception e)
{
if (debugEnabled())
{
}
message);
}
// Instantiate the class as a task.
try
{
}
catch (Exception e)
{
if (debugEnabled())
{
}
message);
}
// Perform the necessary internal and external initialization for the task.
try
{
}
catch (InitializationException ie)
{
if (debugEnabled())
{
}
message);
}
catch (Exception e)
{
message);
}
{
}
// Avoid task specific initialization for completed tasks.
}
return task;
}
/**
* Decodes the contents of the provided entry as a recurring task. The
* resulting recurring task will not actually be added to the scheduler.
*
* @param entry The entry to decode as a recurring task.
*
* @return The recurring task decoded from the provided entry.
*
* @throws DirectoryException If the provided entry cannot be decoded as a
* recurring task.
*/
throws DirectoryException
{
return new RecurringTask(this, entry);
}
/**
* Retrieves the DN of the configuration entry with which this alert generator
* is associated.
*
* @return The DN of the configuration entry with which this alert generator
* is associated.
*/
{
return taskBackend.getConfigEntryDN();
}
/**
* Retrieves the fully-qualified name of the Java class for this alert
* generator implementation.
*
* @return The fully-qualified name of the Java class for this alert
* generator implementation.
*/
{
return CLASS_NAME;
}
/**
* Retrieves information about the set of alerts that this generator may
* produce. The map returned should be between the notification type for a
* particular notification and the human-readable description for that
* notification. This alert generator must not generate any alerts with types
* that are not contained in this list.
*
* @return Information about the set of alerts that this generator may
* produce.
*/
{
return alerts;
}
}