/*
* CDDL HEADER START
*
* The contents of this file are subject to the terms of the
* Common Development and Distribution License, Version 1.0 only
* (the "License"). You may not use this file except in compliance
* with the License.
*
* You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
* or http://forgerock.org/license/CDDLv1.0.html.
* See the License for the specific language governing permissions
* and limitations under the License.
*
* When distributing Covered Code, include this CDDL HEADER in each
* file and include the License file at legal-notices/CDDLv1_0.txt.
* If applicable, add the following below this CDDL HEADER, with the
* fields enclosed by brackets "[]" replaced with your own identifying
* information:
* Portions Copyright [yyyy] [name of copyright owner]
*
* CDDL HEADER END
*
*
* Copyright 2010 Sun Microsystems, Inc.
*/
import netscape.ldap.*;
import netscape.ldap.util.*;
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.io.*;
class Writer extends Thread {
BlockingQueue<Change> q;
String hostport;
public Writer(BlockingQueue<Change> q, String hostport) {
this.q = q;
this.hostport = hostport;
}
public void run() {
try {
Server application = new Server( hostport );
ImprovedLDAPConnection applicationConnection = new ImprovedLDAPConnection();
// Connect to the stand-alone server
EclReadAndPlay.println("INFO", "****** Connecting to application "
+ application.host + ":" + application.port + " ......");
applicationConnection.connect( application.host, application.port );
applicationConnection.authenticate( 3, EclReadAndPlay.bindDn, EclReadAndPlay.bindPwd );
EclReadAndPlay.println("INFO", "****** ...... Connected to application "
+ application.host + ":" + application.port );
while (true) {
// Read change from the queue
Change change = q.take();
//EclReadAndPlay.println ("DEBUG", "Change read from the queue -----> : " + change.toString() );
CSN RUVcsn=EclReadAndPlay.RUV.get(change.replicaIdentifier);
if ( RUVcsn != null ) {
// if operation is not replicated
if ( change.csn == null )
continue;
if (change.csn.compareTo(RUVcsn) < 0) {
// EclReadAndPlay.println ("DEBUG", Integer.toHexString(i.intValue()) + " < " + Integer.toHexString(l.intValue()) );
EclReadAndPlay.println("DEBUG", "Operation " + change.changeNumberValue + " csn "
+ change.csn + " has already been replayed");
continue;
}
}
try {
// Write change on stand-alone server
applicationConnection.apply(change);
// Write change CSN to file under "db" directory
File f;
if (EclReadAndPlay.files.containsKey(change.replicaIdentifier)) {
f = EclReadAndPlay.files.get(change.replicaIdentifier);
// f.renameTo(new File(EclReadAndPlay.dbPath, new String(change.replicaIdentifier+".tmp") ));
} else {
f = new File(EclReadAndPlay.dbPath, change.replicaIdentifier + ".csn");
EclReadAndPlay.files.put(change.replicaIdentifier,f);
}
FileWriter out = new FileWriter(f);
out.write(change.csn.value);
out.flush();
out.close();
EclReadAndPlay.RUV.put(change.replicaIdentifier,change.csn);
if ( EclReadAndPlay.eclMode.equals("draft") )
EclReadAndPlay.inc_ops(change.changeNumber);
else if ( EclReadAndPlay.eclMode.equals("opends") )
EclReadAndPlay.inc_ops(change.changelogCookie);
// Log a message for the written change on "logs/access" file
EclReadAndPlay.accessOut.println(EclReadAndPlay.getDate()
+ "- INFO: " + change.type + " \""
+ change.dn + "\" (" + change.csn +" / "
+ change.changeNumber + ")" );
} catch (Exception e) {
EclReadAndPlay.println( "ERROR", e.toString() );
e.printStackTrace();
System.exit(1);
}
//nb_changes++;
}
} catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
}
}