/*
* CDDL HEADER START
*
* The contents of this file are subject to the terms of the
* Common Development and Distribution License, Version 1.0 only
* (the "License"). You may not use this file except in compliance
* with the License.
*
* You can obtain a copy of the license at
* trunk/opends/resource/legal-notices/OpenDS.LICENSE
* or https://OpenDS.dev.java.net/OpenDS.LICENSE.
* See the License for the specific language governing permissions
* and limitations under the License.
*
* When distributing Covered Code, include this CDDL HEADER in each
* file and include the License file at
* trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
* add the following below this CDDL HEADER, with the fields enclosed
* by brackets "[]" replaced with your own identifying information:
* Portions Copyright [yyyy] [name of copyright owner]
*
* CDDL HEADER END
*
*
* Copyright 2010 Sun Microsystems, Inc.
*/
import netscape.ldap.*;
import netscape.ldap.util.*;
import java.util.*;
import java.io.*;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
class EclReadAndPlay {
// dbPath --> stores files containing csn of identifiers
static final String dbPath = "."; /*"db";*/
// configPath --> stores masters config file
static final String configPath = "."; /*"config";*/
static final String mastersFilename = "masters";
static final String logsPath = "."; /*"logs";*/
static final String accessFilename = "access";
// Maximum time (in milliseconds) without update being read from the master: 60 s
static final int MAX_IDLE_TIME = 60000;
// ECL "draft" mode -- Initial changeNumber
static final int INITIAL_CHANGENUMBER = 1;
// ECL "opends" mode -- Initial control value:
// --control "1.3.6.1.4.1.26027.1.5.4:false:;" ==> first cookie: ";"
static final String INITIAL_COOKIE = ";";
static PrintWriter standardOut = null;
static PrintWriter accessOut = null;
static HashMap<String,CSN> RUV;
static Object lock;
static HashMap<String,File> files;
static int nb_ops = 0;
static int total_nb_ops = 0;
static int nb_ignored = 0;
static int changeNumber = 0;
static int lastChangeNumber = 0;
static String changelogCookie = null;
static String lastExternalChangelogCookie = null;
static int missingChanges = 0;
static int lastMissingChanges = 1;
static String eclMode = null;
static int queueSize = 0;
static String bindDn = null;
static String bindPwd = null;
static boolean displayMissingChanges = false;
static String outputFilename = null;
public static void main( String[] args )
{
FileWriter out = null;
files = new HashMap<String,File>();
// Load latest read CSN values from files in db/ directory
File csnDir = new File(dbPath);
RUV = new HashMap<String,CSN>();
try {
FilenameFilter csnFileFilter = new FilenameFilter() {
public boolean accept(File dir, String name) {
return name.endsWith(".csn");
}
};
File[] csnFiles = csnDir.listFiles(csnFileFilter);
if ( csnFiles != null ) {
for (File f: csnFiles) {
String csnFilename = f.getName();
String id =
csnFilename.substring(0, csnFilename.indexOf(".csn"));
BufferedReader in = new BufferedReader (new FileReader(f));
CSN mycsn = new CSN(in.readLine());
if ( mycsn.value == null )
mycsn = new CSN("00000000000000000000");
// System.out.println(files[i] + "\t" + mycsn);
RUV.put(id, mycsn);
files.put(id, f);
}
}
} catch (IOException e) {
println("ERROR", e.toString());
e.printStackTrace();
System.exit(1);
}
/********** Parse arguments **********/
int masterN=0;
String hostport = null;
ArrayList<Server> masters = new ArrayList<Server>();
for (int k = 0; k < args.length; k++) {
String opt = args[k];
String val = args[k+1];
// ECL mode: "opends" or "draft"
if (opt.equals("-m")) {
eclMode = val;
}
// Queue size
if (opt.equals("-q")) {
queueSize = Integer.parseInt(val);
}
// Display missing changes?
if (opt.equals("-x")) {
if ( val.equals("true") )
displayMissingChanges = true;
else
displayMissingChanges = false;
}
// Bind DN
if (opt.equals("-D")) {
bindDn = val;
System.out.println(".......... bindDN: " + bindDn);
}
// Bind password
if (opt.equals("-w")) {
bindPwd = val;
System.out.println(".......... bindPwd: " + bindPwd);
}
// Stand-alone server:port
if (opt.equals("-s")) {
hostport = val;
System.out.println(".......... stand-alone server: " + hostport);
}
// Replicated masters
if (opt.equals("-p")) {
masters.add(new Server(val));
}
// Standard output file
if (opt.equals("-o")) {
outputFilename = val;
}
k++;
} /* for() */
if ( eclMode == null || queueSize == 0 || bindDn == null ||
bindPwd == null || hostport == null || masters.size() == 0 ||
outputFilename == null ) {
System.out.println("usage: -m {draft|opends} -q {queue size} "
+ "-x {(displayMissingChanges):true|false} "
+ "-o {outputFilename} -D {bindD} -w {bindPwd} "
+ "-s {standalone-host:port} "
+ "-p {master1-host:port} "
+ "-p {master2-host:port}...");
System.exit(1);
}
/* try {
File mastersFile= new File(configPath, mastersFilename);
LineNumberReader in=new LineNumberReader (new FileReader(mastersFile) );
String line;
while ( in.ready() ) {
line=in.readLine();
masters.add(new Server(line));
}
} catch (IOException e) {
println ("ERROR", e.toString());
System.exit(1);
} */
masters.trimToSize();
// System.out.println(masters);
/*************************************/
// Output file (logs are appended)
try {
standardOut = new PrintWriter(new BufferedWriter(new FileWriter( new File(outputFilename)) ) );
} catch (IOException e) {
println ("ERROR", e.toString() );
e.printStackTrace();
System.exit(1);
}
// Access log (data is appended)
try {
accessOut = new PrintWriter(new BufferedWriter(new FileWriter( new File(logsPath, accessFilename)) ) );
} catch (IOException e) {
println ("ERROR", e.toString() );
e.printStackTrace();
System.exit(1);
}
/********** Initialise reader/writer threads **********/
// Create a bounded blocking queue of integers
BlockingQueue<Change> queue = new ArrayBlockingQueue<Change>(queueSize);
// Initialise reader thread --> read updates from replicated master
Reader reader = new Reader(queue, masters);
reader.start();
// Initialise writer thread --> write updates onto stand-alone server
Writer writer = new Writer(queue, hostport);
writer.start();
lock = new Object();
synchronized (lock) {
try {
lock.wait();
} catch (Exception e) {
e.printStackTrace();
}
}
long start = System.currentTimeMillis();
int i=0;
while (true) {
int loopPeriod = 10; /* 10 s */
sleep(loopPeriod * 1000);
total_nb_ops += nb_ops;
long duration = ((System.currentTimeMillis() - start)/1000);
println("INFO", "Replayed " + nb_ops/loopPeriod
+ " ops/sec. (Avg = " + (total_nb_ops/duration)
+ " ops/s, Total = " + total_nb_ops
+ " , ignored = " + nb_ignored + " )");
nb_ops = 0;
if ( i++ == 3 && displayMissingChanges == true ) {
if ( eclMode.equals("draft") ) {
missingChanges = lastChangeNumber - changeNumber;
float percentage = (lastMissingChanges - missingChanges);
println("INFO", "Current changeNumber = " + changeNumber
+ ", lastChangeNumber = " + lastChangeNumber
+ ", missing changes = " + missingChanges + "/"
+ lastMissingChanges + " (" + percentage + ")");
lastMissingChanges = missingChanges;
if (lastMissingChanges == 0)
lastMissingChanges = 1;
} else if ( eclMode.equals("opends") ) {
println("INFO", "Current changelogCookie = "
+ changelogCookie
+ ", lastExternalChangelogCookie = "
+ lastExternalChangelogCookie);
}
i = 0;
}
}
}
public static void inc_ops(int c) {
nb_ops++;
changeNumber = c;
}
public static void inc_ops(String c) {
nb_ops++;
changelogCookie = c;
}
public static void inc_ignored(int c) {
nb_ignored++;
changeNumber = c;
}
public static void inc_ignored(String c) {
nb_ignored++;
changelogCookie = c;
}
public static String getDate() {
// Initialize the today's date string
String DATE_FORMAT = "yyyy/MM/dd:HH:mm:ss";
java.text.SimpleDateFormat sdf =
new java.text.SimpleDateFormat(DATE_FORMAT);
Calendar c1 = Calendar.getInstance(); // today
return("[" + sdf.format(c1.getTime()) + "]");
}
public static void println(String level, String msg) {
standardOut.println (getDate() + " - " + level + ": " + msg );
}
public static void sleep(int time) {
try {
Thread.sleep(time);
}
catch ( InterruptedException e )
{
println( "ERROR" , e.toString() );
}
}
}