ReplicationDomain.java revision a776a93d0afa206f307e9140a35497ee255840f2
/*
* CDDL HEADER START
*
* The contents of this file are subject to the terms of the
* Common Development and Distribution License, Version 1.0 only
* (the "License"). You may not use this file except in compliance
* with the License.
*
* You can obtain a copy of the license at
* See the License for the specific language governing permissions
* and limitations under the License.
*
* When distributing Covered Code, include this CDDL HEADER in each
* file and include the License file at
* trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
* add the following below this CDDL HEADER, with the fields enclosed
* by brackets "[]" replaced with your own identifying information:
* Portions Copyright [yyyy] [name of copyright owner]
*
* CDDL HEADER END
*
*
* Copyright 2008-2009 Sun Microsystems, Inc.
*/
/**
* This class should be used as a base for Replication implementations.
* <p>
* It is intended that developer in need of a replication mechanism
* subclass this class with their own implementation.
* <p>
* The startup phase of the ReplicationDomain subclass,
* should read the list of replication servers from the configuration,
* instantiate a {@link ServerState} then start the publish service
* by calling
* {@link #startPublishService(Collection, int, long)}.
* At this point it can start calling the {@link #publish(UpdateMsg)}
* method if needed.
* <p>
* When the startup phase reach the point when the subclass is ready
* to handle updates the Replication Domain implementation should call the
* {@link #startListenService()} method.
* At this point a Listener thread is created on the Replication Service
* and which can start receiving updates.
* <p>
* When updates are received the Replication Service calls the
* {@link #processUpdate(UpdateMsg)} method.
* ReplicationDomain implementation should implement the appropriate code
* for replaying the update on the local repository.
* When fully done the subclass must call the
* {@link #processUpdateDone(UpdateMsg, String)} method.
* This allows to process the update asynchronously if necessary.
*
* <p>
* To propagate changes to other replica, a ReplicationDomain implementation
* must use the {@link #publish(UpdateMsg)} method.
* <p>
* If the Full Initialization process is needed then implementation
* for {@link #importBackend(InputStream)} and
* {@link #exportBackend(OutputStream)} must be
* provided.
* <p>
* Full Initialization of a replica can be triggered by LDAP clients
* by creating InitializeTasks or InitializeTargetTask.
* Full initialization can also by triggered from the ReplicationDomain
* implementation using methods {@link #initializeRemote(int)}
* or {@link #initializeFromRemote(int)}.
* <p>
* At shutdown time, the {@link #stopDomain()} method should be called to
* cleanly stop the replication service.
*/
public abstract class ReplicationDomain
{
/**
* Current status for this replicated domain.
*/
/**
* The tracer object for the debug logger.
*/
/**
* An identifier for the Replication Service.
* All Replication Domain using this identifier will be connected
* through the Replication Service.
*/
/**
* The identifier of this Replication Domain inside the
* Replication Service.
* Each Domain must use a unique ServerID.
*/
private final int serverID;
/**
* The ReplicationBroker that is used by this ReplicationDomain to
* connect to the ReplicationService.
*/
/**
* This Map is used to store all outgoing assured messages in order
* to be able to correlate all the coming back acks to the original
* operation.
*/
/**
* The context related to an import or export being processed
* Null when none is being processed.
*/
/**
* The Thread waiting for incoming update messages for this domain and pushing
* them to the global incoming update message queue for later processing by
* replay threads.
*/
private ListenerThread listenerThread;
/**
* A Map used to store all the ReplicationDomains created on this server.
*/
/**
* The Monitor in charge of replication monitoring.
*/
private ReplicationMonitor monitor;
/*
* Assured mode properties
*/
// Is assured mode enabled or not for this domain ?
private boolean assured = false;
// Assured sub mode (used when assured is true)
// Safe Data level (used when assuredMode is SAFE_DATA)
private byte assuredSdLevel = (byte)1;
// The timeout in ms that should be used, when waiting for assured acks
private long assuredTimeout = 2000;
// Group id
private byte groupId = (byte)1;
// Referrals urls to be published to other servers of the topology
// TODO: fill that with all currently opened urls if no urls configured
/**
* A set of counters used for Monitoring.
*/
/* Assured replication monitoring counters */
// Number of updates sent in Assured Mode, Safe Read
// Number of updates sent in Assured Mode, Safe Read, that have been
// successfully acknowledged
// Number of updates sent in Assured Mode, Safe Read, that have not been
// successfully acknowledged (either because of timeout, wrong status or error
// at replay)
private AtomicInteger assuredSrNotAcknowledgedUpdates =
new AtomicInteger(0);
// Number of updates sent in Assured Mode, Safe Read, that have not been
// successfully acknowledged because of timeout
// Number of updates sent in Assured Mode, Safe Read, that have not been
// successfully acknowledged because of wrong status
// Number of updates sent in Assured Mode, Safe Read, that have not been
// successfully acknowledged because of replay error
// Multiple values allowed: number of updates sent in Assured Mode, Safe Read,
// that have not been successfully acknowledged (either because of timeout,
// wrong status or error at replay) for a particular server (DS or RS). String
// format: <server id>:<number of failed updates>
// Number of updates received in Assured Mode, Safe Read request
// Number of updates received in Assured Mode, Safe Read request that we have
// acked without errors
// Number of updates received in Assured Mode, Safe Read request that we have
// acked with errors
// Number of updates sent in Assured Mode, Safe Data
// Number of updates sent in Assured Mode, Safe Data, that have been
// successfully acknowledged
// Number of updates sent in Assured Mode, Safe Data, that have not been
// successfully acknowledged because of timeout
// Multiple values allowed: number of updates sent in Assured Mode, Safe Data,
// that have not been successfully acknowledged because of timeout for a
// particular RS. String format: <server id>:<number of failed updates>
/* Status related monitoring fields */
// Indicates the date when the status changed. This may be used to indicate
// the date the session with the current replication server started (when
// status is NORMAL for instance). All the above assured monitoring fields
// are also reset each time the status is changed
/**
* The state maintained by the Concrete Class.
*/
private final ServerState state;
/**
* The generator that will be used to generate {@link ChangeNumber}
* for this domain.
*/
private final ChangeNumberGenerator generator;
/**
* Returns the {@link ChangeNumberGenerator} that will be used to
* generate {@link ChangeNumber} for this domain.
*
* @return The {@link ChangeNumberGenerator} that will be used to
* generate {@link ChangeNumber} for this domain.
*/
public ChangeNumberGenerator getGenerator()
{
return generator;
}
/**
* Creates a ReplicationDomain with the provided parameters.
*
* @param serviceID The identifier of the Replication Domain to which
* this object is participating.
* @param serverID The identifier of the server that is participating
* to the Replication Domain.
* This identifier should be different for each server that
* is participating to a given Replication Domain.
*/
{
this.state = new ServerState();
}
/**
* Creates a ReplicationDomain with the provided parameters.
* (for unit test purpose only)
*
* @param serviceID The identifier of the Replication Domain to which
* this object is participating.
* @param serverID The identifier of the server that is participating
* to the Replication Domain.
* This identifier should be different for each server that
* is participating to a given Replication Domain.
* @param serverState The serverState to use
*/
{
this.state = serverState;
}
/**
* Set the initial status of the domain and perform necessary initializations.
* This method will be called by the Broker each time the ReplicationBroker
* establish a new session to a Replication Server.
*
* Implementations may override this method when they need to perform
* additional computing after session establishment.
* The default implementation should be sufficient for ReplicationDomains
* that don't need to perform additional computing.
*
* @param initStatus The status to enter the state machine with.
* @param replicationServerState The ServerState of the ReplicationServer
* with which the session was established.
* @param generationID The current generationID of the
* ReplicationServer with which the session
* was established.
* @param session The ProtocolSession that is currently used.
*/
public void sessionInitiated(
long generationID,
{
// Sanity check: is it a valid initial status?
if (!isValidInitialStatus(initStatus))
{
} else
{
status = initStatus;
}
}
/**
* Processes an incoming ChangeStatusMsg. Compute new status according to
* given order. Then update domain for being compliant with new status
* definition.
* @param csMsg The received status message
*/
{
if (debugEnabled())
" received change status message:\n" + csMsg);
// Translate requested status to a state machine event
{
return;
}
// Set the new status to the requested one
}
/**
* Called when first connection or disconnection detected.
*/
void toNotConnectedStatus()
{
// Go into not connected status
}
/**
* Perform whatever actions are needed to apply properties for being
* compliant with new status. Must be called in synchronized section for
* status. The new status is already set in status variable.
*/
private void updateDomainForNewStatus()
{
switch (status)
{
case NOT_CONNECTED_STATUS:
break;
case NORMAL_STATUS:
break;
case DEGRADED_STATUS:
break;
case FULL_UPDATE_STATUS:
// Signal RS we just entered the full update status
break;
case BAD_GEN_ID_STATUS:
break;
default:
if (debugEnabled())
status);
}
}
/**
* Gets the status for this domain.
* @return The status for this domain.
*/
public ServerStatus getStatus()
{
return status;
}
/**
* Gets the identifier of this domain.
*
* @return The identifier for this domain.
*/
public String getServiceID()
{
return serviceID;
}
/**
* Get the server ID.
* @return The server ID.
*/
public int getServerId()
{
return serverID;
}
/**
* Tells if assured replication is enabled for this domain.
* @return True if assured replication is enabled for this domain.
*/
public boolean isAssured()
{
return assured;
}
/**
* Gives the mode for the assured replication of the domain.
* @return The mode for the assured replication of the domain.
*/
public AssuredMode getAssuredMode()
{
return assuredMode;
}
/**
* Gives the assured level of the replication of the domain.
* @return The assured level of the replication of the domain.
*/
public byte getAssuredSdLevel()
{
return assuredSdLevel;
}
/**
* Gives the assured timeout of the replication of the domain (in ms).
* @return The assured timeout of the replication of the domain.
*/
public long getAssuredTimeout()
{
return assuredTimeout;
}
/**
* Gets the group id for this domain.
* @return The group id for this domain.
*/
public byte getGroupId()
{
return groupId;
}
/**
* Gets the referrals URLs this domain publishes.
* @return The referrals URLs this domain publishes.
*/
{
return refUrls;
}
/**
* Gets the info for Replicas in the topology (except us).
* @return The info for Replicas in the topology (except us)
*/
{
}
/**
* Gets the States of all the Replicas currently in the
* Topology.
* When this method is called, a Monitoring message will be sent
* to the Replication Server to which this domain is currently connected
* so that it computes a table containing information about
* all Directory Servers in the topology.
* This Computation involves communications will all the servers
* currently connected and
*
* @return The States of all Replicas in the topology (except us)
*/
{
return broker.getReplicaStates();
}
/**
* Gets the info for RSs in the topology (except the one we are connected
* to).
* @return The info for RSs in the topology (except the one we are connected
* to)
*/
{
}
/**
* Gets the server ID of the Replication Server to which the domain
* is currently connected.
*
* @return The server ID of the Replication Server to which the domain
* is currently connected.
*/
public int getRsServerId()
{
return broker.getRsServerId();
}
/**
* Increment the number of processed updates.
*/
private void incProcessedUpdates()
{
}
/**
* get the number of updates replayed by the replication.
*
* @return The number of updates replayed by the replication
*/
int getNumProcessedUpdates()
{
if (numProcessedUpdates != null)
return numProcessedUpdates.get();
else
return 0;
}
/**
* get the number of updates received by the replication plugin.
*
* @return the number of updates received
*/
int getNumRcvdUpdates()
{
if (numRcvdUpdates != null)
return numRcvdUpdates.get();
else
return 0;
}
/**
* Get the number of updates sent by the replication plugin.
*
* @return the number of updates sent
*/
int getNumSentUpdates()
{
if (numSentUpdates != null)
return numSentUpdates.get();
else
return 0;
}
/**
* Set the list of Referrals that should be returned when an
* operation needs to be redirected to this server.
*
* @param referralsUrl The list of referrals.
*/
{
}
/**
* Set the timeout of the assured replication.
*
* @param assuredTimeout the timeout of the assured replication.
*/
public void setAssuredTimeout(long assuredTimeout)
{
this.assuredTimeout = assuredTimeout;
}
/**
* Sets the groupID.
*
* @param groupId The groupID.
*/
public void setGroupId(byte groupId)
{
}
/**
* Sets the level of assured replication.
*
* @param assuredSdLevel The level of assured replication.
*/
public void setAssuredSdLevel(byte assuredSdLevel)
{
this.assuredSdLevel = assuredSdLevel;
}
/**
* Sets the assured replication mode.
*
* @param dataMode The assured replication mode.
*/
{
this.assuredMode = dataMode;
}
/**
* Sets assured replication.
*
* @param assured A boolean indicating if assured replication should be used.
*/
public void setAssured(boolean assured)
{
}
/**
* Receives an update message from the replicationServer.
* also responsible for updating the list of pending changes
* @return the received message - null if none
*/
{
{
try
{
{
// The server is in the shutdown process
return null;
}
if (debugEnabled())
if (!(msg instanceof HeartbeatMsg))
{
}
else if (msg instanceof InitializeRequestMsg)
{
// Another server requests us to provide entries
// for a total update
}
else if (msg instanceof InitializeTargetMsg)
{
// Another server is exporting its entries to us
try
{
// This must be done while we are still holding the
// broker lock because we are now going to receive a
// bunch of entries from the remote server and we
// want the import thread to catch them and
// not the ListenerThread.
}
catch(DirectoryException de)
{
// Returns an error message to notify the sender
de.getMessageObject());
}
}
{
{
// This is an error termination for the 2 following cases :
// - either during an export
// - or before an import really started
// For example, when we publish a request and the
// replicationServer did not find any import source.
}
else
{
/*
* Log error message
*/
errorMsg.getDetails()));
}
}
else if (msg instanceof ChangeStatusMsg)
{
}
{
}
}
catch (SocketTimeoutException e)
{
// just retry
}
// Test if we have received and export request message and
// if that's the case handle it now.
// This must be done outside of the portion of code protected
// by the broker lock so that we keep receiveing update
// when we are doing and export and so that a possible
// closure of the socket happening when we are publishing the
// entries to the remote can be handled by the other
// replay thread when they call this method and therefore the
// broker.receive() method.
{
// Do this work in a thread to allow replay thread continue working
}
}
{
}
return update;
}
/**
* Updates the passed monitoring list of errors received for assured messages
* (safe data or safe read, depending of the passed list to update) for a
* particular server in the list. This increments the counter of error for the
* passed server, or creates an initial value of 1 error for it if the server
* is not yet present in the map.
* @param errorList
* @param sid
*/
{
synchronized (errorsByServer)
{
if (serverErrCount == null)
{
// Server not present in list, create an entry with an
// initial number of errors set to 1
} else
{
// Server already present in list, just increment number of
// errors for the server
val++;
}
}
}
/**
* Do the necessary processing when an AckMsg is received.
*
* @param ack The AckMsg that was received.
*/
{
// Remove the message for pending ack list (this may already make the thread
// that is waiting for the ack be aware of its reception)
synchronized (waitingAckMsgs)
{
}
// Signal waiting thread ack has been received
{
synchronized (update)
{
}
// Analyze status of embedded in the ack to see if everything went well
{
// Some problems detected: message not correclty reached every requested
// servers. Log problem
// Increment assured replication monitoring counters
switch (updateAssuredMode)
{
case SAFE_READ_MODE:
if (hasTimeout)
if (hasReplayErrors)
if (hasWrongStatus)
{
{
}
}
break;
case SAFE_DATA_MODE:
// The only possible cause of ack error in safe data mode is timeout
if (hasTimeout) // So should always be the case
{
{
}
}
break;
default:
// Should not happen
}
} else
{
// Update has been acknowledged without errors
// Increment assured replication monitoring counters
switch (updateAssuredMode)
{
case SAFE_READ_MODE:
break;
case SAFE_DATA_MODE:
break;
default:
// Should not happen
}
}
}
}
/**
* Retrieves a replication domain based on the baseDn.
*
* @param serviceID The identifier of the domain to retrieve.
*
* @return The domain retrieved.
*
* @throws DirectoryException When an error occurred or no domain
* match the provided baseDn.
*/
throws DirectoryException
{
if (replicationDomain == null)
{
}
return replicationDomain;
}
/*
* After this point the code is related to the Total Update.
*/
/**
* This thread is launched when we want to export data to another server that
* has requested to be initialized with the data of our backend.
*/
private class ExportThread extends DirectoryThread
{
// Id of server that will receive updates
private int target;
/**
* Constructor for the ExportThread.
*
* @param i Id of server that will receive updates
*/
public ExportThread(int i)
{
super("Export thread " + serverID);
this.target = i;
}
/**
* Run method for this class.
*/
public void run()
{
if (debugEnabled())
{
}
try
{
} catch (DirectoryException de)
{
// An error message has been sent to the peer
// Nothing more to do locally
}
if (debugEnabled())
{
}
}
}
/**
* This class contain the context related to an import or export
* launched on the domain.
*/
protected class IEContext
{
// The private task that initiated the operation.
// The destination in the case of an export
// The source in the case of an import
// The total entry count expected to be processed
long entryCount = 0;
// The count for the entry not yet processed
long entryLeftCount = 0;
// The exception raised when any
// A boolean indicating if the context is related to an
// import or an export.
boolean importInProgress;
/**
* Creates a new IEContext.
*
* @param importInProgress true if the IEContext will be used
* for and import, false if the IEContext
* will be used for and export.
*/
public IEContext(boolean importInProgress)
{
this.importInProgress = importInProgress;
}
/**
* @param total Total number of entries to be processed.
* @param left Remaining number of entries to be processed.
* @throws DirectoryException if an error occurred.
*/
throws DirectoryException
{
entryCount = total;
if (initializeTask != null)
{
if (initializeTask instanceof InitializeTask)
{
}
else if (initializeTask instanceof InitializeTargetTask)
{
}
}
}
/**
* Update the counters of the task for each entry processed during
* an import or export.
* @throws DirectoryException if an error occurred.
*/
public void updateCounters()
throws DirectoryException
{
if (initializeTask != null)
{
if (initializeTask instanceof InitializeTask)
{
}
else if (initializeTask instanceof InitializeTargetTask)
{
}
}
}
/**
* Update the counters of the task for each entry processed during
* an import or export.
*
* @param entriesDone The number of entries that were processed
* since the last time this method was called.
*
* @throws DirectoryException if an error occurred.
*/
public void updateCounters(int entriesDone)
throws DirectoryException
{
if (initializeTask != null)
{
if (initializeTask instanceof InitializeTask)
{
}
else if (initializeTask instanceof InitializeTargetTask)
{
}
}
}
/**
* {@inheritDoc}
*/
{
}
/**
* Gets the server id of the exporting server.
* @return the server id of the exporting server.
*/
public int getExportTarget()
{
return exportTarget;
}
/**
* Gets the server id of the importing server.
* @return the server id of the importing server.
*/
public int getImportSource()
{
return importSource;
}
/**
*/
public DirectoryException getException()
{
return exception;
}
/**
*/
{
}
}
/**
* Verifies that the given string represents a valid source
* from which this server can be initialized.
*
* @param targetString The string representing the source
* @return The source as a integer value
* @throws DirectoryException if the string is not valid
*/
throws DirectoryException
{
int target = 0;
{
return RoutableMsg.ALL_SERVERS;
}
// So should be a serverID
try
{
if (target >= 0)
{
// FIXME Could we check now that it is a know server in the domain ?
}
return target;
}
catch(Exception e)
{
cause = e;
}
throw new DirectoryException(
else
throw new DirectoryException(
}
/**
* Initializes a remote server from this server.
* <p>
* The {@link #exportBackend(OutputStream)} will therefore be called
* on this server, and the {@link #importBackend(InputStream)}
* will be called on the remote server.
* <p>
* The InputStream and OutpuStream given as a parameter to those
* methods will be connected through the replication protocol.
*
* @param target The server-id of the server that should be initialized.
* The target can be discovered using the
* {@link #getReplicasList()} method.
* @param initTask The task that triggers this initialization and that should
* be updated with its progress.
*
* @throws DirectoryException If it was not possible to publish the
* Initialization message to the Topology.
*/
throws DirectoryException
{
{
// Check for the status of all remote servers to check if they
// are all finished with the import.
boolean done = true;
do
{
done = true;
{
{
done = false;
try
{
} catch (InterruptedException e)
{
// just loop again.
}
break;
}
}
}
while (!done);
}
}
/**
* Process the initialization of some other server or servers in the topology
* specified by the target argument when this initialization specifying the
* server that requests the initialization.
*
* @param target The target that should be initialized.
* @param target2 The server that initiated the export.
* @param initTask The task that triggers this initialization and that should
* be updated with its progress.
*
* @exception DirectoryException When an error occurs.
*/
{
boolean contextAcquired=false;
acquireIEContext(false);
contextAcquired = true;
{
}
// The number of entries to be exported is the number of entries under
// the base DN entry and the base entry itself.
long entryCount = this.countEntries();
// Send start message to the peer
try
{
// Notify the peer of the success
}
catch(DirectoryException de)
{
// Notify the peer of the failure
de.getMessageObject());
if (contextAcquired)
throw(de);
}
}
/**
* Get the ServerState maintained by the Concrete class.
*
* @return the ServerState maintained by the Concrete class.
*/
public ServerState getServerState()
{
return state;
}
private synchronized void acquireIEContext(boolean importInProgress)
throws DirectoryException
{
{
// Rejects 2 simultaneous exports
message);
}
}
private synchronized void releaseIEContext()
{
}
/**
* on going.
* @param errorMsg The error message received.
*/
{
// FIXME TBD Treat the case where the error happens while entries
// are being exported
if (debugEnabled())
" abandonImportExport:" + this.serverID +
" serviceID: " + this.serviceID +
" Error Msg received: " + errorMsg);
{
errorMsg.getDetails()));
{
// Update the task that initiated the import
}
}
}
/**
* Receives bytes related to an entry in the context of an import to
* initialize the domain (called by ReplLDIFInputStream).
*
* @return The bytes. Null when the Done or Err message has been received
*/
protected byte[] receiveEntryBytes()
{
while (true)
{
try
{
if (debugEnabled())
" sid:" + serverID +
" base DN:" + serviceID +
" Import EntryBytes received " + msg);
{
// The server is in the shutdown process
return null;
}
{
return entryBytes;
}
{
// This is the normal termination of the import
// No error is stored and the import is ended
// by returning null
return null;
}
{
// This is an error termination during the import
// The error is stored and the import is ended
// by returning null
errorMsg.getDetails()));
return null;
}
else
{
// Other messages received during an import are trashed
}
}
catch(Exception e)
{
// TODO: i18n
e.getLocalizedMessage())));
}
}
}
/**
* Count the number of entries in the provided byte[].
* This is based on the hypothesis that the entries are separated
* by a "\n\n" String.
*
* @param entryBytes
* @return The number of entries in the provided byte[].
*/
private int countEntryLimits(byte[] entryBytes)
{
}
/**
* Count the number of entries in the provided byte[].
* This is based on the hypothesis that the entries are separated
* by a "\n\n" String.
*
* @param entryBytes
* @return The number of entries in the provided byte[].
*/
{
int entryCount = 0;
int count = 0;
{
{
entryCount++;
count++;
}
count++;
}
return entryCount;
}
/**
* Exports an entry in LDIF format.
*
* @param lDIFEntry The entry to be exported in byte[] form.
* @param pos The starting Position in the array.
* @param length Number of array elements to be copied.
*
* @throws IOException when an error occurred.
*/
{
// If an error was raised - like receiving an ErrorMsg
// we just let down the export.
{
throw ioe;
}
try
{
}
catch (DirectoryException de)
{
}
}
/**
* Initializes this domain from another source server.
* <p>
* When this method is called, a request for initialization will
* be sent to the source server asking for initialization.
* <p>
* The {@link #exportBackend(OutputStream)} will therefore be called
* on the source server, and the {@link #importBackend(InputStream)}
* will be called on his server.
* <p>
* The InputStream and OutpuStream given as a parameter to those
* methods will be connected through the replication protocol.
*
* @param source The server-id of the source from which to initialize.
* The source can be discovered using the
* {@link #getReplicasList()} method.
*
* @throws DirectoryException If it was not possible to publish the
* Initialization message to the Topology.
*/
public void initializeFromRemote(int source)
throws DirectoryException
{
}
/**
* Initializes a remote server from this server.
* <p>
* The {@link #exportBackend(OutputStream)} will therefore be called
* on this server, and the {@link #importBackend(InputStream)}
* will be called on the remote server.
* <p>
* The InputStream and OutpuStream given as a parameter to those
* methods will be connected through the replication protocol.
*
* @param target The server-id of the server that should be initialized.
* The target can be discovered using the
* {@link #getReplicasList()} method.
*
* @throws DirectoryException If it was not possible to publish the
* Initialization message to the Topology.
*/
{
}
/**
* Initializes this domain from another source server.
* <p>
* When this method is called, a request for initialization will
* be sent to the source server asking for initialization.
* <p>
* The {@link #exportBackend(OutputStream)} will therefore be called
* on the source server, and the {@link #importBackend(InputStream)}
* will be called on his server.
* <p>
* The InputStream and OutpuStream given as a parameter to those
* methods will be connected through the replication protocol.
*
* @param source The server-id of the source from which to initialize.
* The source can be discovered using the
* {@link #getReplicasList()} method.
* @param initTask The task that launched the initialization
* and should be updated of its progress.
*
* @throws DirectoryException If it was not possible to publish the
* Initialization message to the Topology.
*/
throws DirectoryException
{
if (debugEnabled())
if (!broker.isConnected())
{
if (initTask instanceof InitializeTask)
{
new DirectoryException(
getServiceID())));
}
return;
}
acquireIEContext(true);
// Publish Init request msg
// .. we expect to receive entries or err after that
}
/**
* Initializes the domain's backend with received entries.
* @param initializeMessage The message that initiated the import.
* @exception DirectoryException Thrown when an error occurs.
*/
throws DirectoryException
{
// Go into full update status
{
// The import responds to a request we did so the IEContext
// is already acquired
}
else
{
acquireIEContext(true);
}
try
{
importBackend(new ReplInputStream(this));
}
catch (DirectoryException e)
{
de = e;
}
finally
{
// Update the task that initiated the import
{
}
}
// Sends up the root error.
{
throw de;
}
}
/**
* Sets the status to a new value depending of the passed status machine
* event.
* @param event The event that may make the status be changed
*/
{
{
return;
}
{
// Reset status date
lastStatusChangeDate = new Date();
// Reset monitoring counters if reconnection
// Store new status
if (debugEnabled())
" new status is: " + status);
// Perform whatever actions are needed to apply properties for being
// compliant with new status
}
}
/**
* Returns a boolean indicating if an import or export is currently
* processed.
*
* @return The status
*/
public boolean ieRunning()
{
}
/**
* Check the value of the Replication Servers generation ID.
*
* @param generationID The expected value of the generation ID.
*
* @throws DirectoryException When the generation ID of the Replication
* Servers is not the expected value.
*/
private void checkGenerationID(long generationID)
throws DirectoryException
{
boolean allset = true;
for (int i = 0; i< 10; i++)
{
allset = true;
{
{
try
{
} catch (InterruptedException e)
{
}
allset = false;
break;
}
}
if (allset)
{
break;
}
}
if (!allset)
{
throw new DirectoryException(
}
}
/**
* Reset the Replication Log.
* Calling this method will remove all the Replication information that
* was kept on all the Replication Servers currently connected in the
* topology.
*
* @throws DirectoryException If this ReplicationDomain is not currently
* connected to a Replication Server or it
* was not possible to contact it.
*/
public void resetReplicationLog() throws DirectoryException
{
// Reset the Generation ID to -1 to clean the ReplicationServers.
resetGenerationId((long)-1);
// check that at least one ReplicationServer did change its generation-id
checkGenerationID(-1);
// Reconnect to the Replication Server so that it adopt our
// GenerationID.
// wait for the domain to reconnect.
int count = 0;
{
try
{
} catch (InterruptedException e)
{
}
}
// check that at least one ReplicationServer did change its generation-id
}
/**
* Reset the generationId of this domain in the whole topology.
* A message is sent to the Replication Servers for them to reset
* their change dbs.
*
* @param generationIdNewValue The new value of the generation Id.
* @throws DirectoryException When an error occurs
*/
throws DirectoryException
{
if (debugEnabled())
+ "resetGenerationId" + generationIdNewValue);
if (!isConnected())
{
throw new DirectoryException(
}
if (generationIdNewValue == null)
{
}
else
{
}
// check that at least one ReplicationServer did change its generation-id
if (generationIdNewValue == null)
{
checkGenerationID(this.getGenerationID());
}
else
{
}
}
/*
******** End of The total Update code *********
*/
/*
******* Start of Monitoring Code **********
*/
/**
* Get the maximum receive window size.
*
* @return The maximum receive window size.
*/
int getMaxRcvWindow()
{
return broker.getMaxRcvWindow();
else
return 0;
}
/**
* Get the current receive window size.
*
* @return The current receive window size.
*/
int getCurrentRcvWindow()
{
return broker.getCurrentRcvWindow();
else
return 0;
}
/**
* Get the maximum send window size.
*
* @return The maximum send window size.
*/
int getMaxSendWindow()
{
return broker.getMaxSendWindow();
else
return 0;
}
/**
* Get the current send window size.
*
* @return The current send window size.
*/
int getCurrentSendWindow()
{
return broker.getCurrentSendWindow();
else
return 0;
}
/**
* Get the number of times the replication connection was lost.
* @return The number of times the replication connection was lost.
*/
int getNumLostConnections()
{
return broker.getNumLostConnections();
else
return 0;
}
/**
* Determine whether the connection to the replication server is encrypted.
* @return true if the connection is encrypted, false otherwise.
*/
boolean isSessionEncrypted()
{
return broker.isSessionEncrypted();
else
return false;
}
/**
* This method is called when the ReplicationDomain has completed the
* processing of a received update synchronously.
* In such cases the processUpdateDone () is called and the state
* is updated automatically.
*
* @param msg The UpdateMessage that was processed.
*/
{
// Warning: in synchronous mode, no way to tell the replay of an update went
// wrong Just put null in processUpdateDone so that if assured replication
// is used the ack is sent without error at replay flag.
}
/**
* Check if the domain is connected to a ReplicationServer.
*
* @return true if the server is connected, false if not.
*/
public boolean isConnected()
{
return broker.isConnected();
else
return false;
}
/**
* Check if the domain has a connection error.
* A Connection error happens when the broker could not be created
* or when the broker could not find any ReplicationServer to connect to.
*
* @return true if the domain has a connection error.
*/
public boolean hasConnectionError()
{
return broker.hasConnectionError();
else
return true;
}
/**
* Get the name of the replicationServer to which this domain is currently
* connected.
*
* @return the name of the replicationServer to which this domain
* is currently connected.
*/
public String getReplicationServer()
{
return broker.getReplicationServer();
else
return "Not connected";
}
/**
* Gets the number of updates sent in assured safe read mode.
* @return The number of updates sent in assured safe read mode.
*/
public int getAssuredSrSentUpdates()
{
return assuredSrSentUpdates.get();
}
/**
* Gets the number of updates sent in assured safe read mode that have been
* acknowledged without errors.
* @return The number of updates sent in assured safe read mode that have been
* acknowledged without errors.
*/
public int getAssuredSrAcknowledgedUpdates()
{
return assuredSrAcknowledgedUpdates.get();
}
/**
* Gets the number of updates sent in assured safe read mode that have not
* been acknowledged.
* @return The number of updates sent in assured safe read mode that have not
* been acknowledged.
*/
public int getAssuredSrNotAcknowledgedUpdates()
{
return assuredSrNotAcknowledgedUpdates.get();
}
/**
* Gets the number of updates sent in assured safe read mode that have not
* been acknowledged due to timeout error.
* @return The number of updates sent in assured safe read mode that have not
* been acknowledged due to timeout error.
*/
public int getAssuredSrTimeoutUpdates()
{
return assuredSrTimeoutUpdates.get();
}
/**
* Gets the number of updates sent in assured safe read mode that have not
* been acknowledged due to wrong status error.
* @return The number of updates sent in assured safe read mode that have not
* been acknowledged due to wrong status error.
*/
public int getAssuredSrWrongStatusUpdates()
{
return assuredSrWrongStatusUpdates.get();
}
/**
* Gets the number of updates sent in assured safe read mode that have not
* been acknowledged due to replay error.
* @return The number of updates sent in assured safe read mode that have not
* been acknowledged due to replay error.
*/
public int getAssuredSrReplayErrorUpdates()
{
return assuredSrReplayErrorUpdates.get();
}
/**
* Gets the number of updates sent in assured safe read mode that have not
* been acknowledged per server.
* @return The number of updates sent in assured safe read mode that have not
* been acknowledged per server.
*/
{
// Clone a snapshot with synchronized section to have a consistent view in
// monitoring
synchronized(assuredSrServerNotAcknowledgedUpdates)
{
{
}
}
return snapshot;
}
/**
* Gets the number of updates received in assured safe read mode request.
* @return The number of updates received in assured safe read mode request.
*/
public int getAssuredSrReceivedUpdates()
{
return assuredSrReceivedUpdates.get();
}
/**
* Gets the number of updates received in assured safe read mode that we acked
* without error (no replay error).
* @return The number of updates received in assured safe read mode that we
* acked without error (no replay error).
*/
public int getAssuredSrReceivedUpdatesAcked()
{
return this.assuredSrReceivedUpdatesAcked.get();
}
/**
* Gets the number of updates received in assured safe read mode that we did
* not ack due to error (replay error).
* @return The number of updates received in assured safe read mode that we
* did not ack due to error (replay error).
*/
public int getAssuredSrReceivedUpdatesNotAcked()
{
return this.assuredSrReceivedUpdatesNotAcked.get();
}
/**
* Gets the number of updates sent in assured safe data mode.
* @return The number of updates sent in assured safe data mode.
*/
public int getAssuredSdSentUpdates()
{
return assuredSdSentUpdates.get();
}
/**
* Gets the number of updates sent in assured safe data mode that have been
* acknowledged without errors.
* @return The number of updates sent in assured safe data mode that have been
* acknowledged without errors.
*/
public int getAssuredSdAcknowledgedUpdates()
{
return assuredSdAcknowledgedUpdates.get();
}
/**
* Gets the number of updates sent in assured safe data mode that have not
* been acknowledged due to timeout error.
* @return The number of updates sent in assured safe data mode that have not
* been acknowledged due to timeout error.
*/
public int getAssuredSdTimeoutUpdates()
{
return assuredSdTimeoutUpdates.get();
}
/**
* Gets the number of updates sent in assured safe data mode that have not
* been acknowledged due to timeout error per server.
* @return The number of updates sent in assured safe data mode that have not
* been acknowledged due to timeout error per server.
*/
{
// Clone a snapshot with synchronized section to have a consistent view in
// monitoring
synchronized(assuredSdServerTimeoutUpdates)
{
{
}
}
return snapshot;
}
/**
* Gets the date of the last status change.
* @return The date of the last status change.
*/
public Date getLastStatusChangeDate()
{
return lastStatusChangeDate;
}
/**
* Resets the values of the monitoring counters.
*/
private void resetMonitoringCounters()
{
}
/*
********** End of Monitoring Code **************
*/
/**
* Start the publish mechanism of the Replication Service.
* After this method has been called, the publish service can be used
* by calling the {@link #publish(UpdateMsg)} method.
*
* @param replicationServers The replication servers that should be used.
* @param window The window size of this replication domain.
* @param heartbeatInterval The heartbeatInterval that should be used
* to check the availability of the replication
* servers.
* @param changetimeHeartbeatInterval The interval used to send change
* time heartbeat to the replication server.
*
* @throws ConfigException If the DirectoryServer configuration was
* incorrect.
*/
public void startPublishService(
long heartbeatInterval, long changetimeHeartbeatInterval)
throws ConfigException
{
{
/*
* create the broker object used to publish and receive changes
*/
broker = new ReplicationBroker(
new ReplSessionSecurity(),
getGroupId(),
/*
* Create a replication monitor object responsible for publishing
* monitoring information below cn=monitor.
*/
monitor = new ReplicationMonitor(this);
}
}
/**
* Start the publish mechanism of the Replication Service.
* After this method has been called, the publish service can be used
* by calling the {@link #publish(UpdateMsg)} method.
*
* @param replicationServers The replication servers that should be used.
* @param window The window size of this replication domain.
* @param heartbeatInterval The heartbeatInterval that should be used
* to check the availability of the replication
* servers.
* @throws ConfigException If the DirectoryServer configuration was
* incorrect.
*/
public void startPublishService(
long heartbeatInterval)
throws ConfigException
{
{
/*
* create the broker object used to publish and receive changes
*/
broker = new ReplicationBroker(
new ReplSessionSecurity(),
getGroupId(),
0); // change time heartbeat is disabled
/*
* Create a replication monitor object responsible for publishing
* monitoring information below cn=monitor.
*/
monitor = new ReplicationMonitor(this);
}
}
/**
* Starts the receiver side of the Replication Service.
* <p>
* After this method has been called, the Replication Service will start
* calling the {@link #processUpdate(UpdateMsg)}.
* <p>
* This method must be called once and must be called after the
* {@link #startPublishService(Collection, int, long)}.
*
*/
public void startListenService()
{
//
// Create the listener thread
listenerThread = new ListenerThread(this);
}
/**
* Temporarily disable the Replication Service.
* The Replication Service can be enabled again using
* {@link #enableService()}.
* <p>
* It can be useful to disable the Replication Service when the
* repository where the replicated information is stored becomes
* temporarily unavailable and replicated updates can therefore not
* be replayed during a while.
*/
public void disableService()
{
// Stop the listener thread
if (listenerThread != null)
{
}
{
}
// Wait for the listener thread to stop
if (listenerThread != null)
}
/**
* Restart the Replication service after a {@link #disableService()}.
* <p>
* The Replication Service will restart from the point indicated by the
* {@link ServerState} that was given as a parameter to the
* {@link #startPublishService(Collection, int, long)}
* at startup time.
* If some data have changed in the repository during the period of time when
* the Replication Service was disabled, this {@link ServerState} should
* therefore be updated by the Replication Domain subclass before calling
* this method.
*/
public void enableService()
{
// Create the listener thread
listenerThread = new ListenerThread(this);
}
/**
* Definitively stops the Replication Service.
*/
public void stopDomain()
{
}
/**
* Change the ReplicationDomain parameters.
*
* @param replicationServers The new list of Replication Servers that this
* domain should now use.
* @param windowSize The window size that this domain should use.
* @param heartbeatInterval The heartbeatInterval that this domain should
* use.
* @param groupId The new group id to use
*/
public void changeConfig(
int windowSize,
long heartbeatInterval,
byte groupId)
{
{
if (broker.changeConfig(
{
}
}
}
/**
* This method should trigger an export of the replicated data.
* to the provided outputStream.
* When finished the outputStream should be flushed and closed.
*
* @param output The OutputStream where the export should
* be produced.
* @throws DirectoryException When needed.
*/
throws DirectoryException;
/**
* This method should trigger an import of the replicated data.
*
* @param input The InputStream from which
* the import should be reading entries.
*
* @throws DirectoryException When needed.
*/
throws DirectoryException;
/**
* This method should return the total number of objects in the
* replicated domain.
* This count will be used for reporting.
*
* @throws DirectoryException when needed.
*
* @return The number of objects in the replication domain.
*/
public abstract long countEntries() throws DirectoryException;
/**
* This method should handle the processing of {@link UpdateMsg} receive
* from remote replication entities.
* <p>
* This method will be called by a single thread and should therefore
* should not be blocking.
*
* @param updateMsg The {@link UpdateMsg} that was received.
*
* @return A boolean indicating if the processing is completed at return
* time.
* If <code> true </code> is returned, no further
* processing is necessary.
*
* If <code> false </code> is returned, the subclass should
* call the method
* {@link #processUpdateDone(UpdateMsg, String)}
* and update the ServerState
* When this processing is complete.
*
*/
/**
* This method must be called after each call to
* {@link #processUpdate(UpdateMsg)} when the processing of the update is
* completed.
* <p>
* It is useful for implementation needing to process the update in an
* asynchronous way or using several threads, but must be called even
* by implementation doing it in a synchronous, single-threaded way.
*
* @param msg The UpdateMsg whose processing was completed.
* @param replayErrorMsg if not null, this means an error occurred during the
* replay of this update, and this is the matching human readable message
* describing the problem.
*/
{
// Send an ack if it was requested and the group id is the same of the RS
// one. Only Safe Read mode makes sense in DS for returning an ack.
{
// Assured feature is supported starting from replication protocol V2
if (broker.getProtocolVersion() >=
{
{
{
// Send the ack
if (replayErrorMsg != null)
{
// Mark the error in the ack
// -> replay error occured
ackMsg.setHasReplayError(true);
// -> replay error occured in our server
}
if (replayErrorMsg != null)
{
} else
{
}
}
{
} else
{
// In safe data mode assured update that comes up to a DS requires no
// ack from a destinator DS. Safe data mode is based on RS acks only
}
}
}
}
/**
* Prepare a message if it is to be sent in assured mode.
* If the assured mode is enabled, this method should be called before
* publish(UpdateMsg msg) method. This will configure the update accordingly
* before it is sent and will prepare the mechanism that will block until the
* matching ack is received. To wait for the ack after publish call, use
* the waitForAckIfAssuredEnabled() method.
* The expected typical usage in a service inheriting from this class is
* the following sequence:
* UpdateMsg msg = xxx;
* prepareWaitForAckIfAssuredEnabled(msg);
* publish(msg);
* waitForAckIfAssuredEnabled(msg);
*
* Note: prepareWaitForAckIfAssuredEnabled and waitForAckIfAssuredEnabled have
* no effect if assured replication is disabled.
* Note: this mechanism should not be used if using publish(byte[] msg)
* version as usage of these methods is already hidden inside.
*
* @param msg The update message to be sent soon.
*/
{
/*
* If assured configured, set message accordingly to request an ack in the
* right assured mode.
* No ack requested for a RS with a different group id. Assured
* replication suported for the same locality, i.e: a topology working in
* the same
* geographical location). If we are connected to a RS which is not in our
* locality, no need to ask for an ack.
*/
{
msg.setAssured(true);
// Add the assured message to the list of update that are
// waiting for acks
synchronized (waitingAckMsgs)
{
}
}
}
/**
* Wait for the processing of an assured message after it has been sent, if
* assured replication is configured, otherwise, do nothing.
* The prepareWaitForAckIfAssuredEnabled method should have been called
* before, see its comment for the full picture.
*
* @param msg The UpdateMsg for which we are waiting for an ack.
* @throws TimeoutException When the configured timeout occurs waiting for the
* ack.
*/
throws TimeoutException
{
// If assured mode configured, wait for acknowledgement for the just sent
// message
{
// Increment assured replication monitoring counters
switch (assuredMode)
{
case SAFE_READ_MODE:
break;
case SAFE_DATA_MODE:
break;
default:
// Should not happen
}
} else
{
// Not assured or bad group id, return immediately
return;
}
// Wait for the ack to be received, timing out if necessary
synchronized (msg)
{
{
try
{
// WARNING: this timeout may be difficult to optimize: too low, it
// may use too much CPU, too high, it may penalize performance...
} catch (InterruptedException e)
{
if (debugEnabled())
{
"serviceID: " + serviceID);
}
break;
}
// Timeout ?
{
// Timeout occured, be sure that ack is not being received and if so,
// remove the update from the wait list, log the timeout error and
// also update assured monitoring counters
synchronized (waitingAckMsgs)
{
}
{
// No luck, this is a real timeout
// Increment assured replication monitoring counters
switch (msg.getAssuredMode())
{
case SAFE_READ_MODE:
// Increment number of errors for our RS
broker.getRsServerId());
break;
case SAFE_DATA_MODE:
// Increment number of errors for our RS
broker.getRsServerId());
break;
default:
// Should not happen
}
assuredTimeout + " ms.");
} else
{
// Ack received just before timeout limit: we can exit
break;
}
}
}
}
}
/**
* Publish an {@link UpdateMsg} to the Replication Service.
* <p>
* The Replication Service will handle the delivery of this {@link UpdateMsg}
* to all the participants of this Replication Domain.
* These members will be receive this {@link UpdateMsg} through a call
* of the {@link #processUpdate(UpdateMsg)} message.
*
* @param msg The UpdateMsg that should be pushed.
*/
{
// Publish the update
}
/**
* Publish informations to the Replication Service (not assured mode).
*
* @param msg The byte array containing the informations that should
* be sent to the remote entities.
*/
{
synchronized (this)
{
// If assured replication is configured, this will prepare blocking
// mechanism. If assured replication is disabled, this returns
// immediately
}
try
{
// If assured replication is enabled, this will wait for the matching
// ack or time out. If assured replication is disabled, this returns
// immediately
} catch (TimeoutException ex)
{
// This exception may only be raised if assured replication is
// enabled
}
}
/**
* This method should return the generationID to use for this
* ReplicationDomain.
* This method can be called at any time after the ReplicationDomain
* has been started.
*
* @return The GenerationID.
*/
public abstract long getGenerationID();
/**
* Subclasses should use this method to add additional monitoring
* information in the ReplicationDomain.
*
* @return Additional monitoring attributes that will be added in the
* ReplicationDomain monitoring entry.
*/
{
}
/**
* Returns a boolean indicating if a total update import is currently
* in Progress.
*
* @return A boolean indicating if a total update import is currently
* in Progress.
*/
public boolean importInProgress()
{
return false;
else
return ieContext.importInProgress;
}
/**
* Returns a boolean indicating if a total update export is currently
* in Progress.
*
* @return A boolean indicating if a total update export is currently
* in Progress.
*/
public boolean exportInProgress()
{
return false;
else
return !ieContext.importInProgress;
}
/**
* Returns the number of entries still to be processed when a total update
* is in progress.
*
* @return The number of entries still to be processed when a total update
* is in progress.
*/
long getLeftEntryCount()
{
return ieContext.entryLeftCount;
else
return 0;
}
/**
* Returns the total number of entries to be processed when a total update
* is in progress.
*
* @return The total number of entries to be processed when a total update
* is in progress.
*/
long getTotalEntryCount()
{
return ieContext.entryCount;
else
return 0;
}
/**
* Add an attribute to the list of attributes to include in the ECL.
* @param attribute The attribute to add.
*/
{
}
/**
* Get the list of attributes to include in the ECL.
* @return The list of attributes.
*/
{
return eClIncludes;
}
/**
* Set the list of attributes to include in the ECL.
* @param eclIncludes The list of attributes.
*/
{
this.cfgEclIncludes = eclIncludes;
this.eClIncludes = eclIncludes;
}
}