EventService.java revision 57a1b25dcdf865eacb2fe2e17c5ca83e942da047
/**
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
*
* Copyright (c) 2005 Sun Microsystems Inc. All Rights Reserved
*
* The contents of this file are subject to the terms
* of the Common Development and Distribution License
* (the License). You may not use this file except in
* compliance with the License.
*
* You can obtain a copy of the License at
* See the License for the specific language governing
* permission and limitations under the License.
*
* When distributing Covered Code, include this CDDL
* Header Notice in each file and include the License file
* at opensso/legal/CDDLv1.0.txt.
* If applicable, add the following below the CDDL Header,
* with the fields enclosed by brackets [] replaced by
* your own identifying information:
* "Portions Copyrighted [year] [name of copyright owner]"
*
* $Id: EventService.java,v 1.19 2009/09/28 21:47:33 ww203982 Exp $
*
*/
/*
* Portions Copyrighted 2010-2011 ForgeRock AS
*/
/**
* Event Service monitors changes on the server. Implemented with the persistant
* search control. Uses ldapjdk asynchronous interfaces so that multiple search
* requests can be processed by a single thread
*
* The Type of changes that can be monitored are: -
* LDAPPersistSearchControl.ADD -
* LDAPPersistSearchControl.DELETE - LDAPPersistSearchControl.MODIFY -
* LDAPPersistSearchControl.MODDN
*
* A single connection is established initially and reused to service all
* notification requests.
* @supported.api
*/
public class EventService implements Runnable {
// list that holds notification requests
// Thread that listens to DS notifications
// search listener for asynch ldap searches
static LDAPSearchListener _msgQueue;
// A singelton patern
// Don't want the server to return all the
// entries. return only the changes.
private static final boolean CHANGES_ONLY = true;
// Want the server to return the entry
// change control in the search result
private static final boolean RETURN_CONTROLS = true;
// Don't perform search if Persistent
// Search control is not supported.
private static final boolean IS_CRITICAL = true;
// Parameters in AMConfig, that provide values for connection retries
protected static final String EVENT_CONNECTION_NUM_RETRIES =
"com.iplanet.am.event.connection.num.retries";
protected static final String EVENT_CONNECTION_RETRY_INTERVAL =
"com.iplanet.am.event.connection.delay.between.retries";
protected static final String EVENT_CONNECTION_ERROR_CODES =
"com.iplanet.am.event.connection.ldap.error.codes.retries";
// Idle timeout in minutes
protected static final String EVENT_IDLE_TIMEOUT_INTERVAL =
"com.sun.am.event.connection.idle.timeout";
protected static final String EVENT_LISTENER_DISABLE_LIST =
"com.sun.am.event.connection.disable.list";
private static boolean _allDisabled = false;
private static int _numRetries = 3;
private static int _retryInterval = 3000;
private static int _retryCount = 1;
private static long _lastResetTime = 0;
protected static HashSet _retryErrorCodes;
// Connection Time Out parameters
protected static long _idleTimeOutMills;
// List of know listeners. The order of the listeners is important
// since it is used to enable & disable the listeners
private static final String[] ALL_LISTENERS = {
"com.iplanet.am.sdk.ldap.ACIEventListener",
"com.iplanet.am.sdk.ldap.EntryEventListener",
"com.sun.identity.sm.ldap.LDAPEventManager"
};
protected static volatile boolean _isThreadStarted = false;
protected static volatile boolean _shutdownCalled = false;
while (stz.hasMoreTokens()) {
}
}
return codes;
}
int value = defaultValue;
try {
} catch (NumberFormatException e) {
if (debugger.warningEnabled()) {
+ "Invalid value for property: "
+ " Defaulting to value: " + defaultValue);
}
}
}
if (debugger.messageEnabled()) {
+ " = " + value);
}
return value;
}
/**
* Determine the listener list based on the diable list property
* and SMS DataStore notification property in Realm mode
*/
private static void getListenerList() {
if (debugger.messageEnabled()) {
}
if (debugger.messageEnabled()) {
"com.sun.identity.sm.enableDataStoreNotification: " +
}
if (debugger.messageEnabled()) {
}
// Copy the default listeners
// Process the configured disabled list first
while (st.hasMoreTokens()) {
disableACI = true;
disableUM = true;
disableSM = true;
} else {
"Invalid listener name: " + listener);
}
}
}
if (!disableUM || !disableACI) {
// Check if AMSDK is configured
boolean disableAMSDK = true;
if (!configTime) {
try {
disableAMSDK = false;
}
} catch (SMSException ex) {
if (debugger.warningEnabled()) {
"Unable to obtain idrepo service", ex);
}
} catch (SSOException ex) {
// Should not happen, ignore the exception
}
}
if (disableAMSDK) {
disableUM = true;
disableACI = true;
if (debugger.messageEnabled()) {
"List(): AMSDK is not configured or config time. " +
"Disabling UM and ACI event listeners");
}
}
}
// Verify if SMSnotification should be enabled
if (debugger.messageEnabled()) {
"mode or config time, SMS listener is set to datastore " +
"notification flag: " + enableDataStoreNotification);
}
}
// Disable the selected listeners
if (disableACI) {
}
if (disableUM) {
}
if (disableSM) {
}
// if all disabled, signal to not start the thread
if (debugger.messageEnabled()) {
"all listeners are disabled, EventService won't start");
}
_allDisabled = true;
} else {
_allDisabled = false;
}
}
/**
* Private Constructor
*/
protected EventService() throws EventException {
}
/**
* create the singelton EventService object if it doesn't exist already.
* Check if directory server supports the Persistent Search Control and the
* Proxy Auth Control
* @supported.api
*/
public synchronized static EventService getEventService()
throws EventException, LDAPException {
if (_shutdownCalled) {
return null;
}
// Make sure only one instance of this class is created.
// scenarios. Value == 0 imples no idle timeout.
if (_idleTimeOut == 0) {
_instance = new EventService();
} else {
_instance = new EventServicePolling();
}
ShutdownListener() {
public void shutdown() {
}
}
});
}
return _instance;
}
return "EventService";
}
/**
* At the end, close THE Event Manager's connections Abandon all previous
* persistent search requests
* @supported.api
*/
public void finalize() {
synchronized (this) {
_shutdownCalled = true;
_isThreadStarted = false;
}
}
synchronized (_requestList) {
}
}
}
/**
* Adds a listener to the directory.
* @supported.api
*/
if (_shutdownCalled) {
throw new EventException(i18n
}
try {
// Check for SMS listener and use "sms" group if present
"com.sun.identity.sm.ldap.LDAPEventManager")) &&
} else {
}
} catch (LDAPServiceException le) {
throw new EventException(i18n
}
// Create Persistent Search Control object
// Add LDAPControl array to the search constraint object
// Listeners can not read attributes from the event.
// Request only javaClassName to be able to determine object type
// Set (asynchronous) persistent search request in the DS
try {
if (debugger.messageEnabled()) {
+ listener);
}
} catch (LDAPException le) {
try {
lc.disconnect();
//ignored
}
}
throw le;
}
// Add this search request to the m_msgQueue so it can be
// processed by the monitor thread
} else {
}
if (!_isThreadStarted) {
} else {
notify();
}
}
if (debugger.messageEnabled()) {
+ " on to message Queue. No. of current outstanding "
}
// Create new (EventService) Thread, if one doesn't exist.
return reqID;
}
}
public static boolean isThreadStarted() {
return _isThreadStarted;
}
/**
* Main monitor thread loop. Wait for persistent search change notifications
*
* @supported.api
*/
public void run() {
try {
if (debugger.messageEnabled()) {
}
boolean successState = true;
while (successState) {
try {
if (debugger.messageEnabled()) {
+ "response");
}
synchronized (this) {
if (_requestList.isEmpty()) {
wait();
}
}
} catch (LDAPInterruptedException ex) {
if (_shutdownCalled) {
break;
} else {
if (debugger.warningEnabled()) {
"LDAPInterruptedException received:", ex);
}
}
} catch (LDAPException ex) {
if (_shutdownCalled) {
break;
} else {
if (debugger.warningEnabled()) {
+ "received:", ex);
}
// Catch special error codition in
// LDAPSearchListener.getResponse
// We should not try to resetError and retry
} else {
resetErrorSearches(true);
} else { // Some other network error
}
}
}
}
} // end of while loop
} catch (InterruptedException ex) {
if (!_shutdownCalled) {
if (debugger.warningEnabled()) {
+ " caught.", ex);
}
}
} catch (RuntimeException ex) {
if (debugger.warningEnabled()) {
+ "caught.", ex);
}
// rethrow the Runtime exception to let the container handle the
// exception.
throw ex;
if (debugger.warningEnabled()) {
+ "caught.", ex);
}
// no need to rethrow.
} catch (Throwable t) {
// Catching Throwable to prevent the thread from exiting.
if (debugger.warningEnabled()) {
+ "caught. Sleeping for a while.. ", t);
}
// rethrow the Error to let the container handle the error.
throw new Error(t);
} finally {
synchronized (this) {
if (!_shutdownCalled) {
// try to restart the monitor thread.
}
}
}
} // end of thread
private static synchronized void startMonitorThread() {
!_shutdownCalled) {
// Even if the monitor thread is not alive, we should use the
// same instance of Event Service object (as it maintains all
// the listener information)
_monitorThread.setDaemon(true);
// Since this is a singleton class once a getEventService()
// is invoked the thread will be started and the variable
// will be set to true. This will help other components
// to avoid starting it once again if the thread has
// started.
_isThreadStarted = true;
}
}
protected boolean retryManager(boolean clearCaches) {
// reset _retryCount to 1 after 12 hours
_retryCount = 1;
}
int i = _retryCount * _retryInterval;
if (i > _retryMaxInterval) {
i = _retryMaxInterval;
} else {
_retryCount *= 2;
}
if (debugger.messageEnabled()) {
(i / 1000) +" seconds before calling resetAllSearches");
}
return resetAllSearches(clearCaches);
}
/**
* Method which process the Response received from the DS.
*
* @param message -
* the LDAPMessage received as response
* @return true if the reset was successful. False Otherwise.
*/
// Some problem with the message queue. We should
// try to reset it.
return retryManager(false);
}
if (debugger.messageEnabled()) {
}
// To determine if the monitor thread needs to be stopped.
boolean successState = true;
// If no listeners, abandon this message id
// We do not have anything stored about this message id.
// So, just log a message and do nothing.
if (debugger.messageEnabled()) {
+ "ldap message with unknown id = "
+ message.getMessageID());
}
} else if (message.getMessageType() ==
// then must be a LDAPSearchResult carrying change control
} else if (message.getMessageType() ==
// Check for error message ...
} else if (message.getMessageType() ==
}
return successState;
}
/**
* removes the listener from the list of Persistent Search listeners of the
* asynchronous seach for the given search ID.
*
* @param request
* The request returned by the addListener
* @supported.api
*/
if (connection != null) {
if (debugger.messageEnabled()) {
}
try {
}
} catch (LDAPException le) {
// Might have to check the reset codes and try to reset
if (debugger.warningEnabled()) {
+ "LDAPException, when trying to remove listener",
le);
}
}
}
}
/**
* Reset error searches. Clear cache only if true is passed to argument
*
* @param clearCaches
*/
protected void resetErrorSearches(boolean clearCaches) {
}
}
}
}
/**
* Reset all searches. Clear cache only if true is passed to argument
*
* @param clearCaches
* @return <code>true</code> if the reset was successful, otherwise <code>false</code>
*/
public synchronized boolean resetAllSearches(boolean clearCaches) {
if (_shutdownCalled) {
return false;
}
// Make a copy of the existing psearches
// Clear the cache, if parameter is set
}
}
// Get the list of psearches to be enabled
if (_allDisabled) {
// All psearches are disabled, remove listeners if any and return
if (debugger.messageEnabled()) {
"All psearches have been disabled");
}
if (debugger.messageEnabled()) {
"Psearch disabled: " +
}
}
}
return true;
}
// Psearches are enabled, verify and reinitilize
// Maintain the listeners to reinitialized in tmpListenerList
// Check if the listener is present in reqList
boolean present = false;
present = true;
}
}
if (!present) {
// Add the listner object
if (debugger.messageEnabled()) {
"Psearch being added: " + listeners[i]);
}
}
}
}
// Remove the listeners not configured
if (debugger.messageEnabled()) {
"Psearch disabled due to configuration changes: " +
}
}
}
// Reset the requested list
// Determine the number of retry attempts in case of failure
// If retry property is set to -1, retries will be done infinitely
int retry = 1;
(retry <= _numRetries))) ? true : false;
while (doItAgain) {
if (debugger.messageEnabled()) {
+ "retrying = " + str);
}
// Note: Avoid setting the messageQueue to null and just
// try to disconnect the connections. That way we can be sure
// that we have not lost any responses.
try {
// First add a new listener and then remove the old one
// that we do don't loose any responses to the message
// Queue.
} catch (LDAPServiceException e) {
// Ignore exception and retry as we are in the process of
// re-establishing the searches. Notify Listeners after the
// attempt
if (retry == _numRetries) {
}
} catch (LDAPException le) {
// Ignore exception and retry as we are in the process of
// re-establishing the searches. Notify Listeners after the
// attempt
if (retry == _numRetries) {
}
}
}
// Check if new listeners need to be added
try {
if (debugger.messageEnabled()) {
"successfully initialized: " + listnerClass);
}
} catch (Exception e) {
"Unable to start listener " + listnerClass, e);
}
}
return true;
} else {
if (_numRetries != -1) {
if (!doItAgain) {
// remove the requests fail to be resetted
// would try to reinitialized the next time
"Searches(): unable to restart: " +
}
"Searches(): unable add listener: " + req);
}
}
}
}
if (doItAgain) {
// Sleep before retry
}
} // end while loop
return false;
}
protected void sleepRetryInterval() {
try {
} catch (InterruptedException ie) {
// ignore
}
}
protected void sleepRetryInterval(int interval) {
try {
}
}
/**
* get a handle to the Directory Server Configuration Manager sets the value
*/
protected static void getConfigManager() throws EventException {
try {
} catch (LDAPServiceException lse) {
+ "handle to Configuration Manager", lse);
throw new EventException(i18n
}
}
}
/**
* Dispatch naming event to all listeners
*/
}
/**
* On network error, create ExceptionEvent and delever it to all listeners
* on all events.
*/
}
}
}
}
/**
* Response message carries a LDAP error. Response with the code 0
* (SUCCESS), should never be received as persistent search never completes,
* it has to be abandon. Referral messages are ignored
*/
if (debugger.messageEnabled()) {
+ "received LDAP Response for requestID: "
}
resetErrorSearches(false);
} else if (resultCode != 0
// If not neither of the cases then
return retryManager(false);
}
rsp.getMatchedDN());
}
return true;
}
/**
* Process change notification attached as the change control to the message
*/
if (debugger.messageEnabled()) {
}
/* Get any entry change controls. */
// Can not create event without change control
+ "NamingEvent, no change control info");
} else {
// Multiple controls might be in the message
if (debugger.messageEnabled()) {
+ "processSearchResultMessage() changeCtrl = "
+ changeCtrl.toString());
}
// Can not create event without change control
+ "create NamingEvent, no change control info");
}
// Convert control into a DSEvent and dispatch to listeners
try {
}
}
}
}
}
/**
* Search continuation messages are ignored.
*/
// Do nothing, message ignored, do not dispatch ExceptionEvent
if (debugger.messageEnabled()) {
+ "Ignoring..");
}
}
}
/**
* Find event entry by message ID
*/
}
/**
* Create naming event from a change control
*/
if (debugger.messageEnabled()) {
}
// Get the dn from the entry
// Get information on the type of change made
// Pass the search ID as the event's change info
// set the object class name
return dsEvent;
}
class RetryTask extends GeneralTaskRunnable {
private long runPeriod;
private boolean clearCaches;
private int numRetries;
this.runPeriod = getPropertyIntValue(
this.numRetries = _numRetries;
}
public void clearCache(boolean cc) {
clearCaches = cc;
}
public void run() {
try {
// First add a new listener and then remove the old one
// that we do don't loose any responses to the message
// Queue. However before adding check if request list
// already has this listener initialized
}
if (clearCaches) {
// Send all entries changed notifications
// only after successful establishment of psearch
}
} catch (Exception e) {
// Ignore exception and retry as we are in the process of
// re-establishing the searches. Notify Listeners after the
// attempt
}
}
if (--numRetries == 0) {
runPeriod = -1;
}
}
public long getRunPeriod() {
return runPeriod;
}
public boolean isEmpty() {
return true;
}
return false;
}
return false;
}
}
}