/* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt * or http://forgerock.org/license/CDDLv1.0.html. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at legal-notices/CDDLv1_0.txt. * If applicable, add the following below this CDDL HEADER, with the * fields enclosed by brackets "[]" replaced with your own identifying * information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2008-2010 Sun Microsystems, Inc. * Portions Copyright 2012-2015 ForgeRock AS. */ package org.opends.server.replication; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.net.SocketTimeoutException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import org.assertj.core.api.Assertions; import org.forgerock.i18n.LocalizableMessage; import org.forgerock.i18n.slf4j.LocalizedLogger; import org.forgerock.opendj.ldap.ResultCode; import org.opends.server.TestCaseUtils; import org.opends.server.api.SynchronizationProvider; import org.opends.server.core.DirectoryServer; import org.opends.server.core.ModifyOperation; import org.opends.server.protocols.internal.InternalClientConnection; import org.opends.server.replication.common.CSNGenerator; import org.opends.server.replication.plugin.EntryHistorical; import org.opends.server.replication.protocol.ModifyMsg; import org.opends.server.replication.protocol.ReplicationMsg; import org.opends.server.replication.service.ReplicationBroker; import org.opends.server.types.Attribute; import org.opends.server.types.Attributes; import org.opends.server.types.DN; import org.opends.server.types.Modification; import org.opends.server.types.Operation; import org.opends.server.types.RawModification; import org.opends.server.util.TestTimer; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import static java.util.concurrent.TimeUnit.*; import static org.forgerock.opendj.ldap.ModificationType.*; import static org.opends.server.core.DirectoryServer.*; import static org.opends.server.util.CollectionUtils.*; import static org.testng.Assert.*; /** Test for the schema replication. */ @SuppressWarnings("javadoc") public class SchemaReplicationTest extends ReplicationTestCase { private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); private List rcvdMods; private int replServerPort; /** Set up the environment for performing the tests in this Class. */ @Override @BeforeClass public void setUp() throws Exception { super.setUp(); // This test suite depends on having the schema available. replServerPort = TestCaseUtils.findFreePort(); // Create an internal connection connection = InternalClientConnection.getRootConnection(); // Change log String replServerLdif = "dn: " + "cn=Replication Server, " + SYNCHRO_PLUGIN_DN + "\n" + "objectClass: top\n" + "objectClass: ds-cfg-replication-server\n" + "cn: Replication Server\n" + "ds-cfg-replication-port: " + replServerPort + "\n" + "ds-cfg-replication-db-directory: SchemaReplicationTest\n" + "ds-cfg-replication-server-id: 105\n"; // suffix synchronized String testName = "schemaReplicationTest"; String domainLdif = "dn: cn=" + testName + ", cn=domains, " + SYNCHRO_PLUGIN_DN + "\n" + "objectClass: top\n" + "objectClass: ds-cfg-replication-domain\n" + "cn: " + testName + "\n" + "ds-cfg-base-dn: cn=schema\n" + "ds-cfg-replication-server: localhost:" + replServerPort + "\n" + "ds-cfg-server-id: 1\n"; configureReplication(replServerLdif, domainLdif); } /** Checks that changes done to the schema are pushed to the replicationServer clients. */ @Test public void pushSchemaChange() throws Exception { logger.error(LocalizableMessage.raw("Starting replication test : pushSchemaChange ")); cleanUpReplicationServersDB(); final DN baseDN = DN.valueOf("cn=schema"); ReplicationBroker broker = openReplicationSession(baseDN, 2, 100, replServerPort, 5000); try { // Modify the schema Attribute attr = Attributes.create("attributetypes", "( 2.5.44.77.33 NAME 'dummy' )"); Modification mod = new Modification(ADD, attr); processModify(baseDN, mod); // See if the client has received the msg ModifyMsg modMsg = receiveModifyMsg(broker); assertModReceived(mod, baseDN, modMsg); /* Now cleanup the schema for the next test */ processModify(baseDN, new Modification(DELETE, attr)); // See if the client has received the msg receiveModifyMsg(broker); } finally { broker.stop(); } } private void processModify(final DN baseDN, Modification mod) { ModifyOperation modOp = connection.processModify(baseDN, newArrayList(mod)); assertEquals(modOp.getResultCode(), ResultCode.SUCCESS); } /** * Checks that changes to the schema pushed to the replicationServer * are received and correctly replayed by replication plugin. */ @Test(enabled=true,dependsOnMethods = { "pushSchemaChange" }) public void replaySchemaChange() throws Exception { logger.error(LocalizableMessage.raw("Starting replication test : replaySchemaChange ")); cleanUpReplicationServersDB(); final DN baseDN = DN.valueOf("cn=schema"); ReplicationBroker broker = openReplicationSession(baseDN, 2, 100, replServerPort, 5000); try { CSNGenerator gen = new CSNGenerator( 2, 0); ModifyMsg modMsg = new ModifyMsg(gen.newCSN(), baseDN, rcvdMods, EntryHistorical.getEntryUUID(DirectoryServer.getEntry(baseDN))); broker.publish(modMsg); checkEntryHasAttributeValue(baseDN, "attributetypes", "( 2.5.44.77.33 NAME 'dummy' )", 10, "The modification has not been correctly replayed."); } finally { broker.stop(); } } /** * Checks that changes done to the schema files are pushed to the * ReplicationServers and that the ServerState is updated in the schema * file. */ @Test(enabled=true, dependsOnMethods = { "replaySchemaChange" }) public void pushSchemaFilesChange() throws Exception { logger.error(LocalizableMessage.raw("Starting replication test : pushSchemaFilesChange ")); cleanUpReplicationServersDB(); final DN baseDN = DN.valueOf("cn=schema"); ReplicationBroker broker = openReplicationSession(baseDN, 3, 100, replServerPort, 5000); try { // create a schema change Notification Attribute attr = Attributes.create("attributetypes", "( 2.5.44.76.35 NAME 'push' )"); Modification mod = new Modification(ADD, attr); List mods = newArrayList(mod); for (SynchronizationProvider provider : getSynchronizationProviders()) { provider.processSchemaChange(mods); } // receive the message on the broker side. ModifyMsg modMsg = receiveModifyMsg(broker); assertModReceived(mod, baseDN, modMsg); // check that the schema files were updated with the new ServerState. // by checking that the CSN of msg we just received has been // added to the user schema file. // build the string to find in the schema file final String stateStr = modMsg.getCSN().toString(); // open the schema file final File schemaFile = getSchemaFile(); // it is necessary to loop on this check because the state is not // written immediately but only every so often. TestTimer timer = new TestTimer.Builder() .maxSleep(5, SECONDS) .sleepTimes(100, MILLISECONDS) .toTimer(); timer.repeatUntilSuccess(new Callable() { @Override public Void call() throws Exception { String fileStr = readAsString(schemaFile); assertTrue(fileStr.contains(stateStr), "The Schema persistentState (CSN:" + stateStr + ") has not been saved to " + schemaFile + " : " + fileStr); return null; } }); } finally { broker.stop(); } logger.error(LocalizableMessage.raw("Ending replication test : pushSchemaFilesChange ")); } private File getSchemaFile() { String sep = File.separator; String buildRoot = System.getProperty(TestCaseUtils.PROPERTY_BUILD_ROOT); String buildDir = System.getProperty(TestCaseUtils.PROPERTY_BUILD_DIR, buildRoot + sep + "target"); final String path = buildDir + sep + "unit-tests" + sep + "package-instance" + sep + "config" + sep + "schema" + sep + "99-user.ldif"; return new File(path); } private ModifyMsg receiveModifyMsg(ReplicationBroker broker) throws SocketTimeoutException { ReplicationMsg msg = broker.receive(); Assertions.assertThat(msg).isInstanceOf(ModifyMsg.class); return (ModifyMsg) msg; } private void assertModReceived(Modification mod, final DN baseDN, ModifyMsg modMsg) throws Exception { Operation receivedOp = modMsg.createOperation(connection); assertEquals(modMsg.getDN(), baseDN, "The received message is not for cn=schema"); Assertions.assertThat(receivedOp).isInstanceOf(ModifyOperation.class); ModifyOperation receivedModifyOperation = (ModifyOperation) receivedOp; this.rcvdMods = new ArrayList<>(); for (RawModification m : receivedModifyOperation.getRawModifications()) { this.rcvdMods.add(m.toModification()); } Assertions.assertThat(this.rcvdMods) .as("The received mod does not contain the original change") .contains(mod); } private String readAsString(File file) throws FileNotFoundException, IOException { FileInputStream input = new FileInputStream(file); byte[] bytes = new byte[input.available()]; input.read(bytes); return new String(bytes); } }