ClusterManager.java revision b84068e6021ec8a830c26c4494f6e335d1f9c0ef
/**
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
*
* Copyright (c) 2013-2015 ForgeRock AS. All Rights Reserved
*
* The contents of this file are subject to the terms
* of the Common Development and Distribution License
* (the License). You may not use this file except in
* compliance with the License.
*
* You can obtain a copy of the License at
* See the License for the specific language governing
* permission and limitations under the License.
*
* When distributing Covered Code, include this CDDL
* Header Notice in each file and include the License file
* If applicable, add the following below the CDDL Header,
* with the fields enclosed by brackets [] replaced by
* your own identifying information:
* "Portions Copyrighted [year] [name of copyright owner]"
*
*/
/**
* A Cluster Management Service.
*
*/
@Properties({
/**
* Query ID for querying failed instances
*/
/**
* Query ID for querying all instances
*/
/**
* Query ID for getting pending cluster events
*/
/**
* Resource name when issuing requests over the router
*/
private static final ResourcePath REPO_RESOURCE_CONTAINER = new ResourcePath("repo", "cluster", "states");
/**
* Resource name when issuing cluster state requests directly with the Repository Service
*/
private static final ResourcePath STATES_RESOURCE_CONTAINER = new ResourcePath("cluster", "states");
/**
* Resource name when issuing cluster event requests directly with the Repository Service
*/
private static final ResourcePath EVENTS_RESOURCE_CONTAINER = new ResourcePath("cluster", "events");
/**
* The instance ID
*/
private String instanceId;
protected RepositoryService repoService;
/**
* The Connection Factory
*/
protected IDMConnectionFactory connectionFactory;
/** Enhanced configuration service. */
private EnhancedConfig enhancedConfig;
/**
* A list of listeners to notify when an instance fails
*/
/**
* A thread to perform cluster management
*/
/**
* The Cluster Manager Configuration
*/
private ClusterConfig clusterConfig;
/**
* The current state of this instance
*/
/**
* A flag to indicate if the has checked-in yet
*/
private boolean firstCheckin = true;
/**
* A flag to indicate if this instance has failed
*/
private boolean failed = false;
/**
* A flag to indicate if the cluster management is enabled
*/
private boolean enabled = false;
logger.debug("Activating Cluster Management Service with configuration {}", compContext.getProperties());
}
/**
* Initializes the Cluster Manager configuration
*
* @param config an {@link JsonValue} object representing the configuration
*/
if (clusterConfig.isEnabled()) {
enabled = true;
}
}
if (clusterConfig.isEnabled()) {
synchronized (repoLock) {
try {
} catch (ResourceException e) {
}
}
}
}
public String getInstanceId() {
return instanceId;
}
public boolean isEnabled() {
return enabled;
}
public void startClusterManagement() {
synchronized (startupLock) {
// Start thread
}
}
}
public void stopClusterManagement() {
synchronized (startupLock) {
// Start thread
checkOut();
}
}
}
public boolean isStarted() {
return clusterManagerThread.isRunning();
}
public Promise<ResourceResponse, ResourceException> handleRead(Context context, ReadRequest request) {
try {
if (resourcePath.isEmpty()) {
// Return a list of all nodes in the cluster
return true;
}
});
} else {
ReadRequest readRequest = Requests.newReadRequest(REPO_RESOURCE_CONTAINER.child(resourcePath).toString());
}
} catch (ResourceException e) {
return e.asPromise();
}
}
}
}
/**
* Creates a map representing an instance's state and recovery statistics
* that can be used for responses to read requests.
*
* @param instanceValue
* an instances state object
* @return a map representing an instance's state and recovery statistics
*/
case InstanceState.STATE_RUNNING:
break;
case InstanceState.STATE_DOWN:
if (!state.hasShutdown()) {
.getRecoveryStarted())));
.getRecoveryFinished())));
.getDetectedDown())));
} else {
// Should never reach this state
}
} else {
}
break;
}
return instanceInfo;
}
synchronized (repoLock) {
try {
// Update the recovery timestamp
} catch (ResourceException e) {
instanceId, e.getMessage());
}
}
}
}
/**
* Updates an instance's state.
*
* @param instanceId
* the id of the instance to update
* @param instanceState
* the updated InstanceState object
* @throws ResourceException
*/
throws ResourceException {
synchronized (repoLock) {
UpdateRequest updateRequest = Requests.newUpdateRequest(resourcePath.toString(), new JsonValue(instanceState.toMap()));
}
}
/**
* Gets a list of all instances in the cluster
*
* @return a list of Map objects representing each instance in the cluster
* @throws ResourceException
*/
}
return instanceList;
}
synchronized (repoLock) {
}
}
synchronized (repoLock) {
// create resource
}
return map;
}
}
try {
return resource.getContent();
} catch (NotFoundException e) {
}
}
/**
* Updates the timestamp for this instance in the instance check-in map.
*
* @ return the InstanceState object, or null if an expected failure (MVCC)
* was encountered
*/
private InstanceState checkIn() {
try {
if (firstCheckin) {
firstCheckin = false;
}
case InstanceState.STATE_RUNNING:
// just update the timestamp
break;
case InstanceState.STATE_DOWN:
// instance has been recovered, so switch to "normal" state and
// update timestamp
break;
// rare case, do not update state or timestamp
// system may attempt to recover itself if recovery timeout has
// elapsed
return state;
}
} catch (ResourceException e) {
} else {
// MVCC failure, return null
return null;
}
}
return state;
}
/**
* Performs an instance check-out, setting the state to down if it is currently running.
*/
private void checkOut() {
try {
case InstanceState.STATE_RUNNING:
// just update the timestamp
break;
case InstanceState.STATE_DOWN:
// Already down
break;
// Some other instance is processing this down state
// Leave in this state
break;
}
} catch (ResourceException e) {
} else {
// MVCC failure, return null
}
}
}
/**
* Returns a list of all instances who have timed out.
*
* @return a map of all instances who have timed out (failed).
*/
try {
case InstanceState.STATE_RUNNING:
// Found failed instance
break;
// Check if recovering has failed
}
break;
case InstanceState.STATE_DOWN:
// Already recovered instance, do nothing
break;
}
}
} catch (ResourceException e) {
}
return failedInstances;
}
/**
* Recovers a failed instance by looping through all listeners and calling
* their instanceFailed method.
*
* @param instanceId
* the id of the instance to recover
* @return true if any triggers were "freed", false otherwise
*/
// First attempt to "claim" the failed instance
try {
}
// Update the instance state to recovered
} catch (ResourceException e) {
}
return false;
}
// Then, attempt recovery
if (success) {
try {
// Update the instance state to recovered
} catch (ResourceException e) {
}
return false;
}
} else {
return false;
}
return true;
}
/**
* Sends a ClusterEvent to all registered listeners
*
* @param event
* the ClusterEvent to handle
* @return true if the event was handled appropriately, false otherwise
*/
boolean success = true;
success = false;
}
}
return success;
}
try {
// Loop through instances, creating a pending event for each instance in the cluster
CreateRequest createRequest = Requests.newCreateRequest(EVENTS_RESOURCE_CONTAINER.toString(), newEvent);
}
}
} catch (ResourceException e) {
}
}
/**
* Finds and processes any pending cluster events for this node. The event will then
* be deleting if the processing was successful.
*/
private void processPendingEvents() {
try {
// Find all pending cluster events for this instance
// Loop through results, processing each event
boolean success = false;
// Check if a listener ID is specified
if (listenerId != null) {
// Send the event to the corresponding listener
} else {
success = true;
}
} else {
// Send event to all listeners
}
// If the event was successfully processed, delete it
if (success) {
try {
DeleteRequest deleteRequest = Requests.newDeleteRequest(EVENTS_RESOURCE_CONTAINER.toString(), resource.getId());
} catch (ResourceException e) {
}
}
}
} catch (ResourceException e) {
}
}
try {
DeleteRequest deleteRequest = Requests.newDeleteRequest(EVENTS_RESOURCE_CONTAINER.toString(), eventId);
} catch (ResourceException e) {
}
}
/**
* A thread for managing this instance's lease and detecting cluster events.
*/
class ClusterManagerThread {
private long checkinInterval;
private long checkinOffset;
private ScheduledFuture<?> handler;
private boolean running = false;
this.checkinInterval = checkinInterval;
this.checkinOffset = checkinOffset;
}
public void startup() {
running = true;
public void run() {
try {
// Check in this instance
if (!failed) {
failed = true;
// Notify listeners that this instance has failed
// Set current state to null
currentState = null;
}
return;
} else if (failed) {
failed = false;
}
// If transitioning to a "running" state, send events
}
}
// Set current state
// Check for pending cluster events
// Find failed instances
// Recover failed instance's triggers
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
public void shutdown() {
}
running = false;
}
public boolean isRunning() {
return running;
}
}
public Promise<ActionResponse, ResourceException> handleAction(Context context, ActionRequest request) {
}
public Promise<ResourceResponse, ResourceException> handleCreate(Context context, CreateRequest request) {
}
public Promise<ResourceResponse, ResourceException> handleDelete(Context context, DeleteRequest request) {
}
public Promise<ResourceResponse, ResourceException> handlePatch(Context context, PatchRequest request) {
}
public Promise<QueryResponse, ResourceException> handleQuery(Context context, QueryRequest request, QueryResourceHandler handler) {
}
public Promise<ResourceResponse, ResourceException> handleUpdate(Context context, UpdateRequest request) {
}
}