LDAPv3PersistentSearch.java revision be34d3a7cf588cb751d61e9acc86f1ff328e0344
* Copyright 2015 ForgeRock AS.
package com.iplanet.services.ldap.event;
import static org.forgerock.openam.ldap.LDAPConstants.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import com.sun.identity.common.GeneralTaskRunnable;
import com.sun.identity.common.SystemTimerPool;
import com.sun.identity.idm.IdRepoListener;
import com.sun.identity.idm.IdType;
import com.sun.identity.shared.debug.Debug;
import org.forgerock.openam.utils.IOUtils;
import org.forgerock.opendj.ldap.Attribute;
import org.forgerock.opendj.ldap.Connection;
import org.forgerock.opendj.ldap.ConnectionFactory;
import org.forgerock.opendj.ldap.DN;
import org.forgerock.opendj.ldap.DecodeException;
import org.forgerock.opendj.ldap.DecodeOptions;
import org.forgerock.opendj.ldap.ErrorResultException;
import org.forgerock.opendj.ldap.Filter;
import org.forgerock.opendj.ldap.FutureResult;
import org.forgerock.opendj.ldap.RootDSE;
import org.forgerock.opendj.ldap.SearchResultHandler;
import org.forgerock.opendj.ldap.SearchScope;
import org.forgerock.opendj.ldap.controls.Control;
import org.forgerock.opendj.ldap.controls.EntryChangeNotificationResponseControl;
import org.forgerock.opendj.ldap.controls.GenericControl;
import org.forgerock.opendj.ldap.controls.PersistentSearchChangeType;
import org.forgerock.opendj.ldap.controls.PersistentSearchRequestControl;
import org.forgerock.opendj.ldap.requests.Requests;
import org.forgerock.opendj.ldap.requests.SearchRequest;
import org.forgerock.opendj.ldap.responses.Result;
import org.forgerock.opendj.ldap.responses.SearchResultEntry;
import org.forgerock.opendj.ldap.responses.SearchResultReference;
public abstract class LDAPv3PersistentSearch<T, H> {
private static final Debug DEBUG = Debug.getInstance("PersistentSearch");
private static final boolean CHANGES_ONLY = true;
private static final boolean RETURN_CONTROLS = true;
private static final boolean IS_CRITICAL = true;
private static final List<String> AD_DEFAULT_ATTRIBUTES = Collections.unmodifiableList(Arrays.asList(
private final ConnectionFactory factory;
private final Map<T, H> listeners = new ConcurrentHashMap<>(1);
private final int retryInterval;
private final DN searchBaseDN;
private final Filter searchFilter;
private final SearchScope searchScope;
private final List<String> attributeNames;
private volatile boolean shutdown = false;
private volatile Connection conn;
private FutureResult<Result> futureResult;
private PersistentSearchMode mode;
private RetryTask retryTask;
private enum PersistentSearchMode {
public LDAPv3PersistentSearch(int retryInterval, DN searchBaseDN, Filter searchFilter,
SearchScope searchScope, ConnectionFactory factory, String... attributeNames) {
this.retryInterval = retryInterval;
this.searchBaseDN = searchBaseDN;
this.searchFilter = searchFilter;
this.searchScope = searchScope;
this.factory = factory;
this.attributeNames = Arrays.asList(attributeNames);
private void detectPersistentSearchMode(Connection conn) throws ErrorResultException {
RootDSE dse = RootDSE.readRootDSE(conn);
Collection<String> supportedControls = dse.getSupportedControls();
if (supportedControls.contains(PersistentSearchRequestControl.OID)) {
mode = PersistentSearchMode.STANDARD;
} else if (supportedControls.contains(AD_NOTIFICATION_OID)) {
mode = PersistentSearchMode.AD;
} else {
mode = PersistentSearchMode.NONE;
if (DEBUG.messageEnabled()) {
DEBUG.message("Persistent search mode detected: " + mode.name());
* Adds an {@link IdRepoListener} object, which needs to be notified about persistent search results.
* The caller must ensure that calls to addListener/removeListener/hasListeners invocations are synchronized
* correctly.
* @param idRepoListener The {@link IdRepoListener} instance that needs to be notified about changes.
* @param supportedTypes The supported {@link IdType}s for which events needs to be generated.
public void addListener(T idRepoListener, H supportedTypes) {
listeners.put(idRepoListener, supportedTypes);
* Removes an {@link IdRepoListener} if it was registered to persistent search notifications.
* The caller must ensure that calls to addListener/removeListener/hasListeners invocations are synchronized
* correctly.
* @param idRepoListener The {@link IdRepoListener} instance that needs to be notified about changes.
public void removeListener(T idRepoListener) {
* Checks if there are any registered listeners for this persistent search connection.
* The caller must ensure that calls to addListener/removeListener/hasListeners invocations are synchronized
* correctly.
public boolean hasListeners() {
return !listeners.isEmpty();
* Starts the persistent search connection against the directory. The caller must ensure that calls made to
* startPSearch and stopPsearch are properly synchronized.
public void startSearch() {
try {
conn = factory.getConnection();
} catch (ErrorResultException ere) {
DEBUG.error("An error occurred while trying to initiate persistent search connection", ere);
DEBUG.message("Restarting persistent search");
private void startSearch(Connection conn) throws ErrorResultException {
if (mode == null) {
Control control = null;
String[] attrs = null;
//mode shouldn't be null here, if something failed during the detection it should've resulted in an
//exception already.
switch (mode) {
case NONE: {
DEBUG.error("Persistent search is not supported by the directory, persistent search will be disabled");
case STANDARD: {
control = PersistentSearchRequestControl.newControl(IS_CRITICAL, CHANGES_ONLY, RETURN_CONTROLS,
List<String> attributes = new ArrayList<>(attributeNames);
attrs = attributes.toArray(new String[0]);
case AD: {
control = GenericControl.newControl(AD_NOTIFICATION_OID, true);
List<String> attributes = new ArrayList<>(attributeNames);
attrs = attributes.toArray(new String[0]);
SearchRequest searchRequest = Requests.newSearchRequest(searchBaseDN, searchScope, searchFilter, attrs);
if (DEBUG.messageEnabled()) {
DEBUG.message("Starting persistent search against baseDN: " + searchBaseDN
+ ", scope: " + searchScope.toString() + ", filter: " + searchFilter
+ ", attrs: " + Arrays.toString(attrs) + " against " + factory.toString());
//since psearch wasn't running until now, let's clear the caches to make sure that if something got into the
//cache, while PS was stopped, those gets cleared out and we start with a clean cache.
futureResult = conn.searchAsync(searchRequest, null, new PersistentSearchResultHandler());
* Stops the persistent search request, and terminates the LDAP connection. The caller must ensure that calls made
* to startPSearch and stopPsearch are properly synchronized.
public void stopSearch() {
if (DEBUG.messageEnabled()) {
DEBUG.message("Stopping persistent search against: " + factory.toString());
if (hasListeners()) {
throw new IllegalStateException("Persistent search has assigned listeners, unable to stop.");
shutdown = true;
if (futureResult != null) {
if (retryTask != null) {
private void restartSearch() {
DEBUG.message("Restarting persistent search connection against: {}", factory.toString());
//just to be really sure
if (!shutdown) {
//we shouldn't try to restart psearch if we are in shutdown mode.
retryTask = new RetryTask();
try {
// Schedules the task for the exact second without any non-zero milliseconds
new Date(System.currentTimeMillis() + retryInterval / 1000 * 1000));
} catch (IllegalMonitorStateException e) {
DEBUG.warning("PSearch was not restarted, application may be shutting down:", e);
protected abstract void clearCaches();
protected Map<T, H> getListeners() {
return Collections.unmodifiableMap(listeners);
protected interface SearchResultEntryHandler {
boolean handle(SearchResultEntry entry, String dn, DN previousDn, PersistentSearchChangeType type);
protected abstract SearchResultEntryHandler getSearchResultEntryHandler();
private class PersistentSearchResultHandler implements SearchResultHandler {
public boolean handleEntry(SearchResultEntry entry) {
if (DEBUG.messageEnabled()) {
DEBUG.message("Processing persistent search response: " + entry.toString());
String dn = entry.getName().toString();
DN previousDn = null;
PersistentSearchChangeType type = null;
switch (mode) {
case STANDARD: {
try {
EntryChangeNotificationResponseControl control = entry.getControl(
EntryChangeNotificationResponseControl.DECODER, new DecodeOptions());
if (control != null) {
PersistentSearchChangeType changeType = control.getChangeType();
if (changeType.equals(PersistentSearchChangeType.MODIFY_DN)) {
previousDn = control.getPreviousName();
type = changeType;
} catch (DecodeException de) {
DEBUG.warning("Unable to decode EntryChangeNotificationResponseControl", de);
case AD: {
boolean isDeleted = false;
Attribute attr = entry.getAttribute(AD_IS_DELETED_ATTR);
if (attr != null && attr.size() == 1) {
isDeleted = entry.parseAttribute(AD_IS_DELETED_ATTR).asBoolean(false);
if (isDeleted) {
type = PersistentSearchChangeType.DELETE;
} else {
String whenCreated = entry.parseAttribute(AD_WHEN_CREATED_ATTR).asString();
if (whenCreated == null) {
if (DEBUG.warningEnabled()) {
DEBUG.warning("Missing attribute " + AD_WHEN_CREATED_ATTR + " in persistent search response");
//advance to the next entry and ignore this one
return true;
String whenChanged = entry.parseAttribute(AD_WHEN_CHANGED_ATTR).asString();
if (whenChanged == null) {
if (DEBUG.warningEnabled()) {
DEBUG.warning("Missing attribute " + AD_WHEN_CHANGED_ATTR + " in persistent search response");
//advance to the next entry and ignore this one
return true;
if (whenCreated.equals(whenChanged)) {
type = PersistentSearchChangeType.ADD;
} else {
type = PersistentSearchChangeType.MODIFY;
throw new IllegalStateException("Persistent search mode has invalid value: " + mode);
return getSearchResultEntryHandler().handle(entry, dn, previousDn, type);
public boolean handleReference(SearchResultReference reference) {
//ignoring references
return true;
public void handleErrorResult(ErrorResultException error) {
if (!shutdown) {
DEBUG.error("An error occurred while executing persistent search", error);
DEBUG.message("Restarting persistent search. Some changes may have been missed in the interim.");
} else {
DEBUG.message("Persistence search has been cancelled",error);
public void handleResult(Result result) {
private class RetryTask extends GeneralTaskRunnable {
private long runPeriod;
private long lastLogged = 0;
public RetryTask() {
runPeriod = retryInterval;
public boolean addElement(Object key) {
return false;
public boolean removeElement(Object key) {
return false;
public boolean isEmpty() {
return true;
public long getRunPeriod() {
return runPeriod;
public void run() {
try {
conn = factory.getConnection();
//everything seems to work, let's disable retryTask and reset the debug limit
runPeriod = -1;
lastLogged = 0;
} catch (Exception ex) {
long now = System.currentTimeMillis();
if (now - lastLogged > 60000) {
DEBUG.error("Unable to start persistent search: " + ex.getMessage());
lastLogged = now;