ReplicationDomainTest.java revision 6c5c0af35aabc59a56c71e9c9296a7398a3e9176
/*
* CDDL HEADER START
*
* The contents of this file are subject to the terms of the
* Common Development and Distribution License, Version 1.0 only
* (the "License"). You may not use this file except in compliance
* with the License.
*
* You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
* or http://forgerock.org/license/CDDLv1.0.html.
* See the License for the specific language governing permissions
* and limitations under the License.
*
* When distributing Covered Code, include this CDDL HEADER in each
* file and include the License file at legal-notices/CDDLv1_0.txt.
* If applicable, add the following below this CDDL HEADER, with the
* fields enclosed by brackets "[]" replaced with your own identifying
* information:
* Portions Copyright [yyyy] [name of copyright owner]
*
* CDDL HEADER END
*
*
* Copyright 2008-2010 Sun Microsystems, Inc.
* Portions Copyright 2011-2015 ForgeRock AS
*/
package org.opends.server.replication.service;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.util.CollectionUtils.*;
import static org.testng.Assert.*;
import java.util.Map;
import java.util.SortedSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.forgerock.i18n.slf4j.LocalizedLogger;
import org.opends.server.TestCaseUtils;
import org.opends.server.backends.task.Task;
import org.opends.server.replication.ReplicationTestCase;
import org.opends.server.replication.common.DSInfo;
import org.opends.server.replication.common.RSInfo;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ReplServerFakeConfiguration;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.service.ReplicationDomain.ImportExportContext;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
/**
* Test the Generic Replication Service.
*/
@SuppressWarnings("javadoc")
public class ReplicationDomainTest extends ReplicationTestCase
{
private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
private static final Task NO_INIT_TASK = null;
@DataProvider(name = "publishAndReceiveData")
public Object[][] createpublishAndReceiveData()
{
return new Object[][] {
{1, 2, 3, 4},
{1, 2, 1, 2},
{1, 2, 45891, 45672},
{45610, 45720, 1, 2},
{45610, 45720, 45891, 45672}
};
}
/**
* Test that a ReplicationDomain is able to publish and receive UpdateMsg.
* Also test the ReplicationDomain.resetReplicationLog() method.
*/
@Test(dataProvider = "publishAndReceiveData", enabled=true)
public void publishAndReceive(
int replServerID1, int replServerID2,
int domain1ServerId, int domain2ServerId)
throws Exception
{
DN testService = DN.valueOf("o=test");
ReplicationServer replServer1 = null;
ReplicationServer replServer2 = null;
FakeReplicationDomain domain1 = null;
FakeReplicationDomain domain2 = null;
try
{
int[] ports = TestCaseUtils.findFreePorts(2);
int replServerPort1 = ports[0];
int replServerPort2 = ports[1];
replServer1 = createReplicationServer(replServerID1, replServerPort1,
"ReplicationDomainTestDb1", 100, "localhost:" + replServerPort2);
replServer2 = createReplicationServer(replServerID2, replServerPort2,
"ReplicationDomainTestDb2", 100, "localhost:" + replServerPort1);
SortedSet<String> servers = newTreeSet("localhost:" + replServerPort1);
BlockingQueue<UpdateMsg> rcvQueue1 = new LinkedBlockingQueue<>();
domain1 = new FakeReplicationDomain(
testService, domain1ServerId, servers, 100, 1000, rcvQueue1);
SortedSet<String> servers2 = newTreeSet("localhost:" + replServerPort2);
BlockingQueue<UpdateMsg> rcvQueue2 = new LinkedBlockingQueue<>();
domain2 = new FakeReplicationDomain(
testService, domain2ServerId, servers2, 100, 1000, rcvQueue2);
Thread.sleep(500);
/*
* Publish a message from domain1,
* Check that domain2 receives it shortly after.
*/
byte[] test = {1, 2, 3 ,4, 0, 1, 2, 3, 4, 5};
publish(domain1, test);
UpdateMsg rcvdMsg = rcvQueue2.poll(20, TimeUnit.SECONDS);
assertNotNull(rcvdMsg);
assertEquals(test, rcvdMsg.getPayload());
for (RSInfo replServerInfo : domain1.getRsInfos())
{
// The generation Id of the remote should be 1
assertEquals(replServerInfo.getGenerationId(), 1,
"Unexpected value of generationId in RSInfo for RS=" + replServerInfo);
}
for (DSInfo serverInfo : domain1.getReplicaInfos().values())
{
assertEquals(serverInfo.getStatus(), ServerStatus.NORMAL_STATUS);
}
domain1.setGenerationID(2);
domain1.resetReplicationLog();
Thread.sleep(500);
for (RSInfo replServerInfo : domain1.getRsInfos())
{
// The generation Id of the remote should now be 2
assertEquals(replServerInfo.getGenerationId(), 2,
"Unexpected value of generationId in RSInfo for RS=" + replServerInfo);
}
int sleepTime = 50;
while (true)
{
try
{
assertExpectedServerStatuses(domain1.getReplicaInfos(),
domain1ServerId, domain2ServerId);
assertExpectedServerStatuses(domain2.getReplicaInfos(),
domain1ServerId, domain2ServerId);
Map<Integer, ServerState> states1 = domain1.getReplicaStates();
ServerState state2 = states1.get(domain2ServerId);
assertNotNull(state2, "getReplicaStates is not showing DS2");
Map<Integer, ServerState> states2 = domain2.getReplicaStates();
ServerState state1 = states2.get(domain1ServerId);
assertNotNull(state1, "getReplicaStates is not showing DS1");
// if we reach this point all tests are OK
break;
}
catch (AssertionError e)
{
if (sleepTime >= 30000)
{
throw e;
}
Thread.sleep(sleepTime);
sleepTime *= 2;
}
}
}
finally
{
disable(domain1, domain2);
remove(replServer1, replServer2);
}
}
/**
* Publish information to the Replication Service (not assured mode).
*
* @param msg The byte array containing the information that should
* be sent to the remote entities.
*/
void publish(FakeReplicationDomain domain, byte[] msg)
{
UpdateMsg updateMsg;
synchronized (this)
{
updateMsg = new UpdateMsg(domain.getGenerator().newCSN(), msg);
// If assured replication is configured,
// this will prepare blocking mechanism.
// If assured replication is disabled, this returns immediately
domain.prepareWaitForAckIfAssuredEnabled(updateMsg);
domain.publish(updateMsg);
}
try
{
// If assured replication is enabled,
// this will wait for the matching ack or time out.
// If assured replication is disabled, this returns immediately
domain.waitForAckIfAssuredEnabled(updateMsg);
}
catch (TimeoutException ex)
{
// This exception may only be raised if assured replication is enabled
logger.info(NOTE_DS_ACK_TIMEOUT, domain.getBaseDN(), domain.getAssuredTimeout(), updateMsg);
}
}
private void assertExpectedServerStatuses(Map<Integer, DSInfo> dsInfos,
int domain1ServerId, int domain2ServerId)
{
for (DSInfo serverInfo : dsInfos.values())
{
if (serverInfo.getDsId() == domain2ServerId)
{
assertEquals(serverInfo.getStatus(), ServerStatus.BAD_GEN_ID_STATUS);
}
else
{
assertEquals(serverInfo.getDsId(), domain1ServerId);
assertEquals(serverInfo.getStatus(), ServerStatus.NORMAL_STATUS);
}
}
}
/**
* Publish performance test.
* The test loops calling the publish methods of the ReplicationDomain.
* It should not be enabled by default as it will use a lot of time.
* Its call is only to investigate performance issues with the replication.
*/
@Test(enabled=false)
public void publishPerf() throws Exception
{
DN testService = DN.valueOf("o=test");
ReplicationServer replServer1 = null;
int replServerID1 = 10;
FakeReplicationDomain domain1 = null;
int domain1ServerId = 1;
try
{
int replServerPort = TestCaseUtils.findFreePort();
replServer1 = createReplicationServer(replServerID1, replServerPort,
"ReplicationDomainTestDb", 100000, "localhost:" + replServerPort);
SortedSet<String> servers = newTreeSet("localhost:" + replServerPort);
BlockingQueue<UpdateMsg> rcvQueue1 = new LinkedBlockingQueue<>();
domain1 = new FakeReplicationDomain(
testService, domain1ServerId, servers, 1000, 100000, rcvQueue1);
/*
* Publish a message from domain1,
* Check that domain2 receives it shortly after.
*/
byte[] test = {1, 2, 3 ,4, 0, 1, 2, 3, 4, 5};
long timeNow = System.nanoTime();
timeNow = publishRepeatedly(domain1, test, timeNow);
timeNow = publishRepeatedly(domain1, test, timeNow);
timeNow = publishRepeatedly(domain1, test, timeNow);
timeNow = publishRepeatedly(domain1, test, timeNow);
}
finally
{
disable(domain1);
remove(replServer1);
}
}
private long publishRepeatedly(FakeReplicationDomain domain1, byte[] test, long timeNow)
{
long timeStart = timeNow;
for (int i = 0; i < 100000; i++)
{
publish(domain1, test);
}
timeNow = System.nanoTime();
System.out.println(timeNow - timeStart);
return timeNow;
}
private ReplicationServer createReplicationServer(int serverId,
int replicationPort, String dirName, int windowSize,
String... replServers) throws Exception
{
return createReplicationServer(serverId, replicationPort, dirName, windowSize, newTreeSet(replServers));
}
private ReplicationServer createReplicationServer(int serverId,
int replicationPort, String dirName, int windowSize,
SortedSet<String> replServers) throws Exception
{
return new ReplicationServer(
new ReplServerFakeConfiguration(replicationPort, dirName, 0, serverId, 0, windowSize, replServers));
}
private void disable(ReplicationDomain... domains)
{
for (ReplicationDomain domain : domains)
{
if (domain != null)
{
domain.disableService();
}
}
}
@DataProvider(name = "exportAndImportData")
public Object[][] createExportAndimportData()
{
return new Object[][] {
{1, 2},
{45610, 45720}
};
}
/**
* Test that a ReplicationDomain is able to export and import its database
* When there is only one replication server.
*/
@Test(dataProvider = "exportAndImportData", enabled=true)
public void exportAndImport(int serverId1, int serverId2) throws Exception
{
final int ENTRYCOUNT=5000;
DN testService = DN.valueOf("o=test");
ReplicationServer replServer = null;
int replServerID = 11;
FakeReplicationDomain domain1 = null;
FakeReplicationDomain domain2 = null;
try
{
int replServerPort = TestCaseUtils.findFreePort();
replServer = createReplicationServer(replServerID, replServerPort,
"exportAndImportData", 100);
SortedSet<String> servers = newTreeSet("localhost:" + replServerPort);
String exportedData = buildExportedData(ENTRYCOUNT);
domain1 = new FakeReplicationDomain(
testService, serverId1, servers, 0, exportedData, null, ENTRYCOUNT);
StringBuilder importedData = new StringBuilder();
domain2 = new FakeReplicationDomain(
testService, serverId2, servers, 0, null, importedData, 0);
/*
* Trigger a total update from domain1 to domain2.
* Check that the exported data is correctly received on domain2.
*/
assertTrue(initializeFromRemote(domain2));
waitEndExport(exportedData, importedData);
assertExportSucessful(domain1, domain2, exportedData, importedData);
}
finally
{
disable(domain1, domain2);
remove(replServer);
}
}
private boolean initializeFromRemote(ReplicationDomain domain) throws DirectoryException
{
for (DSInfo remoteDS : domain.getReplicaInfos().values())
{
if (remoteDS.getDsId() != domain.getServerId())
{
domain.initializeFromRemote(remoteDS.getDsId(), NO_INIT_TASK);
return true;
}
}
return false;
}
/**
* Test that a ReplicationDomain is able to export and import its database
* across 2 replication servers.
*/
@Test(enabled=true)
public void exportAndImportAcross2ReplServers() throws Exception
{
final int ENTRYCOUNT=5000;
DN testService = DN.valueOf("o=test");
ReplicationServer replServer2 = null;
ReplicationServer replServer1 = null;
int replServerID = 11;
int replServerID2 = 12;
FakeReplicationDomain domain1 = null;
FakeReplicationDomain domain2 = null;
try
{
int[] ports = TestCaseUtils.findFreePorts(2);
int replServerPort1 = ports[0];
int replServerPort2 = ports[1];
replServer1 = createReplicationServer(replServerID, replServerPort1,
"exportAndImportservice1", 100);
replServer2 = createReplicationServer(replServerID2, replServerPort2,
"exportAndImportservice2", 100, "localhost:" + replServerPort1);
SortedSet<String> servers1 = newTreeSet("localhost:" + replServerPort1);
SortedSet<String> servers2 = newTreeSet("localhost:" + replServerPort2);
String exportedData = buildExportedData(ENTRYCOUNT);
domain1 = new FakeReplicationDomain(
testService, 1, servers1, 0, exportedData, null, ENTRYCOUNT);
StringBuilder importedData = new StringBuilder();
domain2 = new FakeReplicationDomain(
testService, 2, servers2, 0, null, importedData, 0);
domain2.initializeFromRemote(1, NO_INIT_TASK);
waitEndExport(exportedData, importedData);
assertExportSucessful(domain1, domain2, exportedData, importedData);
}
finally
{
disable(domain1, domain2);
remove(replServer1, replServer2);
}
}
private String buildExportedData(final int ENTRYCOUNT)
{
final StringBuilder sb = new StringBuilder();
for (int i = 0; i < ENTRYCOUNT; i++)
{
sb.append("key : value").append(i).append("\n\n");
}
return sb.toString();
}
private void waitEndExport(String exportedData, StringBuilder importedData) throws Exception
{
int count = 0;
while (importedData.length() < exportedData.length() && count < 500)
{
count ++;
Thread.sleep(100);
}
}
private void assertExportSucessful(ReplicationDomain domain1,
ReplicationDomain domain2, String exportedData, StringBuilder importedData)
{
assertEquals(getLeftEntryCount(domain2), 0, "Wrong LeftEntryCount for export");
assertEquals(getLeftEntryCount(domain1), 0, "Wrong LeftEntryCount for import");
assertEquals(importedData.length(), exportedData.length());
assertEquals(importedData.toString(), exportedData);
}
private long getLeftEntryCount(ReplicationDomain domain)
{
final ImportExportContext ieContext = domain.getImportExportContext();
if (ieContext != null)
{
return ieContext.getLeftEntryCount();
}
return 0; // import/export is finished
}
/**
* Sender side of the Total Update Perf test.
* The goal of this test is to measure the performance
* of the total update code.
* It is not intended to be run as part of the daily unit test but
* should only be used manually by developer in need of testing the
* performance improvement or non-regression of the total update code.
* Use this test in combination with the receiverInitialize() :
* - enable the test
* - start the senderInitialize first using
* ./build.sh \
* -Dorg.opends.test.suppressOutput=false \
* -Dtest.methods=org.opends.server.replication.service.ReplicationDomainTest.senderInitialize test
* - start the receiverInitialize second.
* - you may want to change HOST1 and HOST2 to use 2 different hosts
* if you don't want to do a loopback test.
* - don't forget to disable again the tests after running them
*/
final String HOST1 = "localhost:";
final String HOST2 = "localhost:";
final int SENDERPORT = 10102;
final int RECEIVERPORT = 10101;
@Test(enabled=false)
public void senderInitialize() throws Exception
{
DN testService = DN.valueOf("o=test");
ReplicationServer replServer = null;
int replServerID = 12;
FakeStressReplicationDomain domain1 = null;
try
{
SortedSet<String> servers =
newTreeSet(HOST1 + SENDERPORT, HOST2 + RECEIVERPORT);
replServer = createReplicationServer(replServerID, SENDERPORT,
"ReplicationDomainTestDb", 100, servers);
BlockingQueue<UpdateMsg> rcvQueue1 = new LinkedBlockingQueue<>();
domain1 = new FakeStressReplicationDomain(
testService, 2, servers, 1000, rcvQueue1);
System.out.println("waiting");
Thread.sleep(1000000000);
}
finally
{
disable(domain1);
remove(replServer);
}
}
/**
* See comments in senderInitialize() above.
*/
@Test(enabled=false)
public void receiverInitialize() throws Exception
{
DN testService = DN.valueOf("o=test");
ReplicationServer replServer = null;
int replServerID = 11;
FakeStressReplicationDomain domain1 = null;
try
{
SortedSet<String> servers =
newTreeSet(HOST1 + SENDERPORT, HOST2 + RECEIVERPORT);
replServer = createReplicationServer(replServerID, RECEIVERPORT,
"ReplicationDomainTestDb", 100, servers);
BlockingQueue<UpdateMsg> rcvQueue1 = new LinkedBlockingQueue<>();
domain1 = new FakeStressReplicationDomain(
testService, 1, servers, 100000, rcvQueue1);
/*
* Trigger a total update from domain1 to domain2.
* Check that the exported data is correctly received on domain2.
*/
while (!initializeFromRemote(domain1))
{
System.out.println("trying...");
Thread.sleep(1000);
}
System.out.println("waiting");
Thread.sleep(10000000);
}
finally
{
disable(domain1);
remove(replServer);
}
}
}