759N/A/*
759N/A * CDDL HEADER START
759N/A *
759N/A * The contents of this file are subject to the terms of the
759N/A * Common Development and Distribution License, Version 1.0 only
759N/A * (the "License"). You may not use this file except in compliance
759N/A * with the License.
759N/A *
759N/A * You can obtain a copy of the license at
759N/A * trunk/opends/resource/legal-notices/OpenDS.LICENSE
759N/A * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
759N/A * See the License for the specific language governing permissions
759N/A * and limitations under the License.
759N/A *
759N/A * When distributing Covered Code, include this CDDL HEADER in each
759N/A * file and include the License file at
759N/A * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
759N/A * add the following below this CDDL HEADER, with the fields enclosed
873N/A * by brackets "[]" replaced with your own identifying information:
759N/A * Portions Copyright [yyyy] [name of copyright owner]
759N/A *
759N/A * CDDL HEADER END
759N/A *
759N/A *
3231N/A * Copyright 2008 Sun Microsystems, Inc.
6348N/A * Portions Copyright 2011-2013 ForgeRock AS
759N/A */
759N/A
1182N/Apackage org.opends.server.replication.protocol;
759N/A
759N/Aimport org.opends.server.api.DirectoryThread;
1400N/Aimport static org.opends.server.loggers.debug.DebugLogger.*;
1424N/A
1400N/Aimport org.opends.server.loggers.debug.DebugTracer;
5817N/Aimport org.opends.server.types.DebugLogLevel;
759N/A
759N/Aimport java.io.IOException;
759N/A
759N/A/**
759N/A * This thread publishes a heartbeat message on a given protocol session at
1183N/A * regular intervals when there are no other replication messages being
759N/A * published.
759N/A */
759N/Apublic class HeartbeatThread extends DirectoryThread
759N/A{
1400N/A /**
1400N/A * The tracer object for the debug logger.
1400N/A */
1400N/A private static final DebugTracer TRACER = getTracer();
1400N/A
759N/A
759N/A /**
759N/A * For test purposes only to simulate loss of heartbeats.
759N/A */
5305N/A private static volatile boolean heartbeatsDisabled = false;
759N/A
759N/A /**
759N/A * The session on which heartbeats are to be sent.
759N/A */
6348N/A private final Session session;
759N/A
759N/A
759N/A /**
759N/A * The time in milliseconds between heartbeats.
759N/A */
5305N/A private final long heartbeatInterval;
759N/A
759N/A
759N/A /**
759N/A * Set this to stop the thread.
759N/A */
5305N/A private volatile boolean shutdown = false;
5305N/A private final Object shutdownLock = new Object();
759N/A
759N/A
759N/A /**
759N/A * Create a heartbeat thread.
759N/A * @param threadName The name of the heartbeat thread.
759N/A * @param session The session on which heartbeats are to be sent.
759N/A * @param heartbeatInterval The desired interval between heartbeats in
759N/A * milliseconds.
759N/A */
6348N/A public HeartbeatThread(String threadName, Session session,
759N/A long heartbeatInterval)
759N/A {
759N/A super(threadName);
759N/A this.session = session;
759N/A this.heartbeatInterval = heartbeatInterval;
759N/A }
759N/A
759N/A /**
759N/A * {@inheritDoc}
759N/A */
759N/A @Override
759N/A public void run()
759N/A {
759N/A try
759N/A {
868N/A if (debugEnabled())
868N/A {
1400N/A TRACER.debugInfo("Heartbeat thread is starting, interval is %d",
868N/A heartbeatInterval);
868N/A }
3853N/A HeartbeatMsg heartbeatMessage = new HeartbeatMsg();
759N/A
759N/A while (!shutdown)
759N/A {
759N/A long now = System.currentTimeMillis();
868N/A if (debugEnabled())
868N/A {
1400N/A TRACER.debugVerbose("Heartbeat thread awoke at %d, last message " +
1400N/A "was sent at %d", now, session.getLastPublishTime());
868N/A }
759N/A
759N/A if (now > session.getLastPublishTime() + heartbeatInterval)
759N/A {
759N/A if (!heartbeatsDisabled)
759N/A {
868N/A if (debugEnabled())
868N/A {
1400N/A TRACER.debugVerbose("Heartbeat sent at %d", now);
868N/A }
759N/A session.publish(heartbeatMessage);
759N/A }
759N/A }
759N/A
5817N/A long sleepTime = session.getLastPublishTime() +
5817N/A heartbeatInterval - now;
5817N/A if (sleepTime <= 0)
759N/A {
5817N/A sleepTime = heartbeatInterval;
5817N/A }
759N/A
5817N/A if (debugEnabled())
5817N/A {
5817N/A TRACER.debugVerbose("Heartbeat thread sleeping for %d", sleepTime);
5817N/A }
5817N/A
5817N/A synchronized (shutdownLock)
5817N/A {
5817N/A if (!shutdown)
868N/A {
5817N/A try
1424N/A {
5305N/A shutdownLock.wait(sleepTime);
1424N/A }
5817N/A catch (InterruptedException e)
5817N/A {
5817N/A // Server shutdown monitor may interrupt slow threads.
5817N/A if (debugEnabled())
5817N/A {
5817N/A TRACER.debugCaught(DebugLogLevel.ERROR, e);
5817N/A }
5817N/A shutdown = true;
5817N/A }
1424N/A }
759N/A }
759N/A }
759N/A }
759N/A catch (IOException e)
759N/A {
868N/A if (debugEnabled())
868N/A {
1400N/A TRACER.debugInfo("Heartbeat thread could not send a heartbeat.");
868N/A }
759N/A // This will be caught in another thread.
759N/A }
759N/A finally
759N/A {
868N/A if (debugEnabled())
868N/A {
1400N/A TRACER.debugInfo("Heartbeat thread is exiting.");
868N/A }
759N/A }
759N/A }
759N/A
759N/A
759N/A /**
759N/A * Call this method to stop the thread.
1424N/A * This method is blocking until the thread has stopped.
759N/A */
759N/A public void shutdown()
759N/A {
5305N/A synchronized (shutdownLock)
1424N/A {
1424N/A shutdown = true;
5305N/A shutdownLock.notifyAll();
1424N/A if (debugEnabled())
1424N/A {
1424N/A TRACER.debugInfo("Going to notify Heartbeat thread.");
1424N/A }
1424N/A }
1424N/A if (debugEnabled())
1424N/A {
1424N/A TRACER.debugInfo("Returning from Heartbeat shutdown.");
1424N/A }
759N/A }
759N/A
759N/A
759N/A /**
759N/A * For testing purposes only to simulate loss of heartbeats.
759N/A * @param heartbeatsDisabled Set true to prevent heartbeats from being sent.
759N/A */
759N/A public static void setHeartbeatsDisabled(boolean heartbeatsDisabled)
759N/A {
759N/A HeartbeatThread.heartbeatsDisabled = heartbeatsDisabled;
759N/A }
759N/A}