5046N/A/*
5046N/A * CDDL HEADER START
5046N/A *
5046N/A * The contents of this file are subject to the terms of the
5046N/A * Common Development and Distribution License, Version 1.0 only
5046N/A * (the "License"). You may not use this file except in compliance
5046N/A * with the License.
5046N/A *
5046N/A * You can obtain a copy of the license at
5046N/A * trunk/opends/resource/legal-notices/OpenDS.LICENSE
5046N/A * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
5046N/A * See the License for the specific language governing permissions
5046N/A * and limitations under the License.
5046N/A *
5046N/A * When distributing Covered Code, include this CDDL HEADER in each
5046N/A * file and include the License file at
5046N/A * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
5046N/A * add the following below this CDDL HEADER, with the fields enclosed
5046N/A * by brackets "[]" replaced with your own identifying information:
5046N/A * Portions Copyright [yyyy] [name of copyright owner]
5046N/A *
5046N/A * CDDL HEADER END
5046N/A *
5046N/A *
5046N/A * Copyright 2010 Sun Microsystems, Inc.
5046N/A */
5046N/A
5046N/Aimport netscape.ldap.*;
5046N/Aimport netscape.ldap.util.*;
5046N/Aimport java.util.*;
5046N/Aimport java.util.concurrent.BlockingQueue;
5046N/Aimport java.io.*;
5046N/A
5046N/Aclass Reader extends Thread {
5046N/A
5046N/A BlockingQueue<Change> queue;
5046N/A ArrayList<Server> masters;
5046N/A
5046N/A public Reader(BlockingQueue<Change> q, ArrayList<Server> masters) {
5046N/A this.queue = q;
5046N/A this.masters = masters;
5046N/A }
5046N/A
5046N/A public void run() {
5046N/A // master number in the array list
5046N/A int masterN = 0;
5046N/A
5046N/A // ECL "draft" mode index
5046N/A int changeNumber = 0;
5046N/A
5046N/A // ECL "opends" mode index
5046N/A String eclCookie = EclReadAndPlay.INITIAL_COOKIE;
5046N/A
5046N/A
5046N/A try {
5046N/A Server master = null;
5046N/A LDAPConnection masterConnection = new LDAPConnection();
5046N/A LDAPSearchResults results = null;
5046N/A LDAPEntry entry = null;
5046N/A LDAPAttribute attr = null;
5046N/A int idleTime = 0;
5046N/A
5046N/A while (true) {
5046N/A try {
5046N/A master = masters.get(masterN);
5046N/A
5046N/A // Connect to the Directory master
5046N/A EclReadAndPlay.println("INFO", "Connecting to master " + master.host + ":" + master.port + " ......");
5046N/A masterConnection.connect( master.host, master.port );
5046N/A masterConnection.authenticate( 3, EclReadAndPlay.bindDn, EclReadAndPlay.bindPwd );
5046N/A EclReadAndPlay.println("INFO", "...... Connected to master " + master.host + ":" + master.port );
5046N/A
5046N/A // Set changenumber
5046N/A
5046N/A
5046N/A // Try to retrieve the ECL index (changenumber|changelogcookie) of the last update read
5046N/A // ---> use the CSN stored in the file under "db" directory
5046N/A for ( CSN csn: EclReadAndPlay.RUV.values() ) {
5046N/A String filter = "(& (objectclass=changelogentry)(replicationCSN="
5046N/A + csn.getValue() + ") )";
5046N/A results = masterConnection.search( "cn=changelog", LDAPv3.SCOPE_SUB, filter,
5046N/A new String[] {"changeNumber", "changeLogCookie"} ,
5046N/A false );
5046N/A entry = results.next();
5046N/A
5046N/A if ( EclReadAndPlay.eclMode.equals("draft") ) {
5046N/A if ( entry != null ) {
5046N/A attr = entry.getAttribute("changeNumber");
5046N/A if (attr != null) {
5046N/A String changeNumberString = attr.getStringValueArray()[0];
5046N/A EclReadAndPlay.println("DEBUG", "Found changeNumber " + changeNumberString
5046N/A + " for csn " + csn.getValue() );
5046N/A
5046N/A int c = Integer.parseInt(changeNumberString);
5046N/A if ( ( changeNumber == 0 ) || ( changeNumber > c ) ) {
5046N/A EclReadAndPlay.println("DEBUG", "Setting changeNumber to " + ++c );
5046N/A changeNumber = c;
5046N/A }
5046N/A } else {
5046N/A EclReadAndPlay.println("WARNING", "Cannot find changenumber, setting it to 1");
5046N/A changeNumber = EclReadAndPlay.INITIAL_CHANGENUMBER;
5046N/A }
5046N/A } else {
5046N/A EclReadAndPlay.println("WARNING", "Cannot find a changelog entry for csn " + csn );
5046N/A EclReadAndPlay.println("WARNING", "Will start from the first changelog entry");
5046N/A
5046N/A results = masterConnection.search( "", LDAPv3.SCOPE_BASE, "(objectclass=*)",
5046N/A new String[]{"firstChangeNumber"} , false );
5046N/A entry = results.next();
5046N/A attr = entry.getAttribute("firstChangeNumber");
5046N/A if ( attr != null ) {
5046N/A String changeNumberString = attr.getStringValueArray()[0];
5046N/A EclReadAndPlay.println("DEBUG", "Found firstChangeNumber " + changeNumberString);
5046N/A
5046N/A int c = Integer.parseInt(changeNumberString);
5046N/A if ( ( changeNumber == 0 ) || ( changeNumber > c ) ) {
5046N/A EclReadAndPlay.println("DEBUG", "Setting changeNumber to " + c );
5046N/A changeNumber = c;
5046N/A }
5046N/A } else {
5046N/A EclReadAndPlay.println("WARNING", "Cannot find firstChangeNumber, setting it to 1");
5046N/A changeNumber = EclReadAndPlay.INITIAL_CHANGENUMBER;
5046N/A }
5046N/A }
5046N/A } else if ( EclReadAndPlay.eclMode.equals("opends") ) {
5046N/A if ( entry != null ) {
5046N/A attr = entry.getAttribute("changeLogCookie");
5046N/A if ( attr!= null ) {
5046N/A eclCookie = attr.getStringValueArray()[0];
5046N/A EclReadAndPlay.println("DEBUG", "Found changeLogCookie " + eclCookie
5046N/A + " for csn " + csn.getValue() );
5046N/A } else {
5046N/A EclReadAndPlay.println("WARNING", "Cannot find a changelog entry for csn " + csn );
5046N/A EclReadAndPlay.println("WARNING", "Will start from the first changelog entry");
5046N/A eclCookie = EclReadAndPlay.INITIAL_COOKIE;
5046N/A }
5046N/A } else {
5046N/A EclReadAndPlay.println("WARNING", "Cannot find a changelog entry for csn " + csn );
5046N/A EclReadAndPlay.println("WARNING", "Will start from the first changelog entry");
5046N/A eclCookie = EclReadAndPlay.INITIAL_COOKIE;
5046N/A }
5046N/A }
5046N/A } /* for (CSN csn: ...) */
5046N/A
5046N/A synchronized (EclReadAndPlay.lock) {
5046N/A EclReadAndPlay.lock.notifyAll();
5046N/A }
5046N/A
5046N/A String[] attributes = new String[] {"replicationCSN", "replicaIdentifier", "targetDN",
5046N/A "targetEntryUUID", "changeType", "changes",
5046N/A "deleteOldRDN", "newRDN", "newSuperior",
5046N/A "changeNumber", "changeHasReplFixupOp",
5046N/A "changeLogCookie"};
5046N/A
5046N/A while (idleTime < EclReadAndPlay.MAX_IDLE_TIME) {
5046N/A if ( EclReadAndPlay.eclMode.equals("draft") ) {
5046N/A int limit = changeNumber + (EclReadAndPlay.queueSize - 1);
5046N/A
5046N/A String filter = "(& (changeNumber>=" + changeNumber + ")(changeNumber<="
5046N/A + limit + ") )";
5046N/A
5046N/A EclReadAndPlay.println("DEBUG", "Getting changes " + changeNumber + " to " + limit);
5046N/A results = masterConnection.search("cn=changelog", LDAPv3.SCOPE_SUB, filter,
5046N/A attributes , false );
5046N/A
5046N/A } else if ( EclReadAndPlay.eclMode.equals("opends") ) {
5046N/A // --control "1.3.6.1.4.1.26027.1.5.4:false:;"
5046N/A String filter = "changetype=*";
5046N/A LDAPSearchConstraints controls = new LDAPSearchConstraints();
5046N/A LDAPControl eclControl = new LDAPControl("1.3.6.1.4.1.26027.1.5.4", false,
5046N/A eclCookie.getBytes());
5046N/A controls.setMaxResults(199);
5046N/A controls.setServerControls(eclControl);
5046N/A
5046N/A EclReadAndPlay.println("DEBUG", "Getting changes from cookie: " + eclCookie);
5046N/A results = masterConnection.search("cn=changelog", LDAPv3.SCOPE_SUB, filter,
5046N/A attributes , false, controls );
5046N/A }
5046N/A
5046N/A
5046N/A if ( ! results.hasMoreElements() ) {
5046N/A // No new change found in retrocl => sleep 100 ms.
5046N/A sleep(100);
5046N/A idleTime += 100;
5046N/A EclReadAndPlay.println("DEBUG", "No new change found in ECL => have slept for 100ms");
5046N/A } else {
5046N/A idleTime = 0;
5046N/A
5046N/A // Forward all the results found to the application
5046N/A while ( results.hasMoreElements() ) {
5046N/A EclReadAndPlay.println("DEBUG", "Going through change entries found in the ECL.");
5046N/A try {
5046N/A entry = results.next();
5046N/A } catch (LDAPException ldapEx) {
5046N/A if ( ldapEx.getLDAPResultCode() == LDAPException.SIZE_LIMIT_EXCEEDED )
5046N/A continue;
5046N/A else
5046N/A throw ldapEx;
5046N/A }
5046N/A //EclReadAndPlay.println("DEBUG", "Changelog entry: " + entry.toString());
5046N/A try {
5046N/A // Write the change in the queue
5046N/A Change change = new Change(entry);
5046N/A queue.put(change);
5046N/A } catch (Exception e) {
5046N/A EclReadAndPlay.println("DEBUG", "Ignoring change " + entry.getDN() );
5046N/A if ( EclReadAndPlay.eclMode.equals("draft") )
5046N/A EclReadAndPlay.inc_ignored(changeNumber);
5046N/A else if ( EclReadAndPlay.eclMode.equals("opends") )
5046N/A EclReadAndPlay.inc_ignored(eclCookie);
5046N/A }
5046N/A
5046N/A if ( EclReadAndPlay.eclMode.equals("draft") ) {
5046N/A changeNumber++;
5046N/A EclReadAndPlay.println("DEBUG", "change=" + entry.getDN() + ", changenumber = "
5046N/A + changeNumber + ", count =" + results.getCount());
5046N/A } else if ( EclReadAndPlay.eclMode.equals("opends") ) {
5046N/A attr = entry.getAttribute("changeLogCookie");
5046N/A if ( attr != null ) {
5046N/A eclCookie = attr.getStringValueArray()[0];
5046N/A EclReadAndPlay.println ("DEBUG", " ECL cookie value ========> " + eclCookie );
5046N/A }
5046N/A }
5046N/A } /* while (result.hasMoreElements()) */
5046N/A
5046N/A }
5046N/A
5046N/A if ( EclReadAndPlay.displayMissingChanges == true ) {
5046N/A if ( EclReadAndPlay.eclMode.equals("draft") ) {
5046N/A results = masterConnection.search( "", LDAPv3.SCOPE_BASE, "(objectclass=*)",
5046N/A new String[]{"lastChangeNumber"} , false );
5046N/A entry = results.next();
5046N/A attr = entry.getAttribute("lastChangeNumber");
5046N/A if ( attr != null ) {
5046N/A EclReadAndPlay.lastChangeNumber = Integer.parseInt(attr.getStringValueArray()[0]);
5046N/A }
5046N/A } else if ( EclReadAndPlay.eclMode.equals("opends") ) {
5046N/A results = masterConnection.search( "", LDAPv3.SCOPE_BASE, "(objectclass=*)",
5046N/A new String[]{"lastExternalChangelogCookie"} , false );
5046N/A entry = results.next();
5046N/A attr = entry.getAttribute("lastExternalChangelogCookie");
5046N/A if ( attr != null ) {
5046N/A EclReadAndPlay.lastExternalChangelogCookie = attr.getStringValueArray()[0];
5046N/A }
5046N/A }
5046N/A }
5046N/A
5046N/A } /* while (idleTime <= EclReadAndPlay.MAX_IDLE_TIME) */
5046N/A
5046N/A EclReadAndPlay.println("WARNING", "No new changes read in the ECL for " + Integer.toString(idleTime) +
5046N/A " milliseconds. ======> EXIT");
5046N/A System.exit(0);
5046N/A } catch( LDAPException e ) {
5046N/A
5046N/A int errorCode = e.getLDAPResultCode();
5046N/A
5046N/A // if server is down => switch
5046N/A if ( ( errorCode == 91 ) || ( errorCode == 81 ) || ( errorCode == 80 ) ) {
5046N/A // clear the queue of changes
5046N/A queue.clear();
5046N/A EclReadAndPlay.println( "WARNING", "Connection lost to " + master.host + ":" + master.port + ".");
5046N/A masterN = (masterN+1) % masters.size();
5046N/A } else {
5046N/A EclReadAndPlay.println( "ERROR" , e.toString() );
5046N/A e.printStackTrace();
5046N/A System.exit(1);
5046N/A }
5046N/A }
5046N/A } /* while (true) */
5046N/A
5046N/A } catch (Exception e) {
5046N/A e.printStackTrace();
5046N/A System.exit(1);
5046N/A }
5046N/A }
5046N/A
5046N/A}