FAMHaDB.java revision f97667c80d9dd8155ff61164f82b9ad6deae5d95
/**
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
*
* Copyright (c) 2008 Sun Microsystems Inc. All Rights Reserved
*
* The contents of this file are subject to the terms
* of the Common Development and Distribution License
* (the License). You may not use this file except in
* compliance with the License.
*
* You can obtain a copy of the License at
* See the License for the specific language governing
* permission and limitations under the License.
*
* When distributing Covered Code, include this CDDL
* Header Notice in each file and include the License file
* at opensso/legal/CDDLv1.0.txt.
* If applicable, add the following below the CDDL Header,
* with the fields enclosed by brackets [] replaced by
* your own identifying information:
* "Portions Copyrighted [year] [name of copyright owner]"
*
* $Id: FAMHaDB.java,v 1.6 2009/04/16 15:37:49 subashvarma Exp $
*
*/
/*
* Portions Copyrighted 2010-2011 ForgeRock AS
*/
/* Operations */
static boolean debug = true;
static boolean shutdownStatus = false;
static public final String GET_RECORD_COUNT =
"GET_RECORD_COUNT";
/* JMQ Message attrs */
/* JMQ Properties */
// Private data members
// Used for the determination of master BDB node
private static long localStartTime;
private static boolean isMasterNode = false;
private static long localNodeID;
private Thread nodeStatusSender;
private Thread nodeStatusReceiver;
private static long nodeUpdateInterval
= 5000; // 5 seconds in milli-seconds
private static long nodeUpdateGraceperiod
= 1000; // 1 second in milli-seconds
private DataAccessor da;
// Encapsulates the database environment.
/* Config data - TODO : move to properties/CLI options */
private int MAX_RESPONSE_QUEUES = 1;
private int numCleanSessions = 1000;
private boolean verbose = false;
private boolean statsEnabled = false;
private boolean deleteDatabase = true;
private static final int INVALID = 0;
private static final int USER_NAME = 1;
private static final int PASSWORD = 2;
private static final int PASSWORD_FILE = 3;
private static final int CACHE_SIZE = 4;
private static final int DIRECTORY = 5;
private static final int CLUSTER_ADDRESS = 6;
private static final int NUM_CLEAN_SESSIONS = 7;
private static final int DELETE_DATABASE = 8;
private static final int VERBOSE = 9;
private static final int STATS_INTERVAL = 10;
private static final int HELP = 11;
private static final int VERSION = 12;
private static final int NODE_STATUS_UPDATE_INTERVAL = 13;
private static final int PROPERTIES_FILE =14;
private static boolean isServerUp = false;
private static boolean isDBUp = false;
private static int readCount = 0;
private static int writeCount = 0;
private static int deleteCount = 0;
private static int totalTrans = 0;
private static int cumTotalTrans = 0;
private static long minReadSessionCount = 0;
private static double averageReadSessionCount = 0;
private static long maxReadSessionCount = 0;
private static long cumulativeReadSessionCount = 0;
private static long minReadCount = 0;
private static double averageReadCount = 0;
private static long maxReadCount = 0;
private static long cumulativeReadCount = 0;
private static long minWriteCount = 0;
private static double averageWriteCount = 0;
private static long maxWriteCount = 0;
private static long cumulativeWriteCount = 0;
private static long minDeleteCount = 0;
private static double averageDeleteCount = 0;
private static long maxDeleteCount = 0;
private static long cumulativeDeleteCount = 0;
private static long minProcessRequestTime = 0;
private static double averageProcessRequestTime = 0;
private static long maxProcessRequestTime = 0;
private static long cumulativeProcessRequestTime = 0;
private static int cumReadCount = 0;
private static int cumWriteCount = 0;
private static int cumReadSessionCount = 0;
private static int cumDeleteCount = 0;
// Session Constraints
private static int scReadCount = 0;
private static final int SESSION_VALID = 1;
private Thread processThread;
static {
new Integer(PROPERTIES_FILE));
try {
Locale.getDefault());
} catch (MissingResourceException mre) {
}
}
}
try {
closeDB();
}
if (verbose) {
}
}
try {
false); // is this environment read-only?
// Open the data accessor. This is used to retrieve
// persistent objects.
isDBUp = true;
if (verbose) {
}
}
}
closeJMQ();
}
.getProvider();
true, true, userName, userPassword);
/*
* reseting the StartTime for MasterDBNodeChecker
* in case of Broker restart/connection failure
*/
isMasterNode = false;
isServerUp = true;
}
/**
* This method is used to clear out any existing connections before running
* the initJMQ method. Should only be called if the JMQ connection has
* already been initialised.
*
* @throws Exception
*/
private void closeJMQ() {
try {
if (tNodePubSession != null) {
}
}
}
if (tNodeSubSession != null) {
}
}
}
if(verbose) {
}
}
}
/**
*
* @param args
* @throws Exception
*/
private void closeDB()
throws DatabaseException {
}
initDB();
initJMQ();
processThread = new Thread(this);
}
private void initMasterDBNodeChecker() {
nodeStatusSender.setDaemon(true);
nodeStatusReceiver.setDaemon(true);
if(verbose) {
}
try {
// Wait until the NodeInfo of all the peer BDB nodes to
// be received by the local server.
} catch (Exception e) {
if(verbose) {
e.printStackTrace();
}
}
}
/**
* Shutdown amsessiondb
*
* @param exit true if we should call System.exit
*/
try {
shutdownStatus = true;
closeJMQ();
} catch(DatabaseException dbe) {
} catch (Exception e) {
}
if (exit) {
}
}
public int process()
throws Exception {
long processStart = 0;
long pDuration = 0;
if(statsEnabled) {
}
if(verbose) {
}
//showAllSessionRecords();
if(verbose) {
}
return 0;
}
if(verbose) {
}
try {
} catch (DatabaseException ex) {
}
if(statsEnabled) {
readCount++;
}
if (duration > maxReadCount) {
}
}
if(verbose) {
if (baseRecord != null) {
} else {
}
}
if (baseRecord != null) {
} else if(isMasterNode) {
}
if(verbose) {
}
return 0;
}
if(verbose) {
}
}
}
if(statsEnabled) {
writeCount++;
}
if (duration > maxWriteCount) {
}
}
if(verbose) {
}
if(verbose) {
}
if(verbose) {
}
return 0;
}
if(verbose) {
}
try {
} catch (Exception e) {
e.printStackTrace();
}
if (statsEnabled) {
deleteCount++;
}
if (duration > maxDeleteCount) {
}
}
shutdown(true);
return(1);
if(verbose) {
}
if(statsEnabled) {
scReadCount++;
}
if(verbose) {
}
if(statsEnabled) {
scReadCount++;
}
}
if(statsEnabled) {
}
if (pDuration > maxProcessRequestTime) {
}
}
return 0;
}
throws Exception {
if (!isMasterNode) {
if (verbose) {
}
return;
}
return;
}
if(verbose) {
}
int nrows = 0;
long start = 0;
long duration = 0;
try {
// Use the BaseRecord secondary key to retrieve
// these objects.
if(statsEnabled) {
}
if (duration > maxReadSessionCount) {
}
}
// only the "valid" non-expired sessions with the
// right secondaryKey will be counted
long currentTime =
nrows++;
new RecordExpTimeInfo();
}
}
} catch (Exception e) {
if (verbose) {
e.printStackTrace();
}
} finally {
}
// construct a response message which contains the
// session count
}
}
if (!isMasterNode) {
if (verbose) {
}
return;
}
return;
}
if(verbose) {
}
try {
// Use the BaseRecord secondary key to retrieve
// these objects.
}
} catch (Exception e) {
if (verbose) {
e.printStackTrace();
}
} finally {
}
for (int i=0; i<nrows; i++) {
}
}
private class RecordExpTimeInfo {
int auxDataLen;
byte[] auxyData;
long expTime;
}
throws Exception {
int count = 0;
if (verbose) {
}
try {
if (verbose) {
+ "th record has " + expdate);
}
if (count++ >= cleanCount) {
break;
}
} else {
break;
}
}
} catch (Exception e) {
+ e.toString());
e.printStackTrace();
}
}
}
}
throws Exception {
//TODO: loop through all services
}
// Displays all the session records in the store
throws DatabaseException {
// Get a cursor that will walk every
// inventory object in the store.
try {
}
} catch(DatabaseException de){
if(verbose) {
}
} finally {
}
}
throws DatabaseException {
assert theSession != null;
}
{
long pKeylen = 0;
try {
// this should always be < 256 see FAMRecordJMQPersister
if (pKeylen > 256) {
return null;
}
} catch (NegativeArraySizeException nae) {
return null;
}
}
{
if (keylen > 0) {
}
return(keybytes);
}
public void run() {
totalTrans = 0;
while (!shutdownStatus) {
try {
if (isServerUp) {
totalTrans++;
if (statsEnabled &&
printStats();
totalTrans = 0;
readCount = 0;
writeCount = 0;
deleteCount = 0;
scReadCount = 0;
minReadSessionCount = 0;
maxReadSessionCount = 0;
minReadCount = 0;
averageReadCount = 0;
maxReadCount = 0;
cumulativeReadCount = 0;
minWriteCount = 0;
averageWriteCount = 0;
maxWriteCount = 0;
cumulativeWriteCount = 0;
minDeleteCount = 0;
averageDeleteCount = 0;
maxDeleteCount = 0;
}
} else {
/*
* When server is down this thread runs with a sleep
* interval of 1 minute and cleans sessions 5 times the
* numCleanSessions value from the Database.
*/
if (isDBUp) {
}
if(verbose) {
}
if (!isDBUp) {
initDB();
}
if (isDBUp) {
initJMQ();
}
if(verbose) {
}
}
} catch (DatabaseException ex) {
isDBUp = false;
isServerUp = false;
closeJMQ();
if (verbose) {
}
isServerUp = false;
closeJMQ();
if (verbose) {
}
} catch (Throwable t) {
if (verbose) {
t.printStackTrace();
}
}
}
}
// TODO move out to seperate class?
throws Exception
{
"RANDOM");
"true");
userName);
}
{
try {
public void run() {
}
});
}
}
}
private void printUsage() {
}
private void printStats() {
statsWriter.println(bundle.getString("totaldelete") + " " + deleteCount + "(" + cumDeleteCount + ")");
statsWriter.println(bundle.getString("totalreadsessioncount") + " " + scReadCount + "(" + cumReadSessionCount + ")");
statsWriter.flush();
}
printUsage();
}
switch (opt) {
case USER_NAME:
i++;
printUsage();
}
}
break;
case PASSWORD:
i++;
printUsage();
}
userPassword = argv[i];
}
break;
case PASSWORD_FILE:
i++;
printUsage();
}
}
}
break;
case CACHE_SIZE:
i++;
printUsage();
}
}
try {
} catch (NumberFormatException e) {
}
break;
case DIRECTORY:
i++;
printUsage();
}
dbDirectory = argv[i];
}
break;
case CLUSTER_ADDRESS:
i++;
printUsage();
}
clusterAddress = argv[i];
}
break;
case NUM_CLEAN_SESSIONS:
i++;
printUsage();
}
}
try {
} catch (NumberFormatException e) {
}
break;
case VERBOSE:
verbose = true;
break;
case STATS_INTERVAL:
i++;
printUsage();
}
}
try {
} catch (NumberFormatException e) {
}
if (statsInterval <= 0) {
statsInterval = 60;
}
statsEnabled = true;
break;
case HELP:
break;
case VERSION:
break;
i++;
printUsage();
}
printCommandError("nonodestatusupdateinterval",
argv[i-1]);
}
try {
} catch (NumberFormatException e) {
}
if (nodeUpdateInterval <= 0) {
nodeUpdateInterval = 5000;
}
break;
case PROPERTIES_FILE:
i++;
printUsage();
}
propertiesfile = argv[i];
break;
default:
}
}
}
/**
* Return true if arguments are valid.
*
* @param argv
* Array of arguments.
* @param bundle
* Resource Bundle.
* @return true if arguments are valid.
*/
boolean hasClusterAddress = false;
boolean retValue = true;
boolean hasPassword = false;
boolean hasPwdFile = false;
if (len == 0) {
retValue = false;
} else if (len == 1) {
retValue = false;
}
} else {
hasClusterAddress = true;
}
hasPassword = true;
}
hasPwdFile = true;
}
}
if(hasPassword && hasPwdFile) {
retValue = false;
}
if(!hasClusterAddress) {
retValue = false;
}
}
return retValue;
}
try {
} catch(Exception e) {
return 0;
}
}
}
} else {
}
}
// Check if the local daemon process is the master (longest-lived)
// BDB node
static private void determineMasterDBNode() {
synchronized (serverStatusMap) {
boolean masterDB = true;
masterDB = false;
break;
}
}
}
}
if (debug) {
}
}
static void pendRunning() {
// System.in std input stream already opened by default.
// Wrap in a new reader stream to obtain 16 bit capability.
// Wrap the reader with a buffered reader.
try {
// Read a whole line a time. Check the string for
// the "quit" input to jump from the loop.
do {
// Read text from keyboard
debugMessage("------------>str is null !");
else
debugMessage("------------>str is not null !");
} catch (Exception e) {
debugMessage("Exception in pendRunning : "+e);
e.printStackTrace();
}
}
// NodeInfo data structure
private class NodeInfo {
long nodeID;
long startTime;
long lastUpdateTime;
}
// This NodeStatusSender thread keeps performing the following
// actions (interval defined 5 secs):
// (2) Remove the outdated NodeInfo entries from the
// serverStatusMap map if the information is obsolete.
// (3) Determine whether the local BDB node can become the master
// BDB node.
class NodeStatusSender implements Runnable {
NodeStatusSender() {
}
public void run() {
while (!shutdownStatus) {
try {
if (isServerUp) {
if (sleeptime > 0) {
}
} else {
}
} catch (Exception e) {
isServerUp = false;
if (verbose) {
e.printStackTrace();
}
} catch (Throwable t) {
if (verbose) {
t.printStackTrace();
}
}
}
}
// Remove the outdated NodeInfo from the serverStatusMap map
// if the information is obsolete.
void RemoveOutdatedNodeInfo() {
synchronized (serverStatusMap) {
if (currentTime >
}
}
}
}
}
// This NodeStatusReceiver thread keeps waiting for the message
// sent by its peer BDB nodes and update the local serverStatusMap
// accordingly.
class NodeStatusReceiver implements Runnable {
}
public void run() {
while (!shutdownStatus) {
try {
if (isServerUp) {
.receive();
if (nodeID == localNodeID) {
// ignore the message sent by the local server
continue;
}
synchronized (serverStatusMap) {
}
} else {
}
} catch (Exception e) {
isServerUp = false;
if (verbose) {
e.printStackTrace();
}
} catch (Throwable t) {
if (verbose) {
t.printStackTrace();
}
}
}
}
}
}