Importer.java revision f0a048d41a13eca4cba405da9403c2469ca3d1ea
/*
* CDDL HEADER START
*
* The contents of this file are subject to the terms of the
* Common Development and Distribution License, Version 1.0 only
* (the "License"). You may not use this file except in compliance
* with the License.
*
* You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
* See the License for the specific language governing permissions
* and limitations under the License.
*
* When distributing Covered Code, include this CDDL HEADER in each
* file and include the License file at legal-notices/CDDLv1_0.txt.
* If applicable, add the following below this CDDL HEADER, with the
* fields enclosed by brackets "[]" replaced with your own identifying
* information:
* Portions Copyright [yyyy] [name of copyright owner]
*
* CDDL HEADER END
*
*
* Copyright 2008-2010 Sun Microsystems, Inc.
* Portions Copyright 2011-2015 ForgeRock AS
*/
/**
* This class provides the engine that performs both importing of LDIF files and
* the rebuilding of indexes.
*/
final class Importer implements DiskSpaceMonitorHandler
{
private static final int TIMER_INTERVAL = 10000;
/** Defaults for DB cache. */
/**
* Defaults for LDIF reader buffers, min memory required to import and default
* size for byte buffers.
*/
private static final int MIN_DB_CACHE_MEMORY = MAX_DB_CACHE_SIZE
private static final int BYTE_BUFFER_CAPACITY = 128;
/** Max size of phase one buffer. */
/** Min size of phase one buffer. */
/** Min size of phase two read-ahead cache. */
/** Small heap threshold used to give more memory to JVM to attempt OOM errors. */
/** Minimum memory needed for import */
/** The DN attribute type. */
private static final AttributeType dnType;
new IndexOutputBuffer.IndexComparator();
/** Phase one buffer count. */
/** Phase one imported entries count. */
/** Phase one buffer size in bytes. */
private int bufferSize;
/** Temp scratch directory. */
/** Index count. */
private final int indexCount;
/** Thread count. */
private int threadCount;
/** Set to true when validation is skipped. */
private final boolean skipDNValidation;
/** Temporary environment used when DN validation is done in first phase. */
/** Root container. */
private RootContainer rootContainer;
/** Import configuration. */
private final LDIFImportConfig importConfiguration;
/** Backend configuration. */
private final LocalDBBackendCfg backendConfiguration;
/** LDIF reader. */
private ImportLDIFReader reader;
/** Migrated entry count. */
private int migratedCount;
/** Size in bytes of temporary env. */
private long tmpEnvCacheSize;
/** Available memory at the start of the import. */
private long availableMemory;
/** Size in bytes of DB cache. */
private long dbCacheSize;
/** The executor service used for the buffer sort tasks. */
private ExecutorService bufferSortService;
/** The executor service used for the scratch file processing tasks. */
private ExecutorService scratchFileWriterService;
/** Queue of free index buffers -- used to re-cycle index buffers. */
new LinkedBlockingQueue<IndexOutputBuffer>();
/**
* Map of index keys to index buffers. Used to allocate sorted index buffers
* to a index writer thread.
*/
/** Map of DB containers to index managers. Used to start phase 2. */
/** Map of DB containers to DN-based index managers. Used to start phase 2. */
/**
* Futures used to indicate when the index file writers are done flushing
* their work queues and have exited. End of phase one.
*/
/**
* List of index file writer tasks. Used to signal stopScratchFileWriters to
* the index file writer tasks when the LDIF file has been done.
*/
/** Map of DNs to Suffix objects. */
/** Map of container ids to database containers. */
private final ConcurrentHashMap<Integer, Index> idContainerMap = new ConcurrentHashMap<Integer, Index>();
/** Map of container ids to entry containers. */
/** Used to synchronize when a scratch file index writer is first setup. */
/** Rebuild index manager used when rebuilding indexes. */
private final RebuildIndexManager rebuildManager;
/** Set to true if the backend was cleared. */
private final boolean clearedBackend;
/** Used to shutdown import if an error occurs in phase one. */
private volatile boolean isCanceled;
private volatile boolean isPhaseOneDone;
/** Number of phase one buffers. */
private int phaseOneBufferCount;
private final DiskSpaceMonitor diskSpaceMonitor;
static
{
{
}
}
/**
* Create a new import job with the specified rebuild index config.
*
* @param rebuildConfig
* The rebuild index configuration.
* @param cfg
* The local DB back-end configuration.
* @param envConfig
* The JEB environment config.
* @param serverContext
* The ServerContext for this Directory Server instance
* @throws InitializationException
* If a problem occurs during initialization.
* @throws JebException
* If an error occurred when opening the DB.
* @throws ConfigException
* If a problem occurs during initialization.
*/
{
this.importConfiguration = null;
this.backendConfiguration = cfg;
this.threadCount = 1;
this.clearedBackend = false;
this.scratchFileWriterList =
{
}
this.skipDNValidation = true;
}
/**
* Create a new import job with the specified ldif import config.
*
* @param importConfiguration
* The LDIF import configuration.
* @param localDBBackendCfg
* The local DB back-end configuration.
* @param envConfig
* The JEB environment config.
* @param serverContext
* The ServerContext for this Directory Server instance
* @throws InitializationException
* If a problem occurs during initialization.
* @throws ConfigException
* If a problem occurs reading the configuration.
* @throws DatabaseException
* If an error occurred when opening the DB.
*/
{
this.rebuildManager = null;
{
}
else
{
}
// Determine the number of indexes.
this.scratchFileWriterList =
{
}
// Set up temporary environment.
if (!skipDNValidation)
{
}
else
{
}
}
/**
* Returns whether the backend must be cleared.
*
* @param importCfg the import configuration object
* @param backendCfg the backend configuration object
* @return true if the backend must be cleared, false otherwise
*/
{
return !importCfg.appendToExistingData()
}
{
if (tmpDirectory != null)
{
}
else
{
}
}
throws ConfigException
{
{
{
}
else
{
}
}
return indexes;
}
/**
* Return the suffix instance in the specified map that matches the specified
* DN.
*
* @param dn
* The DN to search for.
* @param map
* The map to search.
* @return The suffix instance that matches the DN, or null if no match is
* found.
*/
{
{
{
}
}
return suffix;
}
/**
* Calculate buffer sizes and initialize JEB properties based on memory.
*
* @param envConfig
* The environment config to use in the calculations.
* @throws InitializationException
* If a problem occurs during calculation.
*/
{
// Calculate amount of usable memory. This will need to take into account
// various fudge factors, including the number of IO buffers used by the
// scratch writers (1 per index).
// We need caching when doing DN validation or rebuilding indexes.
{
// No DN validation: calculate memory for DB cache, DN2ID temporary cache,
// and buffers.
{
}
{
}
else if (!clearedBackend)
{
// Appending to existing data so reserve extra memory for the DB cache
// since it will be needed for dn2id queries.
}
else
{
}
}
else
{
// No DN validation: calculate memory for DB cache and buffers.
// No need for DN2ID cache.
tmpEnvCacheSize = 0;
{
}
else if (usableMemory < MIN_DB_CACHE_MEMORY)
{
}
else
{
// not being queried.
}
}
final int oldThreadCount = threadCount;
{
while (true)
{
// Scratch writers allocate 4 buffers per index as well.
// We need (2 * bufferSize) to fit in an int for the insertByteStream
// and deleteByteStream constructors.
if (bufferSize > MAX_BUFFER_SIZE)
{
if (!skipDNValidation)
{
// The buffers are big enough: the memory is best used for the DN2ID temp DB
if (!clearedBackend)
{
}
else
{
}
}
break;
}
else if (bufferSize > MIN_BUFFER_SIZE)
{
// This is acceptable.
break;
}
else if (threadCount > 1)
{
// Retry using less threads.
threadCount--;
}
else
{
// Not enough memory.
throw new InitializationException(message);
}
}
}
if (oldThreadCount != threadCount)
{
}
if (tmpEnvCacheSize > 0)
{
}
}
/**
* Calculates the amount of available memory which can be used by this import,
* taking into account whether or not the import is running offline or online
* as a task.
*/
private void calculateAvailableMemory()
{
final long totalAvailableMemory;
if (DirectoryServer.isRunning())
{
// call twice gc to ensure finalizers are called
// and young to old gen references are properly gc'd
final long configuredMemory;
{
}
else
{
configuredMemory = backendConfiguration.getDBCachePercent() * Runtime.getRuntime().maxMemory() / 100;
}
// Round up to minimum of 32MB (e.g. unit tests only use a small cache).
totalAvailableMemory = Math.max(Math.min(usableMemory, configuredMemory), MINIMUM_AVAILABLE_MEMORY);
}
else
{
}
// Now take into account various fudge factors.
int importMemPct = 90;
if (totalAvailableMemory <= SMALL_HEAP_SIZE)
{
// Be pessimistic when memory is low.
importMemPct -= 25;
}
if (rebuildManager != null)
{
// Rebuild seems to require more overhead.
importMemPct -= 15;
}
}
private void initializeIndexBuffers()
{
for (int i = 0; i < phaseOneBufferCount; i++)
{
}
}
{
{
{
}
}
}
/**
* Mainly used to support multiple suffixes. Each index in each suffix gets an
* unique ID to identify which DB it needs to go to in phase two processing.
*/
{
{
if (!extensibleMap.isEmpty())
{
}
}
}
{
{
{
}
}
}
{
{
}
}
{
}
{
&& !importConfiguration.clearBackend())
{
{
{
// This entire base DN was explicitly excluded. Skip.
return null;
}
{
}
}
{
{
{
}
}
if (includeBranches.isEmpty())
{
/*
* There are no branches in the explicitly defined include list under
* this base DN. Skip this base DN all together.
*/
return null;
}
// Remove any overlapping include branches.
while (includeBranchIterator.hasNext())
{
{
}
}
// Remove any exclude branches that are not are not under a include
// branch since they will be migrated as part of the existing entries
// outside of the include branches anyways.
while (excludeBranchIterator.hasNext())
{
{
}
}
if (excludeBranches.isEmpty()
{
// This entire base DN is explicitly included in the import with
// no exclude branches that we need to migrate. Just clear the entry
// container.
}
else
{
// Create a temp entry container
+ "_importTmp");
}
}
}
}
{
}
{
{
{
return false;
}
}
return true;
}
{
{
{
return true;
}
}
return false;
}
/**
* Rebuild the indexes using the specified root container.
*
* @param rootContainer
* The root container to rebuild indexes in.
* @throws ConfigException
* If a configuration error occurred.
* @throws InitializationException
* If an initialization error occurred.
* @throws JebException
* If the JEB database had an error.
* @throws InterruptedException
* If an interrupted error occurred.
* @throws ExecutionException
* If an execution error occurred.
*/
{
this.rootContainer = rootContainer;
try
{
}
finally
{
}
}
/**
* Import a LDIF using the specified root container.
*
* @param rootContainer
* The root container to use during the import.
* @return A LDIF result.
* @throws ConfigException
* If the import failed because of an configuration error.
* @throws InitializationException
* If the import failed because of an initialization error.
* @throws JebException
* If the import failed due to a database error.
* @throws InterruptedException
* If the import failed due to an interrupted error.
* @throws ExecutionException
* If the import failed due to an execution error.
*/
{
this.rootContainer = rootContainer;
try {
try
{
}
catch (IOException ioe)
{
}
setIndexesTrusted(false);
phaseOne();
isPhaseOneDone = true;
if (!skipDNValidation)
{
}
if (isCanceled)
{
throw new InterruptedException("Import processing canceled.");
}
phaseTwo();
if (isCanceled)
{
throw new InterruptedException("Import processing canceled.");
}
setIndexesTrusted(true);
float rate = 0;
if (importTime > 0)
{
}
}
finally
{
if (!skipDNValidation)
{
try
{
}
{
// Do nothing.
}
}
}
}
{
diskSpaceMonitor.registerMonitoredDirectory(backendConfiguration.getBackendId() + " " + backendSuffix, dir,
}
{
{
{
if (f.isDirectory())
{
recursiveDelete(f);
}
f.delete();
}
}
}
private void switchEntryContainers() throws DatabaseException, JebException, InitializationException
{
{
if (entryContainer != null)
{
replacement.lock();
}
}
}
{
try
{
{
}
}
catch (DatabaseException ex)
{
}
}
{
{
for (int i = 0; i < threadCount; i++)
{
}
}
else
{
for (int i = 0; i < threadCount; i++)
{
}
}
// Try to clear as much memory as possible.
}
{
}
{
{
}
{
}
}
{
{
}
}
{
try
{
}
finally
{
}
}
{
{
return;
}
if (dbThreads < 4)
{
dbThreads = 4;
}
// Calculate memory / buffer counts.
int readAheadSize;
int buffers;
while (true)
{
buffers = 0;
for (int i = 0; i < limit; i++)
{
}
if (readAheadSize > bufferSize)
{
// Cache size is never larger than the buffer size.
break;
}
else if (readAheadSize > MIN_READ_AHEAD_CACHE_SIZE)
{
// This is acceptable.
break;
}
else if (dbThreads > 1)
{
// Reduce thread count.
dbThreads--;
}
else
{
// Not enough memory - will need to do batching for the biggest indexes.
break;
}
}
// Ensure that there are minimum two threads available for parallel
// processing of smaller indexes.
// Start indexing tasks.
// Start DN processing first.
}
private void submitIndexDBWriteTasks(List<IndexManager> indexMgrs, ExecutorService dbService, Semaphore permits,
{
{
}
}
{
{
}
}
private void stopScratchFileWriters()
{
{
}
}
/** Task used to migrate excluded branch. */
private final class MigrateExcludedTask extends ImportTask
{
/** {@inheritDoc} */
{
{
{
try
{
{
{
// This is the base entry for a branch that was excluded in the
// import so we must migrate all entries in this branch over to
// the new entry container.
{
}
}
}
}
catch (Exception e)
{
isCanceled = true;
throw e;
}
finally
{
}
}
}
return null;
}
}
/** Task to migrate existing entries. */
private final class MigrateExistingTask extends ImportTask
{
/** {@inheritDoc} */
{
{
{
try
{
{
{
}
else
{
// This is the base entry for a branch that will be included
// in the import so we don't want to copy the branch to the
// new entry container.
/*
* Advance the cursor to next entry at the same level in the DIT
* skipping all the entries in this branch. Set the next
* starting value to a value of equal length but slightly
* greater than the previous DN. Since keys are compared in
* reverse order we must set the first byte (the comma). No
* possibility of overflow here.
*/
}
}
}
catch (Exception e)
{
isCanceled = true;
throw e;
}
finally
{
}
}
}
return null;
}
{
{
{
}
}
return includeBranches;
}
{
{
{
return true;
}
}
return false;
}
}
/**
*/
private class AppendReplaceTask extends ImportTask
{
/** {@inheritDoc} */
{
try
{
while (true)
{
{
return null;
}
{
break;
}
}
return null;
}
catch (Exception e)
{
isCanceled = true;
throw e;
}
}
{
{
}
{
{
return;
}
}
else
{
}
{
}
else
{
}
}
{
{
}
}
{
{
{
}
}
{
}
}
}
/**
* This task performs phase reading and processing of the entries read from
* the LDIF file(s). This task is used if the append flag wasn't specified.
*/
{
private final Map<IndexKey, IndexOutputBuffer> indexBufferMap = new HashMap<IndexKey, IndexOutputBuffer>();
/** {@inheritDoc} */
{
try
{
while (true)
{
{
return null;
}
{
break;
}
}
return null;
}
catch (Exception e)
{
isCanceled = true;
throw e;
}
}
{
{
return;
}
}
/** Examine the DN for duplicates and missing parents. */
throws JebException, InterruptedException
{
//Perform parent checking.
{
return false;
}
//If the backend was not cleared, then the dn2id needs to checked first
//for DNs that might not exist in the DN cache. If the DN is not in
//the suffixes dn2id DB, then the dn cache is used.
if (!clearedBackend)
{
{
return false;
}
}
{
return false;
}
return true;
}
{
{
{
}
}
}
void fillIndexKey(Suffix suffix, AttributeIndex attrIndex, Entry entry, AttributeType attrType, EntryID entryID)
{
processAttribute(attrIndex.getEqualityIndex(), ImportIndexType.EQUALITY, entry, attrType, entryID, options);
processAttribute(attrIndex.getPresenceIndex(), ImportIndexType.PRESENCE, entry, attrType, entryID, options);
processAttribute(attrIndex.getSubstringIndex(), ImportIndexType.SUBSTRING, entry, attrType, entryID, options);
processAttribute(attrIndex.getOrderingIndex(), ImportIndexType.ORDERING, entry, attrType, entryID, options);
processAttribute(attrIndex.getApproximateIndex(), ImportIndexType.APPROXIMATE, entry, attrType, entryID, options);
{
}
if (!extensibleMap.isEmpty())
{
}
}
{
{
}
}
{
{
{
}
}
}
{
{
}
}
{
{
}
}
{
if (indexBuffer == null)
{
}
{
// complete the current buffer...
// ... and get a new one
}
return indexID;
}
{
if (size > bufferSize)
{
}
else
{
if (indexBuffer == null)
{
throw new InterruptedException("Index buffer processing error.");
}
}
if (indexBuffer.isPoison())
{
throw new InterruptedException("Cancel processing received.");
}
return indexBuffer;
}
{
}
{
{
}
else
{
}
}
}
/**
* This task reads sorted records from the temporary index scratch files,
* processes the records and writes the results to the index database. The DN
* index is treated differently then non-DN indexes.
*/
{
private final IndexManager indexMgr;
private final int cacheSize;
private final int maxPermits;
private long lastBytesRead;
private RandomAccessFile bufferFile;
private DataInputStream bufferIndexFile;
private int remainingBuffers;
private volatile int totalBatches;
private int nextBufferID;
private int ownedPermits;
private volatile boolean isRunning;
/**
* Creates a new index DB writer.
*
* @param indexMgr
* The index manager.
* @param permits
* The semaphore used for restricting the number of buffer
* allocations.
* @param maxPermits
* The maximum number of buffers which can be allocated.
* @param cacheSize
* The buffer cache size.
*/
int maxPermits, int cacheSize)
{
this.maxPermits = maxPermits;
this.dbKey = new DatabaseEntry();
this.dbValue = new DatabaseEntry();
}
/**
* Initializes this task.
*
* @throws IOException
* If an IO error occurred.
*/
public void beginWriteTask() throws IOException
{
indexMgr.getBufferIndexFile())));
nextBufferID = 0;
ownedPermits = 0;
logger.info(NOTE_IMPORT_LDIF_INDEX_STARTED, indexMgr.getBufferFileName(), remainingBuffers, totalBatches);
indexMgr.setIndexDBWriteTask(this);
isRunning = true;
}
/**
* Returns the next batch of buffers to be processed, blocking until enough
* buffer permits are available.
*
* @return The next batch of buffers, or {@code null} if there are no more
* buffers to be processed.
* @throws Exception
* If an exception occurred.
*/
{
// First release any previously acquired permits.
if (ownedPermits > 0)
{
ownedPermits = 0;
}
// Block until we can either get enough permits for all buffers, or the
// maximum number of permits.
if (permitRequest == 0)
{
// No more work to do.
return null;
}
// Update counters.
// Create all the index buffers for the next batch.
for (int i = 0; i < permitRequest; i++)
{
final IndexInputBuffer b =
}
return buffers;
}
/**
* Finishes this task.
*/
public void endWriteTask()
{
isRunning = false;
// First release any previously acquired permits.
if (ownedPermits > 0)
{
ownedPermits = 0;
}
try
{
{
{
}
if (!isCanceled)
{
}
}
else
{
if (!isCanceled)
{
}
}
}
finally
{
}
}
/**
* Print out progress stats.
*
* @param deltaTime
* The time since the last update.
*/
public void printStats(long deltaTime)
{
if (isRunning)
{
final int bytesReadPercent =
// Kilo and milli approximately cancel out.
}
}
/** {@inheritDoc} */
{
if (isCanceled)
{
return null;
}
try
{
{
if (isCanceled)
{
return null;
}
{
{
indexID = b.getIndexID();
{
}
else
{
}
}
{
indexID = b.getIndexID();
{
}
else
{
}
{
}
}
else
{
}
if (b.hasMoreData())
{
b.fetchNextRecord();
}
}
{
}
}
return null;
}
catch (Exception e)
{
throw e;
}
finally
{
endWriteTask();
}
}
private void addToDB(int indexID, ImportIDSet insertSet, ImportIDSet deleteSet) throws DirectoryException
{
{
}
else
{
{
}
{
}
}
}
{
{
}
else
{
}
{
}
}
private void addBytesRead(int bytesRead)
{
}
/**
* This class is used to by a index DB merge thread performing DN processing
* to keep track of the state of individual DN2ID index processing.
*/
class DNState
{
private final EntryContainer entryContainer;
private final boolean isSubordinatesEnabled;
// Fields below are only needed if the isSubordinatesEnabled boolean is true.
private final int childLimit, subTreeLimit;
private final boolean childDoCount, subTreeDoCount;
{
this.entryContainer = entryContainer;
parentIDMap = new TreeMap<>();
dnKey = new DatabaseEntry();
dnValue = new DatabaseEntry();
}
{
if (parentIndex < 0)
{
// This is the root or base DN
return null;
}
return parent;
}
{
if (destBuffer == null
{
}
else
{
destBuffer.flip();
return destBuffer;
}
}
/** Why do we still need this if we are checking parents in the first phase? */
{
byte[] v = record.toDatabase();
//Bypass the cache for append data, lookup the parent in DN2ID and return.
if (importConfiguration != null
{
//If null is returned than this is a suffix DN.
{
{
}
else
{
// We have a missing parent. Maybe parent checking was turned off?
// Just ignore.
return false;
}
}
}
else if (parentIDMap.isEmpty())
{
return true;
}
{
return true;
}
{
return true;
}
{
{
}
}
else
{
// We have a missing parent. Maybe parent checking was turned off?
// Just ignore.
return false;
}
return true;
}
{
{
{
}
else
{
}
{
}
}
else
{
}
}
{
// Bypass the cache for append data, lookup the parent DN in the DN2ID db
{
}
{
}
return null;
}
{
{
{
}
else
{
}
// TODO:
// Instead of doing this,
// we can just walk to parent cache if available
{
{
// We have a missing parent. Maybe parent checking was turned off?
// Just ignore.
break;
}
{
}
else
{
}
}
{
}
}
else
{
}
}
public void writeToDB() throws DirectoryException
{
{
}
}
boolean clearMap)
{
{
}
if (clearMap)
{
}
}
public void flush()
{
if (isSubordinatesEnabled) {
}
}
}
}
/**
* This task writes the temporary scratch index files using the sorted buffers
* read from a blocking queue private to each index.
*/
{
private static final int DRAIN_TO = 3;
private final IndexManager indexMgr;
private final DataOutputStream bufferStream;
private final DataOutputStream bufferIndexStream;
private final byte[] tmpArray = new byte[8];
private int insertKeyCount, deleteKeyCount;
private int bufferCount;
private boolean poisonSeen;
{
}
{
return new DataOutputStream(new BufferedOutputStream(new FileOutputStream(file), READER_WRITER_BUFFER_SIZE));
}
/** {@inheritDoc} */
{
long offset = 0;
try
{
while (true)
{
long beginOffset = offset;
long bufferLen;
{
l.add(indexBuffer);
bufferLen = writeIndexBuffers(l);
for (IndexOutputBuffer id : l)
{
if (!id.isDiscarded())
{
}
}
l.clear();
}
else
{
if (indexBuffer.isPoison())
{
break;
}
if (!indexBuffer.isDiscarded())
{
indexBuffer.reset();
}
}
// Write buffer index information.
bufferCount++;
if (poisonSeen)
{
break;
}
}
return null;
}
catch (IOException e)
{
logger.error(ERR_IMPORT_LDIF_INDEX_FILEWRITER_ERR, indexMgr.getBufferFile().getAbsolutePath(), e.getMessage());
isCanceled = true;
throw e;
}
finally
{
}
}
{
resetStreams();
long bufferLen = 0;
for (int i = 0; i < numberKeys; i++)
{
{
continue;
}
if (!indexBuffer.compare(i))
{
resetStreams();
}
}
{
}
return bufferLen;
}
{
resetStreams();
long bufferID = 0;
long bufferLen = 0;
for (IndexOutputBuffer b : buffers)
{
if (b.isPoison())
{
poisonSeen = true;
}
else
{
b.setPosition(0);
b.setBufferID(bufferID++);
indexSortedSet.add(b);
}
}
int saveIndexID = 0;
while (!indexSortedSet.isEmpty())
{
{
saveIndexID = b.getIndexID();
insertOrDeleteKey(b, b.getPosition());
}
{
resetStreams();
saveIndexID = b.getIndexID();
insertOrDeleteKey(b, b.getPosition());
}
else
{
}
if (b.hasMoreData())
{
b.nextRecord();
indexSortedSet.add(b);
}
}
{
}
return bufferLen;
}
private void resetStreams()
{
insertKeyCount = 0;
deleteKeyCount = 0;
}
{
{
}
else
{
}
}
{
{
{
}
}
else
{
}
}
private int writeByteStreams() throws IOException
{
{
// special handling when index entry limit has been exceeded
insertKeyCount = 1;
}
{
}
{
}
}
{
}
{
int keySize = b.getKeySize();
b.writeKey(bufferStream);
int bodySize = writeByteStreams();
}
{
bufferStream.write(k);
int bodySize = writeByteStreams();
}
{
return writeSize;
}
{
return writeSize;
}
/** {@inheritDoc} */
{
return getClass().getSimpleName() + "(" + indexMgr.getBufferFileName() + ": " + indexMgr.getBufferFile() + ")";
}
}
/**
* This task main function is to sort the index buffers given to it from the
* import tasks reading the LDIF file. It will also create a index file writer
* task and corresponding queue if needed. The sorted index buffers are put on
* the index file writer queues for writing to a temporary file.
*/
{
private final IndexOutputBuffer indexBuffer;
{
this.indexBuffer = indexBuffer;
}
/** {@inheritDoc} */
{
|| isCanceled)
{
isCanceled = true;
return null;
}
indexBuffer.sort();
{
}
return null;
}
{
synchronized (synObj)
{
{
return;
}
if (isDN2ID)
{
}
else
{
}
}
}
}
/**
* The index manager class has several functions:
* <ol>
* <li>It is used to carry information about index processing created in phase one to phase two</li>
* <li>It collects statistics about phase two processing for each index</li>
* <li>It manages opening and closing the scratch index files</li>
* </ol>
*/
{
private final File bufferFile;
private final String bufferFileName;
private final File bufferIndexFile;
private final boolean isDN2ID;
private final int limit;
private int numberOfBuffers;
private long bufferFileSize;
private long totalDNs;
private volatile IndexDBWriteTask writer;
{
this.bufferFileName = fileName;
}
{
}
private File getBufferFile()
{
return bufferFile;
}
private long getBufferFileSize()
{
return bufferFileSize;
}
private File getBufferIndexFile()
{
return bufferIndexFile;
}
{
this.numberOfBuffers = numberOfBuffers;
this.bufferFileSize = bufferFileSize;
}
/**
* Updates the bytes read counter.
*
* @param bytesRead
* The number of bytes read.
*/
void addBytesRead(int bytesRead)
{
{
}
}
private void addTotDNCount(int delta)
{
}
private long getDNCount()
{
return totalDNs;
}
private boolean isDN2ID()
{
return isDN2ID;
}
private void printStats(long deltaTime)
{
{
}
}
/**
* Returns the file name associated with this index manager.
*
* @return The file name associated with this index manager.
*/
{
return bufferFileName;
}
private int getLimit()
{
return limit;
}
/** {@inheritDoc} */
{
}
private int getNumberOfBuffers()
{
return numberOfBuffers;
}
/** {@inheritDoc} */
{
}
}
/**
* The rebuild index manager handles all rebuild index related processing.
*/
private class RebuildIndexManager extends ImportTask implements
{
/** Rebuild index configuration. */
private final RebuildConfig rebuildConfig;
/** Local DB backend configuration. */
private final LocalDBBackendCfg cfg;
/** Map of index keys to indexes. */
/** Map of index keys to extensible indexes. */
/** List of VLV indexes. */
/** The DN2ID index. */
/** The DN2URI index. */
/** Total entries to be processed. */
private long totalEntries;
/** Total entries processed. */
/** The suffix instance. */
/** The entry container. */
private EntryContainer entryContainer;
/**
* Create an instance of the rebuild index manager using the specified
* parameters.
*
* @param rebuildConfig
* The rebuild configuration to use.
* @param cfg
* The local DB configuration to use.
*/
{
this.rebuildConfig = rebuildConfig;
}
/**
* Initialize a rebuild index manager.
*
* @throws ConfigException
* If an configuration error occurred.
* @throws InitializationException
* If an initialization error occurred.
*/
{
}
/**
* Print start message.
*
* @throws DatabaseException
* If an database error occurred.
*/
public void printStartMessage() throws DatabaseException
{
switch (rebuildConfig.getRebuildMode())
{
case ALL:
break;
case DEGRADED:
break;
default:
&& logger.isInfoEnabled())
{
}
break;
}
}
/**
* Print stop message.
*
* @param startTime
* The time the rebuild started.
*/
public void printStopMessage(long startTime)
{
float rate = 0;
if (totalTime > 0)
{
}
if (!rebuildConfig.isClearDegradedState())
{
}
}
/** {@inheritDoc} */
{
try
{
{
if (isCanceled)
{
return null;
}
}
return null;
}
catch (Exception e)
{
logger.traceException(e);
isCanceled = true;
throw e;
}
finally
{
}
}
/**
* Perform rebuild index processing.
*
* @throws DatabaseException
* If an database error occurred.
* @throws InterruptedException
* If an interrupted error occurred.
* @throws ExecutionException
* If an Execution error occurred.
* @throws JebException
* If an JEB error occurred.
*/
public void rebuildIndexes() throws DatabaseException,
{
// Sets only the needed indexes.
if (!rebuildConfig.isClearDegradedState())
{
// If not in a 'clear degraded state' operation,
// need to rebuild the indexes.
setRebuildListIndexesTrusted(false);
clearIndexes(true);
phaseOne();
if (isCanceled)
{
throw new InterruptedException("Rebuild Index canceled.");
}
phaseTwo();
}
else
{
}
setRebuildListIndexesTrusted(true);
}
@SuppressWarnings("fallthrough")
private void setIndexesListsToBeRebuilt() throws JebException
{
// Depends on rebuild mode, (re)building indexes' lists.
switch (mode)
{
case ALL:
rebuildIndexMap(false);
// falls through
case DEGRADED:
{
}
{
}
{
rebuildIndexMap(true); // only degraded.
}
{
}
break;
case USER_DEFINED:
// false may be required if the user wants to rebuild specific index.
rebuildIndexMap(false);
break;
default:
break;
}
}
private void rebuildIndexMap(final boolean onlyDegraded)
{
// rebuildList contains the user-selected index(in USER_DEFINED mode).
{
{
// Get all existing indexes for all && degraded mode.
}
else if (!rebuildList.isEmpty())
{
// Get indexes for user defined index.
{
{
}
}
}
}
}
throws DatabaseException
{
if (!extensibleMap.isEmpty())
{
}
}
{
{
{
{
{
}
}
else
{
// This index is not a candidate for rebuilding.
}
}
if (!mutableCopy.isEmpty())
{
}
}
}
{
{
}
}
{
// Clears all the entry's container databases which are containing the indexes
if (!onlyDegraded)
{
// dn2uri does not have a trusted status.
}
if (!onlyDegraded
{
}
{
{
{
}
}
}
if (!extensibleIndexMap.isEmpty())
{
{
if (subIndexes != null)
{
{
}
}
}
}
{
{
}
}
}
private void setRebuildListIndexesTrusted(boolean trusted)
throws JebException
{
try
{
{
}
if (!vlvIndexes.isEmpty())
{
{
}
}
if (!extensibleIndexMap.isEmpty())
{
{
}
}
}
catch (DatabaseException ex)
{
}
}
{
{
{
}
}
}
{
for (int i = 0; i < threadCount; i++)
{
}
// Try to clear as much memory as possible.
}
{
try
{
}
finally
{
}
}
{
return timer;
}
{
switch (rebuildConfig.getRebuildMode())
{
case ALL:
return getTotalIndexCount(cfg);
case DEGRADED:
// FIXME: since the environment is not started we cannot determine which
// indexes are degraded. As a workaround, be conservative and assume all
// indexes need rebuilding.
return getTotalIndexCount(cfg);
default:
return getRebuildListIndexCount(cfg);
}
}
{
if (rebuildList.isEmpty())
{
return 0;
}
int indexCount = 0;
{
{
indexCount += 3;
}
{
indexCount++;
}
{
{
}
indexCount++;
}
{
throw attributeIndexNotConfigured(index);
}
else
{
{
throw attributeIndexNotConfigured(index);
}
{
throw attributeIndexNotConfigured(index);
}
{
{
{
indexCount++;
}
else
{
throw attributeIndexNotConfigured(index);
}
}
else // attrIndexParts.length == 3
{
{
throw attributeIndexNotConfigured(index);
}
indexCount++;
}
}
else
{
boolean found = false;
{
{
found = true;
}
}
if (!found)
{
throw attributeIndexNotConfigured(index);
}
}
}
}
return indexCount;
}
{
}
private boolean findExtensibleMatchingRule(LocalDBBackendCfg cfg, String indexExRuleName) throws ConfigException
{
{
{
{
{
return true;
}
}
}
}
return false;
}
{
int result = 0;
{
{
result++;
}
}
return result;
}
{
int result = 0;
{
boolean shared = false;
{
{
result++;
}
else if (!shared)
{
shared = true;
result++;
}
}
}
return result;
}
{
{
}
{
}
}
{
{
}
}
throws InterruptedException
{
this.extensibleIndexMap.entrySet())
{
{
{
}
}
}
}
{
{
{
}
}
}
/**
* Return the number of entries processed by the rebuild manager.
*
* @return The number of entries processed.
*/
public long getEntriesProcessed()
{
return this.entriesProcessed.get();
}
/**
* Return the total number of entries to process by the rebuild manager.
*
* @return The total number for entries to process.
*/
public long getTotalEntries()
{
return this.totalEntries;
}
{
}
{
isCanceled = true;
}
{
// Do nothing
}
}
/**
* This class reports progress of rebuild index processing at fixed intervals.
*/
private class RebuildFirstPhaseProgressTask extends TimerTask
{
/**
* The number of records that had been processed at the time of the previous
* progress report.
*/
private long previousProcessed;
/** The time in milliseconds of the previous progress report. */
private long previousTime;
/** The environment statistics at the time of the previous report. */
private EnvironmentStats prevEnvStats;
/**
* Create a new rebuild index progress task.
*
* @throws DatabaseException
* If an error occurred while accessing the JE database.
*/
public RebuildFirstPhaseProgressTask() throws DatabaseException
{
}
/**
* The action to be performed by this timer task.
*/
public void run()
{
if (deltaTime == 0)
{
return;
}
float completed = 0;
{
}
logger.info(NOTE_REBUILD_PROGRESS_REPORT, completed, entriesProcessed, rebuildManager.getTotalEntries(), rate);
try
{
float cacheMissRate = 0;
if (deltaCount > 0)
{
}
}
catch (DatabaseException e)
{
// Unlikely to happen and not critical.
}
}
}
/**
* This class reports progress of first phase of import processing at fixed
* intervals.
*/
private final class FirstPhaseProgressTask extends TimerTask
{
/**
* The number of entries that had been read at the time of the previous
* progress report.
*/
private long previousCount;
/** The time in milliseconds of the previous progress report. */
private long previousTime;
/** The environment statistics at the time of the previous report. */
private EnvironmentStats previousStats;
/** Determines if eviction has been detected. */
private boolean evicting;
/** Entry count when eviction was detected. */
private long evictionEntryCount;
/** Create a new import progress task. */
public FirstPhaseProgressTask()
{
try
{
}
catch (DatabaseException e)
{
throw new RuntimeException(e);
}
}
/** The action to be performed by this timer task. */
public void run()
{
if (deltaTime == 0)
{
return;
}
try
{
//If first phase skip DN validation is specified use the root container
//stats, else use the temporary environment stats.
if (skipDNValidation)
{
}
else
{
}
long nCacheMiss =
float cacheMissRate = 0;
if (deltaCount > 0)
{
}
if (evictPasses != 0)
{
if (!evicting)
{
evicting = true;
}
}
if (cleanerRuns != 0)
{
}
if (checkPoints > 1)
{
}
}
catch (DatabaseException e)
{
// Unlikely to happen and not critical.
}
}
}
/**
* This class reports progress of the second phase of import processing at
* fixed intervals.
*/
private class SecondPhaseProgressTask extends TimerTask
{
/**
* The number of entries that had been read at the time of the previous
* progress report.
*/
private long previousCount;
/** The time in milliseconds of the previous progress report. */
private long previousTime;
/** The environment statistics at the time of the previous report. */
private EnvironmentStats previousStats;
/** Determines if eviction has been detected. */
private boolean evicting;
private long latestCount;
/**
* Create a new import progress task.
*
* @param latestCount
* The latest count of entries processed in phase one.
*/
public SecondPhaseProgressTask(long latestCount)
{
this.latestCount = latestCount;
try
{
}
catch (DatabaseException e)
{
throw new RuntimeException(e);
}
}
/** The action to be performed by this timer task. */
public void run()
{
if (deltaTime == 0)
{
return;
}
try
{
long nCacheMiss =
float cacheMissRate = 0;
if (deltaCount > 0)
{
}
if (evictPasses != 0)
{
if (!evicting)
{
evicting = true;
}
}
if (cleanerRuns != 0)
{
}
if (checkPoints > 1)
{
}
}
catch (DatabaseException e)
{
// Unlikely to happen and not critical.
}
//Do DN index managers first.
{
}
//Do non-DN index managers.
{
}
}
}
/**
* A class to hold information about the entry determined by the LDIF reader.
* Mainly the suffix the entry belongs under and the ID assigned to it by the
* reader.
*/
public class EntryInformation
{
/**
* Return the suffix associated with the entry.
*
* @return Entry's suffix instance;
*/
{
return suffix;
}
/**
* Set the suffix instance associated with the entry.
*
* @param suffix
* The suffix associated with the entry.
*/
{
}
/**
* Set the entry's ID.
*
* @param entryID
* The entry ID to set the entry ID to.
*/
{
}
/**
* Return the entry ID associated with the entry.
*
* @return The entry ID associated with the entry.
*/
public EntryID getEntryID()
{
return entryID;
}
}
/**
* This class defines the individual index type available.
*/
private enum ImportIndexType
{
/** The DN index type. */
DN,
/** The equality index type. */
/** The presence index type. */
/** The sub-string index type. */
/** The ordering index type. */
/** The approximate index type. */
/** The extensible sub-string index type. */
/** The extensible shared index type. */
/** The vlv index type. */
}
/**
* This class is used as an index key for hash maps that need to process
* both attribute type and index type (ie., cn.equality, sn.equality,...).
*/
public static class IndexKey
{
private final AttributeType attributeType;
private final ImportIndexType indexType;
private final int entryLimit;
/**
* Create index key instance using the specified attribute type, index type
* and index entry limit.
*
* @param attributeType
* The attribute type.
* @param indexType
* The index type.
* @param entryLimit
* The entry limit for the index.
*/
{
this.attributeType = attributeType;
this.entryLimit = entryLimit;
}
/**
* An equals method that uses both the attribute type and the index type.
* Only returns {@code true} if the attribute type and index type are equal.
*
* @param obj
* the object to compare.
* @return {@code true} if the objects are equal, or {@code false} if they
* are not.
*/
{
{
{
return true;
}
}
return false;
}
/**
* A hash code method that adds the hash codes of the attribute type and
* index type and returns that value.
*
* @return The combined hash values of attribute type hash code and the
* index type hash code.
*/
public int hashCode()
{
}
/**
* Return the attribute type.
*
* @return The attribute type.
*/
public AttributeType getAttributeType()
{
return attributeType;
}
/**
* Return the index type.
*
* @return The index type.
*/
public ImportIndexType getIndexType()
{
return indexType;
}
/**
* Return the index key name, which is the attribute type primary name, a
* period, and the index type name. Used for building file names and
* progress output.
*
* @return The index key name.
*/
{
}
/**
* Return the entry limit associated with the index.
*
* @return The entry limit.
*/
public int getEntryLimit()
{
return entryLimit;
}
/** {@inheritDoc} */
{
return getClass().getSimpleName()
+ ", entryLimit=" + entryLimit
+ ")";
}
}
/**
* The temporary environment will be shared when multiple suffixes are being
* processed. This interface is used by those suffix instance to do parental
* checking of the DN cache.
*/
public static interface DNCache
{
/**
* Returns {@code true} if the specified DN is contained in the DN cache, or
* {@code false} otherwise.
*
* @param dn
* The DN to check the presence of.
* @return {@code true} if the cache contains the DN, or {@code false} if it
* is not.
* @throws DatabaseException
* If an error occurs reading the database.
*/
}
/**
* Temporary environment used to check DN's when DN validation is performed
* during phase one processing. It is deleted after phase one processing.
*/
{
private final Environment environment;
/**
* Create a temporary DB environment and database to be used as a cache of
* DNs when DN validation is performed in phase one processing.
*
* @param envPath
* The file path to create the environment under.
* @throws DatabaseException
* If an error occurs either creating the environment or the DN
* database.
*/
{
envConfig.setReadOnly(false);
envConfig.setAllowCreate(true);
envConfig.setTransactional(false);
dbConfig.setAllowCreate(true);
dbConfig.setTransactional(false);
dbConfig.setTemporary(true);
}
private static final long FNV_INIT = 0xcbf29ce484222325L;
private static final long FNV_PRIME = 0x100000001b3L;
/** Hash the DN bytes. Uses the FNV-1a hash. */
private byte[] hashCode(byte[] b)
{
for (byte aB : b)
{
}
}
/**
* Shutdown the temporary environment.
*
* @throws JebException
* If error occurs.
*/
private void shutdown() throws JebException
{
environment.close();
}
/**
* Insert the specified DN into the DN cache. It will return {@code true} if
* the DN does not already exist in the cache and was inserted, or
* {@code false} if the DN exists already in the cache.
*
* @param dn
* The DN to insert in the cache.
* @param val
* A database entry to use in the insert.
* @param key
* A database entry to use in the insert.
* @return {@code true} if the DN was inserted in the cache, or
* {@code false} if the DN exists in the cache already and could not
* be inserted.
* @throws JebException
* If an error occurs accessing the database.
*/
throws JebException
{
// Use a compact representation for key
// Use a reversible representation for value
}
throws JebException
{
try
{
{
{
}
{
return true;
}
return false;
}
return true;
}
finally
{
}
}
/** Add the DN to the DNs as because of a hash collision. */
{
{
}
}
/** Return true if the specified DN is in the DNs saved as a result of hash collisions. */
{
int pos = 0;
{
{
return true;
}
}
return false;
}
/**
* Check if the specified DN is contained in the temporary DN cache.
*
* @param dn
* A DN check for.
* @return {@code true} if the specified DN is in the temporary DN cache, or
* {@code false} if it is not.
*/
{
try
{
}
finally
{
}
}
/**
* Return temporary environment stats.
*
* @param statsConfig
* A stats configuration instance.
* @return Environment stats.
* @throws DatabaseException
* If an error occurs retrieving the stats.
*/
throws DatabaseException
{
}
}
/** {@inheritDoc} */
{
}
/** {@inheritDoc} */
{
isCanceled = true;
}
/** {@inheritDoc} */
{
// Do nothing.
}
}