package com.sun.jmx.remote.internal;
import java.io.IOException;
import java.io.NotSerializableException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.security.AccessControlContext;
import java.security.AccessController;
import java.security.PrivilegedAction;
import javax.security.auth.Subject;
import javax.management.Notification;
import javax.management.NotificationListener;
import javax.management.NotificationFilter;
import javax.management.ObjectName;
import javax.management.MBeanServerNotification;
import javax.management.InstanceNotFoundException;
import javax.management.ListenerNotFoundException;
import javax.management.remote.NotificationResult;
import javax.management.remote.TargetedNotification;
import com.sun.jmx.remote.util.ClassLogger;
import com.sun.jmx.remote.util.EnvHelp;
public abstract class ClientNotifForwarder {
private final AccessControlContext acc;
public ClientNotifForwarder(Map env) {
this(null, env);
private static int threadId;
/* An Executor that allows at most one executing and one pending
Runnable. It uses at most one thread -- as soon as there is
no pending Runnable the thread can exit. Another thread is
created as soon as there is a new pending Runnable. This
Executor is adapted for use in a situation where each Runnable
usually schedules up another Runnable. On return from the
first one, the second one is immediately executed. So this
just becomes a complicated way to write a while loop, but with
the advantage that you can replace it with another Executor,
for instance one that you are using to execute a bunch of other
unrelated work.
You might expect that a java.util.concurrent.ThreadPoolExecutor
with corePoolSize=0 and maximumPoolSize=1 would have the same
behavior, but it does not. A ThreadPoolExecutor only creates
a new thread when a new task is submitted and the number of
existing threads is < corePoolSize. This can never happen when
corePoolSize=0, so new threads are never created. Surprising,
but there you are.
private static class LinearExecutor implements Executor {
public synchronized void execute(Runnable command) {
if (this.command != null)
throw new IllegalArgumentException("More than one command");
this.command = command;
if (thread == null) {
thread = new Thread() {
public void run() {
while (true) {
Runnable r;
synchronized (LinearExecutor.this) {
if (LinearExecutor.this.command == null) {
thread = null;
} else {
r = LinearExecutor.this.command;
LinearExecutor.this.command = null;
thread.setName("ClientNotifForwarder-" + ++threadId);
private Runnable command;
private Thread thread;
public ClientNotifForwarder(ClassLoader defaultClassLoader, Map<String, ?> env) {
maxNotifications = EnvHelp.getMaxFetchNotifNumber(env);
timeout = EnvHelp.getFetchTimeout(env);
/* You can supply an Executor in which the remote call to
fetchNotifications will be made. The Executor's execute
method reschedules another task, so you must not use
an Executor that executes tasks in the caller's thread. */
Executor ex = (Executor)
if (ex == null)
ex = new LinearExecutor();
else if (logger.traceOn())
logger.trace("ClientNotifForwarder", "executor is " + ex);
this.defaultClassLoader = defaultClassLoader;
this.executor = ex;
this.acc = AccessController.getContext();
* Called to to fetch notifications from a server.
abstract protected NotificationResult fetchNotifs(long clientSequenceNumber,
int maxNotifications,
long timeout)
throws IOException, ClassNotFoundException;
abstract protected Integer addListenerForMBeanRemovedNotif()
throws IOException, InstanceNotFoundException;
abstract protected void removeListenerForMBeanRemovedNotif(Integer id)
throws IOException, InstanceNotFoundException,
* Used to send out a notification about lost notifs
abstract protected void lostNotifs(String message, long number);
public synchronized void addNotificationListener(Integer listenerID,
ObjectName name,
NotificationListener listener,
NotificationFilter filter,
Object handback,
Subject delegationSubject)
throws IOException, InstanceNotFoundException {
if (logger.traceOn()) {
"Add the listener "+listener+" at "+name);
new ClientListenerInfo(listenerID,
public synchronized Integer[]
removeNotificationListener(ObjectName name,
NotificationListener listener)
throws ListenerNotFoundException, IOException {
if (logger.traceOn()) {
"Remove the listener "+listener+" from "+name);
List<Integer> ids = new ArrayList<Integer>();
List<ClientListenerInfo> values =
new ArrayList<ClientListenerInfo>(infoList.values());
for (int i=values.size()-1; i>=0; i--) {
ClientListenerInfo li = values.get(i);
if (li.sameAs(name, listener)) {
if (ids.isEmpty())
throw new ListenerNotFoundException("Listener not found");
return ids.toArray(new Integer[0]);
public synchronized Integer
removeNotificationListener(ObjectName name,
NotificationListener listener,
NotificationFilter filter,
Object handback)
throws ListenerNotFoundException, IOException {
if (logger.traceOn()) {
"Remove the listener "+listener+" from "+name);
Integer id = null;
List<ClientListenerInfo> values =
new ArrayList<ClientListenerInfo>(infoList.values());
for (int i=values.size()-1; i>=0; i--) {
ClientListenerInfo li = values.get(i);
if (li.sameAs(name, listener, filter, handback)) {
if (id == null)
throw new ListenerNotFoundException("Listener not found");
return id;
public synchronized Integer[] removeNotificationListener(ObjectName name) {
if (logger.traceOn()) {
"Remove all listeners registered at "+name);
List<Integer> ids = new ArrayList<Integer>();
List<ClientListenerInfo> values =
new ArrayList<ClientListenerInfo>(infoList.values());
for (int i=values.size()-1; i>=0; i--) {
ClientListenerInfo li = values.get(i);
if (li.sameAs(name)) {
return ids.toArray(new Integer[0]);
* Called when a connector is doing reconnection. Like <code>postReconnection</code>,
* this method is intended to be called only by a client connector:
* <code>RMIConnector</code> and <code>ClientIntermediary</code>.
* Call this method will set the flag beingReconnection to <code>true</code>,
* and the thread used to fetch notifis will be stopped, a new thread can be
* created only after the method <code>postReconnection</code> is called.
* It is caller's responsiblity to not re-call this method before calling
* <code>postReconnection</code>.
public synchronized ClientListenerInfo[] preReconnection() throws IOException {
if (state == TERMINATED || beingReconnected) { // should never
throw new IOException("Illegal state.");
final ClientListenerInfo[] tmp =
infoList.values().toArray(new ClientListenerInfo[0]);
beingReconnected = true;
return tmp;
* Called after reconnection is finished.
* This method is intended to be called only by a client connector:
* <code>RMIConnector</code> and <code>ClientIntermediary</code>.
public synchronized void postReconnection(ClientListenerInfo[] listenerInfos)
throws IOException {
if (state == TERMINATED) {
while (state == STOPPING) {
try {
} catch (InterruptedException ire) {
IOException ioe = new IOException(ire.toString());
EnvHelp.initCause(ioe, ire);
throw ioe;
final boolean trace = logger.traceOn();
final int len = listenerInfos.length;
for (int i=0; i<len; i++) {
if (trace) {
"Add a listener at "+
infoList.put(listenerInfos[i].getListenerID(), listenerInfos[i]);
beingReconnected = false;
if (currentFetchThread == Thread.currentThread() ||
state == STARTING || state == STARTED) { // doing or waiting reconnection
// only update mbeanRemovedNotifID
try {
mbeanRemovedNotifID = addListenerForMBeanRemovedNotif();
} catch (Exception e) {
final String msg =
"Failed to register a listener to the mbean " +
"server: the client will not do clean when an MBean " +
"is unregistered";
if (logger.traceOn()) {
logger.trace("init", msg, e);
} else {
while (state == STOPPING) {
try {
} catch (InterruptedException ire) {
IOException ioe = new IOException(ire.toString());
EnvHelp.initCause(ioe, ire);
throw ioe;
if (listenerInfos.length > 0) { // old listeners are re-added
init(true); // not update clientSequenceNumber
} else if (infoList.size() > 0) { // only new listeners added during reconnection
init(false); // need update clientSequenceNumber
public synchronized void terminate() {
if (state == TERMINATED) {
if (logger.traceOn()) {
logger.trace("terminate", "Terminating...");
if (state == STARTED) {
// -------------------------------------------------
// private classes
// -------------------------------------------------
private class NotifFetcher implements Runnable {
private volatile boolean alreadyLogged = false;
private void logOnce(String msg, SecurityException x) {
if (alreadyLogged) return;
// Log only once.
if (x != null) logger.fine("setContextClassLoader", x);
alreadyLogged = true;
// Set new context class loader, returns previous one.
private final ClassLoader setContextClassLoader(final ClassLoader loader) {
final AccessControlContext ctxt = ClientNotifForwarder.this.acc;
// if ctxt is null, log a config message and throw a
// SecurityException.
if (ctxt == null) {
logOnce("AccessControlContext must not be null.",null);
throw new SecurityException("AccessControlContext must not be null");
return AccessController.doPrivileged(
new PrivilegedAction<ClassLoader>() {
public ClassLoader run() {
try {
// get context class loader - may throw
// SecurityException - though unlikely.
final ClassLoader previous =
// if nothing needs to be done, break here...
if (loader == previous) return previous;
// reset context class loader - may throw
// SecurityException
return previous;
} catch (SecurityException x) {
logOnce("Permission to set ContextClassLoader missing. " +
"Notifications will not be dispatched. " +
"Please check your Java policy configuration: " +
x, x);
throw x;
}, ctxt);
public void run() {
final ClassLoader previous;
if (defaultClassLoader != null) {
previous = setContextClassLoader(defaultClassLoader);
} else {
previous = null;
try {
} finally {
if (defaultClassLoader != null) {
private void doRun() {
synchronized (ClientNotifForwarder.this) {
currentFetchThread = Thread.currentThread();
if (state == STARTING) {
NotificationResult nr = null;
if (!shouldStop() && (nr = fetchNotifs()) != null) {
// nr == null means got exception
final TargetedNotification[] notifs =
final int len = notifs.length;
final Map<Integer, ClientListenerInfo> listeners;
final Integer myListenerID;
long missed = 0;
synchronized(ClientNotifForwarder.this) {
// check sequence number.
if (clientSequenceNumber >= 0) {
missed = nr.getEarliestSequenceNumber() -
clientSequenceNumber = nr.getNextSequenceNumber();
listeners = new HashMap<Integer, ClientListenerInfo>();
for (int i = 0 ; i < len ; i++) {
final TargetedNotification tn = notifs[i];
final Integer listenerID = tn.getListenerID();
// check if an mbean unregistration notif
if (!listenerID.equals(mbeanRemovedNotifID)) {
final ClientListenerInfo li = infoList.get(listenerID);
if (li != null) {
listeners.put(listenerID, li);
final Notification notif = tn.getNotification();
final String unreg =
if (notif instanceof MBeanServerNotification &&
notif.getType().equals(unreg)) {
MBeanServerNotification mbsn =
(MBeanServerNotification) notif;
ObjectName name = mbsn.getMBeanName();
myListenerID = mbeanRemovedNotifID;
if (missed > 0) {
final String msg =
"May have lost up to " + missed +
" notification" + (missed == 1 ? "" : "s");
lostNotifs(msg, missed);
logger.trace("NotifFetcher.run", msg);
// forward
for (int i = 0 ; i < len ; i++) {
final TargetedNotification tn = notifs[i];
synchronized (ClientNotifForwarder.this) {
currentFetchThread = null;
if (nr == null || shouldStop()) {
// tell that the thread is REALLY stopped
try {
} catch (Exception e) {
if (logger.traceOn()) {
"removeListenerForMBeanRemovedNotif", e);
} else {
void dispatchNotification(TargetedNotification tn,
Integer myListenerID,
Map<Integer, ClientListenerInfo> listeners) {
final Notification notif = tn.getNotification();
final Integer listenerID = tn.getListenerID();
if (listenerID.equals(myListenerID)) return;
final ClientListenerInfo li = listeners.get(listenerID);
if (li == null) {
"Listener ID not in map");
NotificationListener l = li.getListener();
Object h = li.getHandback();
try {
l.handleNotification(notif, h);
} catch (RuntimeException e) {
final String msg =
"Failed to forward a notification " +
"to a listener";
logger.trace("NotifFetcher-run", msg, e);
private NotificationResult fetchNotifs() {
try {
NotificationResult nr = ClientNotifForwarder.this.
if (logger.traceOn()) {
"Got notifications from the server: "+nr);
return nr;
} catch (ClassNotFoundException e) {
logger.trace("NotifFetcher.fetchNotifs", e);
return fetchOneNotif();
} catch (NotSerializableException e) {
logger.trace("NotifFetcher.fetchNotifs", e);
return fetchOneNotif();
} catch (IOException ioe) {
if (!shouldStop()) {
"Failed to fetch notification, " +
"stopping thread. Error is: " + ioe, ioe);
// no more fetching
return null;
/* Fetch one notification when we suspect that it might be a
notification that we can't deserialize (because of a
missing class). First we ask for 0 notifications with 0
timeout. This allows us to skip sequence numbers for
notifications that don't match our filters. Then we ask
for one notification. If that produces a
ClassNotFoundException or a NotSerializableException, we
increase our sequence number and ask again. Eventually we
will either get a successful notification, or a return with
0 notifications. In either case we can return a
NotificationResult. This algorithm works (albeit less
well) even if the server implementation doesn't optimize a
request for 0 notifications to skip sequence numbers for
notifications that don't match our filters.
If we had at least one ClassNotFoundException, then we
must emit a JMXConnectionNotification.LOST_NOTIFS.
private NotificationResult fetchOneNotif() {
ClientNotifForwarder cnf = ClientNotifForwarder.this;
long startSequenceNumber = clientSequenceNumber;
int notFoundCount = 0;
NotificationResult result = null;
long firstEarliest = -1;
while (result == null && !shouldStop()) {
NotificationResult nr;
try {
// 0 notifs to update startSequenceNumber
nr = cnf.fetchNotifs(startSequenceNumber, 0, 0L);
} catch (ClassNotFoundException e) {
"Impossible exception: " + e);
return null;
} catch (IOException e) {
if (!shouldStop())
logger.trace("NotifFetcher.fetchOneNotif", e);
return null;
if (shouldStop())
return null;
startSequenceNumber = nr.getNextSequenceNumber();
if (firstEarliest < 0)
firstEarliest = nr.getEarliestSequenceNumber();
try {
// 1 notif to skip possible missing class
result = cnf.fetchNotifs(startSequenceNumber, 1, 0L);
} catch (Exception e) {
if (e instanceof ClassNotFoundException
|| e instanceof NotSerializableException) {
"Failed to deserialize a notification: "+e.toString());
if (logger.traceOn()) {
"Failed to deserialize a notification.", e);
} else {
if (!shouldStop())
logger.trace("NotifFetcher.fetchOneNotif", e);
return null;
if (notFoundCount > 0) {
final String msg =
"Dropped " + notFoundCount + " notification" +
(notFoundCount == 1 ? "" : "s") +
" because classes were missing locally";
lostNotifs(msg, notFoundCount);
// Even if result.getEarliestSequenceNumber() is now greater than
// it was initially, meaning some notifs have been dropped
// from the buffer, we don't want the caller to see that
// because it is then likely to renotify about the lost notifs.
// So we put back the first value of earliestSequenceNumber
// that we saw.
if (result != null) {
result = new NotificationResult(
firstEarliest, result.getNextSequenceNumber(),
return result;
private boolean shouldStop() {
synchronized (ClientNotifForwarder.this) {
if (state != STARTED) {
return true;
} else if (infoList.size() == 0) {
// no more listener, stop fetching
return true;
return false;
// -------------------------------------------------
// private methods
// -------------------------------------------------
private synchronized void setState(int newState) {
if (state == TERMINATED) {
state = newState;
* Called to decide whether need to start a thread for fetching notifs.
* <P>The parameter reconnected will decide whether to initilize the clientSequenceNumber,
* initilaizing the clientSequenceNumber means to ignore all notifications arrived before.
* If it is reconnected, we will not initialize in order to get all notifications arrived
* during the reconnection. It may cause the newly registered listeners to receive some
* notifications arrived before its registray.
private synchronized void init(boolean reconnected) throws IOException {
switch (state) {
throw new IOException("The ClientNotifForwarder has been terminated.");
if (beingReconnected == true) {
// wait for another thread to do, which is doing reconnection
while (state == STOPPING) { // make sure only one fetching thread.
try {
} catch (InterruptedException ire) {
IOException ioe = new IOException(ire.toString());
EnvHelp.initCause(ioe, ire);
throw ioe;
// re-call this method to check the state again,
// the state can be other value like TERMINATED.
if (beingReconnected == true) {
// wait for another thread to do, which is doing reconnection
if (logger.traceOn()) {
logger.trace("init", "Initializing...");
// init the clientSequenceNumber if not reconnected.
if (!reconnected) {
try {
NotificationResult nr = fetchNotifs(-1, 0, 0);
clientSequenceNumber = nr.getNextSequenceNumber();
} catch (ClassNotFoundException e) {
// can't happen
logger.warning("init", "Impossible exception: "+ e);
// for cleaning
try {
mbeanRemovedNotifID = addListenerForMBeanRemovedNotif();
} catch (Exception e) {
final String msg =
"Failed to register a listener to the mbean " +
"server: the client will not do clean when an MBean " +
"is unregistered";
if (logger.traceOn()) {
logger.trace("init", msg, e);
// start fetching
executor.execute(new NotifFetcher());
// should not
throw new IOException("Unknown state.");
* Import: should not remove a listener during reconnection, the reconnection
* needs to change the listener list and that will possibly make removal fail.
private synchronized void beforeRemove() throws IOException {
while (beingReconnected) {
if (state == TERMINATED) {
throw new IOException("Terminated.");
try {
} catch (InterruptedException ire) {
IOException ioe = new IOException(ire.toString());
EnvHelp.initCause(ioe, ire);
throw ioe;
if (state == TERMINATED) {
throw new IOException("Terminated.");
// -------------------------------------------------
// private variables
// -------------------------------------------------
private final ClassLoader defaultClassLoader;
private final Executor executor;
private final Map<Integer, ClientListenerInfo> infoList =
new HashMap<Integer, ClientListenerInfo>();
// notif stuff
private long clientSequenceNumber = -1;
private final int maxNotifications;
private final long timeout;
private Integer mbeanRemovedNotifID = null;
private Thread currentFetchThread;
// state
* This state means that a thread is being created for fetching and forwarding notifications.
private static final int STARTING = 0;
* This state tells that a thread has been started for fetching and forwarding notifications.
private static final int STARTED = 1;
* This state means that the fetching thread is informed to stop.
private static final int STOPPING = 2;
* This state means that the fetching thread is already stopped.
private static final int STOPPED = 3;
* This state means that this object is terminated and no more thread will be created
* for fetching notifications.
private static final int TERMINATED = 4;
private int state = STOPPED;
* This variable is used to tell whether a connector (RMIConnector or ClientIntermediary)
* is doing reconnection.
* This variable will be set to true by the method <code>preReconnection</code>, and set
* to false by <code>postReconnection</code>.
* When beingReconnected == true, no thread will be created for fetching notifications.
private boolean beingReconnected = false;
private static final ClassLogger logger =
new ClassLogger("javax.management.remote.misc",