Importer.java revision 5198
/*
* CDDL HEADER START
*
* The contents of this file are subject to the terms of the
* Common Development and Distribution License, Version 1.0 only
* (the "License"). You may not use this file except in compliance
* with the License.
*
* You can obtain a copy of the license at
* See the License for the specific language governing permissions
* and limitations under the License.
*
* When distributing Covered Code, include this CDDL HEADER in each
* file and include the License file at
* trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
* add the following below this CDDL HEADER, with the fields enclosed
* by brackets "[]" replaced with your own identifying information:
* Portions Copyright [yyyy] [name of copyright owner]
*
* CDDL HEADER END
*
*
* Copyright 2008-2010 Sun Microsystems, Inc.
*/
/**
* This class provides the engine that performs both importing of LDIF files and
* the rebuilding of indexes.
*/
public final class Importer
{
private static final int TIMER_INTERVAL = 10000;
private static final int KB = 1024;
//Defaults for DB cache.
//Defaults for LDIF reader buffers, min memory required to import and default
//size for byte buffers.
private static final int MIN_DB_CACHE_MEMORY = MAX_DB_CACHE_SIZE +
private static final int BYTE_BUFFER_CAPACITY = 128;
//Min and MAX sizes of phase one buffer.
//Min size of phase two read-ahead cache.
//Small heap threshold used to give more memory to JVM to attempt OOM errors.
//The DN attribute type.
private static AttributeType dnType;
new IndexOutputBuffer.IndexComparator();
//Phase one buffer and imported entries counts.
//Phase one buffer size in bytes.
private int bufferSize;
//Temp scratch directory.
//Index and thread counts.
private final int indexCount;
private int threadCount;
//Set to true when validation is skipped.
private final boolean skipDNValidation;
//Temporary environment used when DN validation is done in first phase.
//Root container.
private RootContainer rootContainer;
//Import configuration.
private final LDIFImportConfig importConfiguration;
//Backend configuration.
private final LocalDBBackendCfg backendConfiguration;
//LDIF reader.
private LDIFReader reader;
//Migrated entry count.
private int migratedCount;
// Size in bytes of temporary env.
private long tmpEnvCacheSize;
// Size in bytes of DB cache.
private long dbCacheSize;
//The executor service used for the buffer sort tasks.
private ExecutorService bufferSortService;
//The executor service used for the scratch file processing tasks.
private ExecutorService scratchFileWriterService;
//Queue of free index buffers -- used to re-cycle index buffers;
new LinkedBlockingQueue<IndexOutputBuffer>();
//Map of index keys to index buffers. Used to allocate sorted
//index buffers to a index writer thread.
private final
//Map of DB containers to index managers. Used to start phase 2.
new LinkedList<IndexManager>();
//Map of DB containers to DN-based index managers. Used to start phase 2.
new LinkedList<IndexManager>();
//Futures used to indicate when the index file writers are done flushing
//their work queues and have exited. End of phase one.
//List of index file writer tasks. Used to signal stopScratchFileWriters to
//the index file writer tasks when the LDIF file has been done.
//Map of DNs to Suffix objects.
//Map of container ids to database containers.
//Map of container ids to entry containers
//Used to synchronize when a scratch file index writer is first setup.
//Rebuld index manager used when rebuilding indexes.
private final RebuildIndexManager rebuildManager;
//Set to true if the backend was cleared.
private boolean clearedBackend = false;
//Used to shutdown import if an error occurs in phase one.
private volatile boolean isPhaseOneCanceled = false;
//Number of phase one buffers
private int phaseOneBufferCount;
static
{
{
}
}
/**
* Create a new import job with the specified rebuild index config.
*
* @param rebuildConfig
* The rebuild index configuration.
* @param cfg
* The local DB back-end configuration.
* @param envConfig
* The JEB environment config.
* @throws InitializationException
* If a problem occurs during initialization.
* @throws JebException
* If an error occurred when opening the DB.
* @throws ConfigException
* If a problem occurs during initialization.
*/
{
this.importConfiguration = null;
this.backendConfiguration = cfg;
this.threadCount = 1;
{
}
else
{
}
{
throw new InitializationException(message);
}
this.skipDNValidation = true;
}
/**
* Create a new import job with the specified ldif import config.
*
* @param importConfiguration
* The LDIF import configuration.
* @param localDBBackendCfg
* The local DB back-end configuration.
* @param envConfig
* The JEB environment config.
* @throws InitializationException
* If a problem occurs during initialization.
* @throws ConfigException
* If a problem occurs reading the configuration.
* @throws DatabaseException
* If an error occurred when opening the DB.
*/
{
this.rebuildManager = null;
{
}
else
{
}
// Determine the number of indexes.
{
{
}
else
{
}
}
this.indexCount = indexes;
{
{
this.clearedBackend = true;
}
}
{
}
else
{
}
{
throw new InitializationException(message);
}
// Set up temporary environment.
if (!skipDNValidation)
{
}
else
{
}
}
/**
* Return the suffix instance in the specified map that matches the specified
* DN.
*
* @param dn The DN to search for.
* @param map The map to search.
* @return The suffix instance that matches the DN, or null if no match is
* found.
*/
{
{
}
}
return suffix;
}
/**
* Calculate buffer sizes and initialize JEB properties based on memory.
*
* @param envConfig
* The environment config to use in the calculations.
* @throws InitializationException
* If a problem occurs during calculation.
*/
throws InitializationException
{
// Calculate amount of usable memory. This will need to take into account
// various fudge factors, including the number of IO buffers used by the
// scratch writers (1 per index).
final long availableMemory = calculateAvailableMemory();
final long usableMemory = availableMemory
if (!skipDNValidation)
{
// No DN validation: calculate memory for DB cache, DN2ID temporary cache,
// and buffers.
{
}
{
}
else if (!clearedBackend)
{
// Appending to existing data so reserve extra memory for the DB cache
// since it will be needed for dn2id queries.
}
else
{
}
}
else
{
// No DN validation: calculate memory for DB cache and buffers.
// No need for DN2ID cache.
tmpEnvCacheSize = 0;
{
}
else if (usableMemory < MIN_DB_CACHE_MEMORY)
{
}
else
{
// not being queried.
}
}
final int oldThreadCount = threadCount;
while (true)
{
// Scratch writers allocate 4 buffers per index as well.
final int totalPhaseOneBufferCount = phaseOneBufferCount
+ (4 * indexCount);
if (bufferSize > MAX_BUFFER_SIZE)
{
if (!skipDNValidation)
{
// The buffers are big enough: the memory is best used for the DN2ID
// temp DB.
final long extraMemory = phaseOneBufferMemory
if (!clearedBackend)
{
}
else
{
}
}
break;
}
else if (bufferSize > MIN_BUFFER_SIZE)
{
// This is acceptable.
break;
}
else if (threadCount > 1)
{
// Retry using less threads.
threadCount--;
}
else
{
// Not enough memory.
final long minimumPhaseOneBufferMemory = totalPhaseOneBufferCount
throw new InitializationException(message);
}
}
if (oldThreadCount != threadCount)
{
}
if (tmpEnvCacheSize > 0)
{
}
.toString(dbCacheSize));
}
/**
* Returns the amount of available memory which can be used by this import,
* taking into account whether or not the import is running offline or online
* as a task.
*/
private long calculateAvailableMemory()
{
final long availableMemory;
if (DirectoryServer.isRunning())
{
final long configuredMemory;
{
}
else
{
}
// Round up to minimum of 16MB (e.g. unit tests only use 2% cache).
16 * MB);
}
else
{
}
// Now take into account various fudge factors.
int importMemPct = 90;
if (availableMemory <= SMALL_HEAP_SIZE)
{
// Be pessimistic when memory is low.
importMemPct -= 25;
}
if (rebuildManager != null)
{
// Rebuild seems to require more overhead.
importMemPct -= 15;
}
}
private void initializeIndexBuffers()
{
for(int i = 0; i < phaseOneBufferCount; i++)
{
freeBufferQueue.add(b);
}
}
{
{
{
}
}
}
//Mainly used to support multiple suffixes. Each index in each suffix gets
//an unique ID to identify which DB it needs to go to in phase two processing.
{
}
}
}
}
}
if(!extensibleMap.isEmpty()) {
if(subIndexes != null) {
}
}
if(sharedIndexes !=null) {
}
}
}
}
}
if(!importConfiguration.appendToExistingData() &&
{
{
{
// This entire base DN was explicitly excluded. Skip.
return null;
}
{
}
}
{
{
{
}
}
if(includeBranches.isEmpty())
{
/*
There are no branches in the explicitly defined include list under
this base DN. Skip this base DN all together.
*/
return null;
}
// Remove any overlapping include branches.
while(includeBranchIterator.hasNext())
{
boolean keep = true;
{
{
keep = false;
break;
}
}
if(!keep)
{
}
}
// Remove any exclude branches that are not are not under a include
// branch since they will be migrated as part of the existing entries
// outside of the include branches anyways.
while(excludeBranchIterator.hasNext())
{
boolean keep = false;
{
{
keep = true;
break;
}
}
if(!keep)
{
}
}
{
// This entire base DN is explicitly included in the import with
// no exclude branches that we need to migrate. Just clear the entry
// container.
}
else
{
// Create a temp entry container
"_importTmp");
}
}
}
}
/**
* Rebuild the indexes using the specified rootcontainer.
*
* @param rootContainer The rootcontainer to rebuild indexes in.
*
* @throws ConfigException If a configuration error occurred.
* @throws InitializationException If an initialization error occurred.
* @throws JebException If the JEB database had an error.
* @throws InterruptedException If an interrupted error occurred.
* @throws ExecutionException If an execution error occurred.
*/
public void
{
this.rootContainer = rootContainer;
}
/**
* Import a LDIF using the specified root container.
*
* @param rootContainer The root container to use during the import.
*
* @return A LDIF result.
* @throws ConfigException If the import failed because of an configuration
* error.
* @throws InitializationException If the import failed because of an
* initialization error.
* @throws JebException If the import failed due to a database error.
* @throws InterruptedException If the import failed due to an interrupted
* error.
* @throws ExecutionException If the import failed due to an execution error.
*/
public LDIFImportResult
{
this.rootContainer = rootContainer;
try
{
}
catch (IOException ioe)
{
}
try
{
phaseOne();
if (!skipDNValidation)
{
}
if (isPhaseOneCanceled)
{
throw new InterruptedException("Import processing canceled.");
}
phaseTwo();
float rate = 0;
}
finally
{
}
}
{
{
{
if(f.isDirectory())
{
recursiveDelete(f);
}
f.delete();
}
}
}
private void switchContainers()
{
if(entryContainer != null) {
}
}
}
private void setIndexesTrusted() throws JebException
{
try {
s.setIndexesTrusted();
}
}
catch (DatabaseException ex)
{
throw new JebException(message);
}
}
{
1);
{
{
}
}
{
for (int i = 0; i < threadCount; i++)
{
}
}
else
{
for (int i = 0; i < threadCount; i++)
{
}
}
{
{
}
}
{
{
}
}
{
{
}
}
// Shutdown the executor services
// Try to clear as much memory as possible.
}
{
reader.getEntriesRead());
1);
try
{
}
finally
{
}
}
private void processIndexFiles() throws InitializationException,
{
{
return;
}
if(dbThreads < 4)
{
dbThreads = 4;
}
// Calculate memory / buffer counts.
final long availableMemory = calculateAvailableMemory();
int readAheadSize;
int buffers;
while (true)
{
buffers = 0;
for (int i = 0; i < limit; i++)
{
}
if (readAheadSize > bufferSize)
{
// Cache size is never larger than the buffer size.
break;
}
else if (readAheadSize > MIN_READ_AHEAD_CACHE_SIZE)
{
// This is acceptable.
break;
}
else if (dbThreads > 1)
{
// Reduce thread count.
dbThreads--;
}
else
{
// Not enough memory.
final long minimumPhaseTwoBufferMemory = buffers
throw new InitializationException(message);
}
}
// Start indexing tasks.
// Start DN processing first.
{
}
{
}
{
{
}
}
}
private void stopScratchFileWriters()
{
{
}
}
/**
* Task used to migrate excluded branch.
*/
private final class MigrateExcludedTask extends ImportTask
{
/**
* {@inheritDoc}
*/
{
if(entryContainer != null &&
Comparator<byte[]> comparator =
try {
// This is the base entry for a branch that was excluded in the
// import so we must migrate all entries in this branch over to
// the new entry container.
suffix);
}
}
}
}
catch (Exception e)
{
message =
isPhaseOneCanceled =true;
throw e;
}
}
}
return null;
}
}
/**
* Task to migrate existing entries.
*/
private final class MigrateExistingTask extends ImportTask
{
/**
* {@inheritDoc}
*/
{
List<byte[]> includeBranches =
{
{
}
}
if(entryContainer != null &&
null);
try {
boolean found = false;
for(byte[] includeBranch : includeBranches)
{
{
found = true;
break;
}
}
if(!found) {
} else {
// This is the base entry for a branch that will be included
// in the import so we don't want to copy the branch to the
// new entry container.
/**
* Advance the cursor to next entry at the same level in the
* DIT
* skipping all the entries in this branch.
* Set the next starting value to a value of equal length but
* slightly greater than the previous DN. Since keys are
* compared in reverse order we must set the first byte
* (the comma).
* No possibility of overflow here.
*/
}
}
}
catch(Exception e)
{
message =
isPhaseOneCanceled =true;
throw e;
}
}
}
return null;
}
}
/**
*/
private class AppendReplaceTask extends ImportTask
{
deleteKeySet = new HashSet<byte[]>();
/**
* {@inheritDoc}
*/
{
try
{
while (true)
{
{
return null;
}
{
break;
}
}
}
catch(Exception e)
{
isPhaseOneCanceled = true;
throw e;
}
return null;
}
{
{
}
{
if(!skipDNValidation)
{
{
return;
}
}
}
else
{
}
{
}
else
{
}
}
void
{
index.getIndexEntryLimit()));
}
index.getIndexEntryLimit()));
}
index.getIndexEntryLimit()));
}
index.getIndexEntryLimit()));
}
index.getIndexEntryLimit()));
}
}
if(!extensibleMap.isEmpty()) {
if(subIndexes != null) {
}
}
if(sharedIndexes !=null) {
}
}
}
}
}
{
{
for(byte[] delKey : deleteKeySet)
{
}
}
for(byte[] key : insertKeySet)
{
}
}
}
/**
* This task performs phase reading and processing of the entries read from
* the LDIF file(s). This task is used if the append flag wasn't specified.
*/
{
valEntry = new DatabaseEntry();
/**
* {@inheritDoc}
*/
{
try
{
while (true)
{
{
return null;
}
{
break;
}
}
}
catch (Exception e)
{
isPhaseOneCanceled = true;
throw e;
}
return null;
}
{
if(!skipDNValidation)
{
{
return;
}
}
}
//Examine the DN for duplicates and missing parents.
throws JebException, InterruptedException
{
//Perform parent checking.
return false;
}
}
//If the backend was not cleared, then the dn2id needs to checked first
//for DNs that might not exist in the DN cache. If the DN is not in
//the suffixes dn2id DB, then the dn cache is used.
if(!clearedBackend)
{
{
return false;
}
}
{
return false;
}
return true;
}
void
{
index.getIndexEntryLimit()));
}
index.getIndexEntryLimit()));
}
index.getIndexEntryLimit()));
}
index.getIndexEntryLimit()));
}
index.getIndexEntryLimit()));
}
}
if(!extensibleMap.isEmpty()) {
if(subIndexes != null) {
}
}
if(sharedIndexes !=null) {
}
}
}
}
}
}
{
for(byte[] key : insertKeySet)
{
}
}
void flushIndexBuffers() throws InterruptedException,
{
while(setIterator.hasNext())
{
}
}
int
throws ConfigException, InterruptedException
{
if (indexBuffer == null)
{
}
{
}
return id;
}
{
if(indexBuffer == null)
{
"Index buffer processing error.");
}
if(indexBuffer.isPoison())
{
"Cancel processing received.");
}
return indexBuffer;
}
throws ConfigException, InterruptedException
{
byte[] dnBytes =
}
throws DatabaseException
{
{
}
else
{
}
}
}
/**
* This task reads sorted records from the temporary index scratch files,
* processes the records and writes the results to the index database. The
* DN index is treated differently then non-DN indexes.
*/
{
private final IndexManager indexMgr;
private final int cacheSize;
{
this.dbKey = new DatabaseEntry();
this.dbValue = new DatabaseEntry();
}
throws IOException
{
new TreeSet<IndexInputBuffer>();
{
indexMgr.bufferIndexID[i]);
}
// GC arrays.
return bufferSet;
}
/**
* {@inheritDoc}
*/
{
new DefaultExceptionHandler());
try
{
{
{
indexID = b.getIndexID();
{
}
else
{
}
}
{
indexID = b.getIndexID();
{
}
else
{
}
{
}
}
else
{
}
if(b.hasMoreData())
{
b.getNextRecord();
}
}
{
}
}
catch (Exception e)
{
message =
e.getMessage());
e.printStackTrace();
throw e;
}
finally
{
cleanUP();
}
return null;
}
{
try
{
{
{
}
}
else
{
{
index.closeCursor();
}
}
}
finally
{
}
}
int indexID) throws InterruptedException,
{
{
{
{
}
}
{
{
}
}
}
else
{
}
}
throws DatabaseException, DirectoryException
{
{
}
else
{
}
{
return;
}
}
/**
* This class is used to by a index DB merge thread performing DN processing
* to keep track of the state of individual DN2ID index processing.
*/
class DNState
{
private final EntryContainer entryContainer;
private final int childLimit, subTreeLimit;
private final boolean childDoCount, subTreeDoCount;
{
this.entryContainer = entryContainer;
Comparator<byte[]> childComparator =
Comparator<byte[]> subComparator =
dnKey = new DatabaseEntry();
dnValue = new DatabaseEntry();
}
{
int parentIndex =
if(parentIndex < 0)
{
// This is the root or base DN
return null;
}
return parent;
}
{
if(destBuffer == null ||
{
}
else
{
destBuffer.flip();
return destBuffer;
}
}
// Why do we still need this if we are checking parents in the first
// phase?
{
byte[] v = record.toDatabase();
//Bypass the cache for append data, lookup the parent in DN2ID and
//return.
if(importConfiguration != null &&
{
//If null is returned than this is a suffix DN.
{
status =
{
}
else
{
// We have a missing parent. Maybe parent checking was turned off?
// Just ignore.
return false;
}
}
}
else
{
if(parentIDMap.isEmpty())
{
return true;
}
{
return true;
}
{
return true;
}
else
{
{
}
}
else
{
// We have a missing parent. Maybe parent checking was turned off?
// Just ignore.
return false;
}
}
}
return true;
}
throws DatabaseException, DirectoryException
{
{
}
else
{
}
{
}
}
{
//Bypass the cache for append data, lookup the parent DN in the DN2ID
//db.
if (importConfiguration != null &&
{
status =
{
}
else
{
}
}
else
{
}
return nodeID;
}
throws DatabaseException, DirectoryException
{
{
}
else
{
}
// TODO:
// Instead of doing this, we can just walk to parent cache if available
{
{
// We have a missing parent. Maybe parent checking was turned off?
// Just ignore.
break;
}
{
}
else
{
}
}
{
}
}
{
{
}
}
boolean clearMap)
throws DatabaseException, DirectoryException
{
{
}
index.closeCursor();
if(clearMap)
{
}
}
{
}
}
}
/**
* This task writes the temporary scratch index files using the sorted
* buffers read from a blocking queue private to each index.
*/
{
private final int DRAIN_TO = 3;
private final IndexManager indexMgr;
private final ByteArrayOutputStream insetByteStream =
private final ByteArrayOutputStream deleteByteStream =
private final byte[] tmpArray = new byte[8];
private final DataOutputStream dataStream;
private int bufferCount = 0;
private boolean poisonSeen = false;
{
}
/**
* {@inheritDoc}
*/
{
long offset = 0;
try {
while(true)
{
if(indexBuffer != null)
{
long beginOffset = offset;
long bufferLen;
{
l.add(indexBuffer);
bufferLen = writeIndexBuffers(l);
for(IndexOutputBuffer id : l)
{
{
}
}
l.clear();
}
else
{
if(indexBuffer.isPoison())
{
break;
}
if(!indexBuffer.isDiscard())
{
indexBuffer.reset();
}
}
bufferCount++;
if(poisonSeen)
{
break;
}
}
}
}
catch (IOException e)
{
isPhaseOneCanceled = true;
throw e;
}
finally
{
dataStream.close();
}
return null;
}
throws IOException
{
long bufferLen = 0;
for(int i = 0; i < numberKeys; i++)
{
{
if(indexBuffer.isInsert(i))
{
}
else
{
}
continue;
}
if(!indexBuffer.compare(i))
{
}
if(indexBuffer.isInsert(i))
{
{
}
}
else
{
}
}
{
}
return bufferLen;
}
throws IOException
{
long id = 0;
long bufferLen = 0;
for(IndexOutputBuffer b : buffers)
{
if(b.isPoison())
{
poisonSeen = true;
}
else
{
b.setPosition(0);
indexSortedSet.add(b);
}
}
int saveIndexID = 0;
while(!indexSortedSet.isEmpty())
{
indexSortedSet.remove(b);
{
saveIndexID = b.getIndexID();
if(b.isInsert(b.getPosition()))
{
}
else
{
}
}
else
{
{
insertKeyCount = 0;
deleteKeyCount = 0;
saveIndexID = b.getIndexID();
if(b.isInsert(b.getPosition()))
{
}
else
{
}
}
else
{
if(b.isInsert(b.getPosition()))
{
{
}
}
else
{
}
}
}
if(b.hasMoreData())
{
b.getNextRecord();
indexSortedSet.add(b);
}
}
{
}
return bufferLen;
}
private int writeByteStreams() throws IOException
{
{
insertKeyCount = 1;
}
{
}
{
}
return insertSize + deleteSize;
}
{
return packedSize;
}
{
int keySize = b.getKeySize();
b.writeKey(dataStream);
packedSize += writeByteStreams();
}
{
dataStream.write(k);
packedSize += writeByteStreams();
}
}
/**
* This task main function is to sort the index buffers given to it from
* the import tasks reading the LDIF file. It will also create a index
* file writer task and corresponding queue if needed. The sorted index
* buffers are put on the index file writer queues for writing to a temporary
* file.
*/
{
private final IndexOutputBuffer indexBuffer;
{
this.indexBuffer = indexBuffer;
}
/**
* {@inheritDoc}
*/
{
{
isPhaseOneCanceled = true;
return null;
}
indexBuffer.sort();
{
.getIndexKey());
q.add(indexBuffer);
}
else
{
.getIndexKey());
q.add(indexBuffer);
}
return null;
}
throws FileNotFoundException
{
boolean isDN = false;
synchronized(synObj)
{
{
return;
}
{
isDN = true;
}
if(isDN)
{
}
else
{
}
}
}
}
/**
* The index manager class has several functions:
*
* 1. It used to carry information about index processing created in phase
* one to phase two.
*
* 2. It collects statistics about phase two processing for each index.
*
* 3. It manages opening and closing the scratch index files.
*/
{
private static final int BUFFER_SIZE = 128;
private long totalDNS;
private final boolean isDN;
private final int limit;
private long[] bufferIndexBegin = new long[BUFFER_SIZE];
private long[] bufferIndexEnd = new long[BUFFER_SIZE];
private int[] bufferIndexID = new int[BUFFER_SIZE];
private int bufferIndexCount = 0;
{
}
private void openIndexFile() throws FileNotFoundException
{
}
/**
* Returns the file channel associated with this index manager.
*
* @return The file channel associated with this index manager.
*/
{
return rFile.getChannel();
}
{
if (bufferIndexCount >= size)
{
size += BUFFER_SIZE;
}
}
{
return file;
}
private boolean deleteIndexFile()
{
}
private void close() throws IOException
{
}
private void setFileLength()
{
}
/**
* Updates the bytes read counter.
*
* @param bytesRead
* The number of bytes read.
*/
void addBytesRead(int bytesRead)
{
}
private void setDone()
{
this.done = true;
}
private void setStarted()
{
started = true;
}
private void addTotDNCount(int delta)
{
}
private long getDNCount()
{
return totalDNS;
}
private boolean isDN2ID()
{
return isDN;
}
private void printStats(long deltaTime)
{
{
}
}
private void incrementKeyCount()
{
}
/**
* Returns the file name associated with this index manager.
*
* @return The file name associated with this index manager.
*/
{
return fileName;
}
private int getLimit()
{
return limit;
}
/**
* {@inheritDoc}
*/
{
}
}
/**
* The rebuild index manager handles all rebuild index related processing.
*/
private class RebuildIndexManager extends ImportTask {
//Rebuild index configuration.
private final RebuildConfig rebuildConfig;
//Local DB backend configuration.
private final LocalDBBackendCfg cfg;
//Map of index keys to indexes.
//Map of index keys to extensible indexes.
//List of VLV indexes.
//The DN2ID index.
//The DN2URI index.
//Total entries to be processed.
private long totalEntries =0;
//Total entries processed.
//The suffix instance.
//Set to true if the rebuild all flag was specified.
private final boolean rebuildAll;
//The entry container.
private EntryContainer entryContainer;
/**
* Create an instance of the rebuild index manager using the specified
* parameters.
*
* @param rebuildConfig The rebuild configuration to use.
* @param cfg The local DB configuration to use.
*/
{
this.rebuildConfig = rebuildConfig;
}
/**
* Initialize a rebuild index manager.
*
* @throws ConfigException If an configuration error occurred.
* @throws InitializationException If an initialization error occurred.
*/
{
{
throw new InitializationException(msg);
}
}
/**
* Print start message.
*
* @throws DatabaseException If an database error occurred.
*/
public void printStartMessage() throws DatabaseException
{
{
{
}
}
if(rebuildAll) {
}
}
/**
* Print stop message.
*
* @param startTime The time the rebuild started.
*/
public void printStopMessage(long startTime)
{
float rate = 0;
if (totalTime > 0)
{
}
}
/**
* {@inheritDoc}
*/
{
try {
{
{
return null;
}
}
}
catch (Exception e)
{
isPhaseOneCanceled = true;
throw e;
}
return null;
}
/**
* Perform rebuild index processing.
*
* @throws InitializationException
* If an initialization error occurred.
* @throws DatabaseException
* If an database error occurred.
* @throws InterruptedException
* If an interrupted error occurred.
* @throws ExecutionException
* If an Excecution error occurred.
* @throws JebException
* If an JEB error occurred.
*/
public void rebuldIndexes() throws InitializationException,
{
phaseOne();
if (isPhaseOneCanceled)
{
throw new InterruptedException("Rebuild Index canceled.");
}
phaseTwo();
if (rebuildAll)
{
}
else
{
}
}
private void setRebuildListIndexesTrusted() throws JebException
{
try
{
{
}
{
}
}
if(!vlvIndexes.isEmpty())
{
{
}
}
if(!extensibleIndexMap.isEmpty())
{
{
if(subIndexes != null) {
}
}
}
}
}
catch (DatabaseException ex)
{
throw new JebException(message);
}
}
private void setAllIndexesTrusted() throws JebException
{
try {
}
catch (DatabaseException ex)
{
throw new JebException(message);
}
}
private void phaseOne() throws DatabaseException,
if(rebuildAll)
{
}
else
{
}
for (int i = 0; i < threadCount; i++)
{
}
}
}
{
}
}
// Try to clear as much memory as possible.
}
private void phaseTwo() throws InitializationException,
{
entriesProcessed.get());
}
{
int indexCount;
if(!rebuildAll)
{
}
else
{
}
return indexCount;
}
{
//Add four for: DN, id2subtree, id2children and dn2uri.
indexCount += 4;
return indexCount;
}
throws JebException, ConfigException
{
int indexCount = 0;
if(!rebuildList.isEmpty())
{
{
{
indexCount += 3;
}
{
indexCount++;
}
{
{
throw new JebException(msg);
}
indexCount++;
{
throw new JebException(msg);
}
else
{
{
throw new JebException(msg);
}
{
throw new JebException(msg);
}
{
{
{
indexCount++;
}
{
indexCount++;
}
{
indexCount++;
}
{
indexCount++;
}
{
indexCount++;
} else {
throw new JebException(msg);
}
}
else
{
boolean found = false;
{
if (indexCfg.getIndexType().
{
{
if(exRule.equalsIgnoreCase(s))
{
found = true;
break;
}
}
}
if(found)
{
break;
}
}
if(!found) {
throw new JebException(msg);
}
indexCount++;
}
}
else
{
{
{
continue;
}
if(indexCfg.getIndexType().
{
indexCount++;
}
if(indexCfg.getIndexType().
{
indexCount++;
}
if(indexCfg.getIndexType().
{
indexCount++;
}
if(indexCfg.getIndexType().
{
indexCount++;
}
if(indexCfg.getIndexType().
{
indexCount++;
}
if (indexCfg.getIndexType().
{
boolean shared = false;
{
{
indexCount++;
}
else
{
if(!shared)
{
shared=true;
indexCount++;
}
}
}
}
}
}
}
}
}
return indexCount;
}
private void clearRebuildListIndexes() throws DatabaseException
{
if(!rebuildList.isEmpty())
{
{
{
}
{
clearDN2URI();
}
{
}
else
{
{
{
}
{
}
{
}
{
}
{
}
else
{
{
dbPart = "substring";
}
if(!extensibleMap.isEmpty()) {
if(subIndexes != null) {
{
break;
}
}
if(sharedIndexes !=null) {
{
break;
}
}
}
}
}
}
}
else
{
}
}
}
}
}
private void clearAllIndexes() throws DatabaseException
{
}
}
{
clearDN2URI();
}
}
throws DatabaseException
{
}
private void clearDN2URI() throws DatabaseException
{
}
private void clearDN2IDIndexes() throws DatabaseException
{
}
throws DatabaseException
{
{
}
{
}
{
}
{
}
{
}
if(!extensibleMap.isEmpty()) {
if(subIndexes != null) {
}
}
if(sharedIndexes !=null) {
}
}
}
}
private
{
{
}
{
}
}
{
}
}
private
{
this.extensibleIndexMap.entrySet()) {
}
}
}
}
private void
{
{
index.getIndexEntryLimit()));
}
else
{
index.getIndexEntryLimit()));
}
}
}
}
/**
* Return the number of entries processed by the rebuild manager.
*
* @return The number of entries processed.
*/
public long getEntriesProcess()
{
return this.entriesProcessed.get();
}
/**
* Return the total number of entries to process by the rebuild manager.
*
* @return The total number for entries to process.
*/
public long getTotEntries()
{
return this.totalEntries;
}
}
/**
* This class reports progress of rebuild index processing at fixed
* intervals.
*/
private class RebuildFirstPhaseProgressTask extends TimerTask
{
/**
* The number of records that had been processed at the time of the
* previous progress report.
*/
private long previousProcessed = 0;
/**
* The time in milliseconds of the previous progress report.
*/
private long previousTime;
/**
* The environment statistics at the time of the previous report.
*/
private EnvironmentStats prevEnvStats;
/**
* Create a new rebuild index progress task.
*
* @throws DatabaseException If an error occurred while accessing the JE
* database.
*/
public RebuildFirstPhaseProgressTask() throws DatabaseException
{
}
/**
* The action to be performed by this timer task.
*/
public void run()
{
if (deltaTime == 0)
{
return;
}
float completed = 0;
{
}
try
{
long nCacheMiss =
float cacheMissRate = 0;
if (deltaCount > 0)
{
}
}
catch (DatabaseException e)
{
}
}
}
/**
* This class reports progress of first phase of import processing at
* fixed intervals.
*/
private final class FirstPhaseProgressTask extends TimerTask
{
/**
* The number of entries that had been read at the time of the
* previous progress report.
*/
private long previousCount = 0;
/**
* The time in milliseconds of the previous progress report.
*/
private long previousTime;
/**
* The environment statistics at the time of the previous report.
*/
private EnvironmentStats previousStats;
// Determines if eviction has been detected.
private boolean evicting = false;
// Entry count when eviction was detected.
private long evictionEntryCount = 0;
/**
* Create a new import progress task.
*/
public FirstPhaseProgressTask()
{
try
{
}
catch (DatabaseException e)
{
throw new RuntimeException(e);
}
}
/**
* The action to be performed by this timer task.
*/
public void run()
{
if (deltaTime == 0)
{
return;
}
try
{
//If first phase skip DN validation is specified use the root container
//stats, else use the temporary environment stats.
if(skipDNValidation)
{
}
else
{
}
float cacheMissRate = 0;
if (deltaCount > 0)
{
}
message =
long cleanerEntriesRead =
if (evictPasses != 0)
{
if (!evicting)
{
evicting = true;
message =
}
message =
}
if (cleanerRuns != 0)
{
message =
}
if (checkPoints > 1)
{
message =
}
}
catch (DatabaseException e)
{
// Unlikely to happen and not critical.
}
}
}
/**
* This class reports progress of the second phase of import processing at
* fixed intervals.
*/
private class SecondPhaseProgressTask extends TimerTask
{
/**
* The number of entries that had been read at the time of the
* previous progress report.
*/
private long previousCount = 0;
/**
* The time in milliseconds of the previous progress report.
*/
private long previousTime;
/**
* The environment statistics at the time of the previous report.
*/
private EnvironmentStats previousStats;
// Determines if eviction has been detected.
private boolean evicting = false;
private long latestCount;
/**
* Create a new import progress task.
*
* @param latestCount The latest count of entries processed in phase one.
*/
public SecondPhaseProgressTask (long latestCount)
{
this.latestCount = latestCount;
try
{
}
catch (DatabaseException e)
{
throw new RuntimeException(e);
}
}
/**
* The action to be performed by this timer task.
*/
public void run()
{
if (deltaTime == 0)
{
return;
}
try
{
float cacheMissRate = 0;
if (deltaCount > 0)
{
}
message =
if (evictPasses != 0)
{
if (!evicting)
{
evicting = true;
}
message =
}
if (cleanerRuns != 0)
{
message =
}
if (checkPoints > 1)
{
message =
}
}
catch (DatabaseException e)
{
// Unlikely to happen and not critical.
}
//Do DN index managers first.
{
}
//Do non-DN index managers.
{
}
}
}
/**
* A class to hold information about the entry determined by the LDIF reader.
* Mainly the suffix the entry belongs under and the ID assigned to it by the
* reader.
*
*/
public class EntryInformation
{
/**
* Return the suffix associated with the entry.
*
* @return Entry's suffix instance;
*/
{
return suffix;
}
/**
* Set the suffix instance associated with the entry.
*
* @param suffix The suffix associated with the entry.
*/
{
}
/**
* Set the entry's ID.
*
* @param entryID The entry ID to set the entry ID to.
*/
{
}
/**
* Return the entry ID associated with the entry.
*
* @return The entry ID associated with the entry.
*/
public EntryID getEntryID()
{
return entryID;
}
}
/**
* This class defines the individual index type available.
*
*/
public enum ImportIndexType {
/**
* The DN index type.
**/
DN,
/**
* The equality index type.
**/
/**
* The presence index type.
**/
/**
* The sub-string index type.
**/
/**
* The ordering index type.
**/
/**
* The approximate index type.
**/
/**
* The extensible sub-string index type.
**/
/**
* The extensible shared index type.
**/
/**
* The vlv index type.
*/
}
/**
* This class is used as an index key for hash maps that need to
* based on both attribute type and index type
* (ie., cn.equality, sn.equality,...).
*/
public class IndexKey {
private final AttributeType attributeType;
private final ImportIndexType indexType;
private final int entryLimit;
/**
* Create index key instance using the specified attribute type, index type
* and index entry limit.
*
* @param attributeType The attribute type.
* @param indexType The index type.
* @param entryLimit The entry limit for the index.
*/
int entryLimit)
{
this.attributeType = attributeType;
this.entryLimit = entryLimit;
}
/**
* An equals method that uses both the attribute type and the index type.
* Only returns {@code true} if the attribute type and index type are
* equal.
*
* @param obj the object to compare.
* @return {@code true} if the objects are equal, or {@code false} if they
* are not.
*/
{
{
return true;
}
}
return false;
}
/**
* A hash code method that adds the hash codes of the attribute type and
* index type and returns that value.
*
* @return The combined hash values of attribute type hash code and the
* index type hash code.
*/
public int hashCode()
{
}
/**
* Return the attribute type.
*
* @return The attribute type.
*/
public AttributeType getAttributeType()
{
return attributeType;
}
/**
* Return the index type.
*
* @return The index type.
*/
public ImportIndexType getIndexType()
{
return indexType;
}
/**
* Return the index key name, which is the attribute type primary name,
* a period, and the index type name. Used for building file names and
* progress output.
*
* @return The index key name.
*/
{
}
/**
* Return the entry limit associated with the index.
*
* @return The entry limit.
*/
public int getEntryLimit()
{
return entryLimit;
}
}
/**
* The temporary enviroment will be shared when multiple suffixes are being
* processed. This interface is used by those suffix instance to do parental
* checking of the DN cache.
*/
public static interface DNCache {
/**
* Returns {@code true} if the specified DN is contained in the DN cache,
* or {@code false} otherwise.
*
* @param dn The DN to check the presence of.
* @return {@code true} if the cache contains the DN, or {@code false} if it
* is not.
* @throws DatabaseException If an error occurs reading the database.
*/
}
/**
* Temporary environment used to check DN's when DN validation is performed
* during phase one processing. It is deleted after phase one processing.
*/
{
private Environment environment;
/**
* Create a temporary DB environment and database to be used as a cache of
* DNs when DN validation is performed in phase one processing.
*
* @param envPath The file path to create the enviroment under.
* @throws DatabaseException If an error occurs either creating the
* environment or the DN database.
*/
{
envConfig.setReadOnly(false);
envConfig.setAllowCreate(true);
envConfig.setTransactional(false);
dbConfig.setAllowCreate(true);
dbConfig.setTransactional(false);
dbConfig.setTemporary(true);
}
private static final long FNV_INIT = 0xcbf29ce484222325L;
private static final long FNV_PRIME = 0x100000001b3L;
//Hash the DN bytes. Uses the FNV-1a hash.
private byte[] hashCode(byte[] b)
{
for (byte aB : b) {
}
}
/**
* Shutdown the temporary environment.
* @throws JebException If error occurs.
*/
public void shutdown() throws JebException
{
environment.close();
}
/**
* Insert the specified DN into the DN cache. It will return {@code true} if
* the DN does not already exist in the cache and was inserted, or
* {@code false} if the DN exists already in the cache.
*
* @param dn The DN to insert in the cache.
* @param val A database entry to use in the insert.
* @param key A database entry to use in the insert.
* @return {@code true} if the DN was inserted in the cache, or
* {@code false} if the DN exists in the cache already and could
* not be inserted.
* @throws JebException If an error occurs accessing the database.
*/
throws JebException
{
}
throws JebException
{
boolean inserted = true;
try
{
{
inserted = false;
{
"Search DN cache failed.");
throw new JebException(message);
}
{
inserted = true;
}
}
}
finally
{
{
}
}
return inserted;
}
//Add the DN to the DNs as because of a hash collision.
byte[] dnBytes) throws JebException
{
int pos = 0;
{
"Add of DN to DN cache failed.");
throw new JebException(message);
}
}
//Return true if the specified DN is in the DNs saved as a result of hash
//collisions.
{
{
{
return true;
}
}
return false;
}
/**
* Check if the specified DN is contained in the temporary DN cache.
*
* @param dn A DN check for.
* @return {@code true} if the specified DN is in the temporary DN cache,
* or {@code false} if it is not.
*/
{
boolean dnExists = false;
try {
{
}
}
finally
{
{
}
}
return dnExists;
}
/**
* Return temporary environment stats.
*
* @param statsConfig A stats configuration instance.
*
* @return Environment stats.
* @throws DatabaseException If an error occurs retrieving the stats.
*/
throws DatabaseException
{
}
}
/**
* Uncaught exception handler. Try and catch any uncaught exceptions, log
* them and print a stack trace.
*/
public
/**
* {@inheritDoc}
*/
e.printStackTrace();
}
}
}