TaskScannerJob.java revision 21dcdac963f79c098a5ea1a2c5c5e109429c9786
/*
* The contents of this file are subject to the terms of the Common Development and
* Distribution License (the License). You may not use this file except in compliance with the
* License.
*
* You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
* specific language governing permission and limitations under the License.
*
* When distributing Covered Software, include this CDDL Header Notice in each file and include
* the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
* Header, with the fields enclosed by brackets [] replaced by your own identifying
* information: "Portions copyright [year] [name of copyright owner]".
*
* Portions copyright 2012-2015 ForgeRock AS.
*/
public class TaskScannerJob {
private ConnectionFactory connectionFactory;
private TaskScannerContext taskScannerContext;
throws ExecutionException {
this.connectionFactory = connectionFactory;
this.taskScannerContext = context;
}
/**
* Starts the task associated with a task scanner event.
* This method may run synchronously or launch a new thread depending upon the settings in the TaskScannerContext
* @return identifier associated with this task scan job
* @throws ExecutionException
*/
if (taskScannerContext.getWaitForCompletion()) {
try {
} catch (ExecutionException ex) {
throw ex;
} finally {
}
} else {
// Launch a new thread for the whole taskscan process
public void run() {
try {
} finally {
}
}
};
// Shouldn't need to keep ahold of this, I don't think? Can just start it and let it go
}
return taskScannerContext.getTaskScanID();
}
/**
* Performs the task associated with the task scanner event.
* Runs the query and executes the script across each resulting object.
*
* @param executor ExecutorService in which to invoke this task.
* @throws ExecutionException
*/
throws ExecutionException {
new Object[] { taskScannerContext.getTaskScanID(), taskScannerContext.getInvokerName(), taskScannerContext.getScriptName() });
try {
results = fetchAllObjects();
} catch (ResourceException e1) {
}
if (maxRecords == null) {
} else {
}
// TODO jump out early if it's empty?
// Split and prune the result set according to our max and if we're synchronous or not
List<JsonValue> resultSets = splitResultsOverThreads(results, taskScannerContext.getNumberOfThreads(), maxRecords);
public void run() {
try {
}
}
};
}
try {
} catch (InterruptedException e) {
// Mark it interrupted
}
// Don't mark the job as completed if its been deactivated
if (!taskScannerContext.isInactive()) {
}
});
}
private List<JsonValue> splitResultsOverThreads(JsonValue results, int numberOfThreads, Integer max) {
for (int i = 0; i < numberOfThreads; i++) {
}
int i = 0;
break;
}
i++;
}
}
return jsonSets;
}
throws ExecutionException {
if (taskScannerContext.isCanceled()) {
break; // Jump out quick since we've cancelled the job
}
// Check if this object has a STARTED time already
// Skip if the startTime + interval has not been passed
if (expirationDate.isAfterNow()) {
logger.debug("Object already started and has not expired. Started at: {}. Timeout: {}. Expires at: {}",
new Object[] {
continue;
}
}
try {
} catch (ResourceException e) {
throw new ExecutionException("Error during claim and execution phase", e);
}
}
}
/**
* Flatten a list of parameters and perform a query to fetch all objects from storage.
*
* @return JsonValue containing a list of all the retrieved objects
* @throws ResourceException
*/
}
/**
* Performs a query on a resource and returns the result set
* @param resourceID the identifier of the resource to query
* @param params parameters to supply to the query
* @return the set of results from the performed query
* @throws ResourceException
*/
connectionFactory.getConnection().query(taskScannerContext.getContext(), request, new QueryResourceHandler() {
return true;
}
});
return queryResults;
}
/**
* Performs a read on a resource and returns the result
* @param resourceID the identifier of the resource to read
* @return the results from the performed read
* @throws ResourceException
*/
readResults = connectionFactory.getConnection().read(taskScannerContext.getContext(), Requests.newReadRequest(resourceID)).getContent();
return readResults;
}
/**
* Adds an object to a JsonValue and performs an update
* @param resourceID the resource identifier that the updated value belongs to
* @param value value to perform the update with
* @param obj object to add to the field
* @return the updated JsonValue
* @throws ResourceException
*/
private JsonValue updateValueWithObject(String resourceID, JsonValue value, JsonPointer path, Object obj) throws ResourceException {
}
/**
* Performs an update on a given resource with a supplied JsonValue
* @param resourceID the resource identifier to perform the update on
* @param value the object to update with
* @return the updated object
* @throws ResourceException
*/
}
/**
* Constructs a full object ID from the supplied resourceID and the JsonValue
* @param resourceID resource ID that the value originates from
* @param value JsonValue to create the full ID with
* @return string indicating the full id
*/
}
/**
* Constructs a full object ID from the supplied resourceID and the objectID
* @param resourceID resource ID that the object originates from
* @param objectID ID of some object
* @return string indicating the full ID
*/
}
/**
* Fetches an updated copy of some specified object from the given resource
* @param resourceID the resource identifier to fetch an object from
* @param value the value to retrieve an updated copy of
* @return the updated value
* @throws ResourceException
*/
throws JsonValueException, ResourceException {
}
/**
* Retrieves a specified object from a resource
* @param resourceID the resource identifier to fetch the object from
* @param id the identifier of the object to fetch
* @return the object retrieved from the resource
* @throws ResourceException
*/
}
throws ExecutionException, ResourceException {
boolean claimedTask = false;
boolean retryClaimTask = false;
do {
try {
retryClaimTask = false;
claimedTask = true;
} catch (PreconditionFailedException ex) {
// If the object changed since we queried, get the latest
// and check if it's still in a state we want to process the task.
String currentStartDateStr = (_input.get(startField) == null) ? null : _input.get(startField).asString();
String currentCompletedDateStr = (_input.get(completedField) == null) ? null : _input.get(completedField).asString();
if (currentCompletedDateStr == null && (currentStartDateStr == null || currentStartDateStr.equals(expectedStartDateStr))) {
retryClaimTask = true;
} else {
// Someone else managed to update the started field first,
// claimed the task. Do not execute it here this run.
}
}
if (claimedTask) {
}
}
/**
* Performs the individual executions of the supplied script
*
* Passes <b>"input"</b> and <b>"objectID"</b> to the script.<br>
* <b>"objectID"</b> contains the full ID of the supplied object (including resource identifier).
* Useful for performing updates.<br>
* <b>"input"</b> contains the supplied object
*
* @param input value to input to the script
* @throws ExecutionException
* @throws ResourceException
*/
throws ExecutionException, ResourceException {
try {
_input = updateValueWithObject(resourceID, _input, taskScannerContext.getCompletedField(), DATE_UTIL.now());
} else {
}
} catch (ScriptException se) {
}
}
}
/**
* Flattens JsonValue into a one-level-deep object
* @param original original JsonValue object
* @return flattened JsonValue
*/
}
/**
* Flattens JsonValue into a one-level-deep object
* @param parent name of the parent object (for nested objects)
* @param original original JsonValue object
* @return flattened JsonValue
*/
} else {
}
}
return flattened;
}
/**
* Adds all objects from one JsonValue to another (performs a merge).
* Any values contained in both objects will be overwritten to reflect the values in <b>from</b>
* <br><br>
* <i><b>NOTE:</b> this should be a part of JsonValue itself (so we can support merging two JsonValue objects)</i>
* @param to JsonValue that will have objects added to it
* @param from JsonValue that will be used as reference for updating
*/
}
}
/**
* Ensure that some JsonPointer exists within a supplied object so that some object can be placed in that field
* @param ptr JsonPointer to ensure exists at each level
* @param obj object to ensure the JsonPointer exists within
*/
}
}
}
}