/*
* CDDL HEADER START
*
* The contents of this file are subject to the terms of the
* Common Development and Distribution License, Version 1.0 only
* (the "License"). You may not use this file except in compliance
* with the License.
*
* You can obtain a copy of the license at
* trunk/opends/resource/legal-notices/OpenDS.LICENSE
* or https://OpenDS.dev.java.net/OpenDS.LICENSE.
* See the License for the specific language governing permissions
* and limitations under the License.
*
* When distributing Covered Code, include this CDDL HEADER in each
* file and include the License file at
* trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
* add the following below this CDDL HEADER, with the fields enclosed
* by brackets "[]" replaced with your own identifying information:
* Portions Copyright [yyyy] [name of copyright owner]
*
* CDDL HEADER END
*
*
* Copyright 2010 Sun Microsystems, Inc.
*/
import netscape.ldap.*;
import netscape.ldap.util.*;
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.io.*;
class Reader extends Thread {
BlockingQueue<Change> queue;
ArrayList<Server> masters;
public Reader(BlockingQueue<Change> q, ArrayList<Server> masters) {
this.queue = q;
this.masters = masters;
}
public void run() {
// master number in the array list
int masterN = 0;
// ECL "draft" mode index
int changeNumber = 0;
// ECL "opends" mode index
String eclCookie = EclReadAndPlay.INITIAL_COOKIE;
try {
Server master = null;
LDAPConnection masterConnection = new LDAPConnection();
LDAPSearchResults results = null;
LDAPEntry entry = null;
LDAPAttribute attr = null;
int idleTime = 0;
while (true) {
try {
master = masters.get(masterN);
// Connect to the Directory master
EclReadAndPlay.println("INFO", "Connecting to master " + master.host + ":" + master.port + " ......");
masterConnection.connect( master.host, master.port );
masterConnection.authenticate( 3, EclReadAndPlay.bindDn, EclReadAndPlay.bindPwd );
EclReadAndPlay.println("INFO", "...... Connected to master " + master.host + ":" + master.port );
// Set changenumber
// Try to retrieve the ECL index (changenumber|changelogcookie) of the last update read
// ---> use the CSN stored in the file under "db" directory
for ( CSN csn: EclReadAndPlay.RUV.values() ) {
String filter = "(& (objectclass=changelogentry)(replicationCSN="
+ csn.getValue() + ") )";
results = masterConnection.search( "cn=changelog", LDAPv3.SCOPE_SUB, filter,
new String[] {"changeNumber", "changeLogCookie"} ,
false );
entry = results.next();
if ( EclReadAndPlay.eclMode.equals("draft") ) {
if ( entry != null ) {
attr = entry.getAttribute("changeNumber");
if (attr != null) {
String changeNumberString = attr.getStringValueArray()[0];
EclReadAndPlay.println("DEBUG", "Found changeNumber " + changeNumberString
+ " for csn " + csn.getValue() );
int c = Integer.parseInt(changeNumberString);
if ( ( changeNumber == 0 ) || ( changeNumber > c ) ) {
EclReadAndPlay.println("DEBUG", "Setting changeNumber to " + ++c );
changeNumber = c;
}
} else {
EclReadAndPlay.println("WARNING", "Cannot find changenumber, setting it to 1");
changeNumber = EclReadAndPlay.INITIAL_CHANGENUMBER;
}
} else {
EclReadAndPlay.println("WARNING", "Cannot find a changelog entry for csn " + csn );
EclReadAndPlay.println("WARNING", "Will start from the first changelog entry");
results = masterConnection.search( "", LDAPv3.SCOPE_BASE, "(objectclass=*)",
new String[]{"firstChangeNumber"} , false );
entry = results.next();
attr = entry.getAttribute("firstChangeNumber");
if ( attr != null ) {
String changeNumberString = attr.getStringValueArray()[0];
EclReadAndPlay.println("DEBUG", "Found firstChangeNumber " + changeNumberString);
int c = Integer.parseInt(changeNumberString);
if ( ( changeNumber == 0 ) || ( changeNumber > c ) ) {
EclReadAndPlay.println("DEBUG", "Setting changeNumber to " + c );
changeNumber = c;
}
} else {
EclReadAndPlay.println("WARNING", "Cannot find firstChangeNumber, setting it to 1");
changeNumber = EclReadAndPlay.INITIAL_CHANGENUMBER;
}
}
} else if ( EclReadAndPlay.eclMode.equals("opends") ) {
if ( entry != null ) {
attr = entry.getAttribute("changeLogCookie");
if ( attr!= null ) {
eclCookie = attr.getStringValueArray()[0];
EclReadAndPlay.println("DEBUG", "Found changeLogCookie " + eclCookie
+ " for csn " + csn.getValue() );
} else {
EclReadAndPlay.println("WARNING", "Cannot find a changelog entry for csn " + csn );
EclReadAndPlay.println("WARNING", "Will start from the first changelog entry");
eclCookie = EclReadAndPlay.INITIAL_COOKIE;
}
} else {
EclReadAndPlay.println("WARNING", "Cannot find a changelog entry for csn " + csn );
EclReadAndPlay.println("WARNING", "Will start from the first changelog entry");
eclCookie = EclReadAndPlay.INITIAL_COOKIE;
}
}
} /* for (CSN csn: ...) */
synchronized (EclReadAndPlay.lock) {
EclReadAndPlay.lock.notifyAll();
}
String[] attributes = new String[] {"replicationCSN", "replicaIdentifier", "targetDN",
"targetEntryUUID", "changeType", "changes",
"deleteOldRDN", "newRDN", "newSuperior",
"changeNumber", "changeHasReplFixupOp",
"changeLogCookie"};
while (idleTime < EclReadAndPlay.MAX_IDLE_TIME) {
if ( EclReadAndPlay.eclMode.equals("draft") ) {
int limit = changeNumber + (EclReadAndPlay.queueSize - 1);
String filter = "(& (changeNumber>=" + changeNumber + ")(changeNumber<="
+ limit + ") )";
EclReadAndPlay.println("DEBUG", "Getting changes " + changeNumber + " to " + limit);
results = masterConnection.search("cn=changelog", LDAPv3.SCOPE_SUB, filter,
attributes , false );
} else if ( EclReadAndPlay.eclMode.equals("opends") ) {
// --control "1.3.6.1.4.1.26027.1.5.4:false:;"
String filter = "changetype=*";
LDAPSearchConstraints controls = new LDAPSearchConstraints();
LDAPControl eclControl = new LDAPControl("1.3.6.1.4.1.26027.1.5.4", false,
eclCookie.getBytes());
controls.setMaxResults(199);
controls.setServerControls(eclControl);
EclReadAndPlay.println("DEBUG", "Getting changes from cookie: " + eclCookie);
results = masterConnection.search("cn=changelog", LDAPv3.SCOPE_SUB, filter,
attributes , false, controls );
}
if ( ! results.hasMoreElements() ) {
// No new change found in retrocl => sleep 100 ms.
sleep(100);
idleTime += 100;
EclReadAndPlay.println("DEBUG", "No new change found in ECL => have slept for 100ms");
} else {
idleTime = 0;
// Forward all the results found to the application
while ( results.hasMoreElements() ) {
EclReadAndPlay.println("DEBUG", "Going through change entries found in the ECL.");
try {
entry = results.next();
} catch (LDAPException ldapEx) {
if ( ldapEx.getLDAPResultCode() == LDAPException.SIZE_LIMIT_EXCEEDED )
continue;
else
throw ldapEx;
}
//EclReadAndPlay.println("DEBUG", "Changelog entry: " + entry.toString());
try {
// Write the change in the queue
Change change = new Change(entry);
queue.put(change);
} catch (Exception e) {
EclReadAndPlay.println("DEBUG", "Ignoring change " + entry.getDN() );
if ( EclReadAndPlay.eclMode.equals("draft") )
EclReadAndPlay.inc_ignored(changeNumber);
else if ( EclReadAndPlay.eclMode.equals("opends") )
EclReadAndPlay.inc_ignored(eclCookie);
}
if ( EclReadAndPlay.eclMode.equals("draft") ) {
changeNumber++;
EclReadAndPlay.println("DEBUG", "change=" + entry.getDN() + ", changenumber = "
+ changeNumber + ", count =" + results.getCount());
} else if ( EclReadAndPlay.eclMode.equals("opends") ) {
attr = entry.getAttribute("changeLogCookie");
if ( attr != null ) {
eclCookie = attr.getStringValueArray()[0];
EclReadAndPlay.println ("DEBUG", " ECL cookie value ========> " + eclCookie );
}
}
} /* while (result.hasMoreElements()) */
}
if ( EclReadAndPlay.displayMissingChanges == true ) {
if ( EclReadAndPlay.eclMode.equals("draft") ) {
results = masterConnection.search( "", LDAPv3.SCOPE_BASE, "(objectclass=*)",
new String[]{"lastChangeNumber"} , false );
entry = results.next();
attr = entry.getAttribute("lastChangeNumber");
if ( attr != null ) {
EclReadAndPlay.lastChangeNumber = Integer.parseInt(attr.getStringValueArray()[0]);
}
} else if ( EclReadAndPlay.eclMode.equals("opends") ) {
results = masterConnection.search( "", LDAPv3.SCOPE_BASE, "(objectclass=*)",
new String[]{"lastExternalChangelogCookie"} , false );
entry = results.next();
attr = entry.getAttribute("lastExternalChangelogCookie");
if ( attr != null ) {
EclReadAndPlay.lastExternalChangelogCookie = attr.getStringValueArray()[0];
}
}
}
} /* while (idleTime <= EclReadAndPlay.MAX_IDLE_TIME) */
EclReadAndPlay.println("WARNING", "No new changes read in the ECL for " + Integer.toString(idleTime) +
" milliseconds. ======> EXIT");
System.exit(0);
} catch( LDAPException e ) {
int errorCode = e.getLDAPResultCode();
// if server is down => switch
if ( ( errorCode == 91 ) || ( errorCode == 81 ) || ( errorCode == 80 ) ) {
// clear the queue of changes
queue.clear();
EclReadAndPlay.println( "WARNING", "Connection lost to " + master.host + ":" + master.port + ".");
masterN = (masterN+1) % masters.size();
} else {
EclReadAndPlay.println( "ERROR" , e.toString() );
e.printStackTrace();
System.exit(1);
}
}
} /* while (true) */
} catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
}
}