/*
* CDDL HEADER START
*
* The contents of this file are subject to the terms of the
* Common Development and Distribution License, Version 1.0 only
* (the "License"). You may not use this file except in compliance
* with the License.
*
* You can obtain a copy of the license at
* See the License for the specific language governing permissions
* and limitations under the License.
*
* When distributing Covered Code, include this CDDL HEADER in each
* file and include the License file at
* trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
* add the following below this CDDL HEADER, with the fields enclosed
* by brackets "[]" replaced with your own identifying information:
* Portions Copyright [yyyy] [name of copyright owner]
*
* CDDL HEADER END
*
*
* Copyright 2009-2010 Sun Microsystems, Inc.
* Portions Copyright 2010-2013 ForgeRock AS
*/
/**
* This class defines a server handler, which handles all interaction with a
* peer replication server.
*/
{
// This is a string identifying the operation, provided by the client part
// of the ECL, used to help interpretation of messages logged.
// Iterator on the draftCN database.
boolean draftCompat = false;
/**
* Specifies the last draft changer number (seqnum) requested.
*/
/**
* Specifies whether the draft change number (seqnum) db has been read until
* its end.
*/
public boolean isEndOfDraftCNReached = false;
/**
* Specifies whether the current search has been requested to be persistent
* or not.
*/
public short isPersistent;
/**
* Specifies the current search phase : INIT or PERSISTENT.
*/
/**
* Specifies the cookie contained in the request, specifying where
* to start serving the ECL.
*/
/**
* Specifies the value of the cookie before the change currently processed
* is returned. It is updated with the change number of the change
* currently processed (thus becoming the "current" cookie just
* before the change is returned.
*/
new MultiDomainServerState();
/**
* Specifies the excluded DNs (like cn=admin, ...).
*/
//HashSet<String> excludedServiceIDs = new HashSet<String>();
/**
* Eligible changeNumber - only changes older or equal to eligibleCN
* are published in the ECL.
*/
/**
* Provides a string representation of this object.
* @return the string representation.
*/
{
return this.getClass().getCanonicalName() +
"[" +
"[draftCompat=" + draftCompat +
"] [persistent=" + isPersistent +
"] [lastDraftCN=" + lastDraftCN +
"] [isEndOfDraftCNReached=" + isEndOfDraftCNReached +
"] [searchPhase=" + searchPhase +
"] [startCookie=" + startCookie +
"] [previousCookie=" + previousCookie +
"]]";
}
/**
* Class that manages the 'by domain' state variables for the search being
* currently processed on the ECL.
* For example :
* if search on 'cn=changelog' is being processed when 2 replicated domains
* dc=us and dc=europe are configured, then there will be 2 DomainContext
* used, one for ds=us, and one for dc=europe.
*/
private class DomainContext
{
// supposed eligible for the ECL.
// the changes for this domain
long domainLatestTrimDate;
/**
* {@inheritDoc}
*/
{
}
/**
* Provide a string representation of this object for debug purpose..
* @param buffer Append to this buffer.
*/
{
.append("]]");
}
/**
* Get the next message eligible regarding
* the crossDomain eligible CN. Put it in the context table.
* @param opid The operation id.
*/
{
if (debugEnabled())
+ "ctxt=" + toString());
try
{
// Before get a new message from the domain, evaluate in priority
// a message that has not been published to the ECL because it was
// not eligible
if (nextNonEligibleMsg != null)
{
boolean hasBecomeEligible =
<= eligibleCN.getTime());
if (debugEnabled())
+ " stored nonEligibleMsg " + nextNonEligibleMsg
+ " has now become eligible regarding "
+ " the eligibleCN ("+ eligibleCN
+ " ):" + hasBecomeEligible);
if (hasBecomeEligible)
{
// it is now eligible
}
// else the oldest is still not eligible - let's wait next
}
else
{
// Here comes a new message !!!
// non blocking
do {
// when the replication changelog is trimmed, the last (latest) chg
// is left in the db (whatever its age), and we don't want this chg
// to be returned in the external changelog.
// So let's check if the chg time is older than the trim date
if (debugEnabled())
+ " got new message : "
// in non blocking mode, return null when no more msg
{
<= eligibleCN.getTime());
if (debugEnabled())
+ "] eligibleCN=[" + eligibleCN
+ dumpState());
if (isEligible)
{
}
else
{
}
}
}
}
catch(Exception e)
{
}
}
/**
* Unregister the handler from the DomainContext ReplicationDomain.
* @return Whether the handler has been unregistered with success.
*/
private boolean unRegisterHandler()
{
}
/**
* Stops the DomainContext handler.
*/
private void stopServer()
{
}
}
// The global list of contexts by domain for the search currently processed.
{
}
}
/**
* Starts this handler based on a start message received from remote server.
* @param inECLStartMsg The start msg provided by the remote server.
* @return Whether the remote server requires encryption or not.
* @throws DirectoryException When a problem occurs.
*/
throws DirectoryException
{
try
{
.getVersion()));
{
// We support connection from a V1 RS
// Only V2 protocol has the group id in repl server start message
}
}
catch(Exception e)
{
}
return inECLStartMsg.getSSLEncryption();
}
/**
* Sends a start message to the remote ECL server.
*
* @return The StartMsg sent.
* @throws IOException
* When an exception occurs.
*/
{
// Before V4 protocol, we sent a ReplServerStartMsg
{
// Peer DS uses protocol < V4 : send it a ReplServerStartMsg
}
else
{
// Peer DS uses protocol V4 : send it a ReplServerStartDSMsg
}
return startMsg;
}
/**
* Creates a new handler object to a remote replication server.
* @param session The session with the remote RS.
* @param queueSize The queue size to manage updates to that RS.
* @param replicationServerURL The hosting local RS URL.
* @param replicationServerId The hosting local RS serverId.
* @param replicationServer The hosting local RS object.
* @param rcvWindowSize The receiving window size.
*/
public ECLServerHandler(
int queueSize,
int replicationServerId,
int rcvWindowSize)
{
try
{
}
catch(DirectoryException de)
{
// no chance to have a bad domain set here
}
}
/**
* Creates a new handler object to a remote replication server.
* @param replicationServerURL The hosting local RS URL.
* @param replicationServerId The hosting local RS serverId.
* @param replicationServer The hosting local RS object.
* @param startECLSessionMsg the start parameters.
* @throws DirectoryException when an errors occurs.
*/
public ECLServerHandler(
int replicationServerId,
throws DirectoryException
{
// queueSize is hard coded to 1 else super class hangs for some reason
replicationServer, 0);
try
{
}
catch(DirectoryException de)
{
// no chance to have a bad domain set here
}
this.initialize(startECLSessionMsg);
}
/**
* Starts the handler from a remote start message received from
* the remote server.
* @param inECLStartMsg The provided ReplServerStart message received.
*/
{
try
{
// Process start from remote
boolean sessionInitiatorSSLEncryption =
// lock with timeout
if (replicationServerDomain != null)
{
lockDomain(true);
}
localGenerationId = -1;
// send start to remote
// log
// until here session is encrypted then it depends on the negotiation
// The session initiator decides whether to use SSL.
// wait and process StartSessionMsg from remote RS
if (inStartECLSessionMsg == null)
{
// client wants to properly close the connection (client sent a
// StopMsg)
return;
}
// initialization
}
catch(DirectoryException de)
{
}
catch(Exception e)
{
}
finally
{
if ((replicationServerDomain != null) &&
{
}
}
}
/**
* Wait receiving the StartSessionMsg from the remote DS and process it.
* @return the startSessionMsg received
* @throws DirectoryException
* @throws IOException
* @throws ClassNotFoundException
* @throws DataFormatException
* @throws NotSupportedOldVersionPDUException
*/
{
{
// client wants to stop handshake (was just for handshake phase one for RS
// choice). Return null to make the session be terminated.
return null;
}
else if (!(msg instanceof StartECLSessionMsg))
{
+ " received.");
return null;
}
else
{
// Process StartSessionMsg sent by remote DS
return (StartECLSessionMsg) msg;
}
}
/**
* Initialize the handler from a provided cookie value.
* @param crossDomainStartState The provided cookie value.
* @throws DirectoryException When an error is raised.
*/
throws DirectoryException
{
}
/**
* Initialize the handler from a provided draft first change number.
* @param startDraftCN The provided draft first change number.
* @throws DirectoryException When an error is raised.
*/
throws DirectoryException
{
try
{
draftCompat = true;
// Any possible optimization on draft CN in the request filter ?
if (startDraftCN <= 1)
{
// Request filter DOES NOT contain any firstDraftCN
// So we'll generate from the beginning of what we have stored here.
// Get starting state from first DraftCN from DraftCNdb
{
// DraftCNdb IS EMPTY hence start from what we have in the changelog db.
isEndOfDraftCNReached = true;
}
else
{
// DraftCNdb IS NOT EMPTY hence start from
// the generalizedServerState related to the start of the draftDb
// And get an iterator to traverse the draftCNDb
}
}
else
{
// Request filter DOES contain a startDraftCN
// Read the draftCNDb to see whether it contains startDraftCN
if (crossDomainStartState != null)
{
// startDraftCN (from the request filter) is present in the draftCnDb
// Get an iterator to traverse the draftCNDb
}
else
{
// startDraftCN provided in the request IS NOT in the DraftCNDb
// Get the draftLimits (from the eligibleCN got at the beginning of
// the operation) in order to have the first and possible last
// DraftCN.
// If the startDraftCN provided is lower than the first Draft CN in
// the DB, let's use the lower limit.
{
if (crossDomainStartState != null)
{
// startDraftCN (from the request filter) is present in the
// draftCnDb.
// Get an iterator to traverse it.
}
else
{
// This shouldn't happen
// Let's start from what we have in the changelog db.
isEndOfDraftCNReached = true;
}
}
{
// startDraftCN is between first and potential last and has never
// been returned yet
{
// db is empty
isEndOfDraftCNReached = true;
}
else
{
}
// TODO:ECL ... ok we'll start from the end of the draftCNDb BUT ...
// this may be very long. Work on perf improvement here.
}
else
{
// startDraftCN is > the potential last DraftCN
}
}
}
this.draftCompat = true;
}
catch(DirectoryException de)
{
if (draftCNDbIter != null)
throw(de);
}
catch(Exception e)
{
if (draftCNDbIter != null)
throw new DirectoryException(
}
}
/**
* Initialize the context for each domain.
* @param providedCookie the provided generalized state
* @param allowUnknownDomains Provides all changes for domains not included
* in the provided cookie.
* @throws DirectoryException When an error occurs.
*/
boolean allowUnknownDomains)
throws DirectoryException
{
/*
This map is initialized from the providedCookie.
Below, it will be traversed and each domain configured with ECL will be
checked and removed from the map.
At the end, normally the map should be empty.
Depending on allowUnknownDomains provided flag, a non empty map will
be considered as an error when allowUnknownDomains is false.
*/
// Parse the provided cookie and overwrite startState from it.
try
{
// Creates the table that will contain the real-time info for each
// and every domain.
{
{
// process a domain
// skip the 'unreal' changelog domain
if (rsd == this.replicationServerDomain)
continue;
// skip the excluded domains
{
// this is an excluded domain
if (allowUnknownDomains)
continue;
}
// skip unused domains
continue;
// Creates the new domain context
newDomainCtxt.active = true;
// Assign the start state for the domain
if (isPersistent ==
{
}
else
{
// let's take the start state for this domain from the provided
// cookie
{
// when there is no cookie provided in the request,
// let's start traversing this domain from the beginning of
// what we have in the replication changelog
{
}
}
else
{
// when there is a cookie provided in the request,
{
continue;
}
{
/*
when the provided startState is older than the replication
changelogdb startState, it means that the replication
changelog db has been trimmed and the cookie is not valid
anymore.
*/
boolean cookieTooOld = false;
{
if ((providedChange != null)
{
cookieTooOld=true;
}
}
if (cookieTooOld)
{
// the provided start
}
}
}
// Set the stop state for the domain from the eligibleCN
}
// Creates an unconnected SH for the domain
// set initial state
// set serviceID and domain
// register the unconnected into the domain
// store the new context
}
}
{
// If there are domain missing in the provided cookie,
// the request is rejected and a full resync is required.
}
/*
When it is valid to have the provided cookie containing unknown domains
(allowUnknownDomains is true), 2 cases must be considered :
- if the cookie contains a domain that is replicated but where
ECL is disabled, then this case is considered above
- if the cookie contains a domain that is even not replicated
then this case need to be considered here in another loop.
*/
if (!startStatesFromProvidedCookie.isEmpty())
{
if (allowUnknownDomains)
// the domain provided in the cookie is not replicated
}
// Now do the final checking
if (!startStatesFromProvidedCookie.isEmpty())
{
/*
After reading all the known domains from the provided cookie, there
is one (or several) domain that are not currently configured.
This domain has probably been removed or replication disabled on it.
The request is rejected and full resync is required.
*/
}
}
// the next record from the DraftCNdb should be the one
// Initializes each and every domain with the next(first) eligible message
// from the domain.
domainCtxt.active = false;
}
}
catch(DirectoryException de)
{
throw de;
}
catch(Exception e)
{
// FIXME:ECL do not publish internal exception plumb to the client
throw new DirectoryException(
e),
e);
}
if (debugEnabled())
}
/**
* Registers this handler into its related domain and notifies the domain.
*/
private void registerIntoDomain()
{
if (replicationServerDomain!=null)
}
/**
* Shutdown this handler.
*/
public void shutdown()
{
if (debugEnabled())
if (this.draftCNDbIter != null)
{
}
if (!domainCtxt.unRegisterHandler()) {
this + " shutdown() - error when unregistering handler "
+ domainCtxt.mh));
}
}
super.shutdown();
domainCtxts = null;
}
/**
* Request to shutdown the associated writer.
*/
protected void shutdownWriter()
{
shutdownWriter = true;
{
}
}
/**
* {@inheritDoc}
*/
{
return "Connected External Changelog Server " + str +
}
/**
* Retrieves a set of attributes containing monitor data that should be
* returned to the client if the corresponding monitor entry is requested.
*
* @return A set of attributes containing monitor data that should be
* returned to the client if the corresponding monitor entry is
* requested.
*/
{
// Get the generic ones
// Add the specific RS ones
serverURL));
// TODO:ECL No monitoring exist for ECL.
return attributes;
}
/**
* {@inheritDoc}
*/
{
localString = "External changelog Server ";
if (this.serverId != 0)
+ " " + this.getOperationId();
else
return localString;
}
/**
* Gets the status of the connected DS.
* @return The status of the connected DS.
*/
{
// There is no other status possible for the ECL Server Handler to
// be normally connected.
return ServerStatus.NORMAL_STATUS;
}
/**
* {@inheritDoc}
*/
public boolean isDataServer()
{
return true;
}
/**
* Initialize the handler.
* @param startECLSessionMsg The provided starting state.
* @throws DirectoryException when a problem occurs.
*/
throws DirectoryException
{
try
{
}
catch(Exception e)
{
throw new DirectoryException(
}
if (startECLSessionMsg.getECLRequestType()==
{
}
else if (startECLSessionMsg.getECLRequestType()==
{
}
{
try
{
// Disable timeout for next communications
}
catch(Exception e) { /* do nothing */ }
// sendWindow MUST be created before starting the writer
// create reader
{
// create writer
}
// Resume the writer
// TODO:ECL Potential race condition if writer not yet resumed here
}
{
}
/* TODO: From replication changenumber
//--
if (startCLMsg.getStartMode()==2)
{
if (CLSearchFromProvidedExactCN(startCLMsg.getChangeNumber()))
return;
}
//--
if (startCLMsg.getStartMode()==4)
{
// to get the CL first and last
initializeCLDomCtxts(null); // from start
ChangeNumber crossDomainEligibleCN = computeCrossDomainEligibleCN();
try
{
// to get the CL first and last
// last rely on the crossDomainEligibleCN thus must have been
// computed before
int[] limits = computeCLLimits(crossDomainEligibleCN);
// Send the response
CLLimitsMsg msg = new CLLimitsMsg(limits[0], limits[1]);
session.publish(msg);
}
catch(Exception e)
{
TRACER.debugCaught(DebugLogLevel.ERROR, e);
try
{
session.publish(
new ErrorMsg(
replicationServerDomain.getReplicationServer().getServerId(),
serverId,
Message.raw(Category.SYNC, Severity.INFORMATION,
"Exception raised: " + e.getMessage())));
}
catch(IOException ioe)
{
// FIXME: close conn ?
}
}
return;
}
*/
// Store into domain
if (debugEnabled())
" initialized: " +
}
/**
* Select the next update that must be sent to the server managed by this
* ServerHandler.
*
* @return the next update that must be sent to the server managed by this
* ServerHandler.
* @exception DirectoryException when an error occurs.
*/
throws DirectoryException
{
boolean interrupted = true;
// TODO:ECL We should refactor so that a SH always have a session
return msg;
boolean acquired = false;
do
{
try
{
interrupted = false;
} catch (InterruptedException e)
{
// loop until not interrupted
}
{
}
return msg;
}
/**
* Get the next message - non blocking - null when none.
* This method is currently not used but we don't want to keep the mother
* class method since it make no sense for ECLServerHandler.
* @param synchronous - not used
* @return the next message
*/
{
try
{
}
catch(DirectoryException de)
{
}
return msg;
}
/**
* Returns the next update message for the External Changelog (ECL).
* @return the ECL update message, null when there aren't anymore.
* @throws DirectoryException when an error occurs.
*/
throws DirectoryException
{
if (debugEnabled())
" getNextECLUpdate starts: " + dumpState());
try
{
// getMessage:
// get the oldest msg:
// after:
// if stopState of domain is covered then domain is out candidate
// until no domain candidate mean generalized stopState
// has been reached
// else
// get one msg from that domain
// no (null) msg returned: should not happen since we go to a state
// Persistent:
// ----------
// step 1&2: same as non persistent
//
// step 3: reinit all domain are candidate
// take the oldest
// if one domain has no msg, still is candidate
int iDom;
boolean continueLooping = true;
{
// Step 1 & 2
if (searchPhase == INIT_PHASE)
{
// Default is not to loop, with one exception
continueLooping = false;
// iDom == -1 means that there is no oldest change to process
if (iDom == -1)
{
// signals end of phase 1 to the caller
return null;
}
// Build the ECLUpdateMsg to be returned
oldestChange = new ECLUpdateMsg(
null, // cookie will be set later
0); // draftChangeNumber may be set later
if (draftCompat)
{
// either retrieve a draftCN from the draftCNDb
// or assign a new draftCN and store in the db
// We also need to check if the draftCNdb is consistent with
// the changelogdb.
// if not, 2 potential reasons
// a/ : changelog has been purged (trim)let's traverse the draftCNDb
// b/ : changelog is late .. let's traverse the changelogDb
// The following loop allows to loop until being on the same cn
// in the 2 dbs
// replogcn : the oldest change from the changelog db
while (true)
{
if (!isEndOfDraftCNReached)
{
// we did not reach yet the end of the DraftCNdb
// the next change from the DraftCN db
// are replogcn and DraftCNcn should be the same change ?
if (debugEnabled())
+ " comparing the 2 db DNs :"
{
// same domain and same CN => same change
// assign the DraftCN found to the change from the changelogdb
if (debugEnabled())
+ " to change=" + oldestChange);
break;
}
else
{
// replogcn and DraftCNcn are NOT on the same change
{
// the change from the DraftCNDb is older
// that means that the change has been purged from the
// changelogDb (and DraftCNdb not yet been trimmed)
try
{
// let's traverse the DraftCNdb searching for the change
// found in the changelogDb.
if (debugEnabled())
+ " will skip " + cnFromDraftCNDb
+ " and read next change from the DraftCNDb.");
if (debugEnabled())
+ " has skiped to "
+ " End of draftCNDb ?"+isEndOfDraftCNReached);
{
// we are at the end of the DraftCNdb in the append mode
// generate a new draftCN and assign to this change
// store in DraftCNdb the pair
// (draftCN_of_the_cur_change, state_before_this_change)
break;
}
}
catch(Exception e)
{
// TODO: At least log a warning
}
}
else
{
// the change from the changelogDb is older
// it should have been stored lately
// let's continue to traverse the changelogdb
if (debugEnabled())
+ " and read next from the regular changelog.");
continueLooping = true;
break; // TO BE CHECKED
}
}
}
else
{
// we are at the end of the DraftCNdb in the append mode
// store in DraftCNdb the pair
// (DraftCN of the current change, state before this change)
this.previousCookie.toString(),
break;
}
} // while DraftCN
} // if draftCompat
// here we have the right oldest change
// and in the draft case, we have its draft changenumber
// Set and test the domain of the oldestChange see if we reached
// the end of the phase for this domain
{
}
{
}
{
// populates the table with the next eligible msg from iDom
// in non blocking mode, return null when no more eligible msg
}
} // phase == INIT_PHASE
} // while (...)
if (searchPhase == PERSISTENT_PHASE)
{
if (debugEnabled())
clDomCtxtsToString("In getNextECLUpdate (persistent): " +
"looking for the generalized oldest change");
// get next msg
}
// take the oldest one
if (iDom != -1)
{
oldestChange = new ECLUpdateMsg(
null, // set later
suffix, 0);
if (draftCompat)
{
// should generate DraftCN
// store in DraftCNdb the pair
// (DraftCN of the current change, state before this change)
this.previousCookie.toString(),
}
}
}
}
catch(Exception e)
{
throw new DirectoryException(
e);
}
if (oldestChange != null)
{
if (debugEnabled())
// Update the current state
// Set the current value of global state in the returned message
if (debugEnabled())
}
return oldestChange;
}
/**
* Terminates the first (non persistent) phase of the search on the ECL.
*/
private void closeInitPhase()
{
// starvation of changelog messages
// all domain have been unactived means are covered
if (debugEnabled())
+ dumpState());
// go to persistent phase if one
{
// INIT_PHASE is done AND search is persistent => goto PERSISTENT_PHASE
{
}
}
else
{
// INIT_PHASE is done AND search is not persistent => re-init
}
if (draftCNDbIter!=null)
{
// End of INIT_PHASE => always release the iterator
}
}
/**
* Get the index in the domainCtxt table of the domain with the oldest change.
* @return the index of the domain with the oldest change, -1 when none.
*/
private int getOldestChangeFromDomainCtxts()
{
int oldest = -1;
{
if ((domainCtxts[i].active))
{
// on the first loop, oldest==-1
// .msg is null when the previous (non blocking) nextMessage did
// not have any eligible msg to return
{
if ((oldest==-1) ||
{
oldest = i;
}
}
}
}
if (debugEnabled())
+ "," + this + " getOldestChangeFromDomainCtxts() returns " +
return oldest;
}
/**
* Returns the client operation id.
* @return The client operation id.
*/
{
return operationId;
}
/**
* Getter for the persistent property of the current search.
* @return Whether the current search is persistent or not.
*/
public short isPersistent() {
return this.isPersistent;
}
/**
* Getter for the current search phase (INIT or PERSISTENT).
* @return Whether the current search is persistent or not.
*/
public int getSearchPhase() {
return this.searchPhase;
}
/**
* Refresh the eligibleCN by requesting the replication server.
*/
public void refreshEligibleCN()
{
}
}