revision adb077e59a0318a46a2079cccd8fd4a05841130c
* Copyright (c) 2012-2013 ForgeRock AS. All Rights Reserved
* The contents of this file are subject to the terms
* of the Common Development and Distribution License
* (the License). You may not use this file except in
* compliance with the License.
* You can obtain a copy of the License at
* See the License for the specific language governing
* permission and limitations under the License.
* When distributing Covered Code, include this CDDL
* Header Notice in each file and include the License file
* at
* If applicable, add the following below the CDDL Header,
* with the fields enclosed by brackets [] replaced by
* your own identifying information:
* "Portions Copyrighted [year] [name of copyright owner]"
package org.forgerock.openidm.sync.impl;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.ConfigurationPolicy;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Properties;
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.ReferencePolicy;
import org.apache.felix.scr.annotations.Service;
import org.forgerock.json.fluent.JsonValue;
import org.forgerock.json.resource.ActionRequest;
import org.forgerock.json.resource.BadRequestException;
import org.forgerock.json.resource.ConflictException;
import org.forgerock.json.resource.CreateRequest;
import org.forgerock.json.resource.DeleteRequest;
import org.forgerock.json.resource.NotFoundException;
import org.forgerock.json.resource.PatchRequest;
import org.forgerock.json.resource.QueryRequest;
import org.forgerock.json.resource.QueryResultHandler;
import org.forgerock.json.resource.ReadRequest;
import org.forgerock.json.resource.RequestHandler;
import org.forgerock.json.resource.Requests;
import org.forgerock.json.resource.Resource;
import org.forgerock.json.resource.ResultHandler;
import org.forgerock.json.resource.RouterContext;
import org.forgerock.json.resource.ServerContext;
import org.forgerock.json.resource.UpdateRequest;
import org.forgerock.openidm.config.enhanced.EnhancedConfig;
import org.forgerock.openidm.config.enhanced.JSONEnhancedConfig;
import org.forgerock.openidm.core.IdentityServer;
import org.forgerock.openidm.util.ResourceUtil;
import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* Reconciliation service implementation
* @author aegloff
@Component(name = ReconciliationService.PID, immediate = true, policy = ConfigurationPolicy.OPTIONAL)
@Property(name = "service.description", value = "Reconciliation Service"),
@Property(name = "service.vendor", value = "ForgeRock AS"),
@Property(name = "openidm.router.prefix", value = "/recon/*")
public class ReconciliationService
implements RequestHandler, Reconcile {
final static Logger logger = LoggerFactory.getLogger(ReconciliationService.class);
public static final String PID = "org.forgerock.openidm.recon";
public enum ReconAction {
recon, reconByQuery, reconById;
* Convenience helper that checks if a given string
* is contained in this enum
* @param action the stringified action to check
* @return true if it is contained in this enum, false if not
public static boolean isReconAction(String action) {
try {
return true;
} catch (IllegalArgumentException ex) {
return false;
final EnhancedConfig enhancedConfig = new JSONEnhancedConfig();
cardinality = ReferenceCardinality.OPTIONAL_UNARY,
policy = ReferencePolicy.DYNAMIC
Mappings mappings;
* The thread pool for executing full reconciliation runs.
ExecutorService fullReconExecutor;
* Map from reconciliation ID to the run itself
* In historical start order, oldest first.
Map<String, ReconciliationContext> reconRuns =
Collections.synchronizedMap(new LinkedHashMap<String, ReconciliationContext>());
* The approximate max number of runs in COMPLETED state to keep in the recon runs list
private int maxCompletedRuns;
* Get the the list of all reconciliations, or details of one specific recon instance
* {@inheritDoc}
public void handleRead(ServerContext context, ReadRequest request, ResultHandler<Resource> handler) {
try {
String localId = getLocalId(request.getResourceName());
if (localId == null || "".equals(localId)) {
List<Map> runList = new ArrayList<Map>();
for (ReconciliationContext entry : reconRuns.values()) {
Map<String, Object> result = new LinkedHashMap<String, Object>();
result.put("reconciliations", runList);
handler.handleResult(new Resource(localId, null, new JsonValue(result)));
} else {
// First try and get it from in memory
if (reconRuns.containsKey(localId)) {
handler.handleResult(new Resource(localId, null, new JsonValue(reconRuns.get(localId).getSummary())));
} else {
// Next, if not in memory, try and get it from audit log
QueryRequest auditQuery = Requests.newQueryRequest("audit/recon");
auditQuery.setAdditionalQueryParameter("reconId", localId);
auditQuery.setAdditionalQueryParameter("entryType", "summary");
ServerContext routerContext = context.asContext(RouterContext.class);
Collection<Resource> queryResult = new ArrayList<Resource>();
routerContext.getConnection().query(routerContext, auditQuery, queryResult);
for (Resource resource : queryResult) {
handler.handleResult(new Resource(
new JsonValue(resource.getContent().get("messageDetail").asMap())));
} catch (Throwable t) {
* {@inheritDoc}
public void handleCreate(ServerContext context, CreateRequest request, ResultHandler<Resource> handler) {
* {@inheritDoc}
public void handleDelete(ServerContext context, DeleteRequest request, ResultHandler<Resource> handler) {
* {@inheritDoc}
public void handlePatch(ServerContext context, PatchRequest request, ResultHandler<Resource> handler) {
* {@inheritDoc}
public void handleQuery(ServerContext context, QueryRequest request, QueryResultHandler handler) {
* {@inheritDoc}
public void handleUpdate(ServerContext context, UpdateRequest request, ResultHandler<Resource> handler) {
public void handleAction(ServerContext context, ActionRequest request, ResultHandler<JsonValue> handler) {
try {
if (request.getAction() == null) {
throw new BadRequestException("Action parameter is not present or value is null");
Map<String, Object> result = new LinkedHashMap<String, Object>();
String id = getLocalId(request.getResourceName());
JsonValue paramsVal = new JsonValue(request.getAdditionalActionParameters());
if (id == null || "".equals(id)) {
// operation on collection
if (ReconciliationService.ReconAction.isReconAction(request.getAction())) {
try {
JsonValue mapping = paramsVal.get("mapping").required();
logger.debug("Reconciliation action of mapping {}", mapping);
Boolean waitForCompletion = Boolean.FALSE;
JsonValue waitParam = paramsVal.get("waitForCompletion").defaultTo(Boolean.FALSE);
if (waitParam.isBoolean()) {
waitForCompletion = waitParam.asBoolean();
} else {
waitForCompletion = Boolean.parseBoolean(waitParam.asString());
result.put("_id", reconcile(ReconAction.valueOf(request.getAction()),
mapping, waitForCompletion, paramsVal));
} catch (SynchronizationException se) {
throw new ConflictException(se);
} else {
throw new BadRequestException("Action " + request.getAction() + " on reconciliation not supported " + request.getAdditionalActionParameters());
} else {
// operation on individual resource
ReconciliationContext foundRun = reconRuns.get(id);
if (foundRun == null) {
throw new NotFoundException("Reconciliation with id " + id + " not found." );
if ("cancel".equalsIgnoreCase(request.getAction())) {
result.put("_id", foundRun.getReconId());
result.put("action", request.getAction());
result.put("status", "SUCCESS");
} else {
throw new BadRequestException("Action " + request.getAction() + " on recon run " + id + " not supported " + request.getAdditionalActionParameters());
handler.handleResult(new JsonValue(result));
} catch (Throwable t) {
finally {
* {@inheritDoc}
public String reconcile(ReconAction reconAction, final JsonValue mapping, Boolean synchronous, JsonValue reconParams)
throws SynchronizationException {
final ReconciliationContext reconContext = newReconContext(reconAction, mapping, reconParams);
if (Boolean.TRUE.equals(synchronous)) {
} else {
final ServerContext threadContext = ObjectSetContext.get();
Runnable command = new Runnable() {
public void run() {
try {
} catch (SynchronizationException ex) {"Reconciliation reported exception", ex);
} catch (Exception ex) {
logger.warn("Reconciliation failed with unexpected exception", ex);
finally {
return reconContext.getReconId();
* Allocates a new reconciliation run's context, including its identifier
* Separate from the actual execution so that the execution can happen asynchronously,
* whilst we hand back the identifier to the caller.
* @param reconAction the recon action
* @param mapping the mapping configuration
* @param reconParams
* @return a new reconciliation context
private ReconciliationContext newReconContext(ReconAction reconAction, JsonValue mapping, JsonValue reconParams)
throws SynchronizationException {
ReconciliationContext reconContext = null;
if (mappings == null) {
throw new SynchronizationException("Unknown mapping type, no mappings configured");
ServerContext context = ObjectSetContext.get();
ObjectMapping objMapping = null;
if (mapping.isString()) {
objMapping = mappings.getMapping(mapping.asString());
} else if (mapping.isMap()) {
// FIXME: Entire mapping configs defined in scheduled jobs?! Not a good idea! –PB
objMapping = mappings.createMapping(mapping);
} else {
throw new SynchronizationException("Unknown mapping type");
try {
reconContext = new ReconciliationContext(reconAction, objMapping, context, reconParams, this);
} catch (BadRequestException ex) {
throw new SynchronizationException("Failure in initializing reconciliation: "
+ ex.getMessage(), ex);
return reconContext;
* Start a full reconciliation run
* @param reconContext a new reconciliation context. Do not re-use these contexts for more than one call to reconcile.
* @throws SynchronizationException
private void reconcile(ReconciliationContext reconContext) throws SynchronizationException {
try {
reconContext.getObjectMapping().recon(reconContext); // throws SynchronizationException
} catch (SynchronizationException ex) {
if (reconContext.isCanceled()) {
} else {
throw ex;
} catch (RuntimeException ex) {
throw ex;
* Add a reconciliation run to the cached list of reconcliation runs.
* May clean out old entries of completed reconciliation runs.
* @param reconContext the reconciliation run specific context
private void addReconRun(ReconciliationContext reconContext) {
// Clean out run history if needed
// Since it only checks for completed runs when a new run is started this
// only provides for approximate adherence to maxCompleteRuns
synchronized(reconRuns) {
if (reconRuns.size() > maxCompletedRuns) {
int completedCount = 0;
// Since oldest runs are first in the list, inspect backwards
ListIterator<String> iter = new ArrayList<String>(reconRuns.keySet())
while (iter.hasPrevious()) {
String key = iter.previous();
ReconciliationContext aRun = reconRuns.get(key);
if (aRun.getStage().isComplete()) {
if (completedCount > maxCompletedRuns) {
reconRuns.put(reconContext.getReconId(), reconContext);
// TODO: replace with common utility to handle ID, this is temporary
private String getLocalId(String id) {
String localId = null;
if (id != null) {
if (id.startsWith("/")) {
id = id.replaceFirst("/", "");
int lastSlashPos = id.lastIndexOf("/");
if (lastSlashPos > -1) {
localId = id.substring(0, id.lastIndexOf("/"));
} else {
localId = id;
logger.trace("Full id: {} Extracted local id: {}", id, localId);
return localId;
// TODO: replace with common utility to handle ID, this is temporary
private String getObjectField(String id) {
String type = null;
if (id != null) {
int slashPos = id.indexOf("/");
if (slashPos > -1) {
type = id.substring(slashPos+1, id.length());
logger.trace("Full id: {} Extracted type: {}", id, type);
return type;
void activate(ComponentContext compContext) {
logger.debug("Activating Service with configuration {}", compContext.getProperties());
JsonValue config = null;
try {
// Until we have a recon service config, allow overrides via (unsupported) properties
String maxCompletedStr =
IdentityServer.getInstance().getProperty("openidm.recon.maxcompletedruns", "100");
maxCompletedRuns = Integer.parseInt(maxCompletedStr);
int maxConcurrentFullRecons = 10; // TODO: make configurable
fullReconExecutor = Executors.newFixedThreadPool(maxConcurrentFullRecons);
config = enhancedConfig.getConfigurationAsJson(compContext);
} catch (RuntimeException ex) {
logger.warn("Configuration invalid and could not be parsed, can not start reconciliation service: "
+ ex.getMessage(), ex);
throw ex;
}"Reconciliation service started.");
/* Currently rely on deactivate/activate to be called by DS if config changes instead
void modified(ComponentContext compContext) {"Configuration of service changed.");
void deactivate(ComponentContext compContext) {
logger.debug("Deactivating Service {}", compContext);"Reconciliation service stopped.");
* Accessor to router
* @return handle to router accessor
ServerContext getRouter() {
return ObjectSetContext.get();