5046N/A/*
5046N/A * CDDL HEADER START
5046N/A *
5046N/A * The contents of this file are subject to the terms of the
5046N/A * Common Development and Distribution License, Version 1.0 only
5046N/A * (the "License"). You may not use this file except in compliance
5046N/A * with the License.
5046N/A *
6982N/A * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
6982N/A * or http://forgerock.org/license/CDDLv1.0.html.
5046N/A * See the License for the specific language governing permissions
5046N/A * and limitations under the License.
5046N/A *
5046N/A * When distributing Covered Code, include this CDDL HEADER in each
6982N/A * file and include the License file at legal-notices/CDDLv1_0.txt.
6982N/A * If applicable, add the following below this CDDL HEADER, with the
6982N/A * fields enclosed by brackets "[]" replaced with your own identifying
6982N/A * information:
5046N/A * Portions Copyright [yyyy] [name of copyright owner]
5046N/A *
5046N/A * CDDL HEADER END
5046N/A *
5046N/A *
5046N/A * Copyright 2010 Sun Microsystems, Inc.
5046N/A */
5046N/A
5061N/A
5046N/Aimport netscape.ldap.*;
5046N/Aimport netscape.ldap.util.*;
5046N/Aimport java.util.*;
5046N/Aimport java.util.concurrent.BlockingQueue;
5046N/Aimport java.io.*;
5046N/A
5046N/Aclass Writer extends Thread {
5046N/A
5046N/A BlockingQueue<Change> q;
5046N/A String hostport;
5046N/A
5046N/A public Writer(BlockingQueue<Change> q, String hostport) {
5046N/A this.q = q;
5046N/A this.hostport = hostport;
5046N/A }
5046N/A
5046N/A public void run() {
5046N/A try {
5046N/A Server application = new Server( hostport );
5046N/A ImprovedLDAPConnection applicationConnection = new ImprovedLDAPConnection();
5046N/A
5046N/A // Connect to the stand-alone server
5046N/A EclReadAndPlay.println("INFO", "****** Connecting to application "
5046N/A + application.host + ":" + application.port + " ......");
5046N/A applicationConnection.connect( application.host, application.port );
5046N/A applicationConnection.authenticate( 3, EclReadAndPlay.bindDn, EclReadAndPlay.bindPwd );
5046N/A EclReadAndPlay.println("INFO", "****** ...... Connected to application "
5046N/A + application.host + ":" + application.port );
5046N/A
5046N/A while (true) {
5046N/A // Read change from the queue
5046N/A Change change = q.take();
5046N/A //EclReadAndPlay.println ("DEBUG", "Change read from the queue -----> : " + change.toString() );
5046N/A
5046N/A CSN RUVcsn=EclReadAndPlay.RUV.get(change.replicaIdentifier);
5046N/A if ( RUVcsn != null ) {
5046N/A // if operation is not replicated
5046N/A if ( change.csn == null )
5046N/A continue;
5046N/A
5046N/A if (change.csn.compareTo(RUVcsn) < 0) {
5046N/A // EclReadAndPlay.println ("DEBUG", Integer.toHexString(i.intValue()) + " < " + Integer.toHexString(l.intValue()) );
5046N/A EclReadAndPlay.println("DEBUG", "Operation " + change.changeNumberValue + " csn "
5046N/A + change.csn + " has already been replayed");
5046N/A continue;
5046N/A }
5046N/A }
5046N/A
5046N/A try {
5046N/A // Write change on stand-alone server
5046N/A applicationConnection.apply(change);
5046N/A
5046N/A // Write change CSN to file under "db" directory
5046N/A File f;
5046N/A if (EclReadAndPlay.files.containsKey(change.replicaIdentifier)) {
5046N/A f = EclReadAndPlay.files.get(change.replicaIdentifier);
5046N/A // f.renameTo(new File(EclReadAndPlay.dbPath, new String(change.replicaIdentifier+".tmp") ));
5046N/A } else {
5046N/A f = new File(EclReadAndPlay.dbPath, change.replicaIdentifier + ".csn");
5046N/A EclReadAndPlay.files.put(change.replicaIdentifier,f);
5046N/A }
5046N/A
5046N/A FileWriter out = new FileWriter(f);
5046N/A out.write(change.csn.value);
5046N/A out.flush();
5046N/A out.close();
5046N/A
5046N/A EclReadAndPlay.RUV.put(change.replicaIdentifier,change.csn);
5046N/A
5046N/A if ( EclReadAndPlay.eclMode.equals("draft") )
5046N/A EclReadAndPlay.inc_ops(change.changeNumber);
5046N/A else if ( EclReadAndPlay.eclMode.equals("opends") )
5046N/A EclReadAndPlay.inc_ops(change.changelogCookie);
5046N/A
5046N/A // Log a message for the written change on "logs/access" file
5046N/A EclReadAndPlay.accessOut.println(EclReadAndPlay.getDate()
5046N/A + "- INFO: " + change.type + " \""
5046N/A + change.dn + "\" (" + change.csn +" / "
5046N/A + change.changeNumber + ")" );
5046N/A } catch (Exception e) {
5046N/A EclReadAndPlay.println( "ERROR", e.toString() );
5046N/A e.printStackTrace();
5046N/A System.exit(1);
5046N/A }
5046N/A
5046N/A //nb_changes++;
5046N/A
5046N/A }
5046N/A } catch (Exception e) {
5046N/A e.printStackTrace();
5046N/A System.exit(1);
5046N/A }
5046N/A }
5046N/A
5046N/A}