7212N/A/*
7212N/A * CDDL HEADER START
7212N/A *
7212N/A * The contents of this file are subject to the terms of the
7212N/A * Common Development and Distribution License, Version 1.0 only
7212N/A * (the "License"). You may not use this file except in compliance
7212N/A * with the License.
7212N/A *
7212N/A * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
7212N/A * or http://forgerock.org/license/CDDLv1.0.html.
7212N/A * See the License for the specific language governing permissions
7212N/A * and limitations under the License.
7212N/A *
7212N/A * When distributing Covered Code, include this CDDL HEADER in each
7212N/A * file and include the License file at legal-notices/CDDLv1_0.txt.
7212N/A * If applicable, add the following below this CDDL HEADER, with the
7212N/A * fields enclosed by brackets "[]" replaced with your own identifying
7212N/A * information:
7212N/A * Portions Copyright [yyyy] [name of copyright owner]
7212N/A *
7212N/A * CDDL HEADER END
7212N/A *
7212N/A *
7212N/A * Copyright 2014 ForgeRock AS.
7212N/A */
7212N/Apackage org.opends.server.backends;
7212N/A
7237N/Aimport java.text.SimpleDateFormat;
7251N/Aimport java.util.Collection;
7251N/Aimport java.util.Collections;
7251N/Aimport java.util.Date;
7251N/Aimport java.util.HashSet;
7251N/Aimport java.util.Iterator;
7251N/Aimport java.util.LinkedHashMap;
7251N/Aimport java.util.List;
7251N/Aimport java.util.Map;
7251N/Aimport java.util.Set;
7251N/Aimport java.util.TimeZone;
7257N/Aimport java.util.concurrent.ConcurrentLinkedQueue;
7257N/Aimport java.util.concurrent.ConcurrentSkipListMap;
7257N/Aimport java.util.concurrent.atomic.AtomicReference;
7237N/A
7237N/Aimport org.opends.messages.Category;
7237N/Aimport org.opends.messages.Message;
7237N/Aimport org.opends.messages.Severity;
7237N/Aimport org.opends.server.admin.Configuration;
7237N/Aimport org.opends.server.api.Backend;
7237N/Aimport org.opends.server.config.ConfigConstants;
7237N/Aimport org.opends.server.config.ConfigException;
7237N/Aimport org.opends.server.controls.EntryChangelogNotificationControl;
7237N/Aimport org.opends.server.controls.ExternalChangelogRequestControl;
7251N/Aimport org.opends.server.core.AddOperation;
7251N/Aimport org.opends.server.core.DeleteOperation;
7251N/Aimport org.opends.server.core.DirectoryServer;
7251N/Aimport org.opends.server.core.ModifyDNOperation;
7251N/Aimport org.opends.server.core.ModifyOperation;
7251N/Aimport org.opends.server.core.PersistentSearch;
7251N/Aimport org.opends.server.core.SearchOperation;
7237N/Aimport org.opends.server.loggers.debug.DebugTracer;
7237N/Aimport org.opends.server.replication.common.CSN;
7237N/Aimport org.opends.server.replication.common.MultiDomainServerState;
7244N/Aimport org.opends.server.replication.common.ServerState;
7237N/Aimport org.opends.server.replication.protocol.AddMsg;
7237N/Aimport org.opends.server.replication.protocol.DeleteMsg;
7237N/Aimport org.opends.server.replication.protocol.LDAPUpdateMsg;
7237N/Aimport org.opends.server.replication.protocol.ModifyCommonMsg;
7237N/Aimport org.opends.server.replication.protocol.ModifyDNMsg;
7237N/Aimport org.opends.server.replication.protocol.UpdateMsg;
7237N/Aimport org.opends.server.replication.server.ReplicationServer;
7244N/Aimport org.opends.server.replication.server.ReplicationServerDomain;
7237N/Aimport org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB;
7237N/Aimport org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
7237N/Aimport org.opends.server.replication.server.changelog.api.ChangelogDB;
7237N/Aimport org.opends.server.replication.server.changelog.api.ChangelogException;
7237N/Aimport org.opends.server.replication.server.changelog.api.DBCursor;
7237N/Aimport org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
7237N/Aimport org.opends.server.replication.server.changelog.je.ECLEnabledDomainPredicate;
7237N/Aimport org.opends.server.replication.server.changelog.je.ECLMultiDomainDBCursor;
7237N/Aimport org.opends.server.replication.server.changelog.je.MultiDomainDBCursor;
7251N/Aimport org.opends.server.types.Attribute;
7251N/Aimport org.opends.server.types.AttributeType;
7251N/Aimport org.opends.server.types.AttributeValue;
7251N/Aimport org.opends.server.types.Attributes;
7251N/Aimport org.opends.server.types.BackupConfig;
7251N/Aimport org.opends.server.types.BackupDirectory;
7251N/Aimport org.opends.server.types.ByteString;
7251N/Aimport org.opends.server.types.CanceledOperationException;
7251N/Aimport org.opends.server.types.ConditionResult;
7251N/Aimport org.opends.server.types.Control;
7251N/Aimport org.opends.server.types.DN;
7251N/Aimport org.opends.server.types.DebugLogLevel;
7251N/Aimport org.opends.server.types.DirectoryConfig;
7251N/Aimport org.opends.server.types.DirectoryException;
7251N/Aimport org.opends.server.types.Entry;
7251N/Aimport org.opends.server.types.FilterType;
7251N/Aimport org.opends.server.types.IndexType;
7251N/Aimport org.opends.server.types.InitializationException;
7251N/Aimport org.opends.server.types.LDIFExportConfig;
7251N/Aimport org.opends.server.types.LDIFImportConfig;
7251N/Aimport org.opends.server.types.LDIFImportResult;
7251N/Aimport org.opends.server.types.Modification;
7251N/Aimport org.opends.server.types.ModificationType;
7251N/Aimport org.opends.server.types.ObjectClass;
7251N/Aimport org.opends.server.types.Privilege;
7251N/Aimport org.opends.server.types.RDN;
7251N/Aimport org.opends.server.types.RawAttribute;
7251N/Aimport org.opends.server.types.RestoreConfig;
7251N/Aimport org.opends.server.types.ResultCode;
7251N/Aimport org.opends.server.types.SearchFilter;
7251N/Aimport org.opends.server.types.SearchScope;
7251N/Aimport org.opends.server.types.WritabilityMode;
7237N/Aimport org.opends.server.util.StaticUtils;
7237N/A
7257N/Aimport com.forgerock.opendj.util.Pair;
7257N/A
7251N/Aimport static org.opends.messages.BackendMessages.*;
7251N/Aimport static org.opends.messages.ReplicationMessages.*;
7251N/Aimport static org.opends.server.config.ConfigConstants.*;
7251N/Aimport static org.opends.server.loggers.ErrorLogger.*;
7251N/Aimport static org.opends.server.loggers.debug.DebugLogger.*;
7251N/Aimport static org.opends.server.replication.plugin.MultimasterReplication.*;
7263N/Aimport static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*;
7251N/Aimport static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
7251N/Aimport static org.opends.server.util.LDIFWriter.*;
7251N/Aimport static org.opends.server.util.ServerConstants.*;
7251N/Aimport static org.opends.server.util.StaticUtils.*;
7251N/A
7212N/A/**
7244N/A * A backend that provides access to the changelog, i.e. the "cn=changelog"
7237N/A * suffix. It is a read-only backend that is created by a
7237N/A * {@code ReplicationServer} and is not configurable.
7237N/A * <p>
7237N/A * There are two modes to search the changelog:
7237N/A * <ul>
7237N/A * <li>Cookie mode: when a "ECL Cookie Exchange Control" is provided with the
7237N/A * request. The cookie provided in the control is used to retrieve entries from
7237N/A * the ReplicaDBs. The <code>changeNumber</code> attribute is not returned with
7237N/A * the entries.</li>
7257N/A * <li>Change number mode: when no "ECL Cookie Exchange Control" is provided
7257N/A * with the request. The entries are retrieved using the ChangeNumberIndexDB and
7257N/A * their attributes are set with the information from the ReplicasDBs. The
7257N/A * <code>changeNumber</code> attribute value is set from the content of
7257N/A * ChangeNumberIndexDB.</li>
7237N/A * </ul>
7251N/A * <h3>Searches flow</h3>
7251N/A * <p>
7251N/A * Here is the flow of searches within the changelog backend APIs:
7251N/A * <ul>
7251N/A * <li>Normal searches only go through:
7251N/A * <ol>
7251N/A * <li>{@link ChangelogBackend#search(SearchOperation)} (once, single threaded)</li>
7251N/A * </ol>
7251N/A * </li>
7251N/A * <li>Persistent searches with <code>changesOnly=false</code> go through:
7251N/A * <ol>
7251N/A * <li>{@link ChangelogBackend#registerPersistentSearch(PersistentSearch)}
7251N/A * (once, single threaded),</li>
7251N/A * <li>
7251N/A * {@link ChangelogBackend#search(SearchOperation)} (once, single threaded)</li>
7257N/A * <li>{@link ChangelogBackend#notify*EntryAdded()} (multiple times, multi
7257N/A * threaded)</li>
7251N/A * </ol>
7251N/A * </li>
7251N/A * <li>Persistent searches with <code>changesOnly=true</code> go through:
7251N/A * <ol>
7251N/A * <li>{@link ChangelogBackend#registerPersistentSearch(PersistentSearch)}
7251N/A * (once, single threaded)</li>
7251N/A * <li>
7257N/A * {@link ChangelogBackend#notify*EntryAdded()} (multiple times, multi
7257N/A * threaded)</li>
7251N/A * </ol>
7251N/A * </li>
7251N/A * </ul>
7237N/A *
7237N/A * @see ReplicationServer
7212N/A */
7237N/Apublic class ChangelogBackend extends Backend<Configuration>
7212N/A{
7218N/A private static final DebugTracer TRACER = getTracer();
7218N/A
7237N/A /** The id of this backend. */
7237N/A public static final String BACKEND_ID = "changelog";
7237N/A
7237N/A private static final long CHANGE_NUMBER_FOR_EMPTY_CURSOR = 0L;
7237N/A
7237N/A private static final String CHANGE_NUMBER_ATTR = "changeNumber";
7237N/A private static final String CHANGE_NUMBER_ATTR_LC = CHANGE_NUMBER_ATTR.toLowerCase();
7257N/A private static final String ENTRY_SENDER_ATTACHMENT = OID_ECL_COOKIE_EXCHANGE_CONTROL + ".entrySender";
7237N/A
7237N/A /** The set of objectclasses that will be used in root entry. */
7237N/A private static final Map<ObjectClass, String>
7237N/A CHANGELOG_ROOT_OBJECT_CLASSES = new LinkedHashMap<ObjectClass, String>(2);
7237N/A
7237N/A static
7237N/A {
7237N/A CHANGELOG_ROOT_OBJECT_CLASSES.put(DirectoryServer.getObjectClass(OC_TOP, true), OC_TOP);
7237N/A CHANGELOG_ROOT_OBJECT_CLASSES.put(DirectoryServer.getObjectClass("container", true), "container");
7237N/A }
7237N/A
7237N/A /** The set of objectclasses that will be used in ECL entries. */
7237N/A private static final Map<ObjectClass, String>
7237N/A CHANGELOG_ENTRY_OBJECT_CLASSES = new LinkedHashMap<ObjectClass, String>(2);
7237N/A
7237N/A static
7237N/A {
7237N/A CHANGELOG_ENTRY_OBJECT_CLASSES.put(DirectoryServer.getObjectClass(OC_TOP, true), OC_TOP);
7237N/A CHANGELOG_ENTRY_OBJECT_CLASSES.put(DirectoryServer.getObjectClass(OC_CHANGELOG_ENTRY, true), OC_CHANGELOG_ENTRY);
7237N/A }
7237N/A
7237N/A /** The attribute type for the "creatorsName" attribute. */
7237N/A private static final AttributeType CREATORS_NAME_TYPE =
7237N/A DirectoryConfig.getAttributeType(OP_ATTR_CREATORS_NAME_LC, true);
7237N/A
7237N/A /** The attribute type for the "modifiersName" attribute. */
7237N/A private static final AttributeType MODIFIERS_NAME_TYPE =
7237N/A DirectoryConfig.getAttributeType(OP_ATTR_MODIFIERS_NAME_LC, true);
7237N/A
7244N/A /** The base DN for the external change log. */
7244N/A public static final DN CHANGELOG_BASE_DN;
7244N/A
7244N/A static
7244N/A {
7244N/A try
7244N/A {
7244N/A CHANGELOG_BASE_DN = DN.decode(DN_EXTERNAL_CHANGELOG_ROOT);
7244N/A }
7244N/A catch (DirectoryException e)
7244N/A {
7244N/A throw new RuntimeException(e);
7244N/A }
7244N/A }
7218N/A
7218N/A /** The set of base DNs for this backend. */
7218N/A private DN[] baseDNs;
7218N/A /** The set of supported controls for this backend. */
7237N/A private final Set<String> supportedControls = Collections.singleton(OID_ECL_COOKIE_EXCHANGE_CONTROL);
7251N/A /** Whether the base changelog entry has subordinates. */
7251N/A private Boolean baseEntryHasSubordinates;
7237N/A
7237N/A /** The replication server on which the changelog is read. */
7237N/A private final ReplicationServer replicationServer;
7237N/A private final ECLEnabledDomainPredicate domainPredicate;
7237N/A
7257N/A /** The set of cookie-based persistent searches registered with this backend. */
7257N/A private final ConcurrentLinkedQueue<PersistentSearch> cookieBasedPersistentSearches =
7257N/A new ConcurrentLinkedQueue<PersistentSearch>();
7257N/A /**
7257N/A * The set of change number-based persistent searches registered with this
7257N/A * backend.
7257N/A */
7257N/A private final ConcurrentLinkedQueue<PersistentSearch> changeNumberBasedPersistentSearches =
7257N/A new ConcurrentLinkedQueue<PersistentSearch>();
7257N/A
7237N/A /**
7244N/A * Creates a new backend with the provided replication server.
7237N/A *
7237N/A * @param replicationServer
7237N/A * The replication server on which the changes are read.
7237N/A * @param domainPredicate
7237N/A * Returns whether a domain is enabled for the external changelog.
7237N/A */
7237N/A public ChangelogBackend(final ReplicationServer replicationServer, final ECLEnabledDomainPredicate domainPredicate)
7237N/A {
7237N/A this.replicationServer = replicationServer;
7237N/A this.domainPredicate = domainPredicate;
7237N/A setBackendID(BACKEND_ID);
7237N/A setWritabilityMode(WritabilityMode.DISABLED);
7237N/A setPrivateBackend(true);
7237N/A }
7212N/A
7244N/A private ChangelogDB getChangelogDB()
7244N/A {
7244N/A return replicationServer.getChangelogDB();
7244N/A }
7244N/A
7244N/A /**
7244N/A * Returns the ChangelogBackend configured for "cn=changelog" in this directory server.
7244N/A *
7244N/A * @return the ChangelogBackend configured for "cn=changelog" in this directory server
7244N/A * @deprecated instead inject the required object where needed
7244N/A */
7244N/A @Deprecated
7244N/A public static ChangelogBackend getInstance()
7244N/A {
7244N/A return (ChangelogBackend) DirectoryServer.getBackend(CHANGELOG_BASE_DN);
7244N/A }
7244N/A
7212N/A /** {@inheritDoc} */
7212N/A @Override
7237N/A public void configureBackend(final Configuration config) throws ConfigException
7212N/A {
7237N/A throw new UnsupportedOperationException("The changelog backend is not configurable");
7237N/A }
7218N/A
7237N/A /** {@inheritDoc} */
7237N/A @Override
7237N/A public void initializeBackend() throws InitializationException
7237N/A {
7244N/A baseDNs = new DN[] { CHANGELOG_BASE_DN };
7218N/A
7218N/A try
7218N/A {
7244N/A DirectoryServer.registerBaseDN(CHANGELOG_BASE_DN, this, true);
7218N/A }
7237N/A catch (final DirectoryException e)
7218N/A {
7218N/A throw new InitializationException(
7251N/A ERR_BACKEND_CANNOT_REGISTER_BASEDN.get(DN_EXTERNAL_CHANGELOG_ROOT, getExceptionMessage(e)), e);
7218N/A }
7212N/A }
7212N/A
7212N/A /** {@inheritDoc} */
7212N/A @Override
7212N/A public void finalizeBackend()
7212N/A {
7244N/A super.finalizeBackend();
7244N/A
7218N/A try
7218N/A {
7244N/A DirectoryServer.deregisterBaseDN(CHANGELOG_BASE_DN);
7218N/A }
7237N/A catch (final DirectoryException e)
7218N/A {
7218N/A if (debugEnabled())
7218N/A {
7218N/A TRACER.debugCaught(DebugLogLevel.ERROR, e);
7218N/A }
7218N/A }
7212N/A }
7212N/A
7212N/A /** {@inheritDoc} */
7212N/A @Override
7212N/A public DN[] getBaseDNs()
7212N/A {
7218N/A return baseDNs;
7212N/A }
7212N/A
7212N/A /** {@inheritDoc} */
7212N/A @Override
7212N/A public void preloadEntryCache() throws UnsupportedOperationException
7212N/A {
7237N/A throw new UnsupportedOperationException("Operation not supported.");
7212N/A }
7212N/A
7212N/A /** {@inheritDoc} */
7212N/A @Override
7212N/A public boolean isLocal()
7212N/A {
7237N/A return true;
7212N/A }
7212N/A
7212N/A /** {@inheritDoc} */
7212N/A @Override
7237N/A public boolean isIndexed(final AttributeType attributeType, final IndexType indexType)
7212N/A {
7237N/A return true;
7212N/A }
7212N/A
7212N/A /** {@inheritDoc} */
7212N/A @Override
7237N/A public Entry getEntry(final DN entryDN) throws DirectoryException
7212N/A {
7218N/A if (entryDN == null)
7218N/A {
7218N/A throw new DirectoryException(DirectoryServer.getServerErrorResultCode(),
7218N/A ERR_BACKEND_GET_ENTRY_NULL.get(getBackendID()));
7218N/A }
7212N/A throw new RuntimeException("Not implemented");
7212N/A }
7212N/A
7212N/A /** {@inheritDoc} */
7212N/A @Override
7251N/A public ConditionResult hasSubordinates(final DN entryDN) throws DirectoryException
7237N/A {
7251N/A if (CHANGELOG_BASE_DN.equals(entryDN))
7237N/A {
7251N/A final Boolean hasSubs = baseChangelogHasSubordinates();
7251N/A if (hasSubs == null)
7251N/A {
7251N/A return ConditionResult.UNDEFINED;
7251N/A }
7251N/A return hasSubs ? ConditionResult.TRUE : ConditionResult.FALSE;
7237N/A }
7251N/A return ConditionResult.FALSE;
7237N/A }
7237N/A
7251N/A private Boolean baseChangelogHasSubordinates() throws DirectoryException
7212N/A {
7251N/A if (baseEntryHasSubordinates == null)
7237N/A {
7251N/A // compute its value
7251N/A try
7251N/A {
7251N/A final ReplicationDomainDB replicationDomainDB = getChangelogDB().getReplicationDomainDB();
7253N/A final MultiDomainDBCursor cursor = replicationDomainDB.getCursorFrom(
7263N/A new MultiDomainServerState(), GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, getExcludedBaseDNs());
7251N/A try
7251N/A {
7251N/A baseEntryHasSubordinates = cursor.next();
7251N/A }
7251N/A finally
7251N/A {
7251N/A close(cursor);
7251N/A }
7251N/A }
7251N/A catch (ChangelogException e)
7251N/A {
7251N/A throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, ERR_CHANGELOG_BACKEND_ATTRIBUTE.get(
7251N/A "hasSubordinates", DN_EXTERNAL_CHANGELOG_ROOT, stackTraceToSingleLineString(e)));
7251N/A }
7237N/A }
7251N/A return baseEntryHasSubordinates;
7212N/A }
7212N/A
7212N/A /** {@inheritDoc} */
7212N/A @Override
7237N/A public long numSubordinates(final DN entryDN, final boolean subtree) throws DirectoryException
7212N/A {
7251N/A return -1;
7244N/A }
7244N/A
7244N/A /**
7257N/A * Notifies persistent searches of this backend that a new cookie entry was added to it.
7244N/A * <p>
7251N/A * Note: This method correspond to the "persistent search" phase.
7251N/A * It is executed multiple times per persistent search, multi-threaded, until the persistent search is cancelled.
7257N/A * <p>
7257N/A * This method must only be called after the provided data have been persisted to disk.
7257N/A *
7257N/A * @param baseDN
7257N/A * the baseDN of the newly added entry.
7257N/A * @param updateMsg
7257N/A * the update message of the newly added entry
7257N/A * @throws ChangelogException
7257N/A * If a problem occurs while notifying of the newly added entry.
7257N/A */
7257N/A public void notifyCookieEntryAdded(DN baseDN, UpdateMsg updateMsg) throws ChangelogException
7257N/A {
7257N/A if (!(updateMsg instanceof LDAPUpdateMsg))
7257N/A {
7257N/A return;
7257N/A }
7257N/A
7257N/A try
7257N/A {
7257N/A for (PersistentSearch pSearch : cookieBasedPersistentSearches)
7257N/A {
7257N/A final SearchOperation searchOp = pSearch.getSearchOperation();
7257N/A final CookieEntrySender entrySender = searchOp.getAttachment(ENTRY_SENDER_ATTACHMENT);
7257N/A entrySender.persistentSearchSendEntry(baseDN, updateMsg);
7257N/A }
7257N/A }
7257N/A catch (DirectoryException e)
7257N/A {
7257N/A throw new ChangelogException(e.getMessageObject(), e);
7257N/A }
7257N/A }
7257N/A
7257N/A /**
7257N/A * Notifies persistent searches of this backend that a new change number entry was added to it.
7257N/A * <p>
7257N/A * Note: This method correspond to the "persistent search" phase.
7257N/A * It is executed multiple times per persistent search, multi-threaded, until the persistent search is cancelled.
7257N/A * <p>
7257N/A * This method must only be called after the provided data have been persisted to disk.
7244N/A *
7244N/A * @param baseDN
7244N/A * the baseDN of the newly added entry.
7244N/A * @param changeNumber
7244N/A * the change number of the newly added entry. It will be greater
7244N/A * than zero for entries added to the change number index and less
7244N/A * than or equal to zero for entries added to any replica DB
7244N/A * @param cookieString
7244N/A * a string representing the cookie of the newly added entry.
7244N/A * This is only meaningful for entries added to the change number index
7244N/A * @param updateMsg
7244N/A * the update message of the newly added entry
7244N/A * @throws ChangelogException
7244N/A * If a problem occurs while notifying of the newly added entry.
7244N/A */
7257N/A public void notifyChangeNumberEntryAdded(DN baseDN, long changeNumber, String cookieString, UpdateMsg updateMsg)
7244N/A throws ChangelogException
7244N/A {
7257N/A if (!(updateMsg instanceof LDAPUpdateMsg))
7244N/A {
7244N/A return;
7244N/A }
7244N/A
7244N/A try
7244N/A {
7257N/A // changeNumber entry can be shared with multiple persistent searches
7257N/A final Entry changeNumberEntry = createEntryFromMsg(baseDN, changeNumber, cookieString, updateMsg);
7257N/A for (PersistentSearch pSearch : changeNumberBasedPersistentSearches)
7244N/A {
7257N/A final SearchOperation searchOp = pSearch.getSearchOperation();
7257N/A final ChangeNumberEntrySender entrySender = searchOp.getAttachment(ENTRY_SENDER_ATTACHMENT);
7257N/A entrySender.persistentSearchSendEntry(changeNumber, changeNumberEntry);
7244N/A }
7244N/A }
7244N/A catch (DirectoryException e)
7244N/A {
7244N/A throw new ChangelogException(e.getMessageObject(), e);
7244N/A }
7244N/A }
7244N/A
7244N/A private boolean isCookieBased(final SearchOperation searchOp)
7244N/A {
7244N/A for (Control c : searchOp.getRequestControls())
7244N/A {
7244N/A if (OID_ECL_COOKIE_EXCHANGE_CONTROL.equals(c.getOID()))
7244N/A {
7244N/A return true;
7244N/A }
7244N/A }
7244N/A return false;
7244N/A }
7244N/A
7212N/A /** {@inheritDoc} */
7212N/A @Override
7212N/A public void addEntry(Entry entry, AddOperation addOperation)
7212N/A throws DirectoryException, CanceledOperationException
7212N/A {
7218N/A throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
7218N/A ERR_BACKEND_ADD_NOT_SUPPORTED.get(String.valueOf(entry.getDN()), getBackendID()));
7212N/A }
7212N/A
7212N/A /** {@inheritDoc} */
7212N/A @Override
7212N/A public void deleteEntry(DN entryDN, DeleteOperation deleteOperation)
7212N/A throws DirectoryException, CanceledOperationException
7212N/A {
7218N/A throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
7218N/A ERR_BACKEND_DELETE_NOT_SUPPORTED.get(String.valueOf(entryDN), getBackendID()));
7212N/A }
7212N/A
7212N/A /** {@inheritDoc} */
7212N/A @Override
7212N/A public void replaceEntry(Entry oldEntry, Entry newEntry,
7212N/A ModifyOperation modifyOperation) throws DirectoryException,
7212N/A CanceledOperationException
7212N/A {
7218N/A throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
7218N/A ERR_BACKEND_MODIFY_NOT_SUPPORTED.get(String.valueOf(newEntry.getDN()), getBackendID()));
7212N/A }
7212N/A
7212N/A /** {@inheritDoc} */
7212N/A @Override
7212N/A public void renameEntry(DN currentDN, Entry entry,
7212N/A ModifyDNOperation modifyDNOperation) throws DirectoryException,
7212N/A CanceledOperationException
7212N/A {
7218N/A throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
7218N/A ERR_BACKEND_MODIFY_DN_NOT_SUPPORTED.get(String.valueOf(currentDN), getBackendID()));
7212N/A }
7212N/A
7212N/A /** {@inheritDoc} */
7212N/A @Override
7237N/A public void search(final SearchOperation searchOperation) throws DirectoryException
7212N/A {
7243N/A checkChangelogReadPrivilege(searchOperation);
7243N/A
7243N/A final SearchParams params = buildSearchParameters(searchOperation);
7237N/A
7237N/A optimizeSearchParameters(params, searchOperation.getBaseDN(), searchOperation.getFilter());
7237N/A try
7237N/A {
7251N/A initialSearch(params, searchOperation);
7237N/A }
7237N/A catch (ChangelogException e)
7237N/A {
7237N/A throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, ERR_CHANGELOG_BACKEND_SEARCH.get(
7237N/A searchOperation.getBaseDN().toString(),
7237N/A searchOperation.getFilter().toString(),
7237N/A stackTraceToSingleLineString(e)));
7237N/A }
7212N/A }
7212N/A
7243N/A private SearchParams buildSearchParameters(final SearchOperation searchOperation) throws DirectoryException
7243N/A {
7251N/A final SearchParams params = new SearchParams(getExcludedBaseDNs());
7243N/A final ExternalChangelogRequestControl eclRequestControl =
7243N/A searchOperation.getRequestControl(ExternalChangelogRequestControl.DECODER);
7247N/A if (eclRequestControl != null)
7243N/A {
7244N/A params.cookie = eclRequestControl.getCookie();
7243N/A }
7243N/A return params;
7243N/A }
7243N/A
7212N/A /** {@inheritDoc} */
7212N/A @Override
7212N/A public Set<String> getSupportedControls()
7212N/A {
7218N/A return supportedControls;
7212N/A }
7212N/A
7212N/A /** {@inheritDoc} */
7212N/A @Override
7212N/A public Set<String> getSupportedFeatures()
7212N/A {
7218N/A return Collections.emptySet();
7212N/A }
7212N/A
7212N/A /** {@inheritDoc} */
7212N/A @Override
7212N/A public boolean supportsLDIFExport()
7212N/A {
7218N/A return false;
7212N/A }
7212N/A
7212N/A /** {@inheritDoc} */
7212N/A @Override
7237N/A public void exportLDIF(final LDIFExportConfig exportConfig)
7212N/A throws DirectoryException
7212N/A {
7218N/A throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
7218N/A ERR_BACKEND_IMPORT_AND_EXPORT_NOT_SUPPORTED.get(getBackendID()));
7212N/A }
7212N/A
7212N/A /** {@inheritDoc} */
7212N/A @Override
7212N/A public boolean supportsLDIFImport()
7212N/A {
7218N/A return false;
7212N/A }
7212N/A
7212N/A /** {@inheritDoc} */
7212N/A @Override
7212N/A public LDIFImportResult importLDIF(LDIFImportConfig importConfig)
7212N/A throws DirectoryException
7212N/A {
7218N/A throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
7218N/A ERR_BACKEND_IMPORT_AND_EXPORT_NOT_SUPPORTED.get(getBackendID()));
7212N/A }
7212N/A
7212N/A /** {@inheritDoc} */
7212N/A @Override
7212N/A public boolean supportsBackup()
7212N/A {
7237N/A return false;
7212N/A }
7212N/A
7212N/A /** {@inheritDoc} */
7212N/A @Override
7237N/A public boolean supportsBackup(BackupConfig backupConfig, StringBuilder unsupportedReason)
7212N/A {
7237N/A return false;
7212N/A }
7212N/A
7212N/A /** {@inheritDoc} */
7212N/A @Override
7212N/A public void createBackup(BackupConfig backupConfig) throws DirectoryException
7212N/A {
7218N/A throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
7218N/A ERR_BACKEND_BACKUP_AND_RESTORE_NOT_SUPPORTED.get(getBackendID()));
7212N/A }
7212N/A
7212N/A /** {@inheritDoc} */
7212N/A @Override
7237N/A public void removeBackup(BackupDirectory backupDirectory, String backupID) throws DirectoryException
7212N/A {
7237N/A throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
7237N/A ERR_BACKEND_BACKUP_AND_RESTORE_NOT_SUPPORTED.get(getBackendID()));
7212N/A }
7212N/A
7212N/A /** {@inheritDoc} */
7212N/A @Override
7212N/A public boolean supportsRestore()
7212N/A {
7237N/A return false;
7212N/A }
7212N/A
7212N/A /** {@inheritDoc} */
7212N/A @Override
7237N/A public void restoreBackup(RestoreConfig restoreConfig) throws DirectoryException
7212N/A {
7237N/A throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
7237N/A ERR_BACKEND_BACKUP_AND_RESTORE_NOT_SUPPORTED.get(getBackendID()));
7212N/A }
7212N/A
7212N/A /** {@inheritDoc} */
7212N/A @Override
7212N/A public long getEntryCount()
7212N/A {
7237N/A try
7237N/A {
7244N/A return numSubordinates(CHANGELOG_BASE_DN, true) + 1;
7237N/A }
7237N/A catch (DirectoryException e)
7237N/A {
7237N/A if (debugEnabled())
7237N/A {
7237N/A TRACER.debugCaught(DebugLogLevel.ERROR, e);
7237N/A }
7237N/A return -1;
7237N/A }
7237N/A }
7237N/A
7237N/A /**
7237N/A * Represent the search parameters specific to the changelog.
7237N/A *
7237N/A * This class should be visible for tests.
7237N/A */
7237N/A static class SearchParams
7237N/A {
7251N/A private final Set<DN> excludedBaseDNs;
7237N/A private long lowestChangeNumber = -1;
7237N/A private long highestChangeNumber = -1;
7237N/A private CSN csn = new CSN(0, 0, 0);
7244N/A private MultiDomainServerState cookie;
7237N/A
7237N/A /**
7237N/A * Creates search parameters.
7237N/A */
7237N/A SearchParams()
7237N/A {
7251N/A this(Collections.<DN> emptySet());
7237N/A }
7237N/A
7237N/A /**
7237N/A * Creates search parameters with provided id and excluded domain DNs.
7237N/A *
7237N/A * @param excludedBaseDNs
7237N/A * Set of DNs to exclude from search.
7237N/A */
7251N/A SearchParams(final Set<DN> excludedBaseDNs)
7237N/A {
7237N/A this.excludedBaseDNs = excludedBaseDNs;
7237N/A }
7237N/A
7237N/A /**
7247N/A * Returns whether this search is cookie based.
7247N/A *
7247N/A * @return true if this search is cookie-based, false if this search is
7247N/A * change number-based.
7247N/A */
7247N/A private boolean isCookieBasedSearch()
7247N/A {
7247N/A return cookie != null;
7247N/A }
7247N/A
7247N/A /**
7237N/A * Indicates if provided change number is compatible with last change
7237N/A * number.
7237N/A *
7237N/A * @param changeNumber
7237N/A * The change number to test.
7237N/A * @return {@code true} if and only if the provided change number is in the
7237N/A * range of the last change number.
7237N/A */
7237N/A boolean changeNumberIsInRange(long changeNumber)
7237N/A {
7237N/A return highestChangeNumber == -1 || changeNumber <= highestChangeNumber;
7237N/A }
7237N/A
7237N/A /**
7237N/A * Returns the lowest change number to retrieve (inclusive).
7237N/A *
7237N/A * @return the lowest change number
7237N/A */
7237N/A long getLowestChangeNumber()
7237N/A {
7237N/A return lowestChangeNumber;
7237N/A }
7237N/A
7237N/A /**
7237N/A * Returns the highest change number to retrieve (inclusive).
7237N/A *
7237N/A * @return the highest change number
7237N/A */
7237N/A long getHighestChangeNumber()
7237N/A {
7237N/A return highestChangeNumber;
7237N/A }
7237N/A
7237N/A /**
7237N/A * Returns the CSN to retrieve.
7237N/A *
7237N/A * @return the CSN, which may be the default CSN with zero values.
7237N/A */
7237N/A CSN getCSN()
7237N/A {
7237N/A return csn;
7237N/A }
7237N/A
7237N/A /**
7237N/A * Returns the set of DNs to exclude from the search.
7237N/A *
7237N/A * @return the DNs corresponding to domains to exclude from the search.
7237N/A */
7251N/A Set<DN> getExcludedBaseDNs()
7237N/A {
7251N/A return excludedBaseDNs;
7237N/A }
7251N/A }
7237N/A
7251N/A /**
7251N/A * Returns the set of DNs to exclude from the search.
7251N/A *
7251N/A * @return the DNs corresponding to domains to exclude from the search.
7251N/A * @throws DirectoryException
7251N/A * If a DN can't be decoded.
7251N/A */
7251N/A private static Set<DN> getExcludedBaseDNs() throws DirectoryException
7251N/A {
7251N/A final Set<DN> excludedDNs = new HashSet<DN>();
7251N/A for (String dn : getExcludedChangelogDomains())
7251N/A {
7251N/A excludedDNs.add(DN.decode(dn));
7251N/A }
7251N/A return excludedDNs;
7237N/A }
7237N/A
7237N/A /**
7237N/A * Optimize the search parameters by analyzing the DN and filter.
7237N/A * Populate the provided SearchParams with optimizations found.
7237N/A *
7237N/A * @param params the search parameters that are specific to external changelog
7237N/A * @param baseDN the provided search baseDN.
7237N/A * @param userFilter the provided search filter.
7237N/A * @throws DirectoryException when an exception occurs.
7237N/A */
7237N/A void optimizeSearchParameters(final SearchParams params, final DN baseDN, final SearchFilter userFilter)
7237N/A throws DirectoryException
7237N/A {
7237N/A SearchFilter equalityFilter = null;
7237N/A switch (baseDN.getNumComponents())
7237N/A {
7237N/A case 1:
7237N/A // "cn=changelog" : use user-provided search filter.
7237N/A break;
7237N/A case 2:
7237N/A // It is probably "changeNumber=xxx,cn=changelog", use equality filter
7237N/A // But it also could be "<service-id>,cn=changelog" so need to check on attribute
7237N/A equalityFilter = buildSearchFilterFrom(baseDN, CHANGE_NUMBER_ATTR_LC, CHANGE_NUMBER_ATTR);
7237N/A break;
7237N/A default:
7237N/A // "replicationCSN=xxx,<service-id>,cn=changelog" : use equality filter
7237N/A equalityFilter = buildSearchFilterFrom(baseDN, "replicationcsn", "replicationCSN");
7237N/A break;
7237N/A }
7237N/A
7237N/A final SearchParams optimized = optimizeSearchUsingFilter(equalityFilter != null ? equalityFilter : userFilter);
7237N/A params.lowestChangeNumber = optimized.lowestChangeNumber;
7237N/A params.highestChangeNumber = optimized.highestChangeNumber;
7237N/A params.csn = optimized.csn;
7237N/A }
7237N/A
7237N/A /**
7237N/A * Build a search filter from given DN and attribute.
7237N/A *
7237N/A * @return the search filter or {@code null} if attribute is not present in
7237N/A * the provided DN
7237N/A */
7237N/A private SearchFilter buildSearchFilterFrom(final DN baseDN, final String lowerCaseAttr, final String upperCaseAttr)
7237N/A {
7237N/A final RDN rdn = baseDN.getRDN();
7237N/A AttributeType attrType = DirectoryServer.getAttributeType(lowerCaseAttr);
7237N/A if (attrType == null)
7237N/A {
7237N/A attrType = DirectoryServer.getDefaultAttributeType(upperCaseAttr);
7237N/A }
7237N/A final AttributeValue attrValue = rdn.getAttributeValue(attrType);
7237N/A if (attrValue != null)
7237N/A {
7237N/A return SearchFilter.createEqualityFilter(attrType, attrValue);
7237N/A }
7237N/A return null;
7237N/A }
7237N/A
7237N/A private SearchParams optimizeSearchUsingFilter(final SearchFilter filter) throws DirectoryException
7237N/A {
7237N/A final SearchParams params = new SearchParams();
7237N/A if (filter == null)
7237N/A {
7237N/A return params;
7237N/A }
7237N/A
7237N/A if (matches(filter, FilterType.GREATER_OR_EQUAL, CHANGE_NUMBER_ATTR))
7237N/A {
7237N/A params.lowestChangeNumber = decodeChangeNumber(filter.getAssertionValue());
7237N/A }
7237N/A else if (matches(filter, FilterType.LESS_OR_EQUAL, CHANGE_NUMBER_ATTR))
7237N/A {
7237N/A params.highestChangeNumber = decodeChangeNumber(filter.getAssertionValue());
7237N/A }
7237N/A else if (matches(filter, FilterType.EQUALITY, CHANGE_NUMBER_ATTR))
7237N/A {
7237N/A final long number = decodeChangeNumber(filter.getAssertionValue());
7237N/A params.lowestChangeNumber = number;
7237N/A params.highestChangeNumber = number;
7237N/A }
7237N/A else if (matches(filter, FilterType.EQUALITY, "replicationcsn"))
7237N/A {
7237N/A // == exact CSN
7237N/A params.csn = new CSN(filter.getAssertionValue().toString());
7237N/A }
7237N/A else if (filter.getFilterType() == FilterType.AND)
7237N/A {
7237N/A // TODO: it looks like it could be generalized to N components, not only two
7237N/A final Collection<SearchFilter> components = filter.getFilterComponents();
7237N/A final SearchFilter filters[] = components.toArray(new SearchFilter[0]);
7237N/A long last1 = -1;
7237N/A long first1 = -1;
7237N/A long last2 = -1;
7237N/A long first2 = -1;
7237N/A if (filters.length > 0)
7237N/A {
7237N/A SearchParams msg1 = optimizeSearchUsingFilter(filters[0]);
7237N/A last1 = msg1.highestChangeNumber;
7237N/A first1 = msg1.lowestChangeNumber;
7237N/A }
7237N/A if (filters.length > 1)
7237N/A {
7237N/A SearchParams msg2 = optimizeSearchUsingFilter(filters[1]);
7237N/A last2 = msg2.highestChangeNumber;
7237N/A first2 = msg2.lowestChangeNumber;
7237N/A }
7237N/A if (last1 == -1)
7237N/A {
7237N/A params.highestChangeNumber = last2;
7237N/A }
7237N/A else if (last2 == -1)
7237N/A {
7237N/A params.highestChangeNumber = last1;
7237N/A }
7237N/A else
7237N/A {
7237N/A params.highestChangeNumber = Math.min(last1, last2);
7237N/A }
7237N/A
7237N/A params.lowestChangeNumber = Math.max(first1, first2);
7237N/A }
7237N/A return params;
7237N/A }
7237N/A
7237N/A private static long decodeChangeNumber(final AttributeValue assertionValue)
7237N/A throws DirectoryException
7237N/A {
7237N/A try
7237N/A {
7237N/A return Long.decode(assertionValue.getNormalizedValue().toString());
7237N/A }
7237N/A catch (NumberFormatException e)
7237N/A {
7237N/A throw new DirectoryException(ResultCode.INVALID_ATTRIBUTE_SYNTAX,
7237N/A Message.raw("Could not convert value '%s' to long", assertionValue.getNormalizedValue().toString()));
7237N/A }
7237N/A }
7237N/A
7237N/A private boolean matches(SearchFilter filter, FilterType filterType, String primaryName)
7237N/A {
7237N/A return filter.getFilterType() == filterType
7237N/A && filter.getAttributeType() != null
7237N/A && filter.getAttributeType().getPrimaryName().equalsIgnoreCase(primaryName);
7237N/A }
7237N/A
7251N/A /**
7251N/A * Runs the "initial search" phase (as opposed to a "persistent search" phase).
7251N/A * The "initial search" phase is the only search run by normal searches,
7251N/A * but it is also run by persistent searches with <code>changesOnly=false</code>.
7251N/A * Persistent searches with <code>changesOnly=true</code> never execute this code.
7251N/A * <p>
7251N/A * Note: this method is executed only once per persistent search, single threaded.
7251N/A */
7251N/A private void initialSearch(final SearchParams searchParams, final SearchOperation searchOperation)
7237N/A throws DirectoryException, ChangelogException
7237N/A {
7247N/A if (searchParams.isCookieBasedSearch())
7237N/A {
7251N/A initialSearchFromCookie(searchParams, searchOperation);
7247N/A }
7247N/A else
7247N/A {
7251N/A initialSearchFromChangeNumber(searchParams, searchOperation);
7237N/A }
7237N/A }
7237N/A
7237N/A /**
7237N/A * Search the changelog when a cookie control is provided.
7237N/A */
7251N/A private void initialSearchFromCookie(final SearchParams searchParams, final SearchOperation searchOperation)
7237N/A throws DirectoryException, ChangelogException
7237N/A {
7237N/A validateProvidedCookie(searchParams);
7237N/A
7257N/A final CookieEntrySender entrySender;
7251N/A if (isPersistentSearch(searchOperation))
7251N/A {
7257N/A entrySender = searchOperation.getAttachment(ENTRY_SENDER_ATTACHMENT);
7257N/A }
7257N/A else
7257N/A {
7266N/A entrySender = new CookieEntrySender(searchOperation, SearchPhase.INITIAL);
7257N/A }
7270N/A entrySender.setCookie(searchParams.cookie);
7257N/A
7257N/A if (!sendBaseChangelogEntry(searchOperation))
7257N/A { // only return the base entry: stop here
7257N/A return;
7244N/A }
7244N/A
7237N/A ECLMultiDomainDBCursor replicaUpdatesCursor = null;
7237N/A try
7237N/A {
7244N/A final ReplicationDomainDB replicationDomainDB = getChangelogDB().getReplicationDomainDB();
7237N/A final MultiDomainDBCursor cursor = replicationDomainDB.getCursorFrom(
7270N/A searchParams.cookie, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, searchParams.getExcludedBaseDNs());
7237N/A replicaUpdatesCursor = new ECLMultiDomainDBCursor(domainPredicate, cursor);
7237N/A
7270N/A final boolean continueSearch = sendCookieEntriesFromCursor(entrySender, replicaUpdatesCursor);
7257N/A if (continueSearch)
7237N/A {
7257N/A entrySender.transitioningToPersistentSearchPhase();
7270N/A sendCookieEntriesFromCursor(entrySender, replicaUpdatesCursor);
7237N/A }
7237N/A }
7237N/A finally
7237N/A {
7257N/A entrySender.finalizeInitialSearch();
7237N/A StaticUtils.close(replicaUpdatesCursor);
7237N/A }
7237N/A }
7237N/A
7257N/A private boolean sendCookieEntriesFromCursor(final CookieEntrySender entrySender,
7270N/A final ECLMultiDomainDBCursor replicaUpdatesCursor) throws ChangelogException, DirectoryException
7257N/A {
7257N/A boolean continueSearch = true;
7257N/A while (continueSearch && replicaUpdatesCursor.next())
7257N/A {
7257N/A final UpdateMsg updateMsg = replicaUpdatesCursor.getRecord();
7257N/A final DN domainBaseDN = replicaUpdatesCursor.getData();
7270N/A continueSearch = entrySender.initialSearchSendEntry(updateMsg, domainBaseDN);
7257N/A }
7257N/A return continueSearch;
7257N/A }
7257N/A
7244N/A private boolean isPersistentSearch(SearchOperation op)
7244N/A {
7244N/A for (PersistentSearch pSearch : getPersistentSearches())
7244N/A {
7244N/A if (op == pSearch.getSearchOperation())
7244N/A {
7244N/A return true;
7244N/A }
7244N/A }
7244N/A return false;
7244N/A }
7244N/A
7244N/A /** {@inheritDoc} */
7244N/A @Override
7270N/A public void registerPersistentSearch(PersistentSearch pSearch) throws DirectoryException
7244N/A {
7270N/A validatePersistentSearch(pSearch);
7270N/A initializeEntrySender(pSearch);
7257N/A
7257N/A if (isCookieBased(pSearch.getSearchOperation()))
7257N/A {
7257N/A cookieBasedPersistentSearches.add(pSearch);
7257N/A }
7257N/A else
7244N/A {
7257N/A changeNumberBasedPersistentSearches.add(pSearch);
7244N/A }
7257N/A super.registerPersistentSearch(pSearch);
7257N/A }
7251N/A
7270N/A private void validatePersistentSearch(final PersistentSearch pSearch) throws DirectoryException
7257N/A {
7270N/A // Validation must be done during registration for changes only persistent searches.
7270N/A // Otherwise, when there is an initial search phase,
7270N/A // validation is performed by the search() method.
7270N/A if (pSearch.isChangesOnly())
7270N/A {
7270N/A final SearchOperation searchOperation = pSearch.getSearchOperation();
7270N/A checkChangelogReadPrivilege(searchOperation);
7270N/A final SearchParams params = buildSearchParameters(searchOperation);
7270N/A // next line also validates some search parameters
7270N/A optimizeSearchParameters(params, searchOperation.getBaseDN(), searchOperation.getFilter());
7270N/A validateProvidedCookie(params);
7270N/A }
7270N/A }
7270N/A
7270N/A private void initializeEntrySender(PersistentSearch pSearch)
7270N/A {
7270N/A final SearchPhase startPhase = pSearch.isChangesOnly() ? SearchPhase.PERSISTENT : SearchPhase.INITIAL;
7270N/A
7257N/A final SearchOperation searchOp = pSearch.getSearchOperation();
7257N/A if (isCookieBased(searchOp))
7257N/A {
7270N/A final CookieEntrySender entrySender = new CookieEntrySender(searchOp, startPhase);
7270N/A searchOp.setAttachment(ENTRY_SENDER_ATTACHMENT, entrySender);
7257N/A if (pSearch.isChangesOnly())
7257N/A {
7257N/A // this changesOnly persistent search will not go through #initialSearch()
7257N/A // so we must initialize the cookie here
7270N/A entrySender.setCookie(getNewestCookie(searchOp));
7257N/A }
7257N/A }
7257N/A else
7257N/A {
7270N/A searchOp.setAttachment(ENTRY_SENDER_ATTACHMENT, new ChangeNumberEntrySender(searchOp, startPhase));
7257N/A }
7244N/A }
7244N/A
7244N/A private MultiDomainServerState getNewestCookie(SearchOperation searchOp)
7244N/A {
7244N/A if (!isCookieBased(searchOp))
7244N/A {
7244N/A return null;
7244N/A }
7244N/A
7244N/A final MultiDomainServerState cookie = new MultiDomainServerState();
7244N/A for (final Iterator<ReplicationServerDomain> it =
7244N/A replicationServer.getDomainIterator(); it.hasNext();)
7244N/A {
7244N/A final DN baseDN = it.next().getBaseDN();
7244N/A final ServerState state = getChangelogDB().getReplicationDomainDB().getDomainNewestCSNs(baseDN);
7244N/A cookie.update(baseDN, state);
7244N/A }
7244N/A return cookie;
7244N/A }
7244N/A
7237N/A /**
7237N/A * Validates the cookie contained in search parameters by checking its content
7237N/A * with the actual replication server state.
7237N/A *
7237N/A * @throws DirectoryException
7237N/A * If the state is not valid
7237N/A */
7237N/A private void validateProvidedCookie(final SearchParams searchParams) throws DirectoryException
7237N/A {
7257N/A final MultiDomainServerState cookie = searchParams.cookie;
7257N/A if (cookie != null && !cookie.isEmpty())
7237N/A {
7257N/A replicationServer.validateCookie(cookie, searchParams.getExcludedBaseDNs());
7237N/A }
7237N/A }
7237N/A
7237N/A /**
7237N/A * Search the changelog using change number(s).
7237N/A */
7251N/A private void initialSearchFromChangeNumber(final SearchParams params, final SearchOperation searchOperation)
7237N/A throws ChangelogException, DirectoryException
7237N/A {
7251N/A // "initial search" phase must return the base entry immediately
7251N/A sendBaseChangelogEntry(searchOperation);
7244N/A
7257N/A final ChangeNumberEntrySender entrySender;
7257N/A if (isPersistentSearch(searchOperation))
7257N/A {
7257N/A entrySender = searchOperation.getAttachment(ENTRY_SENDER_ATTACHMENT);
7257N/A }
7257N/A else
7257N/A {
7266N/A entrySender = new ChangeNumberEntrySender(searchOperation, SearchPhase.INITIAL);
7257N/A }
7257N/A
7237N/A DBCursor<ChangeNumberIndexRecord> cnIndexDBCursor = null;
7257N/A final AtomicReference<MultiDomainDBCursor> replicaUpdatesCursor = new AtomicReference<MultiDomainDBCursor>();
7244N/A try
7244N/A {
7244N/A cnIndexDBCursor = getCNIndexDBCursor(params.lowestChangeNumber);
7263N/A MultiDomainServerState cookie = new MultiDomainServerState();
7257N/A final boolean continueSearch =
7263N/A sendChangeNumberEntriesFromCursors(entrySender, params, cnIndexDBCursor, replicaUpdatesCursor, cookie);
7257N/A if (continueSearch)
7237N/A {
7257N/A entrySender.transitioningToPersistentSearchPhase();
7263N/A sendChangeNumberEntriesFromCursors(entrySender, params, cnIndexDBCursor, replicaUpdatesCursor, cookie);
7237N/A }
7237N/A }
7244N/A finally
7244N/A {
7257N/A entrySender.finalizeInitialSearch();
7257N/A StaticUtils.close(cnIndexDBCursor, replicaUpdatesCursor.get());
7237N/A }
7237N/A }
7237N/A
7257N/A private boolean sendChangeNumberEntriesFromCursors(final ChangeNumberEntrySender entrySender,
7257N/A final SearchParams params, DBCursor<ChangeNumberIndexRecord> cnIndexDBCursor,
7263N/A AtomicReference<MultiDomainDBCursor> replicaUpdatesCursor, MultiDomainServerState cookie)
7263N/A throws ChangelogException, DirectoryException
7237N/A {
7257N/A boolean continueSearch = true;
7257N/A while (continueSearch && cnIndexDBCursor.next())
7257N/A {
7257N/A // Handle the current cnIndex record
7257N/A final ChangeNumberIndexRecord cnIndexRecord = cnIndexDBCursor.getRecord();
7257N/A if (replicaUpdatesCursor.get() == null)
7257N/A {
7257N/A replicaUpdatesCursor.set(initializeReplicaUpdatesCursor(cnIndexRecord));
7263N/A initializeCookieForChangeNumberMode(cookie, cnIndexRecord);
7263N/A }
7263N/A else
7263N/A {
7263N/A cookie.update(cnIndexRecord.getBaseDN(), cnIndexRecord.getCSN());
7257N/A }
7257N/A continueSearch = params.changeNumberIsInRange(cnIndexRecord.getChangeNumber());
7257N/A if (continueSearch)
7257N/A {
7257N/A final UpdateMsg updateMsg = findReplicaUpdateMessage(cnIndexRecord, replicaUpdatesCursor.get());
7257N/A if (updateMsg != null)
7257N/A {
7263N/A continueSearch = entrySender.initialSearchSendEntry(cnIndexRecord, updateMsg, cookie);
7257N/A replicaUpdatesCursor.get().next();
7257N/A }
7257N/A }
7257N/A }
7257N/A return continueSearch;
7237N/A }
7237N/A
7263N/A /** Initialize the provided cookie from the provided change number index record. */
7263N/A private void initializeCookieForChangeNumberMode(
7263N/A MultiDomainServerState cookie, final ChangeNumberIndexRecord cnIndexRecord) throws ChangelogException
7263N/A {
7263N/A ECLMultiDomainDBCursor eclCursor = null;
7263N/A try
7263N/A {
7263N/A cookie.update(cnIndexRecord.getBaseDN(), cnIndexRecord.getCSN());
7263N/A MultiDomainDBCursor cursor =
7263N/A getChangelogDB().getReplicationDomainDB().getCursorFrom(cookie,
7263N/A LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY);
7263N/A eclCursor = new ECLMultiDomainDBCursor(domainPredicate, cursor);
7263N/A eclCursor.next();
7263N/A cookie.update(eclCursor.toCookie());
7263N/A }
7263N/A finally
7263N/A {
7263N/A close(eclCursor);
7263N/A }
7263N/A }
7263N/A
7244N/A private MultiDomainDBCursor initializeReplicaUpdatesCursor(
7237N/A final ChangeNumberIndexRecord cnIndexRecord) throws ChangelogException
7237N/A {
7237N/A final MultiDomainServerState state = new MultiDomainServerState();
7237N/A state.update(cnIndexRecord.getBaseDN(), cnIndexRecord.getCSN());
7237N/A
7237N/A // No need for ECLMultiDomainDBCursor in this case
7237N/A // as updateMsg will be matched with cnIndexRecord
7237N/A final MultiDomainDBCursor replicaUpdatesCursor =
7263N/A getChangelogDB().getReplicationDomainDB().getCursorFrom(state, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY);
7237N/A replicaUpdatesCursor.next();
7237N/A return replicaUpdatesCursor;
7237N/A }
7237N/A
7237N/A /**
7237N/A * Returns the replica update message corresponding to the provided
7237N/A * cnIndexRecord.
7237N/A *
7237N/A * @return the update message, which may be {@code null} if the update message
7237N/A * could not be found because it was purged or because corresponding
7237N/A * baseDN was removed from the changelog
7237N/A * @throws DirectoryException
7237N/A * If inconsistency is detected between the available update
7237N/A * messages and the provided cnIndexRecord
7237N/A */
7237N/A private UpdateMsg findReplicaUpdateMessage(
7237N/A final ChangeNumberIndexRecord cnIndexRecord,
7237N/A final MultiDomainDBCursor replicaUpdatesCursor)
7237N/A throws DirectoryException, ChangelogException
7237N/A {
7237N/A while (true)
7237N/A {
7237N/A final UpdateMsg updateMsg = replicaUpdatesCursor.getRecord();
7237N/A final int compareIndexWithUpdateMsg = cnIndexRecord.getCSN().compareTo(updateMsg.getCSN());
7237N/A if (compareIndexWithUpdateMsg < 0) {
7237N/A // Either update message has been purged or baseDN has been removed from changelogDB,
7237N/A // ignore current index record and go to the next one
7237N/A return null;
7237N/A }
7237N/A else if (compareIndexWithUpdateMsg == 0)
7237N/A {
7237N/A // Found the matching update message
7237N/A return updateMsg;
7237N/A }
7237N/A // Case compareIndexWithUpdateMsg > 0 : the update message has not bean reached yet
7237N/A if (!replicaUpdatesCursor.next())
7237N/A {
7237N/A // Should never happen, as it means some messages have disappeared
7237N/A // TODO : put the correct I18N message
7237N/A throw new DirectoryException(ResultCode.OPERATIONS_ERROR,
7237N/A Message.raw("Could not find replica update message matching index record. " +
7237N/A "No more replica update messages with a csn newer than " + updateMsg.getCSN() + " exist."));
7237N/A }
7237N/A }
7237N/A }
7237N/A
7237N/A /** Returns a cursor on CNIndexDB for the provided first change number. */
7244N/A private DBCursor<ChangeNumberIndexRecord> getCNIndexDBCursor(
7237N/A final long firstChangeNumber) throws ChangelogException
7237N/A {
7244N/A final ChangeNumberIndexDB cnIndexDB = getChangelogDB().getChangeNumberIndexDB();
7237N/A long changeNumberToUse = firstChangeNumber;
7237N/A if (changeNumberToUse <= 1)
7237N/A {
7237N/A final ChangeNumberIndexRecord oldestRecord = cnIndexDB.getOldestRecord();
7237N/A changeNumberToUse = oldestRecord == null ? CHANGE_NUMBER_FOR_EMPTY_CURSOR : oldestRecord.getChangeNumber();
7237N/A }
7237N/A return cnIndexDB.getCursorFrom(changeNumberToUse);
7237N/A }
7237N/A
7237N/A /**
7237N/A * Creates a changelog entry.
7237N/A */
7257N/A private static Entry createEntryFromMsg(final DN baseDN, final long changeNumber, final String cookie,
7257N/A final UpdateMsg msg) throws DirectoryException
7237N/A {
7237N/A if (msg instanceof AddMsg)
7237N/A {
7237N/A return createAddMsg(baseDN, changeNumber, cookie, msg);
7237N/A }
7237N/A else if (msg instanceof ModifyCommonMsg)
7237N/A {
7237N/A return createModifyMsg(baseDN, changeNumber, cookie, msg);
7237N/A }
7237N/A else if (msg instanceof DeleteMsg)
7237N/A {
7237N/A final DeleteMsg delMsg = (DeleteMsg) msg;
7237N/A return createChangelogEntry(baseDN, changeNumber, cookie, delMsg, null, "delete", delMsg.getInitiatorsName());
7237N/A }
7237N/A throw new DirectoryException(ResultCode.OPERATIONS_ERROR,
7237N/A Message.raw("Unexpected message type when trying to create changelog entry for dn %s : %s", baseDN.toString(),
7237N/A msg.getClass().toString()));
7237N/A }
7237N/A
7237N/A /**
7237N/A * Creates an entry from an add message.
7237N/A * <p>
7237N/A * Map addMsg to an LDIF string for the 'changes' attribute, and pull out
7237N/A * change initiators name if available which is contained in the creatorsName
7237N/A * attribute.
7237N/A */
7257N/A private static Entry createAddMsg(final DN baseDN, final long changeNumber, final String cookie, final UpdateMsg msg)
7237N/A throws DirectoryException
7237N/A {
7237N/A final AddMsg addMsg = (AddMsg) msg;
7237N/A String changeInitiatorsName = null;
7237N/A String ldifChanges = null;
7237N/A try
7237N/A {
7237N/A final StringBuilder builder = new StringBuilder(256);
7237N/A for (Attribute attr : addMsg.getAttributes())
7237N/A {
7237N/A if (attr.getAttributeType().equals(CREATORS_NAME_TYPE) && !attr.isEmpty())
7237N/A {
7237N/A // This attribute is not multi-valued.
7237N/A changeInitiatorsName = attr.iterator().next().toString();
7237N/A }
7237N/A final String attrName = attr.getNameWithOptions();
7237N/A for (AttributeValue value : attr)
7237N/A {
7237N/A builder.append(attrName);
7237N/A appendLDIFSeparatorAndValue(builder, value.getValue());
7237N/A builder.append('\n');
7237N/A }
7237N/A }
7237N/A ldifChanges = builder.toString();
7237N/A }
7237N/A catch (Exception e)
7237N/A {
7237N/A logEncodingMessageError("add", addMsg.getDN(), e);
7237N/A }
7237N/A
7237N/A return createChangelogEntry(baseDN, changeNumber, cookie, addMsg, ldifChanges, "add", changeInitiatorsName);
7237N/A }
7237N/A
7237N/A /**
7237N/A * Creates an entry from a modify message.
7237N/A * <p>
7237N/A * Map the modifyMsg to an LDIF string for the 'changes' attribute, and pull
7237N/A * out change initiators name if available which is contained in the
7237N/A * modifiersName attribute.
7237N/A */
7257N/A private static Entry createModifyMsg(final DN baseDN, final long changeNumber, final String cookie,
7257N/A final UpdateMsg msg) throws DirectoryException
7237N/A {
7237N/A final ModifyCommonMsg modifyMsg = (ModifyCommonMsg) msg;
7237N/A String changeInitiatorsName = null;
7237N/A String ldifChanges = null;
7237N/A try
7237N/A {
7237N/A final StringBuilder builder = new StringBuilder(128);
7237N/A for (Modification mod : modifyMsg.getMods())
7237N/A {
7237N/A final Attribute attr = mod.getAttribute();
7237N/A if (mod.getModificationType() == ModificationType.REPLACE
7237N/A && attr.getAttributeType().equals(MODIFIERS_NAME_TYPE)
7237N/A && !attr.isEmpty())
7237N/A {
7237N/A // This attribute is not multi-valued.
7237N/A changeInitiatorsName = attr.iterator().next().toString();
7237N/A }
7237N/A final String attrName = attr.getNameWithOptions();
7237N/A builder.append(mod.getModificationType().getLDIFName());
7237N/A builder.append(": ");
7237N/A builder.append(attrName);
7237N/A builder.append('\n');
7237N/A
7237N/A for (AttributeValue value : attr)
7237N/A {
7237N/A builder.append(attrName);
7237N/A appendLDIFSeparatorAndValue(builder, value.getValue());
7237N/A builder.append('\n');
7237N/A }
7237N/A builder.append("-\n");
7237N/A }
7237N/A ldifChanges = builder.toString();
7237N/A }
7237N/A catch (Exception e)
7237N/A {
7237N/A logEncodingMessageError("modify", modifyMsg.getDN(), e);
7237N/A }
7237N/A
7237N/A final boolean isModifyDNMsg = modifyMsg instanceof ModifyDNMsg;
7237N/A final Entry entry = createChangelogEntry(baseDN, changeNumber, cookie, modifyMsg, ldifChanges,
7237N/A isModifyDNMsg ? "modrdn" : "modify", changeInitiatorsName);
7237N/A
7237N/A if (isModifyDNMsg)
7237N/A {
7237N/A final ModifyDNMsg modDNMsg = (ModifyDNMsg) modifyMsg;
7237N/A addAttribute(entry, "newrdn", modDNMsg.getNewRDN());
7237N/A if (modDNMsg.getNewSuperior() != null)
7237N/A {
7237N/A addAttribute(entry, "newsuperior", modDNMsg.getNewSuperior());
7237N/A }
7237N/A addAttribute(entry, "deleteoldrdn", String.valueOf(modDNMsg.deleteOldRdn()));
7237N/A }
7237N/A return entry;
7237N/A }
7237N/A
7237N/A /**
7237N/A * Log an encoding message error.
7237N/A *
7237N/A * @param messageType
7237N/A * String identifying type of message. Should be "add" or "modify".
7237N/A * @param entryDN
7237N/A * DN of original entry
7237N/A */
7257N/A private static void logEncodingMessageError(String messageType, DN entryDN, Exception exception)
7237N/A {
7237N/A TRACER.debugCaught(DebugLogLevel.ERROR, exception);
7237N/A logError(Message.raw(Category.SYNC, Severity.MILD_ERROR,
7237N/A "An exception was encountered while trying to encode a replication " + messageType + " message for entry \""
7237N/A + entryDN + "\" into an External Change Log entry: " + exception.getMessage()));
7237N/A }
7237N/A
7243N/A private void checkChangelogReadPrivilege(SearchOperation searchOp) throws DirectoryException
7243N/A {
7243N/A if (!searchOp.getClientConnection().hasPrivilege(Privilege.CHANGELOG_READ, searchOp))
7243N/A {
7243N/A throw new DirectoryException(ResultCode.INSUFFICIENT_ACCESS_RIGHTS,
7243N/A NOTE_SEARCH_CHANGELOG_INSUFFICIENT_PRIVILEGES.get());
7243N/A }
7243N/A }
7243N/A
7237N/A /**
7237N/A * Create a changelog entry from a set of provided information. This is the part of
7237N/A * entry creation common to all types of msgs (ADD, DEL, MOD, MODDN).
7237N/A */
7237N/A private static Entry createChangelogEntry(final DN baseDN, final long changeNumber, final String cookie,
7237N/A final LDAPUpdateMsg msg, final String ldifChanges, final String changeType,
7237N/A final String changeInitiatorsName) throws DirectoryException
7237N/A {
7237N/A final CSN csn = msg.getCSN();
7237N/A String dnString;
7244N/A if (changeNumber > 0)
7237N/A {
7257N/A // change number mode
7244N/A dnString = "changeNumber=" + changeNumber + "," + DN_EXTERNAL_CHANGELOG_ROOT;
7237N/A }
7237N/A else
7237N/A {
7244N/A // Cookie mode
7244N/A dnString = "replicationCSN=" + csn + "," + baseDN + "," + DN_EXTERNAL_CHANGELOG_ROOT;
7237N/A }
7237N/A
7237N/A final Map<AttributeType, List<Attribute>> userAttrs = new LinkedHashMap<AttributeType, List<Attribute>>();
7237N/A final Map<AttributeType, List<Attribute>> opAttrs = new LinkedHashMap<AttributeType, List<Attribute>>();
7237N/A
7237N/A // Operational standard attributes
7237N/A addAttributeByType(ATTR_SUBSCHEMA_SUBENTRY_LC, ATTR_SUBSCHEMA_SUBENTRY_LC,
7237N/A ConfigConstants.DN_DEFAULT_SCHEMA_ROOT, userAttrs, opAttrs);
7237N/A addAttributeByType("numsubordinates", "numSubordinates", "0", userAttrs, opAttrs);
7237N/A addAttributeByType("hassubordinates", "hasSubordinates", "false", userAttrs, opAttrs);
7237N/A addAttributeByType("entrydn", "entryDN", dnString, userAttrs, opAttrs);
7237N/A
7237N/A // REQUIRED attributes
7244N/A if (changeNumber > 0)
7237N/A {
7237N/A addAttributeByType("changenumber", "changeNumber", String.valueOf(changeNumber), userAttrs, opAttrs);
7237N/A }
7237N/A SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT_GMT_TIME);
7237N/A dateFormat.setTimeZone(TimeZone.getTimeZone("UTC")); // ??
7237N/A final String format = dateFormat.format(new Date(csn.getTime()));
7237N/A addAttributeByType("changetime", "changeTime", format, userAttrs, opAttrs);
7237N/A addAttributeByType("changetype", "changeType", changeType, userAttrs, opAttrs);
7237N/A addAttributeByType("targetdn", "targetDN", msg.getDN().toString(), userAttrs, opAttrs);
7237N/A
7237N/A // NON REQUESTED attributes
7237N/A addAttributeByType("replicationcsn", "replicationCSN", csn.toString(), userAttrs, opAttrs);
7237N/A addAttributeByType("replicaidentifier", "replicaIdentifier", Integer.toString(csn.getServerId()),
7237N/A userAttrs, opAttrs);
7237N/A
7237N/A if (ldifChanges != null)
7237N/A {
7237N/A addAttributeByType("changes", "changes", ldifChanges, userAttrs, opAttrs);
7237N/A }
7237N/A if (changeInitiatorsName != null)
7237N/A {
7237N/A addAttributeByType("changeinitiatorsname", "changeInitiatorsName", changeInitiatorsName, userAttrs, opAttrs);
7237N/A }
7237N/A
7237N/A final String targetUUID = msg.getEntryUUID();
7237N/A if (targetUUID != null)
7237N/A {
7237N/A addAttributeByType("targetentryuuid", "targetEntryUUID", targetUUID, userAttrs, opAttrs);
7237N/A }
7244N/A final String cookie2 = cookie != null ? cookie : "";
7244N/A addAttributeByType("changelogcookie", "changeLogCookie", cookie2, userAttrs, opAttrs);
7237N/A
7237N/A final List<RawAttribute> includedAttributes = msg.getEclIncludes();
7237N/A if (includedAttributes != null && !includedAttributes.isEmpty())
7237N/A {
7237N/A final StringBuilder builder = new StringBuilder(256);
7237N/A for (final RawAttribute includedAttribute : includedAttributes)
7237N/A {
7237N/A final String name = includedAttribute.getAttributeType();
7237N/A for (final ByteString value : includedAttribute.getValues())
7237N/A {
7237N/A builder.append(name);
7237N/A appendLDIFSeparatorAndValue(builder, value);
7237N/A builder.append('\n');
7237N/A }
7237N/A }
7237N/A final String includedAttributesLDIF = builder.toString();
7237N/A addAttributeByType("includedattributes", "includedAttributes", includedAttributesLDIF, userAttrs, opAttrs);
7237N/A }
7237N/A
7237N/A return new Entry(DN.decode(dnString), CHANGELOG_ENTRY_OBJECT_CLASSES, userAttrs, opAttrs);
7237N/A }
7237N/A
7244N/A /**
7251N/A * Sends the entry if it matches the base, scope and filter of the current search operation.
7251N/A * It will also send the base changelog entry if it needs to be sent and was not sent before.
7251N/A *
7251N/A * @return {@code true} if search should continue, {@code false} otherwise
7244N/A */
7257N/A private static boolean sendEntryIfMatches(SearchOperation searchOp, Entry entry, String cookie)
7257N/A throws DirectoryException
7244N/A {
7251N/A if (matchBaseAndScopeAndFilter(searchOp, entry))
7251N/A {
7251N/A return searchOp.returnEntry(entry, getControls(cookie));
7251N/A }
7251N/A // maybe the next entry will match?
7251N/A return true;
7251N/A }
7244N/A
7251N/A /** Indicates if the provided entry matches the filter, base and scope. */
7257N/A private static boolean matchBaseAndScopeAndFilter(SearchOperation searchOp, Entry entry) throws DirectoryException
7251N/A {
7251N/A return entry.matchesBaseAndScope(searchOp.getBaseDN(), searchOp.getScope())
7251N/A && searchOp.getFilter().matchesEntry(entry);
7251N/A }
7244N/A
7257N/A private static List<Control> getControls(String cookie)
7251N/A {
7251N/A if (cookie != null)
7244N/A {
7257N/A final Control c = new EntryChangelogNotificationControl(true, cookie);
7257N/A return Collections.singletonList(c);
7244N/A }
7251N/A return Collections.emptyList();
7251N/A }
7244N/A
7251N/A /**
7251N/A * Create and returns the base changelog entry to the underlying search operation.
7251N/A *
7251N/A * @return {@code true} if search should continue, {@code false} otherwise
7251N/A */
7251N/A private boolean sendBaseChangelogEntry(SearchOperation searchOp) throws DirectoryException
7251N/A {
7251N/A final DN baseDN = searchOp.getBaseDN();
7251N/A final SearchFilter filter = searchOp.getFilter();
7251N/A final SearchScope scope = searchOp.getScope();
7251N/A
7251N/A if (ChangelogBackend.CHANGELOG_BASE_DN.matchesBaseAndScope(baseDN, scope))
7244N/A {
7251N/A final Entry entry = buildBaseChangelogEntry();
7251N/A if (filter.matchesEntry(entry) && !searchOp.returnEntry(entry, null))
7244N/A {
7251N/A // Abandon, size limit reached.
7244N/A return false;
7244N/A }
7244N/A }
7251N/A return !baseDN.equals(ChangelogBackend.CHANGELOG_BASE_DN)
7251N/A || !scope.equals(SearchScope.BASE_OBJECT);
7251N/A }
7244N/A
7251N/A private Entry buildBaseChangelogEntry() throws DirectoryException
7251N/A {
7251N/A final String hasSubordinatesStr = Boolean.toString(baseChangelogHasSubordinates());
7244N/A
7251N/A final Map<AttributeType, List<Attribute>> userAttrs = new LinkedHashMap<AttributeType, List<Attribute>>();
7251N/A final Map<AttributeType, List<Attribute>> operationalAttrs = new LinkedHashMap<AttributeType, List<Attribute>>();
7244N/A
7251N/A // We never return the numSubordinates attribute for the base changelog entry
7251N/A // and there is a very good reason for that:
7251N/A // - Either we compute it before sending the entries,
7251N/A // -- then we risk returning more entries if new entries come in after we computed numSubordinates
7251N/A // -- or we risk returning less entries if purge kicks in after we computed numSubordinates
7251N/A // - Or we accumulate all the entries that must be returned before sending them => OutOfMemoryError
7244N/A
7251N/A addAttributeByUppercaseName(ATTR_COMMON_NAME, ATTR_COMMON_NAME, BACKEND_ID, userAttrs, operationalAttrs);
7251N/A addAttributeByUppercaseName(ATTR_SUBSCHEMA_SUBENTRY_LC, ATTR_SUBSCHEMA_SUBENTRY,
7251N/A ConfigConstants.DN_DEFAULT_SCHEMA_ROOT, userAttrs, operationalAttrs);
7251N/A addAttributeByUppercaseName("hassubordinates", "hasSubordinates", hasSubordinatesStr, userAttrs, operationalAttrs);
7251N/A addAttributeByUppercaseName("entrydn", "entryDN", DN_EXTERNAL_CHANGELOG_ROOT, userAttrs, operationalAttrs);
7251N/A return new Entry(CHANGELOG_BASE_DN, CHANGELOG_ROOT_OBJECT_CLASSES, userAttrs, operationalAttrs);
7244N/A }
7244N/A
7237N/A private static void addAttribute(final Entry e, final String attrType, final String attrValue)
7237N/A {
7237N/A e.addAttribute(Attributes.create(attrType, attrValue), null);
7237N/A }
7237N/A
7237N/A private static void addAttributeByType(String attrNameLowercase,
7237N/A String attrNameUppercase, String attrValue,
7237N/A Map<AttributeType, List<Attribute>> userAttrs,
7237N/A Map<AttributeType, List<Attribute>> operationalAttrs)
7237N/A {
7237N/A addAttribute(attrNameLowercase, attrNameUppercase, attrValue, userAttrs, operationalAttrs, true);
7237N/A }
7237N/A
7244N/A private static void addAttributeByUppercaseName(String attrNameLowercase,
7237N/A String attrNameUppercase, String attrValue,
7237N/A Map<AttributeType, List<Attribute>> userAttrs,
7237N/A Map<AttributeType, List<Attribute>> operationalAttrs)
7237N/A {
7237N/A addAttribute(attrNameLowercase, attrNameUppercase, attrValue, userAttrs, operationalAttrs, false);
7237N/A }
7237N/A
7237N/A private static void addAttribute(final String attrNameLowercase,
7237N/A final String attrNameUppercase, final String attrValue,
7237N/A final Map<AttributeType, List<Attribute>> userAttrs,
7237N/A final Map<AttributeType, List<Attribute>> operationalAttrs, final boolean addByType)
7237N/A {
7237N/A AttributeType attrType = DirectoryServer.getAttributeType(attrNameLowercase);
7237N/A if (attrType == null)
7237N/A {
7237N/A attrType = DirectoryServer.getDefaultAttributeType(attrNameUppercase);
7237N/A }
7244N/A final Attribute a = addByType
7244N/A ? Attributes.create(attrType, attrValue)
7244N/A : Attributes.create(attrNameUppercase, attrValue);
7237N/A final List<Attribute> attrList = Collections.singletonList(a);
7237N/A if (attrType.isOperational())
7237N/A {
7237N/A operationalAttrs.put(attrType, attrList);
7237N/A }
7237N/A else
7237N/A {
7237N/A userAttrs.put(attrType, attrList);
7237N/A }
7212N/A }
7212N/A
7257N/A /**
7257N/A * Describes the current search phase.
7257N/A */
7257N/A private enum SearchPhase
7257N/A {
7257N/A /**
7257N/A * "Initial search" phase. The "initial search" phase is running
7257N/A * concurrently. All update notifications are ignored.
7257N/A */
7257N/A INITIAL,
7257N/A /**
7257N/A * Transitioning from the "initial search" phase to the "persistent search"
7257N/A * phase. "Initial search" phase has finished reading from the DB. It now
7257N/A * verifies if any more updates have been persisted to the DB since stopping
7257N/A * and send them. All update notifications are blocked.
7257N/A */
7257N/A TRANSITIONING,
7257N/A /**
7257N/A * "Persistent search" phase. "Initial search" phase has completed. All
7257N/A * update notifications are published.
7257N/A */
7257N/A PERSISTENT;
7257N/A }
7257N/A
7257N/A /**
7257N/A * Contains data to ensure that the same change is not sent twice to clients
7257N/A * because of race conditions between the "initial search" phase and the
7257N/A * "persistent search" phase.
7257N/A */
7257N/A private static class SendEntryData<K extends Comparable<K>>
7257N/A {
7257N/A private final AtomicReference<SearchPhase> searchPhase = new AtomicReference<SearchPhase>(SearchPhase.INITIAL);
7257N/A private final Object transitioningLock = new Object();
7257N/A private volatile K lastKeySentByInitialSearch;
7257N/A
7266N/A private SendEntryData(SearchPhase startPhase)
7266N/A {
7266N/A searchPhase.set(startPhase);
7266N/A }
7266N/A
7257N/A private void finalizeInitialSearch()
7257N/A {
7257N/A searchPhase.set(SearchPhase.PERSISTENT);
7257N/A synchronized (transitioningLock)
7257N/A { // initial search phase has completed, release all persistent searches
7257N/A transitioningLock.notifyAll();
7257N/A }
7257N/A }
7257N/A
7257N/A public void transitioningToPersistentSearchPhase()
7257N/A {
7257N/A searchPhase.set(SearchPhase.TRANSITIONING);
7257N/A }
7257N/A
7257N/A private void initialSearchSendsEntry(final K key)
7257N/A {
7257N/A lastKeySentByInitialSearch = key;
7257N/A }
7257N/A
7257N/A private boolean persistentSearchCanSendEntry(K key)
7257N/A {
7257N/A final SearchPhase stateValue = searchPhase.get();
7257N/A switch (stateValue)
7257N/A {
7257N/A case INITIAL:
7257N/A return false;
7257N/A case TRANSITIONING:
7257N/A synchronized (transitioningLock)
7257N/A {
7257N/A while (SearchPhase.TRANSITIONING.equals(searchPhase.get()))
7257N/A {
7257N/A // "initial search" phase is over, and is now verifying whether new
7257N/A // changes have been published to the DB.
7257N/A // Wait for this check to complete
7257N/A try
7257N/A {
7257N/A transitioningLock.wait();
7257N/A }
7257N/A catch (InterruptedException e)
7257N/A {
7257N/A Thread.currentThread().interrupt();
7257N/A // Shutdown must have been called. Stop sending entries.
7257N/A return false;
7257N/A }
7257N/A }
7257N/A }
7257N/A return key.compareTo(lastKeySentByInitialSearch) > 0;
7257N/A case PERSISTENT:
7257N/A return true;
7257N/A default:
7257N/A throw new RuntimeException("Not implemented for " + stateValue);
7257N/A }
7257N/A }
7257N/A }
7257N/A
7257N/A /** Sends entries to clients for change number searches. */
7257N/A private static class ChangeNumberEntrySender
7257N/A {
7257N/A private final SearchOperation searchOp;
7266N/A private final SendEntryData<Long> sendEntryData;
7257N/A
7266N/A private ChangeNumberEntrySender(SearchOperation searchOp, SearchPhase startPhase)
7257N/A {
7257N/A this.searchOp = searchOp;
7266N/A this.sendEntryData = new SendEntryData<Long>(startPhase);
7257N/A }
7257N/A
7257N/A private void finalizeInitialSearch()
7257N/A {
7257N/A sendEntryData.finalizeInitialSearch();
7257N/A }
7257N/A
7270N/A private void transitioningToPersistentSearchPhase()
7257N/A {
7257N/A sendEntryData.transitioningToPersistentSearchPhase();
7257N/A }
7257N/A
7257N/A /**
7257N/A * @return {@code true} if search should continue, {@code false} otherwise
7257N/A */
7263N/A private boolean initialSearchSendEntry(ChangeNumberIndexRecord cnIndexRecord, UpdateMsg updateMsg,
7263N/A MultiDomainServerState cookie) throws DirectoryException
7257N/A {
7257N/A final DN baseDN = cnIndexRecord.getBaseDN();
7257N/A sendEntryData.initialSearchSendsEntry(cnIndexRecord.getChangeNumber());
7263N/A final Entry entry = createEntryFromMsg(baseDN, cnIndexRecord.getChangeNumber(), cookie.toString(), updateMsg);
7257N/A return sendEntryIfMatches(searchOp, entry, null);
7257N/A }
7257N/A
7257N/A private void persistentSearchSendEntry(long changeNumber, Entry entry) throws DirectoryException
7257N/A {
7257N/A if (sendEntryData.persistentSearchCanSendEntry(changeNumber))
7257N/A {
7257N/A sendEntryIfMatches(searchOp, entry, null);
7257N/A }
7257N/A }
7257N/A }
7257N/A
7257N/A /** Sends entries to clients for cookie-based searches. */
7257N/A private static class CookieEntrySender {
7257N/A private final SearchOperation searchOp;
7266N/A private final SearchPhase startPhase;
7270N/A private MultiDomainServerState cookie;
7257N/A private final ConcurrentSkipListMap<Pair<DN, Integer>, SendEntryData<CSN>> replicaIdToSendEntryData =
7257N/A new ConcurrentSkipListMap<Pair<DN, Integer>, SendEntryData<CSN>>(Pair.COMPARATOR);
7257N/A
7266N/A private CookieEntrySender(SearchOperation searchOp, SearchPhase startPhase)
7257N/A {
7257N/A this.searchOp = searchOp;
7266N/A this.startPhase = startPhase;
7257N/A }
7257N/A
7270N/A private void setCookie(MultiDomainServerState cookie)
7270N/A {
7270N/A this.cookie = cookie;
7270N/A }
7270N/A
7270N/A private void finalizeInitialSearch()
7257N/A {
7257N/A for (SendEntryData<CSN> sendEntryData : replicaIdToSendEntryData.values())
7257N/A {
7257N/A sendEntryData.finalizeInitialSearch();
7257N/A }
7257N/A }
7257N/A
7270N/A private void transitioningToPersistentSearchPhase()
7257N/A {
7257N/A for (SendEntryData<CSN> sendEntryData : replicaIdToSendEntryData.values())
7257N/A {
7257N/A sendEntryData.transitioningToPersistentSearchPhase();
7257N/A }
7257N/A }
7257N/A
7257N/A private SendEntryData<CSN> getSendEntryData(DN baseDN, CSN csn)
7257N/A {
7257N/A final Pair<DN, Integer> replicaId = Pair.of(baseDN, csn.getServerId());
7257N/A SendEntryData<CSN> data = replicaIdToSendEntryData.get(replicaId);
7257N/A if (data == null)
7257N/A {
7266N/A final SendEntryData<CSN> newData = new SendEntryData<CSN>(startPhase);
7257N/A data = replicaIdToSendEntryData.putIfAbsent(replicaId, newData);
7257N/A return data == null ? newData : data;
7257N/A }
7257N/A return data;
7257N/A }
7257N/A
7270N/A private boolean initialSearchSendEntry(final UpdateMsg updateMsg, final DN baseDN) throws DirectoryException
7257N/A {
7257N/A final CSN csn = updateMsg.getCSN();
7257N/A final SendEntryData<CSN> sendEntryData = getSendEntryData(baseDN, csn);
7257N/A sendEntryData.initialSearchSendsEntry(csn);
7270N/A final String cookieString = updateCookie(baseDN, updateMsg.getCSN());
7257N/A final Entry entry = createEntryFromMsg(baseDN, 0, cookieString, updateMsg);
7257N/A return sendEntryIfMatches(searchOp, entry, cookieString);
7257N/A }
7257N/A
7257N/A private void persistentSearchSendEntry(DN baseDN, UpdateMsg updateMsg)
7257N/A throws DirectoryException
7257N/A {
7257N/A final CSN csn = updateMsg.getCSN();
7257N/A final SendEntryData<CSN> sendEntryData = getSendEntryData(baseDN, csn);
7257N/A if (sendEntryData.persistentSearchCanSendEntry(csn))
7257N/A {
7257N/A // multi threaded case: wait for the "initial search" phase to set the cookie
7270N/A final String cookieString = updateCookie(baseDN, updateMsg.getCSN());
7257N/A final Entry cookieEntry = createEntryFromMsg(baseDN, 0, cookieString, updateMsg);
7257N/A // FIXME JNR use this instead of previous line:
7257N/A // entry.replaceAttribute(Attributes.create("changelogcookie", cookieString));
7257N/A sendEntryIfMatches(searchOp, cookieEntry, cookieString);
7257N/A }
7257N/A }
7257N/A
7270N/A private String updateCookie(DN baseDN, final CSN csn)
7257N/A {
7257N/A synchronized (cookie)
7257N/A { // forbid concurrent updates to the cookie
7257N/A cookie.update(baseDN, csn);
7257N/A return cookie.toString();
7257N/A }
7257N/A }
7257N/A }
7212N/A}