3853N/A/*
3853N/A * CDDL HEADER START
3853N/A *
3853N/A * The contents of this file are subject to the terms of the
3853N/A * Common Development and Distribution License, Version 1.0 only
3853N/A * (the "License"). You may not use this file except in compliance
3853N/A * with the License.
3853N/A *
3853N/A * You can obtain a copy of the license at
3853N/A * trunk/opends/resource/legal-notices/OpenDS.LICENSE
3853N/A * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
3853N/A * See the License for the specific language governing permissions
3853N/A * and limitations under the License.
3853N/A *
3853N/A * When distributing Covered Code, include this CDDL HEADER in each
3853N/A * file and include the License file at
3853N/A * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
3853N/A * add the following below this CDDL HEADER, with the fields enclosed
3853N/A * by brackets "[]" replaced with your own identifying information:
3853N/A * Portions Copyright [yyyy] [name of copyright owner]
3853N/A *
3853N/A * CDDL HEADER END
3853N/A *
3853N/A *
5086N/A * Copyright 2009-2010 Sun Microsystems, Inc.
5414N/A * Portions Copyright 2011 ForgeRock AS
3853N/A */
3853N/Apackage org.opends.server.replication.plugin;
3853N/A
5414N/Aimport org.opends.server.util.StaticUtils;
5414N/Aimport java.io.File;
3853N/Aimport java.io.IOException;
3853N/Aimport java.net.ServerSocket;
3853N/Aimport java.net.SocketException;
3853N/Aimport java.net.SocketTimeoutException;
3853N/Aimport java.util.ArrayList;
3853N/Aimport java.util.List;
3853N/Aimport java.util.SortedSet;
3853N/Aimport java.util.TreeSet;
3853N/Aimport java.util.concurrent.atomic.AtomicBoolean;
3988N/A
3853N/Aimport static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
3853N/Aimport static org.opends.server.loggers.ErrorLogger.logError;
3853N/Aimport static org.opends.server.loggers.debug.DebugLogger.getTracer;
3853N/Aimport org.opends.server.types.DirectoryException;
3853N/Aimport static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
3853N/A
3853N/Aimport org.opends.messages.Category;
3853N/Aimport org.opends.messages.Message;
3853N/Aimport org.opends.messages.Severity;
3853N/Aimport org.opends.server.TestCaseUtils;
4169N/Aimport org.opends.server.admin.server.ConfigurationChangeListener;
4169N/Aimport org.opends.server.admin.std.server.SynchronizationProviderCfg;
4169N/Aimport org.opends.server.api.SynchronizationProvider;
4169N/Aimport org.opends.server.core.DirectoryServer;
3853N/Aimport org.opends.server.loggers.debug.DebugTracer;
3853N/Aimport org.opends.server.replication.ReplicationTestCase;
3988N/Aimport org.opends.server.replication.service.ReplicationBroker;
3853N/Aimport org.opends.server.replication.common.ChangeNumberGenerator;
3853N/Aimport org.opends.server.replication.common.DSInfo;
3853N/Aimport org.opends.server.replication.common.ServerState;
3853N/Aimport org.opends.server.replication.common.ServerStatus;
3853N/Aimport org.opends.server.replication.protocol.AddMsg;
3853N/Aimport org.opends.server.replication.protocol.DoneMsg;
3853N/Aimport org.opends.server.replication.protocol.EntryMsg;
3853N/Aimport org.opends.server.replication.protocol.InitializeTargetMsg;
3853N/Aimport org.opends.server.replication.protocol.ReplSessionSecurity;
3853N/Aimport org.opends.server.replication.protocol.ReplicationMsg;
3853N/Aimport org.opends.server.replication.protocol.ResetGenerationIdMsg;
3853N/Aimport org.opends.server.replication.protocol.RoutableMsg;
3853N/Aimport org.opends.server.replication.server.ReplServerFakeConfiguration;
3853N/Aimport org.opends.server.replication.server.ReplicationServer;
3853N/Aimport org.opends.server.types.Attribute;
3853N/Aimport org.opends.server.types.DN;
3853N/Aimport org.opends.server.types.Entry;
3853N/Aimport org.testng.annotations.AfterClass;
3853N/Aimport org.testng.annotations.BeforeClass;
3853N/Aimport org.testng.annotations.DataProvider;
3853N/Aimport org.testng.annotations.Test;
3853N/Aimport static org.testng.Assert.*;
3853N/A
3853N/A/**
3853N/A * Some tests to go through the DS state machine and validate we get the
3853N/A * expected status according to the actions we perform.
3853N/A */
3853N/Apublic class StateMachineTest extends ReplicationTestCase
3853N/A{
3853N/A
3853N/A private static final String EXAMPLE_DN = "dc=example,dc=com"; // Server id definitions
3853N/A
4802N/A private static final int DS1_ID = 1;
4802N/A private static final int DS2_ID = 2;
4802N/A private static final int DS3_ID = 3;
4802N/A private static final int RS1_ID = 41;
3853N/A private int rs1Port = -1;
3988N/A private LDAPReplicationDomain ds1 = null;
3853N/A private ReplicationBroker ds2 = null;
3853N/A private ReplicationBroker ds3 = null;
3853N/A private ReplicationServer rs1 = null;
3853N/A // The tracer object for the debug logger
3853N/A private static final DebugTracer TRACER = getTracer();
5086N/A private int initWindow = 100;
3853N/A
3853N/A private void debugInfo(String s)
3853N/A {
3853N/A logError(Message.raw(Category.SYNC, Severity.NOTICE, s));
3853N/A if (debugEnabled())
3853N/A {
3853N/A TRACER.debugInfo("** TEST **" + s);
3853N/A }
3853N/A }
3853N/A
3853N/A private void debugInfo(String message, Exception e)
3853N/A {
3853N/A debugInfo(message + stackTraceToSingleLineString(e));
3853N/A }
3853N/A
3853N/A private void initTest()
3853N/A {
3853N/A rs1Port = -1;
3853N/A ds1 = null;
3853N/A ds2 = null;
3853N/A ds3 = null;
3853N/A findFreePorts();
3853N/A }
3853N/A
3853N/A private void endTest()
3853N/A {
3853N/A if (ds1 != null)
3853N/A {
3853N/A ds1.shutdown();
3853N/A ds1 = null;
3853N/A }
3853N/A
3853N/A try
3853N/A {
3853N/A // Clear any reference to a domain in synchro plugin
3853N/A MultimasterReplication.deleteDomain(DN.decode(EXAMPLE_DN));
3853N/A } catch (DirectoryException ex)
3853N/A {
3853N/A fail("Error deleting reference to domain: " + EXAMPLE_DN);
3853N/A }
3853N/A
3853N/A if (ds2 != null)
3853N/A {
3853N/A ds2.stop();
3853N/A ds2 = null;
3853N/A }
3853N/A
3853N/A if (ds3 != null)
3853N/A {
3853N/A ds3.stop();
3853N/A ds3 = null;
3853N/A }
3853N/A
3853N/A if (rs1 != null)
3853N/A {
3853N/A rs1.clearDb();
3853N/A rs1.remove();
5414N/A StaticUtils.recursiveDelete(new File(DirectoryServer.getInstanceRoot(),
5414N/A rs1.getDbDirName()));
3853N/A rs1 = null;
3853N/A }
3853N/A
3853N/A rs1Port = -1;
3853N/A }
3853N/A
3853N/A private void sleep(long time)
3853N/A {
3853N/A try
3853N/A {
3853N/A Thread.sleep(time);
3853N/A } catch (InterruptedException ex)
3853N/A {
3853N/A fail("Error sleeping " + stackTraceToSingleLineString(ex));
3853N/A }
3853N/A }
3853N/A
3853N/A /**
3853N/A * Check connection of the provided ds to the
3853N/A * replication server. Waits for connection to be ok up to secTimeout seconds
3853N/A * before failing.
3853N/A */
4802N/A private void checkConnection(int secTimeout, int dsId)
3853N/A {
3853N/A
3853N/A ReplicationBroker rb = null;
3988N/A LDAPReplicationDomain rd = null;
3853N/A switch (dsId)
3853N/A {
3853N/A case DS1_ID:
3853N/A rd = ds1;
3853N/A break;
3853N/A case DS2_ID:
3853N/A rb = ds2;
3853N/A break;
3853N/A case DS3_ID:
3853N/A rb = ds3;
3853N/A break;
3853N/A default:
3853N/A fail("Unknown ds server id.");
3853N/A }
3853N/A
3853N/A int nSec = 0;
3853N/A
3853N/A // Go out of the loop only if connection is verified or if timeout occurs
3853N/A while (true)
3853N/A {
3853N/A // Test connection
3853N/A boolean connected = false;
3853N/A if (rd != null)
3853N/A connected = rd.isConnected();
3853N/A else
3853N/A connected = rb.isConnected();
3853N/A
3853N/A if (connected)
3853N/A {
3853N/A // Connection verified
3853N/A debugInfo("checkConnection: connection of DS " + dsId +
3853N/A " to RS obtained after " + nSec + " seconds.");
3853N/A return;
3853N/A }
3853N/A
3853N/A // Sleep 1 second
3853N/A try
3853N/A {
3988N/A Thread.sleep(100);
3853N/A } catch (InterruptedException ex)
3853N/A {
3853N/A fail("Error sleeping " + stackTraceToSingleLineString(ex));
3853N/A }
3853N/A nSec++;
3853N/A
3988N/A if (nSec > secTimeout*10)
3853N/A {
3853N/A // Timeout reached, end with error
3853N/A fail("checkConnection: DS " + dsId + " is not connected to the RS after "
3853N/A + secTimeout + " seconds.");
3853N/A }
3853N/A }
3853N/A }
3853N/A
3853N/A /**
3853N/A * Find needed free TCP ports.
3853N/A */
3853N/A private void findFreePorts()
3853N/A {
3853N/A try
3853N/A {
3853N/A ServerSocket socket1 = TestCaseUtils.bindFreePort();
3853N/A rs1Port = socket1.getLocalPort();
3853N/A socket1.close();
3853N/A } catch (IOException e)
3853N/A {
3853N/A fail("Unable to determinate some free ports " +
3853N/A stackTraceToSingleLineString(e));
3853N/A }
3853N/A }
3853N/A
3853N/A /**
3853N/A * Creates a new ReplicationServer.
3853N/A */
3853N/A private ReplicationServer createReplicationServer(String testCase,
3853N/A int degradedStatusThreshold)
3853N/A {
3853N/A try
3853N/A {
3853N/A SortedSet<String> replServers = new TreeSet<String>();
3853N/A
3853N/A String dir = "stateMachineTest" + RS1_ID + testCase + "Db";
3853N/A ReplServerFakeConfiguration conf =
3853N/A new ReplServerFakeConfiguration(rs1Port, dir, 0, RS1_ID, 0, 100,
3853N/A replServers, 1, 1000, degradedStatusThreshold);
3853N/A ReplicationServer replicationServer = new ReplicationServer(conf);
3853N/A return replicationServer;
3853N/A
3853N/A } catch (Exception e)
3853N/A {
3853N/A fail("createReplicationServer " + stackTraceToSingleLineString(e));
3853N/A }
3853N/A return null;
3853N/A }
3853N/A
3853N/A /**
3853N/A * Creates and starts a new ReplicationDomain configured for the replication
3853N/A * server
3853N/A */
4169N/A @SuppressWarnings("unchecked")
4802N/A private LDAPReplicationDomain createReplicationDomain(int dsId)
3853N/A {
3853N/A try
3853N/A {
3853N/A SortedSet<String> replServers = new TreeSet<String>();
3853N/A replServers.add("localhost:" + rs1Port);
3853N/A
3853N/A DN baseDn = DN.decode(EXAMPLE_DN);
3853N/A DomainFakeCfg domainConf =
3853N/A new DomainFakeCfg(baseDn, dsId, replServers);
3988N/A LDAPReplicationDomain replicationDomain =
3853N/A MultimasterReplication.createNewDomain(domainConf);
3853N/A replicationDomain.start();
4169N/A SynchronizationProvider<SynchronizationProviderCfg> provider =
4169N/A DirectoryServer.getSynchronizationProviders().get(0);
4169N/A if (provider instanceof ConfigurationChangeListener)
4169N/A {
4169N/A ConfigurationChangeListener<MultimasterReplicationFakeConf> mmr =
4169N/A (ConfigurationChangeListener<MultimasterReplicationFakeConf>) provider;
4169N/A mmr.applyConfigurationChange(new MultimasterReplicationFakeConf());
4169N/A }
3853N/A
3853N/A return replicationDomain;
3853N/A
3853N/A } catch (Exception e)
3853N/A {
3853N/A fail("createReplicationDomain " + stackTraceToSingleLineString(e));
3853N/A }
3853N/A return null;
3853N/A }
3853N/A
3853N/A /**
3853N/A * Create and connect a replication broker to the replication server with
3853N/A * the given state and generation id (uses passed window for received changes)
3853N/A */
4802N/A private ReplicationBroker createReplicationBroker(int dsId,
3853N/A ServerState state, long generationId, int window)
3853N/A throws Exception, SocketException
3853N/A {
3853N/A ReplicationBroker broker = new ReplicationBroker(null,
3988N/A state, EXAMPLE_DN, dsId, 100, generationId, 0,
4641N/A new ReplSessionSecurity(null, null, null, true), (byte) 1, 500);
3853N/A ArrayList<String> servers = new ArrayList<String>(1);
3853N/A servers.add("localhost:" + rs1Port);
3853N/A broker.start(servers);
4870N/A checkConnection(30, broker, rs1Port);
3853N/A
3853N/A return broker;
3853N/A }
3853N/A
3853N/A /**
3853N/A * Create and connect a replication broker to the replication server with
3853N/A * the given state and generation id (uses 100 as window for received changes)
3853N/A */
4802N/A private ReplicationBroker createReplicationBroker(int dsId,
3853N/A ServerState state, long generationId)
3853N/A throws Exception, SocketException
3853N/A {
3853N/A return createReplicationBroker(dsId, state, generationId, 100);
3853N/A }
3853N/A
3853N/A /**
3853N/A * Make simple state machine test.
3853N/A *
3853N/A * NC = Not connected status
3853N/A * N = Normal status
3853N/A * D = Degraded status
3853N/A * FU = Full update status
3853N/A * BG = Bad generation id status
3853N/A *
3853N/A * The test path should be:
3853N/A * ->NC->N->NC
3853N/A * @throws Exception If a problem occurred
3853N/A */
3853N/A @Test(enabled=true)
3853N/A public void testStateMachineBasic() throws Exception
3853N/A {
3853N/A String testCase = "testStateMachineBasic";
3853N/A
3853N/A debugInfo("Starting " + testCase);
3853N/A
3853N/A initTest();
3853N/A
3853N/A try
3853N/A {
3853N/A
3853N/A /**
3853N/A * DS1 start, no RS available: DS1 should be in not connected status
3853N/A */
3853N/A ds1 = createReplicationDomain(DS1_ID);
3853N/A sleepAssertStatusEquals(30, ds1, ServerStatus.NOT_CONNECTED_STATUS);
3853N/A
3853N/A /**
3853N/A * RS1 starts , DS1 should connect to it and be in normal status
3853N/A */
3853N/A rs1 = createReplicationServer(testCase, 5000);
3853N/A sleepAssertStatusEquals(30, ds1, ServerStatus.NORMAL_STATUS);
3853N/A
3853N/A /**
3853N/A * RS1 stops, DS1 should go in not connected status
3853N/A */
3853N/A rs1.remove();
3853N/A sleepAssertStatusEquals(30, ds1, ServerStatus.NOT_CONNECTED_STATUS);
3853N/A
3853N/A } finally
3853N/A {
3853N/A endTest();
3853N/A }
3853N/A }
3853N/A
3853N/A // Returns various init values for test testStateMachineStatusAnalyzer
3853N/A @DataProvider(name="stateMachineStatusAnalyzerTestProvider")
3853N/A public Object [][] stateMachineStatusAnalyzerTestProvider() throws Exception
3853N/A {
3853N/A return new Object [][] { {1} , {10}, {50}, {120} };
3853N/A }
3853N/A
3853N/A /**
3853N/A * Test the status analyzer system that allows to go from normal to degraded
3853N/A * and vice versa, using the configured threshold value
3853N/A *
3853N/A * NC = Not connected status
3853N/A * N = Normal status
3853N/A * D = Degraded status
3853N/A * FU = Full update status
3853N/A * BG = Bad generation id status
3853N/A *
3853N/A * Expected path:
3853N/A * ->NC->N->D->N->NC
3853N/A * @throws Exception If a problem occurred
3853N/A */
3853N/A @Test(enabled=true, groups="slow", dataProvider="stateMachineStatusAnalyzerTestProvider")
3853N/A public void testStateMachineStatusAnalyzer(int thresholdValue) throws Throwable
3853N/A {
3853N/A String testCase = "testStateMachineStatusAnalyzer with threhold " + thresholdValue;
3853N/A
3853N/A debugInfo("Starting " + testCase + " with " + thresholdValue);
3853N/A
3853N/A initTest();
3853N/A
3853N/A BrokerReader br3 = null;
3853N/A BrokerReader br2 = null;
3853N/A BrokerWriter bw = null;
3853N/A
3853N/A try
3853N/A {
3853N/A /**
3853N/A * RS1 starts with specified threshold value
3853N/A */
3853N/A rs1 = createReplicationServer(testCase, thresholdValue);
3853N/A
3853N/A /**
3853N/A * DS2 starts and connects to RS1. No reader and low window value at the
3853N/A * beginning so writer for DS2 in RS should enqueue changes after first
3853N/A * changes sent to DS. (window value reached: a window msg needed by RS for
3853N/A * following sending changes to DS)
3853N/A */
3853N/A ds2 = createReplicationBroker(DS2_ID, new ServerState(), EMPTY_DN_GENID, 10);
3853N/A checkConnection(30, DS2_ID);
3853N/A
3853N/A /**
3853N/A * DS3 starts and connects to RS1
3853N/A */
3853N/A ds3 = createReplicationBroker(DS3_ID, new ServerState(), EMPTY_DN_GENID);
3853N/A br3 = new BrokerReader(ds3, DS3_ID);
3853N/A checkConnection(30, DS3_ID);
3853N/A
3853N/A // Send first changes to reach window and block DS2 writer queue. Writer will take them
3853N/A // from queue and block (no more changes removed from writer queue) after
3853N/A // having sent them to TCP receive queue of DS2.
3853N/A bw = new BrokerWriter(ds3, DS3_ID, false);
3853N/A bw.followAndPause(11);
3988N/A // sleep(1000);
3853N/A
3853N/A /**
3853N/A * DS3 sends changes (less than threshold): DS2 should still be in normal
3853N/A * status so no topo message should be sent (update topo message
3853N/A * for telling status of DS2 changed)
3853N/A */
3853N/A int nChangesSent = 0;
3853N/A if (thresholdValue > 1)
3853N/A {
3853N/A nChangesSent = thresholdValue - 1;
3853N/A bw.followAndPause(nChangesSent);
3988N/A sleep(1000); // Be sure status analyzer has time to test
3853N/A ReplicationMsg msg = br3.getLastMsg();
3853N/A debugInfo(testCase + " Step 1: last message from writer: " + msg);
3988N/A assertTrue(msg == null, (msg != null) ? msg.toString() : "null" );
3853N/A }
3853N/A
3853N/A /**
3853N/A * DS3 sends changes to reach the threshold value, DS3 should receive an
3853N/A * update topo message with status of DS2: degraded status
3853N/A */
3853N/A bw.followAndPause(thresholdValue - nChangesSent);
3988N/A // wait for a status MSG status analyzer to broker 3
3988N/A ReplicationMsg lastMsg = null;
3988N/A for (int count = 0; count< 50; count++)
3988N/A {
3988N/A List<DSInfo> dsList = ds3.getDsList();
3988N/A DSInfo ds3Info = null;
3988N/A if (dsList.size() > 0)
3988N/A {
3988N/A ds3Info = dsList.get(0);
3988N/A }
3988N/A if ((ds3Info != null) && (ds3Info.getDsId() == DS2_ID) &&
3988N/A (ds3Info.getStatus()== ServerStatus.DEGRADED_STATUS) )
3988N/A {
3988N/A break;
3988N/A }
3988N/A else
3988N/A {
3988N/A if (count < 50)
3988N/A sleep(200); // Be sure status analyzer has time to test
3988N/A else
3988N/A fail("DS2 did not get degraded : " + ds3Info);
3988N/A }
3988N/A }
3853N/A
3853N/A /**
3853N/A * DS3 sends 10 additional changes after threshold value, DS2 should still be
3853N/A * degraded so no topo message received.
3853N/A */
3853N/A bw.followAndPause(10);
3853N/A bw.shutdown();
3988N/A sleep(1000); // Be sure status analyzer has time to test
3853N/A lastMsg = br3.getLastMsg();
3853N/A ReplicationMsg msg = br3.getLastMsg();
3853N/A debugInfo(testCase + " Step 3: last message from writer: " + msg);
3853N/A assertTrue(lastMsg == null);
3853N/A
3853N/A /**
3853N/A * DS2 replays every changes and should go back to normal status
3853N/A * (create a reader to emulate replay of messages (messages read from queue))
3853N/A */
3853N/A br2 = new BrokerReader(ds2, DS2_ID);
3988N/A // wait for a status MSG status analyzer to broker 3
3988N/A for (int count = 0; count< 50; count++)
3988N/A {
3988N/A List<DSInfo> dsList = ds3.getDsList();
3988N/A DSInfo ds3Info = null;
3988N/A if (dsList.size() > 0)
3988N/A {
3988N/A ds3Info = dsList.get(0);
3988N/A }
3988N/A if ((ds3Info != null) && (ds3Info.getDsId() == DS2_ID) &&
3988N/A (ds3Info.getStatus()== ServerStatus.DEGRADED_STATUS) )
3988N/A {
3988N/A break;
3988N/A }
3988N/A else
3988N/A {
3988N/A if (count < 50)
3988N/A sleep(200); // Be sure status analyzer has time to test
3988N/A else
3988N/A fail("DS2 did not get degraded.");
3988N/A }
3988N/A }
3853N/A
3853N/A } finally
3853N/A {
3853N/A endTest();
3853N/A if (bw != null) bw.shutdown();
3853N/A if (br3 != null) br3.shutdown();
3853N/A if (br2 != null) br2.shutdown();
3853N/A }
3853N/A }
3853N/A
3853N/A /**
3853N/A * Go through the possible state machine transitions:
3853N/A *
3853N/A * NC = Not connected status
3853N/A * N = Normal status
3853N/A * D = Degraded status
3853N/A * FU = Full update status
3853N/A * BG = Bad generation id status
3853N/A *
3853N/A * The test path should be:
3853N/A * ->NC->D->N->NC->N->D->NC->D->N->BG->NC->N->D->BG->FU->NC->N->D->FU->NC->BG->NC->N->FU->NC->N->NC
3853N/A * @throws Exception If a problem occurred
3853N/A */
4169N/A @Test(enabled = false, groups = "slow")
3853N/A public void testStateMachineFull() throws Exception
3853N/A {
3853N/A String testCase = "testStateMachineFull";
3853N/A
3853N/A debugInfo("Starting " + testCase);
3853N/A
3853N/A initTest();
3853N/A BrokerReader br = null;
3853N/A BrokerWriter bw = null;
3853N/A
3853N/A try
3853N/A {
3853N/A
3853N/A int DEGRADED_STATUS_THRESHOLD = 1;
3853N/A
3853N/A /**
3853N/A * RS1 starts with 1 message as degraded status threshold value
3853N/A */
3853N/A rs1 = createReplicationServer(testCase, DEGRADED_STATUS_THRESHOLD);
3853N/A
3853N/A /**
3853N/A * DS2 starts and connects to RS1
3853N/A */
3853N/A ds2 = createReplicationBroker(DS2_ID, new ServerState(), EMPTY_DN_GENID);
3853N/A br = new BrokerReader(ds2, DS2_ID);
3853N/A checkConnection(30, DS2_ID);
3853N/A
3853N/A /**
3853N/A * DS2 starts sending a lot of changes
3853N/A */
3853N/A bw = new BrokerWriter(ds2, DS2_ID, false);
3853N/A bw.follow();
3853N/A sleep(1000); // Let some messages being queued in RS
3853N/A
3853N/A /**
3853N/A * DS1 starts and connects to RS1, server state exchange should lead to
3853N/A * start in degraded status as some changes should be in queued in the RS
3853N/A * and the threshold value is 1 change in queue.
3853N/A */
3853N/A ds1 = createReplicationDomain(DS1_ID);
3853N/A checkConnection(30, DS1_ID);
3853N/A sleepAssertStatusEquals(30, ds1, ServerStatus.DEGRADED_STATUS);
3853N/A
3853N/A /**
3853N/A * DS2 stops sending changes: DS1 should replay pending changes and should
3853N/A * enter the normal status
3853N/A */
3853N/A bw.pause();
3853N/A // Sleep enough so that replay can be done and analyzer has time
3853N/A // to see that the queue length is now under the threshold value.
3853N/A sleepAssertStatusEquals(30, ds1, ServerStatus.NORMAL_STATUS);
3853N/A
3853N/A /**
3853N/A * RS1 stops to make DS1 go to not connected status (from normal status)
3853N/A */
3853N/A rs1.remove();
3853N/A sleepAssertStatusEquals(30, ds1, ServerStatus.NOT_CONNECTED_STATUS);
3853N/A
3853N/A /**
3853N/A * DS2 restarts with up to date server state (this allows to have
3853N/A * restarting RS1 not sending him some updates he already sent)
3853N/A */
3853N/A ds2.stop();
3853N/A bw.shutdown();
3853N/A br.shutdown();
3853N/A ServerState curState = ds1.getServerState();
3853N/A ds2 = createReplicationBroker(DS2_ID, curState, EMPTY_DN_GENID);
3853N/A br = new BrokerReader(ds2, DS2_ID);
3853N/A
3853N/A /**
3853N/A * RS1 restarts, DS1 should get back to normal status
3853N/A */
3853N/A rs1 = createReplicationServer(testCase, DEGRADED_STATUS_THRESHOLD);
3853N/A checkConnection(30, DS2_ID);
3853N/A sleepAssertStatusEquals(30, ds1, ServerStatus.NORMAL_STATUS);
3853N/A
3853N/A /**
3853N/A * DS2 sends again a lot of changes to make DS1 degraded again
3853N/A */
3853N/A bw = new BrokerWriter(ds2, DS2_ID, false);
3853N/A bw.follow();
3853N/A sleep(8000); // Let some messages being queued in RS, and analyzer see the change
3853N/A sleepAssertStatusEquals(30, ds1, ServerStatus.DEGRADED_STATUS);
3853N/A
3853N/A /**
3853N/A * RS1 stops to make DS1 go to not connected status (from degraded status)
3853N/A */
3853N/A rs1.remove();
3853N/A bw.pause();
3853N/A sleepAssertStatusEquals(30, ds1, ServerStatus.NOT_CONNECTED_STATUS);
3988N/A
3988N/A
3853N/A /**
3853N/A * DS2 restarts with up to date server state (this allows to have
3853N/A * restarting RS1 not sending him some updates he already sent)
3853N/A */
3853N/A ds2.stop();
3853N/A bw.shutdown();
3853N/A br.shutdown();
3853N/A curState = ds1.getServerState();
3853N/A ds2 = createReplicationBroker(DS2_ID, curState, EMPTY_DN_GENID);
3853N/A br = new BrokerReader(ds2, DS2_ID);
3853N/A
3853N/A /**
3853N/A * RS1 restarts, DS1 should reconnect in degraded status (from not connected
3853N/A * this time, not from state machine entry)
3853N/A */
3853N/A rs1 = createReplicationServer(testCase, DEGRADED_STATUS_THRESHOLD);
3853N/A // It is too difficult to tune the right sleep so disabling this test:
3853N/A // Sometimes the status analyzer may be fast and quickly change the status
3853N/A // of DS1 to NORMAL_STATUS
3853N/A //sleep(2000);
3853N/A //sleepAssertStatusEquals(30, ds1, ServerStatus.DEGRADED_STATUS);
3853N/A checkConnection(30, DS2_ID);
3853N/A
3853N/A /**
3853N/A * DS1 should come back in normal status after a while
3853N/A */
3853N/A sleepAssertStatusEquals(30, ds1, ServerStatus.NORMAL_STATUS);
3853N/A
3853N/A /**
3853N/A * DS2 sends a reset gen id order with wrong gen id: DS1 should go into bad generation id status
3853N/A */
3853N/A long BAD_GEN_ID = 999999L;
3853N/A resetGenId(ds2, BAD_GEN_ID); // ds2 will also go bad gen
3853N/A sleepAssertStatusEquals(30, ds1, ServerStatus.BAD_GEN_ID_STATUS);
3853N/A
3853N/A /**
3853N/A * DS2 sends again a reset gen id order with right id: DS1 should be disconnected
3853N/A * by RS then reconnect and enter again in normal status. This goes through
3853N/A * not connected status but not possible to check as should reconnect immediately
3853N/A */
3853N/A resetGenId(ds2, EMPTY_DN_GENID); // ds2 will also be disconnected
3853N/A ds2.stop();
3853N/A br.shutdown(); // Reader could reconnect broker, but gen id would be bad: need to recreate a broker to send changex
3853N/A sleepAssertStatusEquals(30, ds1, ServerStatus.NORMAL_STATUS);
3853N/A
3853N/A /**
3853N/A * DS2 sends again a lot of changes to make DS1 degraded again
3853N/A */
3853N/A curState = ds1.getServerState();
3853N/A ds2 = createReplicationBroker(DS2_ID, curState, EMPTY_DN_GENID);
3853N/A checkConnection(30, DS2_ID);
3853N/A bw = new BrokerWriter(ds2, DS2_ID, false);
3853N/A br = new BrokerReader(ds2, DS2_ID);
3853N/A bw.follow();
3853N/A sleep(8000); // Let some messages being queued in RS, and analyzer see the change
3853N/A sleepAssertStatusEquals(30, ds1, ServerStatus.DEGRADED_STATUS);
3853N/A
3853N/A /**
3853N/A * DS2 sends reset gen id order with bad gen id: DS1 should go in bad gen id
3853N/A * status (from degraded status this time)
3853N/A */
3988N/A resetGenId(ds2, -1); // -1 to allow next step full update and flush RS db so that DS1 can reconnect after full update
3853N/A sleepAssertStatusEquals(30, ds1, ServerStatus.BAD_GEN_ID_STATUS);
3853N/A bw.pause();
3853N/A
3853N/A /**
3853N/A * DS2 engages full update (while DS1 in bad gen id status), DS1 should go
3853N/A * in full update status
3853N/A */
3853N/A BrokerInitializer bi = new BrokerInitializer(ds2, DS2_ID, false);
3853N/A bi.initFullUpdate(DS1_ID, 200);
3853N/A sleepAssertStatusEquals(30, ds1, ServerStatus.FULL_UPDATE_STATUS);
3853N/A
3853N/A /**
3853N/A * DS2 terminates full update to DS1: DS1 should reconnect (goes through not connected status)
3853N/A * and come back to normal status (RS genid was -1 so RS will adopt ne genb id)
3853N/A */
3853N/A bi.runFullUpdate();
3853N/A sleepAssertStatusEquals(30, ds1, ServerStatus.NORMAL_STATUS);
3853N/A
3853N/A /**
3853N/A * DS2 sends changes to DS1: DS1 should go in degraded status
3853N/A */
3853N/A ds2.stop(); // will need a new broker with another gen id restart it
3853N/A bw.shutdown();
3853N/A br.shutdown();
3988N/A long newGen = ds1.getGenerationID();
3853N/A curState = ds1.getServerState();
3853N/A ds2 = createReplicationBroker(DS2_ID, curState, newGen);
3853N/A checkConnection(30, DS2_ID);
3853N/A bw = new BrokerWriter(ds2, DS2_ID, false);
3853N/A br = new BrokerReader(ds2, DS2_ID);
3853N/A bw.follow();
3853N/A sleep(8000); // Let some messages being queued in RS, and analyzer see the change
3853N/A sleepAssertStatusEquals(30, ds1, ServerStatus.DEGRADED_STATUS);
3853N/A
3853N/A /**
3853N/A * DS2 engages full update (while DS1 in degraded status), DS1 should go
3853N/A * in full update status
3853N/A */
3853N/A bi = new BrokerInitializer(ds2, DS2_ID, false);
3853N/A bi.initFullUpdate(DS1_ID, 300);
3853N/A sleepAssertStatusEquals(30, ds1, ServerStatus.FULL_UPDATE_STATUS);
3853N/A bw.pause();
3853N/A
3853N/A /**
3853N/A * DS2 terminates full update to DS1: DS1 should reconnect (goes through not connected status)
3853N/A * and come back to bad gen id status (RS genid was another gen id (300 entries instead of 200))
3853N/A */
3853N/A bi.runFullUpdate();
3853N/A sleepAssertStatusEquals(30, ds1, ServerStatus.BAD_GEN_ID_STATUS);
3853N/A
3853N/A /**
3853N/A * DS2 sends reset gen id with gen id same as DS1: DS1 will be disconnected
3853N/A * by RS (not connected status) and come back to normal status
3853N/A */
3853N/A ds2.stop(); // will need a new broker with another gen id restart it
3853N/A bw.shutdown();
3853N/A br.shutdown();
3988N/A newGen = ds1.getGenerationID();
3853N/A curState = ds1.getServerState();
3853N/A ds2 = createReplicationBroker(DS2_ID, curState, newGen);
3853N/A checkConnection(30, DS2_ID);
3853N/A br = new BrokerReader(ds2, DS2_ID);
3853N/A resetGenId(ds2, newGen); // Make DS1 reconnect in normal status
3853N/A
3853N/A sleepAssertStatusEquals(30, ds1, ServerStatus.NORMAL_STATUS);
3853N/A
3853N/A /**
3853N/A * DS2 engages full update (while DS1 in normal status), DS1 should go
3853N/A * in full update status
3853N/A */
3853N/A bi = new BrokerInitializer(ds2, DS2_ID, false);
3853N/A bi.initFullUpdate(DS1_ID, 300); // 300 entries will compute same genid of the RS
3853N/A sleepAssertStatusEquals(30, ds1, ServerStatus.FULL_UPDATE_STATUS);
3853N/A
3853N/A /**
3853N/A * DS2 terminates full update to DS1: DS1 should reconnect (goes through not connected status)
3853N/A * and come back to normal status (process full update with same data as
3853N/A * before so RS already has right gen id: version with 300 entries)
3853N/A */
3853N/A bi.runFullUpdate();
3853N/A ds2.stop();
3853N/A br.shutdown();
3853N/A sleepAssertStatusEquals(30, ds1, ServerStatus.NORMAL_STATUS);
3853N/A
3853N/A /**
3853N/A * RS1 stops, DS1 should go to not connected status
3853N/A */
3853N/A rs1.remove();
3853N/A sleepAssertStatusEquals(30, ds1, ServerStatus.NOT_CONNECTED_STATUS);
3853N/A
3853N/A } finally
3853N/A {
3853N/A // Finalize test
3853N/A endTest();
3853N/A if (bw != null) bw.shutdown();
3853N/A if (br != null) br.shutdown();
3853N/A }
3853N/A }
3853N/A
3853N/A /**
3853N/A * Set up the environment.
3853N/A *
3853N/A * @throws Exception
3853N/A * If the environment could not be set up.
3853N/A */
3853N/A @BeforeClass
3853N/A @Override
3853N/A public void setUp() throws Exception
3853N/A {
3853N/A super.setUp();
3853N/A
3853N/A // Note: this test does not use the memory test backend as for having a DS
3853N/A // going into degraded status, we need to send a lot of updates. This makes
3853N/A // the memory test backend crash with OutOfMemoryError. So we prefer here
3853N/A // a backend backed up with a file
3853N/A
3853N/A // Clear the backend
3988N/A LDAPReplicationDomain.clearJEBackend(false, "userRoot", EXAMPLE_DN);
3853N/A
3853N/A }
3853N/A
3853N/A /**
3853N/A * Clean up the environment.
3853N/A *
3853N/A * @throws Exception If the environment could not be set up.
3853N/A */
3853N/A @AfterClass
3853N/A @Override
3853N/A public void classCleanUp() throws Exception
3853N/A {
3853N/A callParanoiaCheck = false;
3853N/A super.classCleanUp();
3853N/A
3853N/A // Clear the backend
3988N/A LDAPReplicationDomain.clearJEBackend(false, "userRoot", EXAMPLE_DN);
3988N/A
3853N/A paranoiaCheck();
3853N/A }
3853N/A
3853N/A /**
3853N/A * Sends a reset genid message through the given replication broker, with the
3853N/A * given new generation id
3853N/A */
3853N/A private void resetGenId(ReplicationBroker rb, long newGenId)
3853N/A {
3853N/A ResetGenerationIdMsg resetMsg = new ResetGenerationIdMsg(newGenId);
3853N/A rb.publish(resetMsg);
3853N/A }
3853N/A
3853N/A /**
3853N/A * Utility class for making a full update through a broker. No separated thread
3853N/A * Usage:
3853N/A * BrokerInitializer bi = new BrokerInitializer(rb, sid, nEntries);
3853N/A * bi.initFullUpdate(); // Initializes a full update session by sending InitializeTargetMsg
3853N/A * bi.runFullUpdate(); // loops sending nEntries entries and finalizes the full update by sending the EntryDoneMsg
3853N/A */
3853N/A private class BrokerInitializer
3853N/A {
3853N/A
3853N/A private ReplicationBroker rb = null;
4802N/A private int serverId = -1;
3853N/A private long userId = 0;
4802N/A private int destId = -1; // Server id of server to initialize
3853N/A private long nEntries = -1; // Number of entries to send to dest
3853N/A private boolean createReader = false;
3988N/A
3853N/A /**
3853N/A * If the BrokerInitializer is to be used for a lot of entries to send
3853N/A * (which is often the case), the reader thread should be enabled to make
3853N/A * the window subsystem work and allow the broker to send as much entries as
3853N/A * he wants. If not enabled, the user is responsible to call the receive
3853N/A * method of the broker himself.
3853N/A */
3853N/A private BrokerReader reader = null;
3988N/A
3853N/A /**
3853N/A * Creates a broker initializer with a reader
3853N/A */
4802N/A public BrokerInitializer(ReplicationBroker rb, int serverId)
3853N/A {
3853N/A this(rb, serverId, true);
3853N/A }
3853N/A
3853N/A /**
3853N/A * Creates a broker initializer. Also creates a reader according to request
3853N/A */
4802N/A public BrokerInitializer(ReplicationBroker rb, int serverId,
3853N/A boolean createReader)
3853N/A {
3853N/A this.rb = rb;
3853N/A this.serverId = serverId;
3853N/A this.createReader = createReader;
3853N/A }
3988N/A
3853N/A /**
3853N/A * Initializes a full update session by sending InitializeTargetMsg
3853N/A */
4802N/A public void initFullUpdate(int destId, long nEntries)
3853N/A {
3853N/A // Also create reader ?
3853N/A if (createReader)
3853N/A {
3853N/A reader = new BrokerReader(rb, serverId);
3853N/A }
3853N/A
3853N/A debugInfo("Broker " + serverId + " initializer sending InitializeTargetMsg to server " + destId);
3853N/A
3853N/A this.destId = destId;
3853N/A this.nEntries = nEntries;
3853N/A
3853N/A // Send init msg to warn dest server it is going do be initialized
3853N/A RoutableMsg initTargetMsg = null;
3988N/A
3988N/A initTargetMsg =
3988N/A new InitializeTargetMsg(EXAMPLE_DN, serverId, destId,
5086N/A serverId, nEntries, initWindow);
3988N/A
3853N/A rb.publish(initTargetMsg);
3853N/A
3853N/A // Send top entry for the domain
3853N/A String topEntry = "dn: " + EXAMPLE_DN + "\n"
3853N/A + "objectClass: top\n"
3853N/A + "objectClass: domain\n"
3853N/A + "dc: example\n"
3853N/A + "entryUUID: 11111111-1111-1111-1111-111111111111\n\n";
5086N/A EntryMsg entryMsg = new EntryMsg(serverId, destId, topEntry.getBytes(), 1);
3853N/A rb.publish(entryMsg);
3853N/A }
3853N/A
3853N/A private EntryMsg createNextEntryMsg()
3853N/A {
3853N/A String userEntryUUID = "11111111-1111-1111-1111-111111111111";
5086N/A long curId = ++userId;
3853N/A String userdn = "uid=full_update_user" + curId + "," + EXAMPLE_DN;
3853N/A String entryWithUUIDldif = "dn: " + userdn + "\n" + "objectClass: top\n" +
3853N/A "objectClass: person\n" + "objectClass: organizationalPerson\n" +
3853N/A "objectClass: inetOrgPerson\n" +
3853N/A "uid: full_update_user" + curId + "\n" +
3853N/A "homePhone: 951-245-7634\n" +
3853N/A "description: This is the description for Aaccf Amar.\n" + "st: NC\n" +
3853N/A "mobile: 027-085-0537\n" +
3853N/A "postalAddress: Aaccf Amar$17984 Thirteenth Street" +
3853N/A "$Rockford, NC 85762\n" + "mail: user.1@example.com\n" +
3853N/A "cn: Aaccf Amar\n" + "l: Rockford\n" + "pager: 508-763-4246\n" +
3853N/A "street: 17984 Thirteenth Street\n" + "telephoneNumber: 216-564-6748\n" +
3853N/A "employeeNumber: 1\n" + "sn: Amar\n" + "givenName: Aaccf\n" +
3853N/A "postalCode: 85762\n" + "userPassword: password\n" + "initials: AA\n" +
3853N/A "entryUUID: " + userEntryUUID + "\n\n";
3853N/A // -> WARNING: EntryMsg PDUs are concatenated before calling import on LDIF
3853N/A // file so need \n\n to separate LDIF entries to conform to LDIF file format
3853N/A
3853N/A // Create an entry message
3853N/A EntryMsg entryMsg = new EntryMsg(serverId, destId,
5086N/A entryWithUUIDldif.getBytes(), (int)userId);
3853N/A
3853N/A return entryMsg;
3853N/A }
3853N/A
3853N/A /**
3853N/A * Loops sending entries for full update (EntryMsg messages). When
3853N/A * terminates, sends the EntryDoneMsg to finalize full update. Number of
3853N/A * sent entries is determined at initFullUpdate call time.
3853N/A */
3853N/A public void runFullUpdate()
3853N/A {
3853N/A debugInfo("Broker " + serverId + " initializer starting sending entries to server " + destId);
3988N/A
3853N/A for(long i = 0 ; i<nEntries ; i++) {
3853N/A EntryMsg entryMsg = createNextEntryMsg();
3853N/A rb.publish(entryMsg);
3853N/A }
3853N/A
3853N/A debugInfo("Broker " + serverId + " initializer stopping sending entries");
3988N/A
3853N/A debugInfo("Broker " + serverId + " initializer sending EntryDoneMsg");
3853N/A DoneMsg doneMsg = new DoneMsg(serverId, destId);
3853N/A rb.publish(doneMsg);
3988N/A
3853N/A if (createReader)
3853N/A {
3853N/A reader.shutdown();
3853N/A }
3988N/A
3853N/A debugInfo("Broker " + serverId + " initializer thread is dying");
3853N/A }
3853N/A }
3988N/A
3853N/A /**
3853N/A * Thread for sending a lot of changes through a broker.
3853N/A */
3853N/A private class BrokerWriter extends Thread
3853N/A {
3853N/A
3853N/A private ReplicationBroker rb = null;
4802N/A private int serverId = -1;
3853N/A private long userId = 0;
3853N/A private AtomicBoolean shutdown = new AtomicBoolean(false);
3853N/A // The writer starts suspended
3853N/A private AtomicBoolean suspended = new AtomicBoolean(true);
3853N/A // Tells a sending session is finished
3853N/A // A session is sending messages between the follow and the pause calls,
3853N/A // or the time a followAndPause method runs.
3853N/A private AtomicBoolean sessionDone = new AtomicBoolean(true);
3853N/A private boolean careAboutAmountOfChanges = false;
3853N/A private int nChangesSent = 0; // Number of sent changes
3853N/A private int nChangesSentLimit = 0;
3853N/A ChangeNumberGenerator gen = null;
3988N/A private Object sleeper = new Object();
3853N/A /**
3853N/A * If the BrokerWriter is to be used for a lot of changes to send (which is
3853N/A * often the case), the reader thread should be enabled to make the window
3853N/A * subsystem work and allow the broker to send as much changes as he wants.
3853N/A * If not enabled, the user is responsible to call the receive method of
3853N/A * the broker himself.
3853N/A */
3853N/A private BrokerReader reader = null;
3853N/A
3853N/A /* Creates a broker writer with a reader */
4802N/A public BrokerWriter(ReplicationBroker rb, int serverId)
3853N/A {
3853N/A this(rb, serverId, true);
3853N/A }
3853N/A
3853N/A /* Creates a broker writer. Also creates a reader according to request */
4802N/A public BrokerWriter(ReplicationBroker rb, int serverId,
3853N/A boolean createReader)
3853N/A {
3853N/A super("BrokerWriter for broker " + serverId);
3853N/A this.rb = rb;
3853N/A this.serverId = serverId;
3853N/A // Create a Change number generator to generate new change numbers
3853N/A // when we need to send changes
3853N/A gen = new ChangeNumberGenerator(serverId, 0);
3853N/A
3853N/A // Start thread (is paused by default so will have to call follow anyway)
3853N/A start();
3853N/A
3853N/A // Also create reader ?
3853N/A if (createReader)
3853N/A {
3853N/A reader = new BrokerReader(rb, serverId);
3853N/A }
3853N/A }
3853N/A
3853N/A /**
3853N/A * Loops sending changes: add operations creating users with different ids
3853N/A * This starts paused and has to be resumed calling a follow method.
3853N/A */
3853N/A public void run()
3853N/A {
3853N/A boolean dbg1Written = false, dbg2Written;
3853N/A // No stop msg when entering the loop (thread starts with paused writer)
3853N/A dbg2Written = true;
3853N/A while (!shutdown.get())
3853N/A {
3853N/A long startSessionTime = -1;
3853N/A boolean startedNewSession = false;
3853N/A // When not in pause, loop sending changes to RS
3853N/A while (!suspended.get())
3853N/A {
3853N/A startedNewSession = true;
3853N/A if (!dbg1Written)
3853N/A {
3853N/A startSessionTime = System.currentTimeMillis();
3853N/A debugInfo("Broker " + serverId +
3853N/A " writer starting sending changes session at: " + startSessionTime);
3853N/A dbg1Written = true;
3853N/A dbg2Written = false;
3853N/A }
3853N/A AddMsg addMsg = createNextAddMsg();
3853N/A rb.publish(addMsg);
3853N/A // End session if amount of changes sent has been requested
3853N/A if (careAboutAmountOfChanges)
3853N/A {
3853N/A nChangesSent++;
3853N/A if (nChangesSent == nChangesSentLimit)
3853N/A {
3853N/A // Requested number of changes to send sent, end session
3853N/A debugInfo("Broker " + serverId + " writer reached " +
3853N/A nChangesSent + " changes limit");
3853N/A suspended.set(true);
3853N/A break;
3853N/A }
3853N/A }
3853N/A }
3853N/A if (!dbg2Written)
3853N/A {
3853N/A long endSessionTime = System.currentTimeMillis();
3853N/A debugInfo("Broker " + serverId +
3853N/A " writer stopping sending changes session at: " + endSessionTime +
3853N/A " (duration: " + (endSessionTime - startSessionTime) + " ms)");
3853N/A dbg1Written = false;
3853N/A dbg2Written = true;
3853N/A }
3853N/A // Mark session is finished
3853N/A if (startedNewSession)
3853N/A sessionDone.set(true);
3853N/A try
3853N/A {
3853N/A // Writer in pause, sleep a while to let other threads work
3988N/A synchronized(sleeper)
3988N/A {
3988N/A sleeper.wait(1000);
3988N/A }
3853N/A } catch (InterruptedException ex)
3853N/A {
3853N/A /* Don't care */
3853N/A }
3853N/A }
3853N/A debugInfo("Broker " + serverId + " writer thread is dying");
3853N/A }
3853N/A
3853N/A /**
3853N/A * Stops the writer thread
3853N/A */
3853N/A public void shutdown()
3853N/A {
3853N/A suspended.set(true); // If were working
3853N/A shutdown.set(true);
3988N/A synchronized (sleeper)
3988N/A {
3988N/A sleeper.notify();
3988N/A }
3853N/A try
3853N/A {
3853N/A join();
3853N/A } catch (InterruptedException ex)
3853N/A {
3853N/A /* Don't care */
3853N/A }
3988N/A
3853N/A // Stop reader if any
3853N/A if (reader != null)
3853N/A {
3853N/A reader.shutdown();
3853N/A }
3853N/A }
3853N/A
3853N/A /**
3853N/A * Suspends the writer thread
3853N/A */
3853N/A public void pause()
3853N/A {
3853N/A if (isPaused())
3853N/A return; // Already suspended
3853N/A suspended.set(true);
3853N/A // Wait for all messages sent
3853N/A while (!sessionDone.get())
3853N/A {
3853N/A try
3853N/A {
3988N/A Thread.sleep(200);
3853N/A } catch (InterruptedException ex)
3853N/A {
3853N/A /* Don't care */
3853N/A }
3988N/A }
3853N/A }
3853N/A
3853N/A /**
3853N/A * Test if the writer is suspended
3853N/A */
3853N/A public boolean isPaused()
3853N/A {
3853N/A return (sessionDone.get());
3853N/A }
3853N/A
3853N/A /**
3853N/A * Resumes the writer thread until it is paused
3853N/A */
3853N/A public void follow()
3853N/A {
3853N/A sessionDone.set(false);
3853N/A suspended.set(false);
3853N/A }
3853N/A
3853N/A /**
3853N/A * Resumes the writer and suspends it after a given amount of ms
3853N/A * If the writer was working it will be paused anyway after the given amount
3853N/A * of time.
3853N/A * -> blocking call
3853N/A */
3853N/A public void followAndPause(long time)
3853N/A {
3853N/A debugInfo("Requested broker writer " + serverId + " to write for " + time + " ms.");
3853N/A pause(); // If however we were already working
3853N/A sessionDone.set(false);
3853N/A suspended.set(false);
3853N/A try
3853N/A {
3853N/A Thread.sleep(time);
3853N/A } catch (InterruptedException ex)
3853N/A {
3853N/A /* Don't care */
3853N/A }
3853N/A pause();
3853N/A }
3853N/A
3853N/A /**
3853N/A * Resumes the writer and suspends it after a given amount of changes has been
3853N/A * sent. If the writer was working it will be paused anyway after the given
3853N/A * amount of changes, starting from the current call time.
3853N/A * -> blocking call
3853N/A */
3853N/A public void followAndPause(int nChanges)
3853N/A {
3988N/A debugInfo("Requested broker writer " + serverId + " to write " + nChanges + " change(s).");
3853N/A pause(); // If however we were already working
3853N/A
3853N/A // Initialize counter system variables
3853N/A nChangesSent = 0;
3853N/A nChangesSentLimit = nChanges;
3853N/A careAboutAmountOfChanges = true;
3853N/A
3853N/A // Start session
3853N/A sessionDone.set(false);
3853N/A suspended.set(false);
3853N/A
3853N/A // Wait for all messages sent
3853N/A while (!sessionDone.get())
3853N/A {
3853N/A try
3853N/A {
3853N/A Thread.sleep(1000);
3853N/A } catch (InterruptedException ex)
3853N/A {
3853N/A /* Don't care */
3853N/A }
3853N/A }
3853N/A careAboutAmountOfChanges = false;
3853N/A }
3853N/A
3853N/A private AddMsg createNextAddMsg()
3853N/A {
3853N/A String userEntryUUID = "11111111-1111-1111-1111-111111111111";
3853N/A long curId = userId++;
3853N/A String userdn = "uid=user" + curId + "," + EXAMPLE_DN;
3853N/A String entryWithUUIDldif = "dn: " + userdn + "\n" + "objectClass: top\n" +
3853N/A "objectClass: person\n" + "objectClass: organizationalPerson\n" +
3853N/A "objectClass: inetOrgPerson\n" +
3853N/A "uid: user" + curId + "\n" +
3853N/A "homePhone: 951-245-7634\n" +
3853N/A "description: This is the description for Aaccf Amar.\n" + "st: NC\n" +
3853N/A "mobile: 027-085-0537\n" +
3853N/A "postalAddress: Aaccf Amar$17984 Thirteenth Street" +
3853N/A "$Rockford, NC 85762\n" + "mail: user.1@example.com\n" +
3853N/A "cn: Aaccf Amar\n" + "l: Rockford\n" + "pager: 508-763-4246\n" +
3853N/A "street: 17984 Thirteenth Street\n" + "telephoneNumber: 216-564-6748\n" +
3853N/A "employeeNumber: 1\n" + "sn: Amar\n" + "givenName: Aaccf\n" +
3853N/A "postalCode: 85762\n" + "userPassword: password\n" + "initials: AA\n" +
3853N/A "entryUUID: " + userEntryUUID + "\n";
3853N/A
3853N/A Entry personWithUUIDEntry = null;
3853N/A try
3853N/A {
3853N/A personWithUUIDEntry = TestCaseUtils.entryFromLdifString(
3853N/A entryWithUUIDldif);
3853N/A } catch (Exception e)
3853N/A {
3853N/A fail(e.getMessage());
3853N/A }
3853N/A
3853N/A // Create an update message to add an entry.
3853N/A AddMsg addMsg = new AddMsg(gen.newChangeNumber(),
3853N/A personWithUUIDEntry.getDN().toString(),
3853N/A userEntryUUID,
3853N/A null,
3853N/A personWithUUIDEntry.getObjectClassAttribute(),
3853N/A personWithUUIDEntry.getAttributes(), new ArrayList<Attribute>());
3853N/A
3853N/A return addMsg;
3853N/A }
3853N/A }
3853N/A
3853N/A /**
3853N/A * This simple reader just throws away the received
3853N/A * messages. It is used on a breaker we want to be able to send or read from some message
3853N/A * with (changes, entries (full update)...). Calling the receive method of the
3853N/A * broker allows to unblock the window mechanism and to send the desired messages.
3853N/A * Calling the updateWindowAfterReplay method allows to send when necessary the
3853N/A * window message to the RS to allow him send other messages he may want to send us.
3853N/A */
3853N/A private class BrokerReader extends Thread
3853N/A {
3853N/A
3853N/A private ReplicationBroker rb = null;
4802N/A private int serverId = -1;
3853N/A private boolean shutdown = false;
3853N/A private ReplicationMsg lastMsg = null;
3853N/A
4802N/A public BrokerReader(ReplicationBroker rb, int serverId)
3853N/A {
3853N/A super("BrokerReader for broker " + serverId);
3853N/A this.rb = rb;
3853N/A this.serverId = serverId;
3853N/A start();
3853N/A }
3853N/A // Loop reading and throwing update messages
3853N/A public void run()
3853N/A {
3853N/A while (!shutdown)
3853N/A {
3853N/A try
3853N/A {
3853N/A ReplicationMsg msg = rb.receive(); // Allow more messages to be sent by broker writer
3853N/A rb.updateWindowAfterReplay(); // Allow RS to send more messages to broker
3853N/A if (msg != null)
3853N/A debugInfo("Broker " + serverId + " reader received: " + msg);
3853N/A lastMsg = msg;
3853N/A } catch (SocketTimeoutException ex)
3853N/A {
3853N/A if (shutdown)
3853N/A return;
3853N/A }
3853N/A }
3853N/A debugInfo("Broker " + serverId + " reader thread is dying");
3853N/A }
3853N/A
3853N/A // Returns last received message from reader
3853N/A // When read, last value is cleared
3853N/A public ReplicationMsg getLastMsg()
3853N/A {
3853N/A ReplicationMsg toReturn = lastMsg;
3853N/A lastMsg = null;
3853N/A return toReturn;
3853N/A }
3853N/A
3853N/A // Stops reader thread
3853N/A public void shutdown()
3853N/A {
3853N/A shutdown = true;
3853N/A
3853N/A try
3853N/A {
3853N/A join();
3853N/A } catch (InterruptedException ex)
3853N/A {
3853N/A /* Don't care */
3853N/A }
3853N/A }
3853N/A }
3853N/A
3853N/A /**
3853N/A * Waits for a long time for an equality condition to be true.
3853N/A * Every second, the equality check is performed. After the provided amount of
3853N/A * seconds, if the equality is false, an assertion error is raised.
3853N/A * This methods ends either because the equality is true or if the timeout
3853N/A * occurs after the provided number of seconds.
3853N/A * This method is convenient when the the equality can only occur after a
3853N/A * period of time which is difficult to establish, but we know it will occur
3853N/A * anyway. This has 2 advantages compared to a classical code like this:
3853N/A * - sleep(some time);
3853N/A * - assertEquals(testedValue, expectedValue);
3853N/A * 1. If the sleep value is too big, this will impact the total time of
3853N/A * running tests uselessly. It may also penalize a fast running machine where
3853N/A * the sleep time value may be unnecessarily to long.
3853N/A * 2. If the sleep value is too small, some slow machines may have the test
3853N/A * fail whereas some additional time would have made the test succeed.
3853N/A * @param secTimeout Number of seconds to wait before failing. The value for
3853N/A * this should be high. A timeout is needed anyway to have the test campaign
3853N/A * finish anyway.
3853N/A * @param testedValue The value we want to test
3853N/A * @param expectedValue The value the tested value should be equal to
3853N/A */
3988N/A private void sleepAssertStatusEquals(int secTimeout, LDAPReplicationDomain testedValue,
3853N/A ServerStatus expectedValue)
3853N/A {
3853N/A int nSec = 0;
3853N/A
3853N/A if ((testedValue == null) || (expectedValue == null))
3853N/A fail("sleepAssertStatusEquals: null parameters");
3853N/A
3853N/A // Go out of the loop only if equality is obtained or if timeout occurs
3853N/A while (true)
3853N/A {
3853N/A // Sleep 1 second
3853N/A try
3853N/A {
3853N/A Thread.sleep(1000);
3853N/A } catch (InterruptedException ex)
3853N/A {
3853N/A fail("Error sleeping " + stackTraceToSingleLineString(ex));
3853N/A }
3853N/A nSec++;
3853N/A
3853N/A // Test equality of values
3853N/A if (testedValue.getStatus().equals(expectedValue))
3853N/A {
3853N/A debugInfo("sleepAssertStatusEquals: equality obtained after "
3853N/A + nSec + " seconds (" + expectedValue + ").");
3853N/A return;
3853N/A }
3853N/A
3853N/A if (nSec == secTimeout)
3853N/A {
3853N/A // Timeout reached, end with error
3853N/A fail("sleepAssertStatusEquals: got <" +
3853N/A testedValue.getStatus().toString() + "> where expected <" +
3853N/A expectedValue.toString() + ">");
3853N/A }
3853N/A }
3853N/A }
3853N/A}