3339N/A/*
3339N/A * CDDL HEADER START
3339N/A *
3339N/A * The contents of this file are subject to the terms of the
3339N/A * Common Development and Distribution License, Version 1.0 only
3339N/A * (the "License"). You may not use this file except in compliance
3339N/A * with the License.
3339N/A *
3339N/A * You can obtain a copy of the license at
3339N/A * trunk/opends/resource/legal-notices/OpenDS.LICENSE
3339N/A * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
3339N/A * See the License for the specific language governing permissions
3339N/A * and limitations under the License.
3339N/A *
3339N/A * When distributing Covered Code, include this CDDL HEADER in each
3339N/A * file and include the License file at
3339N/A * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
3339N/A * add the following below this CDDL HEADER, with the fields enclosed
3339N/A * by brackets "[]" replaced with your own identifying information:
3339N/A * Portions Copyright [yyyy] [name of copyright owner]
3339N/A *
3339N/A * CDDL HEADER END
3339N/A *
3339N/A *
5007N/A * Copyright 2008-2010 Sun Microsystems, Inc.
6029N/A * Portions Copyright 2011-2013 ForgeRock AS
3339N/A */
3339N/A
3339N/Apackage org.opends.server.backends.jeb.importLDIF;
3339N/A
4591N/Aimport static org.opends.messages.JebMessages.*;
5195N/Aimport static org.opends.server.loggers.ErrorLogger.logError;
5195N/Aimport static org.opends.server.util.DynamicConstants.BUILD_ID;
5195N/Aimport static org.opends.server.util.DynamicConstants.REVISION_NUMBER;
4591N/Aimport static org.opends.server.util.ServerConstants.*;
5195N/Aimport static org.opends.server.util.StaticUtils.getFileForPath;
5195N/A
4591N/Aimport java.io.*;
5195N/Aimport java.nio.ByteBuffer;
4591N/Aimport java.util.*;
4591N/Aimport java.util.concurrent.*;
5195N/Aimport java.util.concurrent.atomic.AtomicInteger;
5195N/Aimport java.util.concurrent.atomic.AtomicLong;
5195N/A
5195N/Aimport org.opends.messages.Category;
4591N/Aimport org.opends.messages.Message;
4591N/Aimport org.opends.messages.Severity;
5195N/Aimport org.opends.server.admin.std.meta.LocalDBIndexCfgDefn;
5195N/Aimport org.opends.server.admin.std.meta.LocalDBIndexCfgDefn.IndexType;
3339N/Aimport org.opends.server.admin.std.server.LocalDBBackendCfg;
4765N/Aimport org.opends.server.admin.std.server.LocalDBIndexCfg;
5207N/Aimport org.opends.server.api.DiskSpaceMonitorHandler;
4591N/Aimport org.opends.server.backends.jeb.*;
6032N/Aimport org.opends.server.backends.jeb.RebuildConfig.RebuildMode;
3339N/Aimport org.opends.server.config.ConfigException;
3339N/Aimport org.opends.server.core.DirectoryServer;
5207N/Aimport org.opends.server.extensions.DiskSpaceMonitor;
4591N/Aimport org.opends.server.types.*;
5195N/Aimport org.opends.server.util.LDIFReader;
5195N/Aimport org.opends.server.util.Platform;
5195N/Aimport org.opends.server.util.StaticUtils;
5195N/A
4591N/Aimport com.sleepycat.je.*;
4963N/Aimport com.sleepycat.util.PackedInteger;
3339N/A
3339N/A/**
4963N/A * This class provides the engine that performs both importing of LDIF files and
4963N/A * the rebuilding of indexes.
3339N/A */
5207N/Apublic final class Importer implements DiskSpaceMonitorHandler
4591N/A{
4963N/A private static final int TIMER_INTERVAL = 10000;
5195N/A private static final int KB = 1024;
6032N/A private static final int MB = (KB * KB);
4963N/A private static final String DEFAULT_TMP_DIR = "import-tmp";
4963N/A private static final String TMPENV_DIR = "tmp-env";
4963N/A
4963N/A //Defaults for DB cache.
4963N/A private static final int MAX_DB_CACHE_SIZE = 8 * MB;
4963N/A private static final int MAX_DB_LOG_SIZE = 10 * MB;
5007N/A private static final int MIN_DB_CACHE_SIZE = 4 * MB;
4963N/A
4963N/A //Defaults for LDIF reader buffers, min memory required to import and default
4963N/A //size for byte buffers.
5195N/A private static final int READER_WRITER_BUFFER_SIZE = 8 * KB;
6032N/A private static final int MIN_DB_CACHE_MEMORY = MAX_DB_CACHE_SIZE
6032N/A + MAX_DB_LOG_SIZE;
4963N/A private static final int BYTE_BUFFER_CAPACITY = 128;
4963N/A
4963N/A //Min and MAX sizes of phase one buffer.
5195N/A private static final int MAX_BUFFER_SIZE = 2 * MB;
5195N/A private static final int MIN_BUFFER_SIZE = 4 * KB;
4963N/A
4963N/A //Min size of phase two read-ahead cache.
5007N/A private static final int MIN_READ_AHEAD_CACHE_SIZE = 2 * KB;
4963N/A
4963N/A //Small heap threshold used to give more memory to JVM to attempt OOM errors.
4963N/A private static final int SMALL_HEAP_SIZE = 256 * MB;
4963N/A
4963N/A //The DN attribute type.
4643N/A private static AttributeType dnType;
5195N/A static final IndexOutputBuffer.IndexComparator indexComparator =
6032N/A new IndexOutputBuffer.IndexComparator();
3339N/A
4963N/A //Phase one buffer and imported entries counts.
4591N/A private final AtomicInteger bufferCount = new AtomicInteger(0);
4679N/A private final AtomicLong importCount = new AtomicLong(0);
4963N/A
6032N/A //Phase one buffer size in bytes.
4963N/A private int bufferSize;
4963N/A
4963N/A //Temp scratch directory.
4591N/A private final File tempDir;
4963N/A
4963N/A //Index and thread counts.
5007N/A private final int indexCount;
5007N/A private int threadCount;
4963N/A
4963N/A //Set to true when validation is skipped.
4643N/A private final boolean skipDNValidation;
4963N/A
4963N/A //Temporary environment used when DN validation is done in first phase.
4963N/A private final TmpEnv tmpEnv;
4963N/A
4963N/A //Root container.
4963N/A private RootContainer rootContainer;
4963N/A
4963N/A //Import configuration.
4649N/A private final LDIFImportConfig importConfiguration;
4963N/A
5195N/A //Backend configuration.
5195N/A private final LocalDBBackendCfg backendConfiguration;
5195N/A
4963N/A //LDIF reader.
4591N/A private LDIFReader reader;
4963N/A
4963N/A //Migrated entry count.
4643N/A private int migratedCount;
4963N/A
5195N/A // Size in bytes of temporary env.
5195N/A private long tmpEnvCacheSize;
5195N/A
5275N/A // Available memory at the start of the import.
5275N/A private long availableMemory;
5275N/A
5195N/A // Size in bytes of DB cache.
5195N/A private long dbCacheSize;
4963N/A
4963N/A //The executor service used for the buffer sort tasks.
4963N/A private ExecutorService bufferSortService;
4963N/A
4963N/A //The executor service used for the scratch file processing tasks.
4963N/A private ExecutorService scratchFileWriterService;
3339N/A
4591N/A //Queue of free index buffers -- used to re-cycle index buffers;
5195N/A private final BlockingQueue<IndexOutputBuffer> freeBufferQueue =
6032N/A new LinkedBlockingQueue<IndexOutputBuffer>();
3339N/A
4643N/A //Map of index keys to index buffers. Used to allocate sorted
4591N/A //index buffers to a index writer thread.
6032N/A private final Map<IndexKey, BlockingQueue<IndexOutputBuffer>> indexKeyQueMap =
6032N/A new ConcurrentHashMap<IndexKey, BlockingQueue<IndexOutputBuffer>>();
3339N/A
4591N/A //Map of DB containers to index managers. Used to start phase 2.
4643N/A private final List<IndexManager> indexMgrList =
6032N/A new LinkedList<IndexManager>();
3339N/A
4963N/A //Map of DB containers to DN-based index managers. Used to start phase 2.
4963N/A private final List<IndexManager> DNIndexMgrList =
6032N/A new LinkedList<IndexManager>();
4963N/A
4591N/A //Futures used to indicate when the index file writers are done flushing
4591N/A //their work queues and have exited. End of phase one.
4963N/A private final List<Future<?>> scratchFileWriterFutures;
4963N/A
4963N/A //List of index file writer tasks. Used to signal stopScratchFileWriters to
4963N/A //the index file writer tasks when the LDIF file has been done.
4963N/A private final List<ScratchFileWriterTask> scratchFileWriterList;
3598N/A
4643N/A //Map of DNs to Suffix objects.
4591N/A private final Map<DN, Suffix> dnSuffixMap = new LinkedHashMap<DN, Suffix>();
3681N/A
4963N/A //Map of container ids to database containers.
4643N/A private final ConcurrentHashMap<Integer, DatabaseContainer> idContainerMap =
6032N/A new ConcurrentHashMap<Integer, DatabaseContainer>();
4643N/A
4963N/A //Map of container ids to entry containers
4643N/A private final ConcurrentHashMap<Integer, EntryContainer> idECMap =
6032N/A new ConcurrentHashMap<Integer, EntryContainer>();
4643N/A
4963N/A //Used to synchronize when a scratch file index writer is first setup.
4643N/A private final Object synObj = new Object();
4963N/A
5207N/A //Rebuild index manager used when rebuilding indexes.
4963N/A private final RebuildIndexManager rebuildManager;
4963N/A
4963N/A //Set to true if the backend was cleared.
4963N/A private boolean clearedBackend = false;
4963N/A
4963N/A //Used to shutdown import if an error occurs in phase one.
5207N/A private volatile boolean isCanceled = false;
5207N/A
5207N/A private volatile boolean isPhaseOneDone = false;
4963N/A
4963N/A //Number of phase one buffers
4963N/A private int phaseOneBufferCount;
4765N/A
4765N/A static
4643N/A {
4643N/A if ((dnType = DirectoryServer.getAttributeType("dn")) == null)
4643N/A {
4643N/A dnType = DirectoryServer.getDefaultAttributeType("dn");
4643N/A }
4643N/A }
4643N/A
3339N/A /**
5195N/A * Create a new import job with the specified rebuild index config.
3339N/A *
5195N/A * @param rebuildConfig
5195N/A * The rebuild index configuration.
5195N/A * @param cfg
5195N/A * The local DB back-end configuration.
5195N/A * @param envConfig
5195N/A * The JEB environment config.
5195N/A * @throws InitializationException
5195N/A * If a problem occurs during initialization.
5195N/A * @throws JebException
5195N/A * If an error occurred when opening the DB.
5195N/A * @throws ConfigException
5195N/A * If a problem occurs during initialization.
3339N/A */
5195N/A public Importer(RebuildConfig rebuildConfig, LocalDBBackendCfg cfg,
6032N/A EnvironmentConfig envConfig) throws InitializationException,
5195N/A JebException, ConfigException
4591N/A {
5195N/A this.importConfiguration = null;
5195N/A this.backendConfiguration = cfg;
5195N/A this.tmpEnv = null;
5195N/A this.threadCount = 1;
5195N/A this.rebuildManager = new RebuildIndexManager(rebuildConfig, cfg);
5195N/A this.indexCount = rebuildManager.getIndexCount();
6032N/A this.scratchFileWriterList =
6032N/A new ArrayList<ScratchFileWriterTask>(indexCount);
5195N/A this.scratchFileWriterFutures = new CopyOnWriteArrayList<Future<?>>();
5195N/A
4591N/A File parentDir;
5195N/A if (rebuildConfig.getTmpDirectory() == null)
4591N/A {
4963N/A parentDir = getFileForPath(DEFAULT_TMP_DIR);
3339N/A }
4591N/A else
4591N/A {
5195N/A parentDir = getFileForPath(rebuildConfig.getTmpDirectory());
4591N/A }
5195N/A
5195N/A this.tempDir = new File(parentDir, cfg.getBackendId());
5117N/A recursiveDelete(tempDir);
5195N/A if (!tempDir.exists() && !tempDir.mkdirs())
5195N/A {
6032N/A Message message =
6032N/A ERR_JEB_IMPORT_CREATE_TMPDIR_ERROR.get(String.valueOf(tempDir));
5195N/A throw new InitializationException(message);
5195N/A }
5195N/A this.skipDNValidation = true;
5195N/A initializeDBEnv(envConfig);
5195N/A }
5195N/A
5195N/A /**
5195N/A * Create a new import job with the specified ldif import config.
5195N/A *
5195N/A * @param importConfiguration
5195N/A * The LDIF import configuration.
5195N/A * @param localDBBackendCfg
5195N/A * The local DB back-end configuration.
5195N/A * @param envConfig
5195N/A * The JEB environment config.
5195N/A * @throws InitializationException
5195N/A * If a problem occurs during initialization.
5195N/A * @throws ConfigException
5195N/A * If a problem occurs reading the configuration.
5195N/A * @throws DatabaseException
5195N/A * If an error occurred when opening the DB.
5195N/A */
5195N/A public Importer(LDIFImportConfig importConfiguration,
5195N/A LocalDBBackendCfg localDBBackendCfg, EnvironmentConfig envConfig)
5195N/A throws InitializationException, ConfigException, DatabaseException
5195N/A {
5195N/A this.rebuildManager = null;
5195N/A this.importConfiguration = importConfiguration;
5195N/A this.backendConfiguration = localDBBackendCfg;
5195N/A
5195N/A if (importConfiguration.getThreadCount() == 0)
5195N/A {
5195N/A this.threadCount = Runtime.getRuntime().availableProcessors() * 2;
5195N/A }
5195N/A else
4591N/A {
5195N/A this.threadCount = importConfiguration.getThreadCount();
5195N/A }
5195N/A
5195N/A // Determine the number of indexes.
5720N/A this.indexCount = getTotalIndexCount(localDBBackendCfg);
5195N/A
5195N/A if (!importConfiguration.appendToExistingData())
5195N/A {
5195N/A if (importConfiguration.clearBackend()
5195N/A || localDBBackendCfg.getBaseDN().size() <= 1)
5195N/A {
5195N/A this.clearedBackend = true;
5195N/A }
5195N/A }
6032N/A this.scratchFileWriterList =
6032N/A new ArrayList<ScratchFileWriterTask>(indexCount);
5195N/A this.scratchFileWriterFutures = new CopyOnWriteArrayList<Future<?>>();
5195N/A File parentDir;
5195N/A if (importConfiguration.getTmpDirectory() == null)
5195N/A {
5195N/A parentDir = getFileForPath(DEFAULT_TMP_DIR);
5195N/A }
5195N/A else
5195N/A {
5195N/A parentDir = getFileForPath(importConfiguration.getTmpDirectory());
5195N/A }
5195N/A this.tempDir = new File(parentDir, localDBBackendCfg.getBackendId());
5195N/A recursiveDelete(tempDir);
5195N/A if (!tempDir.exists() && !tempDir.mkdirs())
5195N/A {
6032N/A Message message =
6032N/A ERR_JEB_IMPORT_CREATE_TMPDIR_ERROR.get(String.valueOf(tempDir));
5179N/A throw new InitializationException(message);
4591N/A }
4649N/A skipDNValidation = importConfiguration.getSkipDNValidation();
4963N/A initializeDBEnv(envConfig);
5195N/A
5195N/A // Set up temporary environment.
5195N/A if (!skipDNValidation)
3339N/A {
5117N/A File envPath = new File(tempDir, TMPENV_DIR);
4963N/A envPath.mkdirs();
5195N/A this.tmpEnv = new TmpEnv(envPath);
3339N/A }
4591N/A else
4591N/A {
5195N/A this.tmpEnv = null;
3339N/A }
3339N/A }
3339N/A
5720N/A private int getTotalIndexCount(LocalDBBackendCfg localDBBackendCfg)
5720N/A throws ConfigException
5720N/A {
5720N/A int indexes = 2; // dn2id, dn2uri
5720N/A for (String indexName : localDBBackendCfg.listLocalDBIndexes())
5720N/A {
5720N/A LocalDBIndexCfg index = localDBBackendCfg.getLocalDBIndex(indexName);
5720N/A SortedSet<IndexType> types = index.getIndexType();
5720N/A if (types.contains(IndexType.EXTENSIBLE))
5720N/A {
6032N/A indexes +=
6032N/A types.size() - 1 + index.getIndexExtensibleMatchingRule().size();
5720N/A }
5720N/A else
5720N/A {
5720N/A indexes += types.size();
5720N/A }
5720N/A }
5720N/A return indexes;
5720N/A }
5720N/A
3339N/A /**
4591N/A * Return the suffix instance in the specified map that matches the specified
4591N/A * DN.
4591N/A *
6032N/A * @param dn
6032N/A * The DN to search for.
6032N/A * @param map
6032N/A * The map to search.
4591N/A * @return The suffix instance that matches the DN, or null if no match is
4591N/A * found.
4591N/A */
4591N/A public static Suffix getMatchSuffix(DN dn, Map<DN, Suffix> map)
4591N/A {
4591N/A Suffix suffix = null;
4591N/A DN nodeDN = dn;
4591N/A
6032N/A while (suffix == null && nodeDN != null)
6032N/A {
4591N/A suffix = map.get(nodeDN);
4591N/A if (suffix == null)
4591N/A {
4591N/A nodeDN = nodeDN.getParentDNInSuffix();
4591N/A }
4591N/A }
4591N/A return suffix;
4591N/A }
4591N/A
5195N/A /**
5195N/A * Calculate buffer sizes and initialize JEB properties based on memory.
5195N/A *
5195N/A * @param envConfig
5195N/A * The environment config to use in the calculations.
5195N/A * @throws InitializationException
5195N/A * If a problem occurs during calculation.
5195N/A */
5195N/A private void initializeDBEnv(EnvironmentConfig envConfig)
5195N/A throws InitializationException
4963N/A {
5195N/A // Calculate amount of usable memory. This will need to take into account
5195N/A // various fudge factors, including the number of IO buffers used by the
5195N/A // scratch writers (1 per index).
5275N/A calculateAvailableMemory();
5275N/A
6032N/A final long usableMemory =
6032N/A availableMemory - (indexCount * READER_WRITER_BUFFER_SIZE);
5195N/A
5665N/A // We need caching when doing DN validation or rebuilding indexes.
5665N/A if (!skipDNValidation || (rebuildManager != null))
4963N/A {
5195N/A // No DN validation: calculate memory for DB cache, DN2ID temporary cache,
5195N/A // and buffers.
5195N/A if (System.getProperty(PROPERTY_RUNNING_UNIT_TESTS) != null)
5007N/A {
5195N/A dbCacheSize = 500 * KB;
5195N/A tmpEnvCacheSize = 500 * KB;
5195N/A }
5195N/A else if (usableMemory < (MIN_DB_CACHE_MEMORY + MIN_DB_CACHE_SIZE))
5195N/A {
5195N/A dbCacheSize = MIN_DB_CACHE_SIZE;
5195N/A tmpEnvCacheSize = MIN_DB_CACHE_SIZE;
5007N/A }
5195N/A else if (!clearedBackend)
5195N/A {
5195N/A // Appending to existing data so reserve extra memory for the DB cache
5195N/A // since it will be needed for dn2id queries.
5195N/A dbCacheSize = usableMemory * 33 / 100;
5195N/A tmpEnvCacheSize = usableMemory * 33 / 100;
5007N/A }
5195N/A else
5195N/A {
5195N/A dbCacheSize = MAX_DB_CACHE_SIZE;
5195N/A tmpEnvCacheSize = usableMemory * 66 / 100;
5007N/A }
5007N/A }
5007N/A else
5007N/A {
5195N/A // No DN validation: calculate memory for DB cache and buffers.
5195N/A
5195N/A // No need for DN2ID cache.
5195N/A tmpEnvCacheSize = 0;
5195N/A
5195N/A if (System.getProperty(PROPERTY_RUNNING_UNIT_TESTS) != null)
5195N/A {
5195N/A dbCacheSize = 500 * KB;
5195N/A }
5195N/A else if (usableMemory < MIN_DB_CACHE_MEMORY)
5195N/A {
5195N/A dbCacheSize = MIN_DB_CACHE_SIZE;
5195N/A }
5195N/A else
5195N/A {
5195N/A // No need to differentiate between append/clear backend, since dn2id is
5195N/A // not being queried.
5195N/A dbCacheSize = MAX_DB_CACHE_SIZE;
5195N/A }
5007N/A }
5195N/A
6032N/A final long phaseOneBufferMemory =
6032N/A usableMemory - dbCacheSize - tmpEnvCacheSize;
5195N/A final int oldThreadCount = threadCount;
6114N/A if (indexCount != 0) // Avoid / by zero
5007N/A {
6114N/A while (true)
5195N/A {
6114N/A phaseOneBufferCount = 2 * indexCount * threadCount;
6114N/A
6114N/A // Scratch writers allocate 4 buffers per index as well.
6114N/A final int totalPhaseOneBufferCount =
6114N/A phaseOneBufferCount + (4 * indexCount);
6114N/A bufferSize = (int) (phaseOneBufferMemory / totalPhaseOneBufferCount);
6114N/A
6114N/A if (bufferSize > MAX_BUFFER_SIZE)
5195N/A {
6114N/A if (!skipDNValidation)
5195N/A {
6114N/A // The buffers are big enough: the memory is best used for the DN2ID
6114N/A // temp DB.
6114N/A bufferSize = MAX_BUFFER_SIZE;
6114N/A
6114N/A final long extraMemory =
6114N/A phaseOneBufferMemory - (totalPhaseOneBufferCount * bufferSize);
6114N/A if (!clearedBackend)
6114N/A {
6114N/A dbCacheSize += extraMemory / 2;
6114N/A tmpEnvCacheSize += extraMemory / 2;
6114N/A }
6114N/A else
6114N/A {
6114N/A tmpEnvCacheSize += extraMemory;
6114N/A }
5195N/A }
6114N/A
6114N/A break;
5195N/A }
6114N/A else if (bufferSize > MIN_BUFFER_SIZE)
6114N/A {
6114N/A // This is acceptable.
6114N/A break;
6114N/A }
6114N/A else if (threadCount > 1)
6114N/A {
6114N/A // Retry using less threads.
6114N/A threadCount--;
6114N/A }
6114N/A else
6114N/A {
6114N/A // Not enough memory.
6114N/A final long minimumPhaseOneBufferMemory =
6114N/A totalPhaseOneBufferCount * MIN_BUFFER_SIZE;
6114N/A Message message =
6114N/A ERR_IMPORT_LDIF_LACK_MEM.get(usableMemory,
6114N/A minimumPhaseOneBufferMemory + dbCacheSize + tmpEnvCacheSize);
6114N/A throw new InitializationException(message);
6114N/A }
5007N/A }
5007N/A }
5195N/A
5195N/A if (oldThreadCount != threadCount)
5195N/A {
6032N/A Message message =
6032N/A NOTE_JEB_IMPORT_ADJUST_THREAD_COUNT.get(oldThreadCount, threadCount);
5195N/A logError(message);
5195N/A }
5195N/A
6032N/A Message message =
6032N/A NOTE_JEB_IMPORT_LDIF_TOT_MEM_BUF.get(availableMemory,
6032N/A phaseOneBufferCount);
5195N/A logError(message);
5195N/A if (tmpEnvCacheSize > 0)
5195N/A {
5195N/A message = NOTE_JEB_IMPORT_LDIF_TMP_ENV_MEM.get(tmpEnvCacheSize);
5195N/A logError(message);
5195N/A }
5195N/A envConfig.setConfigParam(EnvironmentConfig.MAX_MEMORY, Long
5195N/A .toString(dbCacheSize));
5195N/A message = NOTE_JEB_IMPORT_LDIF_DB_MEM_BUF_INFO.get(dbCacheSize, bufferSize);
5195N/A logError(message);
5007N/A }
5007N/A
4591N/A /**
5275N/A * Calculates the amount of available memory which can be used by this import,
5195N/A * taking into account whether or not the import is running offline or online
5195N/A * as a task.
4591N/A */
5275N/A private void calculateAvailableMemory()
4591N/A {
5275N/A final long totalAvailableMemory;
5195N/A if (DirectoryServer.isRunning())
5195N/A {
5195N/A // Online import/rebuild.
4649N/A Runtime runTime = Runtime.getRuntime();
5195N/A runTime.gc();
5195N/A runTime.gc();
5195N/A final long usedMemory = runTime.totalMemory() - runTime.freeMemory();
5195N/A final long maxUsableMemory = Platform.getUsableMemoryForCaching();
5195N/A final long usableMemory = maxUsableMemory - usedMemory;
5195N/A
5195N/A final long configuredMemory;
5195N/A if (backendConfiguration.getDBCacheSize() > 0)
4963N/A {
5195N/A configuredMemory = backendConfiguration.getDBCacheSize();
4963N/A }
5007N/A else
5007N/A {
6032N/A configuredMemory =
6032N/A backendConfiguration.getDBCachePercent()
6032N/A * Runtime.getRuntime().maxMemory() / 100;
4963N/A }
5198N/A
5198N/A // Round up to minimum of 16MB (e.g. unit tests only use 2% cache).
6032N/A totalAvailableMemory =
6032N/A Math.max(Math.min(usableMemory, configuredMemory), 16 * MB);
5195N/A }
5195N/A else
5195N/A {
5195N/A // Offline import/rebuild.
5275N/A totalAvailableMemory = Platform.getUsableMemoryForCaching();
5195N/A }
5195N/A
5195N/A // Now take into account various fudge factors.
5195N/A int importMemPct = 90;
5275N/A if (totalAvailableMemory <= SMALL_HEAP_SIZE)
5195N/A {
5195N/A // Be pessimistic when memory is low.
5195N/A importMemPct -= 25;
5195N/A }
5195N/A if (rebuildManager != null)
5195N/A {
5195N/A // Rebuild seems to require more overhead.
5195N/A importMemPct -= 15;
5195N/A }
5195N/A
5275N/A availableMemory = (totalAvailableMemory * importMemPct / 100);
4591N/A }
4591N/A
4963N/A private void initializeIndexBuffers()
4591N/A {
6032N/A for (int i = 0; i < phaseOneBufferCount; i++)
4591N/A {
5195N/A IndexOutputBuffer b = new IndexOutputBuffer(bufferSize);
4649N/A freeBufferQueue.add(b);
4591N/A }
4591N/A }
4591N/A
6032N/A private void initializeSuffixes() throws DatabaseException, ConfigException,
6032N/A InitializationException
4591N/A {
6032N/A for (EntryContainer ec : rootContainer.getEntryContainers())
4643N/A {
4643N/A Suffix suffix = getSuffix(ec);
6032N/A if (suffix != null)
4643N/A {
4643N/A dnSuffixMap.put(ec.getBaseDN(), suffix);
4963N/A generateIndexID(suffix);
4963N/A }
4963N/A }
4963N/A }
4963N/A
4963N/A //Mainly used to support multiple suffixes. Each index in each suffix gets
4963N/A //an unique ID to identify which DB it needs to go to in phase two processing.
4963N/A private void generateIndexID(Suffix suffix)
4963N/A {
6032N/A for (Map.Entry<AttributeType, AttributeIndex> mapEntry : suffix
6032N/A .getAttrIndexMap().entrySet())
6032N/A {
4963N/A AttributeIndex attributeIndex = mapEntry.getValue();
4963N/A DatabaseContainer container;
6032N/A if ((container = attributeIndex.getEqualityIndex()) != null)
6032N/A {
4963N/A int id = System.identityHashCode(container);
4963N/A idContainerMap.putIfAbsent(id, container);
4963N/A }
6032N/A if ((container = attributeIndex.getPresenceIndex()) != null)
6032N/A {
4963N/A int id = System.identityHashCode(container);
4963N/A idContainerMap.putIfAbsent(id, container);
4963N/A }
6032N/A if ((container = attributeIndex.getSubstringIndex()) != null)
6032N/A {
4963N/A int id = System.identityHashCode(container);
4963N/A idContainerMap.putIfAbsent(id, container);
4963N/A }
6032N/A if ((container = attributeIndex.getOrderingIndex()) != null)
6032N/A {
4963N/A int id = System.identityHashCode(container);
4963N/A idContainerMap.putIfAbsent(id, container);
4963N/A }
6032N/A if ((container = attributeIndex.getApproximateIndex()) != null)
6032N/A {
4963N/A int id = System.identityHashCode(container);
4963N/A idContainerMap.putIfAbsent(id, container);
4963N/A }
6032N/A Map<String, Collection<Index>> extensibleMap =
6032N/A attributeIndex.getExtensibleIndexes();
6032N/A if (!extensibleMap.isEmpty())
6032N/A {
4963N/A Collection<Index> subIndexes =
6032N/A attributeIndex.getExtensibleIndexes().get(
6032N/A EXTENSIBLE_INDEXER_ID_SUBSTRING);
6032N/A if (subIndexes != null)
6032N/A {
6032N/A for (DatabaseContainer subIndex : subIndexes)
6032N/A {
4963N/A int id = System.identityHashCode(subIndex);
4963N/A idContainerMap.putIfAbsent(id, subIndex);
4963N/A }
4963N/A }
4963N/A Collection<Index> sharedIndexes =
6032N/A attributeIndex.getExtensibleIndexes().get(
6032N/A EXTENSIBLE_INDEXER_ID_SHARED);
6032N/A if (sharedIndexes != null)
6032N/A {
6032N/A for (DatabaseContainer sharedIndex : sharedIndexes)
6032N/A {
4963N/A int id = System.identityHashCode(sharedIndex);
4963N/A idContainerMap.putIfAbsent(id, sharedIndex);
4963N/A }
4963N/A }
4643N/A }
4643N/A }
4591N/A }
4591N/A
4643N/A private Suffix getSuffix(EntryContainer entryContainer)
6032N/A throws ConfigException, InitializationException
6032N/A {
6032N/A DN baseDN = entryContainer.getBaseDN();
6032N/A EntryContainer sourceEntryContainer = null;
6032N/A List<DN> includeBranches = new ArrayList<DN>();
6032N/A List<DN> excludeBranches = new ArrayList<DN>();
6032N/A
6032N/A if (!importConfiguration.appendToExistingData()
6032N/A && !importConfiguration.clearBackend())
6032N/A {
6032N/A for (DN dn : importConfiguration.getExcludeBranches())
6032N/A {
6032N/A if (baseDN.equals(dn))
6032N/A {
6032N/A // This entire base DN was explicitly excluded. Skip.
6032N/A return null;
6032N/A }
6032N/A if (baseDN.isAncestorOf(dn))
6032N/A {
6032N/A excludeBranches.add(dn);
6032N/A }
6032N/A }
6032N/A
6032N/A if (!importConfiguration.getIncludeBranches().isEmpty())
6032N/A {
6032N/A for (DN dn : importConfiguration.getIncludeBranches())
6032N/A {
6032N/A if (baseDN.isAncestorOf(dn))
6032N/A {
6032N/A includeBranches.add(dn);
6032N/A }
6032N/A }
6032N/A
6032N/A if (includeBranches.isEmpty())
6032N/A {
6032N/A /*
6032N/A * There are no branches in the explicitly defined include list under
6032N/A * this base DN. Skip this base DN all together.
4649N/A */
4643N/A
6032N/A return null;
6032N/A }
6032N/A
6032N/A // Remove any overlapping include branches.
6032N/A Iterator<DN> includeBranchIterator = includeBranches.iterator();
6032N/A while (includeBranchIterator.hasNext())
6032N/A {
6032N/A DN includeDN = includeBranchIterator.next();
6032N/A boolean keep = true;
6032N/A for (DN dn : includeBranches)
6032N/A {
6032N/A if (!dn.equals(includeDN) && dn.isAncestorOf(includeDN))
6032N/A {
6032N/A keep = false;
6032N/A break;
6032N/A }
6032N/A }
6032N/A if (!keep)
6032N/A {
6032N/A includeBranchIterator.remove();
6032N/A }
6032N/A }
6032N/A
6032N/A // Remove any exclude branches that are not are not under a include
6032N/A // branch since they will be migrated as part of the existing entries
6032N/A // outside of the include branches anyways.
6032N/A Iterator<DN> excludeBranchIterator = excludeBranches.iterator();
6032N/A while (excludeBranchIterator.hasNext())
6032N/A {
6032N/A DN excludeDN = excludeBranchIterator.next();
6032N/A boolean keep = false;
6032N/A for (DN includeDN : includeBranches)
6032N/A {
6032N/A if (includeDN.isAncestorOf(excludeDN))
6032N/A {
6032N/A keep = true;
6032N/A break;
6032N/A }
6032N/A }
6032N/A if (!keep)
6032N/A {
6032N/A excludeBranchIterator.remove();
6032N/A }
6032N/A }
6032N/A
6032N/A if ((includeBranches.size() == 1) && excludeBranches.isEmpty()
6032N/A && includeBranches.get(0).equals(baseDN))
6032N/A {
6032N/A // This entire base DN is explicitly included in the import with
6032N/A // no exclude branches that we need to migrate. Just clear the entry
6032N/A // container.
6032N/A entryContainer.lock();
6032N/A entryContainer.clear();
6032N/A entryContainer.unlock();
6032N/A }
6032N/A else
6032N/A {
6032N/A // Create a temp entry container
6032N/A sourceEntryContainer = entryContainer;
6032N/A entryContainer =
6032N/A rootContainer.openEntryContainer(baseDN, baseDN
6032N/A .toNormalizedString()
6032N/A + "_importTmp");
6032N/A }
6032N/A }
6032N/A }
6032N/A return Suffix.createSuffixContext(entryContainer, sourceEntryContainer,
6032N/A includeBranches, excludeBranches);
6032N/A }
4643N/A
4765N/A /**
4765N/A * Rebuild the indexes using the specified rootcontainer.
4765N/A *
6032N/A * @param rootContainer
6032N/A * The rootcontainer to rebuild indexes in.
6032N/A * @throws ConfigException
6032N/A * If a configuration error occurred.
6032N/A * @throws InitializationException
6032N/A * If an initialization error occurred.
6032N/A * @throws JebException
6032N/A * If the JEB database had an error.
6032N/A * @throws InterruptedException
6032N/A * If an interrupted error occurred.
6032N/A * @throws ExecutionException
6032N/A * If an execution error occurred.
4765N/A */
6032N/A public void rebuildIndexes(RootContainer rootContainer)
6032N/A throws ConfigException, InitializationException, JebException,
6032N/A InterruptedException, ExecutionException
4765N/A {
4765N/A this.rootContainer = rootContainer;
4765N/A long startTime = System.currentTimeMillis();
5207N/A
6032N/A DiskSpaceMonitor tmpMonitor =
6032N/A new DiskSpaceMonitor(backendConfiguration.getBackendId()
6032N/A + " backend index rebuild tmp directory", tempDir,
6032N/A backendConfiguration.getDiskLowThreshold(), backendConfiguration
6032N/A .getDiskFullThreshold(), 5, TimeUnit.SECONDS, this);
5207N/A tmpMonitor.initializeMonitorProvider(null);
5207N/A DirectoryServer.registerMonitorProvider(tmpMonitor);
5207N/A File parentDirectory =
5207N/A getFileForPath(backendConfiguration.getDBDirectory());
5207N/A File backendDirectory =
5207N/A new File(parentDirectory, backendConfiguration.getBackendId());
6032N/A DiskSpaceMonitor dbMonitor =
6032N/A new DiskSpaceMonitor(backendConfiguration.getBackendId()
6032N/A + " backend index rebuild DB directory", backendDirectory,
6032N/A backendConfiguration.getDiskLowThreshold(), backendConfiguration
6032N/A .getDiskFullThreshold(), 5, TimeUnit.SECONDS, this);
5207N/A dbMonitor.initializeMonitorProvider(null);
5207N/A DirectoryServer.registerMonitorProvider(dbMonitor);
5207N/A
5207N/A try
5207N/A {
5207N/A rebuildManager.initialize();
5207N/A rebuildManager.printStartMessage();
5207N/A rebuildManager.rebuildIndexes();
5207N/A recursiveDelete(tempDir);
5207N/A rebuildManager.printStopMessage(startTime);
5207N/A }
5207N/A finally
5207N/A {
6302N/A DirectoryServer.deregisterMonitorProvider(tmpMonitor);
6302N/A DirectoryServer.deregisterMonitorProvider(dbMonitor);
5207N/A tmpMonitor.finalizeMonitorProvider();
5207N/A dbMonitor.finalizeMonitorProvider();
5207N/A }
4765N/A }
4765N/A
4591N/A /**
4649N/A * Import a LDIF using the specified root container.
4591N/A *
6032N/A * @param rootContainer
6032N/A * The root container to use during the import.
4591N/A * @return A LDIF result.
6032N/A * @throws ConfigException
6032N/A * If the import failed because of an configuration error.
6032N/A * @throws InitializationException
6032N/A * If the import failed because of an initialization error.
6032N/A * @throws JebException
6032N/A * If the import failed due to a database error.
6032N/A * @throws InterruptedException
6032N/A * If the import failed due to an interrupted error.
6032N/A * @throws ExecutionException
6032N/A * If the import failed due to an execution error.
3339N/A */
6032N/A public LDIFImportResult processImport(RootContainer rootContainer)
6032N/A throws ConfigException, InitializationException, JebException,
6032N/A InterruptedException, ExecutionException
4591N/A {
4801N/A this.rootContainer = rootContainer;
5179N/A try
5179N/A {
6032N/A reader =
6032N/A new LDIFReader(importConfiguration, rootContainer,
6032N/A READER_WRITER_BUFFER_SIZE);
5179N/A }
5195N/A catch (IOException ioe)
5179N/A {
5179N/A Message message = ERR_JEB_IMPORT_LDIF_READER_IO_ERROR.get();
5179N/A throw new InitializationException(message, ioe);
5179N/A }
5179N/A
6032N/A DiskSpaceMonitor tmpMonitor =
6032N/A new DiskSpaceMonitor(backendConfiguration.getBackendId()
6032N/A + " backend import tmp directory", tempDir, backendConfiguration
6032N/A .getDiskLowThreshold(),
6032N/A backendConfiguration.getDiskFullThreshold(), 5, TimeUnit.SECONDS,
6032N/A this);
5207N/A tmpMonitor.initializeMonitorProvider(null);
5207N/A DirectoryServer.registerMonitorProvider(tmpMonitor);
5207N/A File parentDirectory =
5207N/A getFileForPath(backendConfiguration.getDBDirectory());
5207N/A File backendDirectory =
5207N/A new File(parentDirectory, backendConfiguration.getBackendId());
6032N/A DiskSpaceMonitor dbMonitor =
6032N/A new DiskSpaceMonitor(backendConfiguration.getBackendId()
6032N/A + " backend import DB directory", backendDirectory,
6032N/A backendConfiguration.getDiskLowThreshold(), backendConfiguration
6032N/A .getDiskFullThreshold(), 5, TimeUnit.SECONDS, this);
5207N/A dbMonitor.initializeMonitorProvider(null);
5207N/A DirectoryServer.registerMonitorProvider(dbMonitor);
5207N/A
4660N/A try
4660N/A {
6032N/A Message message =
6032N/A NOTE_JEB_IMPORT_STARTING.get(DirectoryServer.getVersionString(),
6032N/A BUILD_ID, REVISION_NUMBER);
4660N/A logError(message);
4660N/A message = NOTE_JEB_IMPORT_THREAD_COUNT.get(threadCount);
4660N/A logError(message);
4660N/A initializeSuffixes();
5207N/A setIndexesTrusted(false);
4660N/A long startTime = System.currentTimeMillis();
4963N/A phaseOne();
5207N/A isPhaseOneDone = true;
4963N/A long phaseOneFinishTime = System.currentTimeMillis();
5195N/A if (!skipDNValidation)
4963N/A {
5195N/A tmpEnv.shutdown();
4963N/A }
5207N/A if (isCanceled)
4963N/A {
4963N/A throw new InterruptedException("Import processing canceled.");
4963N/A }
4963N/A long phaseTwoTime = System.currentTimeMillis();
4963N/A phaseTwo();
5207N/A if (isCanceled)
5207N/A {
5207N/A throw new InterruptedException("Import processing canceled.");
5207N/A }
4963N/A long phaseTwoFinishTime = System.currentTimeMillis();
5207N/A setIndexesTrusted(true);
4660N/A switchContainers();
5117N/A recursiveDelete(tempDir);
4660N/A long finishTime = System.currentTimeMillis();
4660N/A long importTime = (finishTime - startTime);
4660N/A float rate = 0;
6032N/A message =
6032N/A NOTE_JEB_IMPORT_PHASE_STATS.get(importTime / 1000,
6032N/A (phaseOneFinishTime - startTime) / 1000,
6032N/A (phaseTwoFinishTime - phaseTwoTime) / 1000);
4963N/A logError(message);
6032N/A if (importTime > 0)
6032N/A rate = 1000f * reader.getEntriesRead() / importTime;
6032N/A message =
6032N/A NOTE_JEB_IMPORT_FINAL_STATUS.get(reader.getEntriesRead(), importCount
6032N/A .get(), reader.getEntriesIgnored(), reader.getEntriesRejected(),
6032N/A migratedCount, importTime / 1000, rate);
4660N/A logError(message);
4649N/A }
4643N/A finally
4643N/A {
4643N/A reader.close();
6302N/A DirectoryServer.deregisterMonitorProvider(tmpMonitor);
6302N/A DirectoryServer.deregisterMonitorProvider(dbMonitor);
5207N/A tmpMonitor.finalizeMonitorProvider();
5207N/A dbMonitor.finalizeMonitorProvider();
4643N/A }
6032N/A return new LDIFImportResult(reader.getEntriesRead(), reader
6032N/A .getEntriesRejected(), reader.getEntriesIgnored());
4591N/A }
4591N/A
5117N/A private void recursiveDelete(File dir)
5117N/A {
6032N/A if (dir.listFiles() != null)
5117N/A {
6032N/A for (File f : dir.listFiles())
5117N/A {
6032N/A if (f.isDirectory())
5117N/A {
5117N/A recursiveDelete(f);
5117N/A }
5117N/A f.delete();
5117N/A }
5117N/A }
5117N/A dir.delete();
5117N/A }
5117N/A
6032N/A private void switchContainers() throws DatabaseException, JebException,
6032N/A InitializationException
4813N/A {
4643N/A
6032N/A for (Suffix suffix : dnSuffixMap.values())
6032N/A {
6032N/A DN baseDN = suffix.getBaseDN();
6032N/A EntryContainer entryContainer = suffix.getSrcEntryContainer();
6032N/A if (entryContainer != null)
6032N/A {
6032N/A EntryContainer needRegisterContainer =
6032N/A rootContainer.unregisterEntryContainer(baseDN);
6032N/A
6032N/A needRegisterContainer.lock();
6032N/A needRegisterContainer.close();
6032N/A needRegisterContainer.delete();
6032N/A needRegisterContainer.unlock();
6032N/A EntryContainer newEC = suffix.getEntryContainer();
6032N/A newEC.lock();
6032N/A newEC.setDatabasePrefix(baseDN.toNormalizedString());
6032N/A newEC.unlock();
6032N/A rootContainer.registerEntryContainer(baseDN, newEC);
6032N/A }
6032N/A }
6032N/A }
4643N/A
5207N/A private void setIndexesTrusted(boolean trusted) throws JebException
4591N/A {
6032N/A try
6032N/A {
6032N/A for (Suffix s : dnSuffixMap.values())
6032N/A {
5207N/A s.setIndexesTrusted(trusted);
4591N/A }
4591N/A }
4591N/A catch (DatabaseException ex)
4591N/A {
4649N/A Message message =
6032N/A NOTE_JEB_IMPORT_LDIF_TRUSTED_FAILED.get(ex.getMessage());
4649N/A throw new JebException(message);
4591N/A }
4591N/A }
4591N/A
4963N/A private void phaseOne() throws InterruptedException, ExecutionException
4591N/A {
4963N/A initializeIndexBuffers();
4591N/A FirstPhaseProgressTask progressTask = new FirstPhaseProgressTask();
6032N/A ScheduledThreadPoolExecutor timerService =
6032N/A new ScheduledThreadPoolExecutor(1);
5118N/A timerService.scheduleAtFixedRate(progressTask, TIMER_INTERVAL,
5195N/A TIMER_INTERVAL, TimeUnit.MILLISECONDS);
4963N/A scratchFileWriterService = Executors.newFixedThreadPool(2 * indexCount);
4963N/A bufferSortService = Executors.newFixedThreadPool(threadCount);
4643N/A ExecutorService execService = Executors.newFixedThreadPool(threadCount);
4643N/A List<Callable<Void>> tasks = new ArrayList<Callable<Void>>(threadCount);
4643N/A tasks.add(new MigrateExistingTask());
4643N/A List<Future<Void>> results = execService.invokeAll(tasks);
5195N/A for (Future<Void> result : results)
5195N/A {
5195N/A if (!result.isDone())
5195N/A {
4765N/A result.get();
4765N/A }
4765N/A }
4643N/A tasks.clear();
4643N/A results.clear();
5195N/A if (importConfiguration.appendToExistingData()
5195N/A && importConfiguration.replaceExistingEntries())
4591N/A {
5195N/A for (int i = 0; i < threadCount; i++)
4643N/A {
4643N/A tasks.add(new AppendReplaceTask());
4643N/A }
4643N/A }
4643N/A else
4643N/A {
4643N/A for (int i = 0; i < threadCount; i++)
4643N/A {
4643N/A tasks.add(new ImportTask());
4643N/A }
4591N/A }
4643N/A results = execService.invokeAll(tasks);
4643N/A for (Future<Void> result : results)
5195N/A {
5195N/A if (!result.isDone())
5195N/A {
4765N/A result.get();
4765N/A }
5195N/A }
4643N/A tasks.clear();
4643N/A results.clear();
4643N/A tasks.add(new MigrateExcludedTask());
4643N/A results = execService.invokeAll(tasks);
4643N/A for (Future<Void> result : results)
5195N/A {
5195N/A if (!result.isDone())
5195N/A {
4765N/A result.get();
4765N/A }
5195N/A }
4963N/A stopScratchFileWriters();
4963N/A for (Future<?> result : scratchFileWriterFutures)
4591N/A {
5195N/A if (!result.isDone())
5195N/A {
4765N/A result.get();
4765N/A }
4591N/A }
5195N/A
5118N/A // Shutdown the executor services
5118N/A timerService.shutdown();
5118N/A timerService.awaitTermination(30, TimeUnit.SECONDS);
5118N/A execService.shutdown();
5118N/A execService.awaitTermination(30, TimeUnit.SECONDS);
5118N/A bufferSortService.shutdown();
5118N/A bufferSortService.awaitTermination(30, TimeUnit.SECONDS);
5118N/A scratchFileWriterService.shutdown();
5118N/A scratchFileWriterService.awaitTermination(30, TimeUnit.SECONDS);
5118N/A
5195N/A // Try to clear as much memory as possible.
4963N/A scratchFileWriterList.clear();
4963N/A scratchFileWriterFutures.clear();
4718N/A indexKeyQueMap.clear();
4649N/A freeBufferQueue.clear();
4591N/A }
4591N/A
5207N/A private void phaseTwo() throws InterruptedException, ExecutionException
4591N/A {
6032N/A SecondPhaseProgressTask progress2Task =
6032N/A new SecondPhaseProgressTask(reader.getEntriesRead());
6032N/A ScheduledThreadPoolExecutor timerService =
6032N/A new ScheduledThreadPoolExecutor(1);
5195N/A timerService.scheduleAtFixedRate(progress2Task, TIMER_INTERVAL,
5195N/A TIMER_INTERVAL, TimeUnit.MILLISECONDS);
5195N/A try
4963N/A {
5195N/A processIndexFiles();
4963N/A }
5195N/A finally
5195N/A {
5195N/A timerService.shutdown();
5195N/A timerService.awaitTermination(30, TimeUnit.SECONDS);
5195N/A }
4963N/A }
4963N/A
6032N/A private void processIndexFiles() throws InterruptedException,
6032N/A ExecutionException
4963N/A {
6032N/A if (bufferCount.get() == 0)
4591N/A {
4591N/A return;
4591N/A }
4963N/A int dbThreads = Runtime.getRuntime().availableProcessors();
6032N/A if (dbThreads < 4)
4591N/A {
4963N/A dbThreads = 4;
4591N/A }
5195N/A
5195N/A // Calculate memory / buffer counts.
5195N/A final long usableMemory = availableMemory - dbCacheSize;
5195N/A int readAheadSize;
5195N/A int buffers;
5195N/A while (true)
5195N/A {
6032N/A final List<IndexManager> totList =
6032N/A new ArrayList<IndexManager>(DNIndexMgrList);
5195N/A totList.addAll(indexMgrList);
5195N/A Collections.sort(totList, Collections.reverseOrder());
5195N/A
5195N/A buffers = 0;
5195N/A final int limit = Math.min(dbThreads, totList.size());
5195N/A for (int i = 0; i < limit; i++)
5195N/A {
5201N/A buffers += totList.get(i).numberOfBuffers;
5195N/A }
5195N/A
5195N/A readAheadSize = (int) (usableMemory / buffers);
5195N/A if (readAheadSize > bufferSize)
5195N/A {
5195N/A // Cache size is never larger than the buffer size.
5195N/A readAheadSize = bufferSize;
5195N/A break;
5195N/A }
5195N/A else if (readAheadSize > MIN_READ_AHEAD_CACHE_SIZE)
5195N/A {
5195N/A // This is acceptable.
5195N/A break;
5195N/A }
5195N/A else if (dbThreads > 1)
5195N/A {
5195N/A // Reduce thread count.
5195N/A dbThreads--;
5195N/A }
5195N/A else
5195N/A {
5201N/A // Not enough memory - will need to do batching for the biggest indexes.
5201N/A readAheadSize = MIN_READ_AHEAD_CACHE_SIZE;
5201N/A buffers = (int) (usableMemory / readAheadSize);
5201N/A
5201N/A Message message = WARN_IMPORT_LDIF_LACK_MEM_PHASE_TWO.get(usableMemory);
5201N/A logError(message);
5201N/A break;
5195N/A }
5195N/A }
5195N/A
5201N/A // Ensure that there are always two threads available for parallel
5201N/A // processing of smaller indexes.
5201N/A dbThreads = Math.max(2, dbThreads);
5201N/A
6032N/A Message message =
6032N/A NOTE_JEB_IMPORT_LDIF_PHASE_TWO_MEM_REPORT.get(availableMemory,
6032N/A readAheadSize, buffers);
5195N/A logError(message);
5195N/A
5195N/A // Start indexing tasks.
4963N/A List<Future<Void>> futures = new LinkedList<Future<Void>>();
5195N/A ExecutorService dbService = Executors.newFixedThreadPool(dbThreads);
5201N/A Semaphore permits = new Semaphore(buffers);
5195N/A
5195N/A // Start DN processing first.
5195N/A for (IndexManager dnMgr : DNIndexMgrList)
4963N/A {
5201N/A futures.add(dbService.submit(new IndexDBWriteTask(dnMgr, permits,
5201N/A buffers, readAheadSize)));
4963N/A }
5195N/A for (IndexManager mgr : indexMgrList)
4591N/A {
5201N/A futures.add(dbService.submit(new IndexDBWriteTask(mgr, permits, buffers,
5201N/A readAheadSize)));
4963N/A }
5195N/A
4963N/A for (Future<Void> result : futures)
5195N/A {
5195N/A if (!result.isDone())
5195N/A {
4963N/A result.get();
3339N/A }
5195N/A }
5195N/A
4963N/A dbService.shutdown();
4963N/A }
4963N/A
4963N/A private void stopScratchFileWriters()
4591N/A {
5195N/A IndexOutputBuffer indexBuffer = new IndexOutputBuffer(0);
6032N/A for (ScratchFileWriterTask task : scratchFileWriterList)
4591N/A {
4649N/A task.queue.add(indexBuffer);
3339N/A }
3339N/A }
3339N/A
4660N/A /**
4643N/A * Task used to migrate excluded branch.
4591N/A */
4643N/A private final class MigrateExcludedTask extends ImportTask
4643N/A {
4643N/A
4963N/A /**
4963N/A * {@inheritDoc}
4963N/A */
5707N/A @Override
4643N/A public Void call() throws Exception
4643N/A {
6032N/A for (Suffix suffix : dnSuffixMap.values())
6032N/A {
4649N/A EntryContainer entryContainer = suffix.getSrcEntryContainer();
6032N/A if (entryContainer != null && !suffix.getExcludeBranches().isEmpty())
6032N/A {
4643N/A DatabaseEntry key = new DatabaseEntry();
4643N/A DatabaseEntry data = new DatabaseEntry();
4643N/A LockMode lockMode = LockMode.DEFAULT;
4643N/A OperationStatus status;
6032N/A Message message =
6032N/A NOTE_JEB_IMPORT_MIGRATION_START.get("excluded", String
6032N/A .valueOf(suffix.getBaseDN()));
4643N/A logError(message);
4643N/A Cursor cursor =
6032N/A entryContainer.getDN2ID().openCursor(null,
6032N/A CursorConfig.READ_COMMITTED);
4649N/A Comparator<byte[]> comparator =
6032N/A entryContainer.getDN2ID().getComparator();
6032N/A try
6032N/A {
6032N/A for (DN excludedDN : suffix.getExcludeBranches())
6032N/A {
6032N/A byte[] bytes =
6032N/A JebFormat.dnToDNKey(excludedDN, suffix.getBaseDN()
6032N/A .getNumComponents());
4643N/A key.setData(bytes);
4643N/A status = cursor.getSearchKeyRange(key, data, lockMode);
6032N/A if (status == OperationStatus.SUCCESS
6032N/A && Arrays.equals(key.getData(), bytes))
6032N/A {
4643N/A // This is the base entry for a branch that was excluded in the
4643N/A // import so we must migrate all entries in this branch over to
4643N/A // the new entry container.
6032N/A byte[] end = Arrays.copyOf(bytes, bytes.length + 1);
6032N/A end[end.length - 1] = 0x01;
6032N/A
6032N/A while (status == OperationStatus.SUCCESS
6032N/A && comparator.compare(key.getData(), end) < 0
6032N/A && !importConfiguration.isCancelled() && !isCanceled)
6032N/A {
4643N/A EntryID id = new EntryID(data);
6032N/A Entry entry =
6032N/A entryContainer.getID2Entry().get(null, id,
6032N/A LockMode.DEFAULT);
6032N/A processEntry(entry, rootContainer.getNextEntryID(), suffix);
4643N/A migratedCount++;
4643N/A status = cursor.getNext(key, data, lockMode);
4643N/A }
4643N/A }
4643N/A }
4963N/A flushIndexBuffers();
4643N/A }
4660N/A catch (Exception e)
4660N/A {
4660N/A message =
6032N/A ERR_JEB_IMPORT_LDIF_MIGRATE_EXCLUDED_TASK_ERR.get(e
6032N/A .getMessage());
4660N/A logError(message);
6032N/A isCanceled = true;
4660N/A throw e;
4660N/A }
5207N/A finally
5207N/A {
5207N/A cursor.close();
5207N/A }
4643N/A }
4643N/A }
4643N/A return null;
4643N/A }
4643N/A }
4643N/A
4643N/A /**
4643N/A * Task to migrate existing entries.
4643N/A */
4643N/A private final class MigrateExistingTask extends ImportTask
4643N/A {
4643N/A
4963N/A /**
4963N/A * {@inheritDoc}
4963N/A */
5707N/A @Override
4643N/A public Void call() throws Exception
4643N/A {
6032N/A for (Suffix suffix : dnSuffixMap.values())
6032N/A {
5176N/A List<byte[]> includeBranches =
5176N/A new ArrayList<byte[]>(suffix.getIncludeBranches().size());
6032N/A for (DN includeBranch : suffix.getIncludeBranches())
5176N/A {
6032N/A if (includeBranch.isDescendantOf(suffix.getBaseDN()))
5176N/A {
6032N/A includeBranches.add(JebFormat.dnToDNKey(includeBranch, suffix
6032N/A .getBaseDN().getNumComponents()));
5176N/A }
5176N/A }
5176N/A
4660N/A EntryContainer entryContainer = suffix.getSrcEntryContainer();
6032N/A if (entryContainer != null && !suffix.getIncludeBranches().isEmpty())
6032N/A {
4660N/A DatabaseEntry key = new DatabaseEntry();
4660N/A DatabaseEntry data = new DatabaseEntry();
4660N/A LockMode lockMode = LockMode.DEFAULT;
4660N/A OperationStatus status;
6032N/A Message message =
6032N/A NOTE_JEB_IMPORT_MIGRATION_START.get("existing", String
6032N/A .valueOf(suffix.getBaseDN()));
4660N/A logError(message);
6032N/A Cursor cursor = entryContainer.getDN2ID().openCursor(null, null);
6032N/A try
6032N/A {
4660N/A status = cursor.getFirst(key, data, lockMode);
6032N/A while (status == OperationStatus.SUCCESS
6032N/A && !importConfiguration.isCancelled() && !isCanceled)
6032N/A {
5176N/A
5176N/A boolean found = false;
6032N/A for (byte[] includeBranch : includeBranches)
5176N/A {
6032N/A if (Arrays.equals(includeBranch, key.getData()))
5176N/A {
5176N/A found = true;
5176N/A break;
5176N/A }
5176N/A }
6032N/A if (!found)
6032N/A {
4660N/A EntryID id = new EntryID(data);
4660N/A Entry entry =
6032N/A entryContainer.getID2Entry()
6032N/A .get(null, id, LockMode.DEFAULT);
6032N/A processEntry(entry, rootContainer.getNextEntryID(), suffix);
4660N/A migratedCount++;
4660N/A status = cursor.getNext(key, data, lockMode);
6032N/A }
6032N/A else
6032N/A {
4660N/A // This is the base entry for a branch that will be included
4660N/A // in the import so we don't want to copy the branch to the
4660N/A // new entry container.
4643N/A
4660N/A /**
6032N/A * Advance the cursor to next entry at the same level in the DIT
6032N/A * skipping all the entries in this branch. Set the next
6032N/A * starting value to a value of equal length but slightly
6032N/A * greater than the previous DN. Since keys are compared in
6032N/A * reverse order we must set the first byte (the comma). No
6032N/A * possibility of overflow here.
4660N/A */
6032N/A byte[] begin = Arrays.copyOf(key.getData(), key.getSize() + 1);
6032N/A begin[begin.length - 1] = 0x01;
4660N/A key.setData(begin);
4660N/A status = cursor.getSearchKeyRange(key, data, lockMode);
4643N/A }
4643N/A }
4963N/A flushIndexBuffers();
4643N/A }
6032N/A catch (Exception e)
4660N/A {
4660N/A message =
6032N/A ERR_JEB_IMPORT_LDIF_MIGRATE_EXISTING_TASK_ERR.get(e
6032N/A .getMessage());
4660N/A logError(message);
6032N/A isCanceled = true;
4660N/A throw e;
4660N/A }
5207N/A finally
5207N/A {
5207N/A cursor.close();
5207N/A }
4643N/A }
4660N/A }
4643N/A return null;
4643N/A }
4643N/A }
4643N/A
4643N/A /**
4963N/A * Task to perform append/replace processing.
4643N/A */
6032N/A private class AppendReplaceTask extends ImportTask
4643N/A {
4963N/A private final Set<byte[]> insertKeySet = new HashSet<byte[]>(),
6032N/A deleteKeySet = new HashSet<byte[]>();
4643N/A private final EntryInformation entryInfo = new EntryInformation();
4643N/A private Entry oldEntry;
4643N/A private EntryID entryID;
4591N/A
4591N/A /**
4591N/A * {@inheritDoc}
4591N/A */
5707N/A @Override
4591N/A public Void call() throws Exception
4591N/A {
4660N/A try
4591N/A {
4660N/A while (true)
4591N/A {
5207N/A if (importConfiguration.isCancelled() || isCanceled)
4660N/A {
5195N/A IndexOutputBuffer indexBuffer = new IndexOutputBuffer(0);
4660N/A freeBufferQueue.add(indexBuffer);
4660N/A return null;
4660N/A }
4660N/A oldEntry = null;
4660N/A Entry entry = reader.readEntry(dnSuffixMap, entryInfo);
4660N/A if (entry == null)
4660N/A {
4660N/A break;
4660N/A }
4660N/A entryID = entryInfo.getEntryID();
4660N/A Suffix suffix = entryInfo.getSuffix();
4660N/A processEntry(entry, suffix);
4591N/A }
4660N/A flushIndexBuffers();
4643N/A }
6032N/A catch (Exception e)
4660N/A {
4660N/A Message message =
6032N/A ERR_JEB_IMPORT_LDIF_APPEND_REPLACE_TASK_ERR.get(e.getMessage());
4660N/A logError(message);
5207N/A isCanceled = true;
4660N/A throw e;
4660N/A }
4643N/A return null;
4643N/A }
4643N/A
6032N/A void processEntry(Entry entry, Suffix suffix) throws DatabaseException,
6032N/A DirectoryException, JebException, InterruptedException
4643N/A
4643N/A {
4643N/A DN entryDN = entry.getDN();
4643N/A DN2ID dn2id = suffix.getDN2ID();
4643N/A EntryID oldID = dn2id.get(null, entryDN, LockMode.DEFAULT);
6032N/A if (oldID != null)
4643N/A {
4643N/A oldEntry = suffix.getID2Entry().get(null, oldID, LockMode.DEFAULT);
4643N/A }
6032N/A if (oldEntry == null)
4643N/A {
6032N/A if (!skipDNValidation)
4591N/A {
6032N/A if (!dnSanityCheck(entryDN, entry, suffix))
4591N/A {
4591N/A suffix.removePending(entryDN);
4643N/A return;
4591N/A }
4591N/A }
5147N/A suffix.removePending(entryDN);
5147N/A processDN2ID(suffix, entryDN, entryID);
4643N/A }
4643N/A else
4643N/A {
4643N/A suffix.removePending(entryDN);
4643N/A entryID = oldID;
4643N/A }
4700N/A processDN2URI(suffix, oldEntry, entry);
4643N/A suffix.getID2Entry().put(null, entryID, entry);
6032N/A if (oldEntry == null)
4643N/A {
4591N/A processIndexes(suffix, entry, entryID);
4591N/A }
4643N/A else
4643N/A {
4643N/A processAllIndexes(suffix, entry, entryID);
4643N/A }
5146N/A importCount.getAndIncrement();
4643N/A }
4643N/A
6032N/A void processAllIndexes(Suffix suffix, Entry entry, EntryID entryID)
6032N/A throws DatabaseException, DirectoryException, JebException,
6032N/A InterruptedException
4643N/A {
6032N/A for (Map.Entry<AttributeType, AttributeIndex> mapEntry : suffix
6032N/A .getAttrIndexMap().entrySet())
6032N/A {
4649N/A AttributeType attributeType = mapEntry.getKey();
6032N/A fillIndexKey(suffix, mapEntry, entry, attributeType, entryID);
4643N/A }
4643N/A }
4643N/A
5707N/A @Override
4649N/A void processAttribute(Index index, Entry entry, EntryID entryID,
6032N/A IndexKey indexKey) throws DatabaseException, InterruptedException
4643N/A {
6032N/A if (oldEntry != null)
4643N/A {
4643N/A deleteKeySet.clear();
4643N/A index.indexer.indexEntry(oldEntry, deleteKeySet);
6032N/A for (byte[] delKey : deleteKeySet)
4643N/A {
4643N/A processKey(index, delKey, entryID, indexComparator, indexKey, false);
4643N/A }
4643N/A }
4643N/A insertKeySet.clear();
4643N/A index.indexer.indexEntry(entry, insertKeySet);
6032N/A for (byte[] key : insertKeySet)
4643N/A {
4643N/A processKey(index, key, entryID, indexComparator, indexKey, true);
4643N/A }
4643N/A }
4643N/A }
4643N/A
4643N/A /**
4963N/A * This task performs phase reading and processing of the entries read from
4963N/A * the LDIF file(s). This task is used if the append flag wasn't specified.
4643N/A */
6032N/A private class ImportTask implements Callable<Void>
4643N/A {
5195N/A private final Map<IndexKey, IndexOutputBuffer> indexBufferMap =
6032N/A new HashMap<IndexKey, IndexOutputBuffer>();
4643N/A private final Set<byte[]> insertKeySet = new HashSet<byte[]>();
4643N/A private final EntryInformation entryInfo = new EntryInformation();
4963N/A private DatabaseEntry keyEntry = new DatabaseEntry(),
6032N/A valEntry = new DatabaseEntry();
4643N/A
4643N/A /**
4643N/A * {@inheritDoc}
4643N/A */
5707N/A @Override
4643N/A public Void call() throws Exception
4643N/A {
4660N/A try
4643N/A {
4660N/A while (true)
4643N/A {
5207N/A if (importConfiguration.isCancelled() || isCanceled)
4660N/A {
5195N/A IndexOutputBuffer indexBuffer = new IndexOutputBuffer(0);
4660N/A freeBufferQueue.add(indexBuffer);
4660N/A return null;
4660N/A }
4660N/A Entry entry = reader.readEntry(dnSuffixMap, entryInfo);
4660N/A if (entry == null)
4660N/A {
4660N/A break;
4660N/A }
4660N/A EntryID entryID = entryInfo.getEntryID();
4660N/A Suffix suffix = entryInfo.getSuffix();
4660N/A processEntry(entry, entryID, suffix);
4643N/A }
4660N/A flushIndexBuffers();
4643N/A }
4660N/A catch (Exception e)
4660N/A {
4660N/A Message message =
6032N/A ERR_JEB_IMPORT_LDIF_IMPORT_TASK_ERR.get(e.getMessage());
4660N/A logError(message);
5207N/A isCanceled = true;
4660N/A throw e;
4660N/A }
4591N/A return null;
4591N/A }
4591N/A
4643N/A void processEntry(Entry entry, EntryID entryID, Suffix suffix)
6032N/A throws DatabaseException, DirectoryException, JebException,
6032N/A InterruptedException
4643N/A
4643N/A {
4643N/A DN entryDN = entry.getDN();
6032N/A if (!skipDNValidation)
4643N/A {
6032N/A if (!dnSanityCheck(entryDN, entry, suffix))
4643N/A {
4643N/A suffix.removePending(entryDN);
4643N/A return;
4643N/A }
4643N/A }
4963N/A suffix.removePending(entryDN);
4963N/A processDN2ID(suffix, entryDN, entryID);
4700N/A processDN2URI(suffix, null, entry);
4963N/A processIndexes(suffix, entry, entryID);
4643N/A suffix.getID2Entry().put(null, entryID, entry);
4823N/A importCount.getAndIncrement();
4643N/A }
4643N/A
4963N/A //Examine the DN for duplicates and missing parents.
4963N/A boolean dnSanityCheck(DN entryDN, Entry entry, Suffix suffix)
6032N/A throws JebException, InterruptedException
4591N/A {
5117N/A //Perform parent checking.
5117N/A DN parentDN = suffix.getEntryContainer().getParentWithinBase(entryDN);
6032N/A if (parentDN != null)
6032N/A {
6032N/A if (!suffix.isParentProcessed(parentDN, tmpEnv, clearedBackend))
6032N/A {
5117N/A Message message =
6032N/A ERR_JEB_IMPORT_PARENT_NOT_FOUND.get(parentDN.toString());
5117N/A reader.rejectEntry(entry, message);
5117N/A return false;
5117N/A }
5117N/A }
4963N/A //If the backend was not cleared, then the dn2id needs to checked first
4963N/A //for DNs that might not exist in the DN cache. If the DN is not in
4963N/A //the suffixes dn2id DB, then the dn cache is used.
6032N/A if (!clearedBackend)
4591N/A {
4963N/A EntryID id = suffix.getDN2ID().get(null, entryDN, LockMode.DEFAULT);
6032N/A if (id != null || !tmpEnv.insert(entryDN, keyEntry, valEntry))
4963N/A {
4963N/A Message message = WARN_JEB_IMPORT_ENTRY_EXISTS.get();
4963N/A reader.rejectEntry(entry, message);
4591N/A return false;
4591N/A }
4591N/A }
6032N/A else if (!tmpEnv.insert(entryDN, keyEntry, valEntry))
4591N/A {
6032N/A Message message = WARN_JEB_IMPORT_ENTRY_EXISTS.get();
6032N/A reader.rejectEntry(entry, message);
6032N/A return false;
4963N/A }
4591N/A return true;
4591N/A }
4591N/A
6032N/A void processIndexes(Suffix suffix, Entry entry, EntryID entryID)
6032N/A throws DatabaseException, DirectoryException, JebException,
6032N/A InterruptedException
4591N/A {
6032N/A for (Map.Entry<AttributeType, AttributeIndex> mapEntry : suffix
6032N/A .getAttrIndexMap().entrySet())
6032N/A {
4649N/A AttributeType attributeType = mapEntry.getKey();
6032N/A if (entry.hasAttribute(attributeType))
6032N/A {
6032N/A fillIndexKey(suffix, mapEntry, entry, attributeType, entryID);
6032N/A }
6032N/A }
6032N/A }
6032N/A
6032N/A void fillIndexKey(Suffix suffix,
6032N/A Map.Entry<AttributeType, AttributeIndex> mapEntry, Entry entry,
6032N/A AttributeType attributeType, EntryID entryID) throws DatabaseException,
6032N/A InterruptedException, DirectoryException, JebException
6032N/A {
6032N/A AttributeIndex attributeIndex = mapEntry.getValue();
6032N/A Index index;
6032N/A if ((index = attributeIndex.getEqualityIndex()) != null)
6032N/A {
6032N/A processAttribute(index, entry, entryID, new IndexKey(attributeType,
6032N/A ImportIndexType.EQUALITY, index.getIndexEntryLimit()));
6032N/A }
6032N/A if ((index = attributeIndex.getPresenceIndex()) != null)
6032N/A {
6032N/A processAttribute(index, entry, entryID, new IndexKey(attributeType,
6032N/A ImportIndexType.PRESENCE, index.getIndexEntryLimit()));
6032N/A }
6032N/A if ((index = attributeIndex.getSubstringIndex()) != null)
6032N/A {
6032N/A processAttribute(index, entry, entryID, new IndexKey(attributeType,
6032N/A ImportIndexType.SUBSTRING, index.getIndexEntryLimit()));
6032N/A }
6032N/A if ((index = attributeIndex.getOrderingIndex()) != null)
6032N/A {
6032N/A processAttribute(index, entry, entryID, new IndexKey(attributeType,
6032N/A ImportIndexType.ORDERING, index.getIndexEntryLimit()));
6032N/A }
6032N/A if ((index = attributeIndex.getApproximateIndex()) != null)
6032N/A {
6032N/A processAttribute(index, entry, entryID, new IndexKey(attributeType,
6032N/A ImportIndexType.APPROXIMATE, index.getIndexEntryLimit()));
6032N/A }
6032N/A for (VLVIndex vlvIdx : suffix.getEntryContainer().getVLVIndexes())
6032N/A {
6032N/A Transaction transaction = null;
6032N/A vlvIdx.addEntry(transaction, entryID, entry);
6032N/A }
6032N/A Map<String, Collection<Index>> extensibleMap =
6032N/A attributeIndex.getExtensibleIndexes();
6032N/A if (!extensibleMap.isEmpty())
6032N/A {
6032N/A Collection<Index> subIndexes =
6032N/A attributeIndex.getExtensibleIndexes().get(
6032N/A EXTENSIBLE_INDEXER_ID_SUBSTRING);
6032N/A if (subIndexes != null)
6032N/A {
6032N/A for (Index subIndex : subIndexes)
6032N/A {
6032N/A processAttribute(subIndex, entry, entryID, new IndexKey(
6032N/A attributeType, ImportIndexType.EX_SUBSTRING, subIndex
6032N/A .getIndexEntryLimit()));
4591N/A }
6032N/A }
6032N/A Collection<Index> sharedIndexes =
6032N/A attributeIndex.getExtensibleIndexes().get(
6032N/A EXTENSIBLE_INDEXER_ID_SHARED);
6032N/A if (sharedIndexes != null)
6032N/A {
6032N/A for (Index sharedIndex : sharedIndexes)
6032N/A {
6032N/A processAttribute(sharedIndex, entry, entryID, new IndexKey(
6032N/A attributeType, ImportIndexType.EX_SHARED, sharedIndex
6032N/A .getIndexEntryLimit()));
4591N/A }
4591N/A }
4591N/A }
4591N/A }
4591N/A
6032N/A void processAttribute(Index index, Entry entry, EntryID entryID,
6032N/A IndexKey indexKey) throws DatabaseException, InterruptedException
4591N/A {
4591N/A insertKeySet.clear();
4591N/A index.indexer.indexEntry(entry, insertKeySet);
6032N/A for (byte[] key : insertKeySet)
4591N/A {
4643N/A processKey(index, key, entryID, indexComparator, indexKey, true);
4591N/A }
4591N/A }
4591N/A
6032N/A void flushIndexBuffers() throws InterruptedException, ExecutionException
4591N/A {
6032N/A Set<Map.Entry<IndexKey, IndexOutputBuffer>> set =
6032N/A indexBufferMap.entrySet();
6032N/A Iterator<Map.Entry<IndexKey, IndexOutputBuffer>> setIterator =
6032N/A set.iterator();
6032N/A while (setIterator.hasNext())
6032N/A {
6032N/A Map.Entry<IndexKey, IndexOutputBuffer> e = setIterator.next();
6032N/A IndexKey indexKey = e.getKey();
6032N/A IndexOutputBuffer indexBuffer = e.getValue();
6032N/A setIterator.remove();
6032N/A indexBuffer.setComparator(indexComparator);
6032N/A indexBuffer.setIndexKey(indexKey);
6032N/A indexBuffer.setDiscard();
6032N/A Future<Void> future =
6032N/A bufferSortService.submit(new SortTask(indexBuffer));
6032N/A future.get();
6032N/A }
4591N/A }
4591N/A
6032N/A int processKey(DatabaseContainer container, byte[] key, EntryID entryID,
6032N/A IndexOutputBuffer.ComparatorBuffer<byte[]> comparator,
6032N/A IndexKey indexKey, boolean insert) throws InterruptedException
4591N/A {
6344N/A int sizeNeeded = IndexOutputBuffer.getRequiredSize(
6344N/A key.length, entryID.longValue());
5195N/A IndexOutputBuffer indexBuffer = indexBufferMap.get(indexKey);
5195N/A if (indexBuffer == null)
4591N/A {
6344N/A indexBuffer = getNewIndexBuffer(sizeNeeded);
4643N/A indexBufferMap.put(indexKey, indexBuffer);
4591N/A }
6344N/A else if (!indexBuffer.isSpaceAvailable(key, entryID.longValue()))
4591N/A {
4591N/A indexBuffer.setComparator(comparator);
4643N/A indexBuffer.setIndexKey(indexKey);
4963N/A bufferSortService.submit(new SortTask(indexBuffer));
6344N/A indexBuffer = getNewIndexBuffer(sizeNeeded);
4643N/A indexBufferMap.put(indexKey, indexBuffer);
4591N/A }
4643N/A int id = System.identityHashCode(container);
4643N/A indexBuffer.add(key, entryID, id, insert);
4643N/A return id;
4591N/A }
4591N/A
6344N/A IndexOutputBuffer getNewIndexBuffer(int size)
6344N/A throws InterruptedException
4591N/A {
6344N/A IndexOutputBuffer indexBuffer;
6344N/A if (size > bufferSize)
6032N/A {
6344N/A indexBuffer = new IndexOutputBuffer(size);
6344N/A indexBuffer.setDiscard();
6344N/A }
6344N/A else
6344N/A {
6344N/A indexBuffer = freeBufferQueue.take();
6344N/A if (indexBuffer == null)
6344N/A {
6344N/A Message message =
6344N/A Message.raw(Category.JEB, Severity.SEVERE_ERROR,
6344N/A "Index buffer processing error.");
6344N/A throw new InterruptedException(message.toString());
6344N/A }
6032N/A }
6032N/A if (indexBuffer.isPoison())
6032N/A {
6032N/A Message message =
6032N/A Message.raw(Category.JEB, Severity.SEVERE_ERROR,
6032N/A "Cancel processing received.");
6032N/A throw new InterruptedException(message.toString());
6032N/A }
4591N/A return indexBuffer;
4591N/A }
4591N/A
4643N/A void processDN2ID(Suffix suffix, DN dn, EntryID entryID)
6032N/A throws InterruptedException
4591N/A {
5176N/A DN2ID dn2id = suffix.getDN2ID();
5176N/A byte[] dnBytes =
5176N/A JebFormat.dnToDNKey(dn, suffix.getBaseDN().getNumComponents());
6032N/A int id =
6032N/A processKey(dn2id, dnBytes, entryID, indexComparator, new IndexKey(
6032N/A dnType, ImportIndexType.DN, 1), true);
4643N/A idECMap.putIfAbsent(id, suffix.getEntryContainer());
4591N/A }
4700N/A
4700N/A void processDN2URI(Suffix suffix, Entry oldEntry, Entry newEntry)
6032N/A throws DatabaseException
4700N/A {
4700N/A DN2URI dn2uri = suffix.getDN2URI();
6032N/A if (oldEntry != null)
4700N/A {
4700N/A dn2uri.replaceEntry(null, oldEntry, newEntry);
4700N/A }
4700N/A else
4700N/A {
4700N/A dn2uri.addEntry(null, newEntry);
4700N/A }
4700N/A }
4591N/A }
4591N/A
4591N/A /**
4963N/A * This task reads sorted records from the temporary index scratch files,
6032N/A * processes the records and writes the results to the index database. The DN
6032N/A * index is treated differently then non-DN indexes.
4591N/A */
4963N/A private final class IndexDBWriteTask implements Callable<Void>
4643N/A {
4591N/A private final IndexManager indexMgr;
4591N/A private final DatabaseEntry dbKey, dbValue;
4591N/A private final int cacheSize;
4643N/A private final Map<Integer, DNState> dnStateMap =
6032N/A new HashMap<Integer, DNState>();
4643N/A private final Map<Integer, Index> indexMap = new HashMap<Integer, Index>();
5201N/A private final Semaphore permits;
5201N/A private final int maxPermits;
5201N/A private final AtomicLong bytesRead = new AtomicLong();
5201N/A private long lastBytesRead = 0;
5201N/A private final AtomicInteger keyCount = new AtomicInteger();
5201N/A private RandomAccessFile bufferFile = null;
5201N/A private DataInputStream bufferIndexFile = null;
5201N/A private int remainingBuffers;
5201N/A private volatile int totalBatches;
5201N/A private AtomicInteger batchNumber = new AtomicInteger();
5201N/A private int nextBufferID;
5201N/A private int ownedPermits;
5201N/A private volatile boolean isRunning = false;
5201N/A
5201N/A /**
5201N/A * Creates a new index DB writer.
5201N/A *
5201N/A * @param indexMgr
5201N/A * The index manager.
5201N/A * @param permits
5201N/A * The semaphore used for restricting the number of buffer
5201N/A * allocations.
5201N/A * @param maxPermits
5201N/A * The maximum number of buffers which can be allocated.
5201N/A * @param cacheSize
5201N/A * The buffer cache size.
5201N/A */
5201N/A public IndexDBWriteTask(IndexManager indexMgr, Semaphore permits,
5201N/A int maxPermits, int cacheSize)
4591N/A {
4591N/A this.indexMgr = indexMgr;
5201N/A this.permits = permits;
5201N/A this.maxPermits = maxPermits;
5201N/A this.cacheSize = cacheSize;
5201N/A
4591N/A this.dbKey = new DatabaseEntry();
4591N/A this.dbValue = new DatabaseEntry();
5201N/A }
5201N/A
5201N/A /**
5201N/A * Initializes this task.
5201N/A *
5201N/A * @throws IOException
5201N/A * If an IO error occurred.
5201N/A */
5201N/A public void beginWriteTask() throws IOException
5201N/A {
5201N/A bufferFile = new RandomAccessFile(indexMgr.getBufferFile(), "r");
6032N/A bufferIndexFile =
6032N/A new DataInputStream(new BufferedInputStream(new FileInputStream(
6032N/A indexMgr.getBufferIndexFile())));
5201N/A
5201N/A remainingBuffers = indexMgr.getNumberOfBuffers();
5201N/A totalBatches = (remainingBuffers / maxPermits) + 1;
5201N/A batchNumber.set(0);
5201N/A nextBufferID = 0;
5201N/A ownedPermits = 0;
5201N/A
6032N/A Message message =
6032N/A NOTE_JEB_IMPORT_LDIF_INDEX_STARTED.get(indexMgr.getBufferFileName(),
6032N/A remainingBuffers, totalBatches);
5201N/A logError(message);
5201N/A
5201N/A indexMgr.setIndexDBWriteTask(this);
5201N/A isRunning = true;
4591N/A }
4591N/A
5201N/A /**
5201N/A * Returns the next batch of buffers to be processed, blocking until enough
5201N/A * buffer permits are available.
5201N/A *
5201N/A * @return The next batch of buffers, or {@code null} if there are no more
5201N/A * buffers to be processed.
5201N/A * @throws Exception
5201N/A * If an exception occurred.
5201N/A */
5201N/A public NavigableSet<IndexInputBuffer> getNextBufferBatch() throws Exception
4591N/A {
5201N/A // First release any previously acquired permits.
5201N/A if (ownedPermits > 0)
5201N/A {
5201N/A permits.release(ownedPermits);
5201N/A ownedPermits = 0;
5201N/A }
5201N/A
5201N/A // Block until we can either get enough permits for all buffers, or the
5201N/A // maximum number of permits.
5201N/A final int permitRequest = Math.min(remainingBuffers, maxPermits);
5201N/A if (permitRequest == 0)
5201N/A {
5201N/A // No more work to do.
5201N/A return null;
5201N/A }
5201N/A permits.acquire(permitRequest);
5201N/A
5201N/A // Update counters.
5201N/A ownedPermits = permitRequest;
5201N/A remainingBuffers -= permitRequest;
5201N/A batchNumber.incrementAndGet();
5201N/A
5201N/A // Create all the index buffers for the next batch.
5201N/A final NavigableSet<IndexInputBuffer> buffers =
6032N/A new TreeSet<IndexInputBuffer>();
5201N/A for (int i = 0; i < permitRequest; i++)
4591N/A {
5201N/A final long bufferBegin = bufferIndexFile.readLong();
5201N/A final long bufferEnd = bufferIndexFile.readLong();
6032N/A final IndexInputBuffer b =
6032N/A new IndexInputBuffer(indexMgr, bufferFile.getChannel(),
6032N/A bufferBegin, bufferEnd, nextBufferID++, cacheSize);
5201N/A buffers.add(b);
5201N/A }
5201N/A
5201N/A return buffers;
5201N/A }
5201N/A
5201N/A /**
5201N/A * Finishes this task.
5201N/A */
5207N/A public void endWriteTask()
5201N/A {
5201N/A isRunning = false;
5201N/A
5201N/A // First release any previously acquired permits.
5201N/A if (ownedPermits > 0)
5201N/A {
5201N/A permits.release(ownedPermits);
5201N/A ownedPermits = 0;
5201N/A }
5201N/A
5201N/A try
5201N/A {
5201N/A if (indexMgr.isDN2ID())
5201N/A {
5201N/A for (DNState dnState : dnStateMap.values())
5201N/A {
5201N/A dnState.flush();
5201N/A }
6032N/A if (!isCanceled)
5207N/A {
6032N/A Message msg =
6032N/A NOTE_JEB_IMPORT_LDIF_DN_CLOSE.get(indexMgr.getDNCount());
5207N/A logError(msg);
5207N/A }
5201N/A }
5201N/A else
5201N/A {
5201N/A for (Index index : indexMap.values())
5201N/A {
5201N/A index.closeCursor();
5201N/A }
6032N/A if (!isCanceled)
5207N/A {
6032N/A Message message =
6032N/A NOTE_JEB_IMPORT_LDIF_INDEX_CLOSE.get(indexMgr
6032N/A .getBufferFileName());
5207N/A logError(message);
5207N/A }
5201N/A }
4591N/A }
5201N/A finally
5201N/A {
5201N/A if (bufferFile != null)
5201N/A {
5201N/A try
5201N/A {
5201N/A bufferFile.close();
5201N/A }
5201N/A catch (IOException ignored)
5201N/A {
5201N/A // Ignore.
5201N/A }
5201N/A }
5201N/A
5201N/A if (bufferIndexFile != null)
5201N/A {
5201N/A try
5201N/A {
5201N/A bufferIndexFile.close();
5201N/A }
5201N/A catch (IOException ignored)
5201N/A {
5201N/A // Ignore.
5201N/A }
5201N/A }
5201N/A
5201N/A indexMgr.getBufferFile().delete();
5201N/A indexMgr.getBufferIndexFile().delete();
5201N/A }
4643N/A }
4643N/A
5201N/A /**
5201N/A * Print out progress stats.
5201N/A *
5201N/A * @param deltaTime
5201N/A * The time since the last update.
5201N/A */
5201N/A public void printStats(long deltaTime)
5201N/A {
5201N/A if (isRunning)
5201N/A {
5201N/A final long bufferFileSize = indexMgr.getBufferFileSize();
5201N/A final long tmpBytesRead = bytesRead.get();
5201N/A final int currentBatch = batchNumber.get();
5201N/A
5201N/A final long bytesReadInterval = tmpBytesRead - lastBytesRead;
6032N/A final int bytesReadPercent =
6032N/A Math.round((100f * tmpBytesRead) / bufferFileSize);
5201N/A
5201N/A // Kilo and milli approximately cancel out.
5201N/A final long kiloBytesRate = bytesReadInterval / deltaTime;
5201N/A final long kiloBytesRemaining = (bufferFileSize - tmpBytesRead) / 1024;
5201N/A
6032N/A Message message =
6032N/A NOTE_JEB_IMPORT_LDIF_PHASE_TWO_REPORT.get(indexMgr
6032N/A .getBufferFileName(), bytesReadPercent, kiloBytesRemaining,
6032N/A kiloBytesRate, currentBatch, totalBatches);
5201N/A logError(message);
5201N/A
5201N/A lastBytesRead = tmpBytesRead;
5201N/A }
5201N/A }
5201N/A
4963N/A /**
4963N/A * {@inheritDoc}
4963N/A */
5707N/A @Override
6319N/A public Void call() throws Exception, DirectoryException
4643N/A {
5195N/A ByteBuffer key = null;
5195N/A ImportIDSet insertIDSet = null;
5195N/A ImportIDSet deleteIDSet = null;
5195N/A Integer indexID = null;
5195N/A
6032N/A if (isCanceled)
5207N/A {
5207N/A return null;
5207N/A }
5207N/A
4660N/A try
4591N/A {
5201N/A beginWriteTask();
5201N/A
5201N/A NavigableSet<IndexInputBuffer> bufferSet;
5201N/A while ((bufferSet = getNextBufferBatch()) != null)
4591N/A {
6032N/A if (isCanceled)
5207N/A {
5207N/A return null;
5207N/A }
5207N/A
5201N/A while (!bufferSet.isEmpty())
4591N/A {
5201N/A IndexInputBuffer b = bufferSet.pollFirst();
5201N/A if (key == null)
5201N/A {
5201N/A indexID = b.getIndexID();
5201N/A
5201N/A if (indexMgr.isDN2ID())
5201N/A {
5201N/A insertIDSet = new ImportIDSet(1, 1, false);
5201N/A deleteIDSet = new ImportIDSet(1, 1, false);
5201N/A }
5201N/A else
5201N/A {
5201N/A Index index = (Index) idContainerMap.get(indexID);
5201N/A int limit = index.getIndexEntryLimit();
5201N/A boolean doCount = index.getMaintainCount();
5201N/A insertIDSet = new ImportIDSet(1, limit, doCount);
5201N/A deleteIDSet = new ImportIDSet(1, limit, doCount);
5201N/A }
5201N/A
5201N/A key = ByteBuffer.allocate(b.getKeyLen());
5201N/A key.flip();
5201N/A b.getKey(key);
5201N/A
5201N/A b.mergeIDSet(insertIDSet);
5201N/A b.mergeIDSet(deleteIDSet);
5201N/A insertIDSet.setKey(key);
5201N/A deleteIDSet.setKey(key);
5201N/A }
5201N/A else if (b.compare(key, indexID) != 0)
4963N/A {
5201N/A addToDB(insertIDSet, deleteIDSet, indexID);
5201N/A keyCount.incrementAndGet();
5201N/A
5201N/A indexID = b.getIndexID();
5201N/A
5201N/A if (indexMgr.isDN2ID())
5201N/A {
5201N/A insertIDSet = new ImportIDSet(1, 1, false);
5201N/A deleteIDSet = new ImportIDSet(1, 1, false);
5201N/A }
5201N/A else
5201N/A {
5201N/A Index index = (Index) idContainerMap.get(indexID);
5201N/A int limit = index.getIndexEntryLimit();
5201N/A boolean doCount = index.getMaintainCount();
5201N/A insertIDSet = new ImportIDSet(1, limit, doCount);
5201N/A deleteIDSet = new ImportIDSet(1, limit, doCount);
5201N/A }
5201N/A
5201N/A key.clear();
5201N/A if (b.getKeyLen() > key.capacity())
5201N/A {
5201N/A key = ByteBuffer.allocate(b.getKeyLen());
5201N/A }
5201N/A key.flip();
5201N/A b.getKey(key);
5201N/A
5201N/A b.mergeIDSet(insertIDSet);
5201N/A b.mergeIDSet(deleteIDSet);
5201N/A insertIDSet.setKey(key);
5201N/A deleteIDSet.setKey(key);
5195N/A }
5195N/A else
5195N/A {
5201N/A b.mergeIDSet(insertIDSet);
5201N/A b.mergeIDSet(deleteIDSet);
4963N/A }
5195N/A
5201N/A if (b.hasMoreData())
5201N/A {
5201N/A b.getNextRecord();
5201N/A bufferSet.add(b);
5201N/A }
5195N/A }
5201N/A
5201N/A if (key != null)
5195N/A {
5195N/A addToDB(insertIDSet, deleteIDSet, indexID);
4591N/A }
4591N/A }
4591N/A }
4660N/A catch (Exception e)
4591N/A {
6032N/A Message message =
6032N/A ERR_JEB_IMPORT_LDIF_INDEX_WRITE_DB_ERR.get(indexMgr
6032N/A .getBufferFileName(), e.getMessage());
4660N/A logError(message);
4660N/A throw e;
4591N/A }
5117N/A finally
5117N/A {
5201N/A endWriteTask();
5117N/A }
4591N/A return null;
4591N/A }
4591N/A
4963N/A private void addToDB(ImportIDSet insertSet, ImportIDSet deleteSet,
6319N/A int indexID) throws DirectoryException
4643N/A {
6032N/A if (!indexMgr.isDN2ID())
4643N/A {
4643N/A Index index;
6032N/A if ((deleteSet.size() > 0) || (!deleteSet.isDefined()))
4643N/A {
6032N/A dbKey.setData(deleteSet.getKey().array(), 0, deleteSet.getKey()
6032N/A .limit());
6032N/A index = (Index) idContainerMap.get(indexID);
4963N/A index.delete(dbKey, deleteSet, dbValue);
6032N/A if (!indexMap.containsKey(indexID))
4643N/A {
4643N/A indexMap.put(indexID, index);
4643N/A }
4643N/A }
6032N/A if ((insertSet.size() > 0) || (!insertSet.isDefined()))
4643N/A {
6032N/A dbKey.setData(insertSet.getKey().array(), 0, insertSet.getKey()
6032N/A .limit());
6032N/A index = (Index) idContainerMap.get(indexID);
4963N/A index.insert(dbKey, insertSet, dbValue);
6032N/A if (!indexMap.containsKey(indexID))
4643N/A {
4643N/A indexMap.put(indexID, index);
4643N/A }
4643N/A }
4643N/A }
4963N/A else
4643N/A {
4963N/A addDN2ID(insertSet, indexID);
4643N/A }
4643N/A }
4643N/A
4643N/A private void addDN2ID(ImportIDSet record, Integer indexID)
6319N/A throws DirectoryException
4591N/A {
4643N/A DNState dnState;
6032N/A if (!dnStateMap.containsKey(indexID))
4591N/A {
4643N/A dnState = new DNState(idECMap.get(indexID));
4643N/A dnStateMap.put(indexID, dnState);
4591N/A }
4643N/A else
4591N/A {
4643N/A dnState = dnStateMap.get(indexID);
4591N/A }
6032N/A if (!dnState.checkParent(record))
4643N/A {
4643N/A return;
4643N/A }
4643N/A dnState.writeToDB();
4591N/A }
4591N/A
5201N/A private void addBytesRead(int bytesRead)
5201N/A {
5201N/A this.bytesRead.addAndGet(bytesRead);
5201N/A }
5201N/A
5201N/A /**
4643N/A * This class is used to by a index DB merge thread performing DN processing
4643N/A * to keep track of the state of individual DN2ID index processing.
4643N/A */
4643N/A class DNState
4591N/A {
5707N/A private static final int DN_STATE_CACHE_SIZE = 64 * KB;
4963N/A
5176N/A private ByteBuffer parentDN, lastDN;
4736N/A private EntryID parentID, lastID, entryID;
5195N/A private final DatabaseEntry dnKey, dnValue;
5176N/A private final TreeMap<ByteBuffer, EntryID> parentIDMap;
4643N/A private final EntryContainer entryContainer;
4643N/A private final Map<byte[], ImportIDSet> id2childTree;
4643N/A private final Map<byte[], ImportIDSet> id2subtreeTree;
4736N/A private final int childLimit, subTreeLimit;
4736N/A private final boolean childDoCount, subTreeDoCount;
4643N/A
4643N/A DNState(EntryContainer entryContainer)
4591N/A {
4643N/A this.entryContainer = entryContainer;
5176N/A parentIDMap = new TreeMap<ByteBuffer, EntryID>();
4736N/A Comparator<byte[]> childComparator =
6032N/A entryContainer.getID2Children().getComparator();
4643N/A id2childTree = new TreeMap<byte[], ImportIDSet>(childComparator);
4736N/A childLimit = entryContainer.getID2Children().getIndexEntryLimit();
4736N/A childDoCount = entryContainer.getID2Children().getMaintainCount();
4736N/A Comparator<byte[]> subComparator =
6032N/A entryContainer.getID2Subtree().getComparator();
4736N/A subTreeLimit = entryContainer.getID2Subtree().getIndexEntryLimit();
4736N/A subTreeDoCount = entryContainer.getID2Subtree().getMaintainCount();
6032N/A id2subtreeTree = new TreeMap<byte[], ImportIDSet>(subComparator);
5195N/A dnKey = new DatabaseEntry();
5195N/A dnValue = new DatabaseEntry();
5176N/A lastDN = ByteBuffer.allocate(BYTE_BUFFER_CAPACITY);
4591N/A }
4591N/A
5176N/A private ByteBuffer getParent(ByteBuffer buffer)
5176N/A {
5176N/A int parentIndex =
5176N/A JebFormat.findDNKeyParent(buffer.array(), 0, buffer.limit());
6032N/A if (parentIndex < 0)
5176N/A {
5176N/A // This is the root or base DN
5176N/A return null;
5176N/A }
5176N/A ByteBuffer parent = buffer.duplicate();
5176N/A parent.limit(parentIndex);
5176N/A return parent;
5176N/A }
5176N/A
5176N/A private ByteBuffer deepCopy(ByteBuffer srcBuffer, ByteBuffer destBuffer)
5176N/A {
6032N/A if (destBuffer == null
6032N/A || destBuffer.clear().remaining() < srcBuffer.limit())
5176N/A {
5176N/A byte[] bytes = new byte[srcBuffer.limit()];
6032N/A System.arraycopy(srcBuffer.array(), 0, bytes, 0, srcBuffer.limit());
5176N/A return ByteBuffer.wrap(bytes);
5176N/A }
5176N/A else
5176N/A {
5176N/A destBuffer.put(srcBuffer);
5176N/A destBuffer.flip();
5176N/A return destBuffer;
5176N/A }
5176N/A }
5176N/A
5176N/A // Why do we still need this if we are checking parents in the first
5176N/A // phase?
5207N/A private boolean checkParent(ImportIDSet record) throws DatabaseException
4591N/A {
6032N/A dnKey.setData(record.getKey().array(), 0, record.getKey().limit());
4643N/A byte[] v = record.toDatabase();
4643N/A long v1 = JebFormat.entryIDFromDatabase(v);
5195N/A dnValue.setData(v);
4963N/A
4856N/A entryID = new EntryID(v1);
5176N/A parentDN = getParent(record.getKey());
5176N/A
4835N/A //Bypass the cache for append data, lookup the parent in DN2ID and
4835N/A //return.
6032N/A if (importConfiguration != null
6032N/A && importConfiguration.appendToExistingData())
4591N/A {
5017N/A //If null is returned than this is a suffix DN.
6032N/A if (parentDN != null)
5017N/A {
5195N/A DatabaseEntry key =
6032N/A new DatabaseEntry(parentDN.array(), 0, parentDN.limit());
5176N/A DatabaseEntry value = new DatabaseEntry();
5176N/A OperationStatus status;
5176N/A status =
5176N/A entryContainer.getDN2ID().read(null, key, value,
6032N/A LockMode.DEFAULT);
6032N/A if (status == OperationStatus.SUCCESS)
5176N/A {
5176N/A parentID = new EntryID(value);
5176N/A }
5176N/A else
5176N/A {
5176N/A // We have a missing parent. Maybe parent checking was turned off?
5176N/A // Just ignore.
5176N/A parentID = null;
5176N/A return false;
5176N/A }
5017N/A }
4591N/A }
4591N/A else
4591N/A {
6032N/A if (parentIDMap.isEmpty())
4656N/A {
5176N/A parentIDMap.put(deepCopy(record.getKey(), null), entryID);
4835N/A return true;
4835N/A }
6032N/A else if (lastDN != null && lastDN.equals(parentDN))
4835N/A {
5176N/A parentIDMap.put(deepCopy(lastDN, null), lastID);
4835N/A parentID = lastID;
5176N/A lastDN = deepCopy(record.getKey(), lastDN);
4643N/A lastID = entryID;
4835N/A return true;
4835N/A }
6032N/A else if (parentIDMap.lastKey().equals(parentDN))
4835N/A {
4835N/A parentID = parentIDMap.get(parentDN);
5176N/A lastDN = deepCopy(record.getKey(), lastDN);
4835N/A lastID = entryID;
4835N/A return true;
4643N/A }
4643N/A else
4643N/A {
6032N/A if (parentIDMap.containsKey(parentDN))
4835N/A {
5176N/A EntryID newParentID = parentIDMap.get(parentDN);
5176N/A ByteBuffer key = parentIDMap.lastKey();
6032N/A while (!parentDN.equals(key))
6032N/A {
5176N/A parentIDMap.remove(key);
5176N/A key = parentIDMap.lastKey();
4835N/A }
5176N/A parentIDMap.put(deepCopy(record.getKey(), null), entryID);
4835N/A parentID = newParentID;
5176N/A lastDN = deepCopy(record.getKey(), lastDN);
4835N/A lastID = entryID;
4835N/A }
4835N/A else
4835N/A {
5176N/A // We have a missing parent. Maybe parent checking was turned off?
5176N/A // Just ignore.
5176N/A parentID = null;
4835N/A return false;
4835N/A }
4643N/A }
4591N/A }
4643N/A return true;
4591N/A }
4643N/A
6319N/A private void id2child(EntryID childID) throws DirectoryException
4591N/A {
4736N/A ImportIDSet idSet;
6319N/A if (parentID != null)
4736N/A {
6319N/A if (!id2childTree.containsKey(parentID.getDatabaseEntry().getData()))
6319N/A {
6319N/A idSet = new ImportIDSet(1, childLimit, childDoCount);
6319N/A id2childTree.put(parentID.getDatabaseEntry().getData(), idSet);
6319N/A }
6319N/A else
6319N/A {
6319N/A idSet = id2childTree.get(parentID.getDatabaseEntry().getData());
6319N/A }
6319N/A idSet.addEntryID(childID);
6319N/A if (id2childTree.size() > DN_STATE_CACHE_SIZE)
6319N/A {
6319N/A flushMapToDB(id2childTree, entryContainer.getID2Children(), true);
6319N/A }
4736N/A }
4736N/A else
4736N/A {
6319N/A throw new DirectoryException(ResultCode.CONSTRAINT_VIOLATION,
6319N/A ERR_PARENT_ENTRY_IS_MISSING.get());
4736N/A }
4591N/A }
4591N/A
5176N/A private EntryID getParentID(ByteBuffer dn) throws DatabaseException
4835N/A {
4835N/A EntryID nodeID;
4835N/A //Bypass the cache for append data, lookup the parent DN in the DN2ID
4835N/A //db.
6032N/A if (importConfiguration != null
6032N/A && importConfiguration.appendToExistingData())
4835N/A {
6032N/A DatabaseEntry key = new DatabaseEntry(dn.array(), 0, dn.limit());
6032N/A DatabaseEntry value = new DatabaseEntry();
6032N/A OperationStatus status;
6032N/A status =
6032N/A entryContainer.getDN2ID()
6032N/A .read(null, key, value, LockMode.DEFAULT);
6032N/A if (status == OperationStatus.SUCCESS)
6032N/A {
6032N/A nodeID = new EntryID(value);
6032N/A }
6032N/A else
6032N/A {
6032N/A nodeID = null;
6032N/A }
4835N/A }
4835N/A else
4835N/A {
4835N/A nodeID = parentIDMap.get(dn);
4835N/A }
4835N/A return nodeID;
4835N/A }
4643N/A
6319N/A private void id2SubTree(EntryID childID) throws DirectoryException
4591N/A {
6319N/A if (parentID != null)
4643N/A {
6319N/A ImportIDSet idSet;
6319N/A if (!id2subtreeTree
6319N/A .containsKey(parentID.getDatabaseEntry().getData()))
6319N/A {
6319N/A idSet = new ImportIDSet(1, subTreeLimit, subTreeDoCount);
6319N/A id2subtreeTree.put(parentID.getDatabaseEntry().getData(), idSet);
6319N/A }
6319N/A else
6319N/A {
6319N/A idSet = id2subtreeTree.get(parentID.getDatabaseEntry().getData());
6319N/A }
6319N/A idSet.addEntryID(childID);
6319N/A // TODO:
6319N/A // Instead of doing this,
6319N/A // we can just walk to parent cache if available
6319N/A for (ByteBuffer dn = getParent(parentDN); dn != null; dn =
6319N/A getParent(dn))
6319N/A {
6319N/A EntryID nodeID = getParentID(dn);
6319N/A if (nodeID == null)
6319N/A {
6319N/A // We have a missing parent. Maybe parent checking was turned off?
6319N/A // Just ignore.
6319N/A break;
6319N/A }
6319N/A if (!id2subtreeTree
6319N/A .containsKey(nodeID.getDatabaseEntry().getData()))
6319N/A {
6319N/A idSet = new ImportIDSet(1, subTreeLimit, subTreeDoCount);
6319N/A id2subtreeTree.put(nodeID.getDatabaseEntry().getData(), idSet);
6319N/A }
6319N/A else
6319N/A {
6319N/A idSet = id2subtreeTree.get(nodeID.getDatabaseEntry().getData());
6319N/A }
6319N/A idSet.addEntryID(childID);
6319N/A }
6319N/A if (id2subtreeTree.size() > DN_STATE_CACHE_SIZE)
6319N/A {
6319N/A flushMapToDB(id2subtreeTree, entryContainer.getID2Subtree(), true);
6319N/A }
4591N/A }
4591N/A else
4591N/A {
6319N/A throw new DirectoryException(ResultCode.CONSTRAINT_VIOLATION,
6319N/A ERR_PARENT_ENTRY_IS_MISSING.get());
4736N/A }
4591N/A }
4643N/A
6319N/A public void writeToDB() throws DirectoryException
4591N/A {
5195N/A entryContainer.getDN2ID().put(null, dnKey, dnValue);
4736N/A indexMgr.addTotDNCount(1);
6032N/A if (parentDN != null)
4736N/A {
4736N/A id2child(entryID);
4736N/A id2SubTree(entryID);
4736N/A }
4591N/A }
4591N/A
4736N/A private void flushMapToDB(Map<byte[], ImportIDSet> map, Index index,
6032N/A boolean clearMap)
4736N/A {
6032N/A for (Map.Entry<byte[], ImportIDSet> e : map.entrySet())
4736N/A {
4736N/A byte[] key = e.getKey();
4736N/A ImportIDSet idSet = e.getValue();
5195N/A dnKey.setData(key);
5195N/A index.insert(dnKey, idSet, dnValue);
4736N/A }
4736N/A index.closeCursor();
6032N/A if (clearMap)
4736N/A {
6032N/A map.clear();
4736N/A }
4736N/A }
4591N/A
5207N/A public void flush()
4643N/A {
4736N/A flushMapToDB(id2childTree, entryContainer.getID2Children(), false);
6032N/A flushMapToDB(id2subtreeTree, entryContainer.getID2Subtree(), false);
4643N/A }
4591N/A }
4591N/A }
4591N/A
4591N/A /**
6032N/A * This task writes the temporary scratch index files using the sorted buffers
6032N/A * read from a blocking queue private to each index.
4591N/A */
4963N/A private final class ScratchFileWriterTask implements Callable<Void>
4591N/A {
4963N/A private final int DRAIN_TO = 3;
4591N/A private final IndexManager indexMgr;
5195N/A private final BlockingQueue<IndexOutputBuffer> queue;
5201N/A private final ByteArrayOutputStream insertByteStream =
6032N/A new ByteArrayOutputStream(2 * bufferSize);
4643N/A private final ByteArrayOutputStream deleteByteStream =
6032N/A new ByteArrayOutputStream(2 * bufferSize);
5201N/A private final DataOutputStream bufferStream;
5201N/A private final DataOutputStream bufferIndexStream;
4963N/A private final byte[] tmpArray = new byte[8];
4963N/A private int insertKeyCount = 0, deleteKeyCount = 0;
5195N/A private int bufferCount = 0;
5195N/A private final SortedSet<IndexOutputBuffer> indexSortedSet;
4591N/A private boolean poisonSeen = false;
4963N/A
5195N/A public ScratchFileWriterTask(BlockingQueue<IndexOutputBuffer> queue,
6032N/A IndexManager indexMgr) throws FileNotFoundException
4591N/A {
4649N/A this.queue = queue;
4591N/A this.indexMgr = indexMgr;
6032N/A this.bufferStream =
6032N/A new DataOutputStream(new BufferedOutputStream(new FileOutputStream(
6032N/A indexMgr.getBufferFile()), READER_WRITER_BUFFER_SIZE));
6032N/A this.bufferIndexStream =
6032N/A new DataOutputStream(new BufferedOutputStream(new FileOutputStream(
6032N/A indexMgr.getBufferIndexFile()), READER_WRITER_BUFFER_SIZE));
5201N/A this.indexSortedSet = new TreeSet<IndexOutputBuffer>();
4591N/A }
4591N/A
4963N/A /**
4963N/A * {@inheritDoc}
4963N/A */
5707N/A @Override
5837N/A public Void call() throws IOException, InterruptedException
4591N/A {
4591N/A long offset = 0;
5195N/A List<IndexOutputBuffer> l = new LinkedList<IndexOutputBuffer>();
6032N/A try
6032N/A {
6032N/A while (true)
4591N/A {
5837N/A final IndexOutputBuffer indexBuffer = queue.take();
5837N/A long beginOffset = offset;
5837N/A long bufferLen;
6032N/A if (!queue.isEmpty())
4591N/A {
5837N/A queue.drainTo(l, DRAIN_TO);
5837N/A l.add(indexBuffer);
5837N/A bufferLen = writeIndexBuffers(l);
6032N/A for (IndexOutputBuffer id : l)
4591N/A {
6032N/A if (!id.isDiscard())
4591N/A {
5837N/A id.reset();
5837N/A freeBufferQueue.add(id);
4963N/A }
4644N/A }
5837N/A l.clear();
5837N/A }
5837N/A else
5837N/A {
6032N/A if (indexBuffer.isPoison())
4643N/A {
4643N/A break;
4643N/A }
5837N/A bufferLen = writeIndexBuffer(indexBuffer);
6032N/A if (!indexBuffer.isDiscard())
5837N/A {
5837N/A indexBuffer.reset();
5837N/A freeBufferQueue.add(indexBuffer);
5837N/A }
5837N/A }
5837N/A offset += bufferLen;
5837N/A
5837N/A // Write buffer index information.
5837N/A bufferIndexStream.writeLong(beginOffset);
5837N/A bufferIndexStream.writeLong(offset);
5837N/A
5837N/A bufferCount++;
5837N/A Importer.this.bufferCount.incrementAndGet();
5837N/A
6032N/A if (poisonSeen)
5837N/A {
5837N/A break;
4591N/A }
4591N/A }
4591N/A }
5179N/A catch (IOException e)
4660N/A {
6032N/A Message message =
6032N/A ERR_JEB_IMPORT_LDIF_INDEX_FILEWRITER_ERR.get(indexMgr
6032N/A .getBufferFile().getAbsolutePath(), e.getMessage());
4649N/A logError(message);
5207N/A isCanceled = true;
4963N/A throw e;
4591N/A }
4963N/A finally
4963N/A {
5201N/A bufferStream.close();
5201N/A bufferIndexStream.close();
5201N/A indexMgr.setBufferInfo(bufferCount, indexMgr.getBufferFile().length());
4963N/A }
4963N/A return null;
4591N/A }
4591N/A
5195N/A private long writeIndexBuffer(IndexOutputBuffer indexBuffer)
6032N/A throws IOException
4591N/A {
4649N/A int numberKeys = indexBuffer.getNumberKeys();
4649N/A indexBuffer.setPosition(-1);
4649N/A long bufferLen = 0;
6032N/A insertByteStream.reset();
6032N/A insertKeyCount = 0;
6032N/A deleteByteStream.reset();
6032N/A deleteKeyCount = 0;
6032N/A for (int i = 0; i < numberKeys; i++)
4591N/A {
6032N/A if (indexBuffer.getPosition() == -1)
4591N/A {
4649N/A indexBuffer.setPosition(i);
6032N/A if (indexBuffer.isInsert(i))
4643N/A {
5201N/A indexBuffer.writeID(insertByteStream, i);
4963N/A insertKeyCount++;
4643N/A }
4643N/A else
4643N/A {
4963N/A indexBuffer.writeID(deleteByteStream, i);
4963N/A deleteKeyCount++;
4643N/A }
4591N/A continue;
4591N/A }
6032N/A if (!indexBuffer.compare(i))
4591N/A {
4963N/A bufferLen += writeRecord(indexBuffer);
4649N/A indexBuffer.setPosition(i);
6032N/A insertByteStream.reset();
6032N/A insertKeyCount = 0;
6032N/A deleteByteStream.reset();
6032N/A deleteKeyCount = 0;
4591N/A }
6032N/A if (indexBuffer.isInsert(i))
4643N/A {
6032N/A if (insertKeyCount++ <= indexMgr.getLimit())
4963N/A {
5201N/A indexBuffer.writeID(insertByteStream, i);
4963N/A }
4643N/A }
4643N/A else
4643N/A {
4963N/A indexBuffer.writeID(deleteByteStream, i);
4963N/A deleteKeyCount++;
4643N/A }
4591N/A }
6032N/A if (indexBuffer.getPosition() != -1)
4591N/A {
4963N/A bufferLen += writeRecord(indexBuffer);
4591N/A }
4649N/A return bufferLen;
4591N/A }
4591N/A
5195N/A private long writeIndexBuffers(List<IndexOutputBuffer> buffers)
6032N/A throws IOException
4591N/A {
4591N/A long id = 0;
4649N/A long bufferLen = 0;
6032N/A insertByteStream.reset();
6032N/A insertKeyCount = 0;
6032N/A deleteByteStream.reset();
6032N/A deleteKeyCount = 0;
6032N/A for (IndexOutputBuffer b : buffers)
4591N/A {
6032N/A if (b.isPoison())
4591N/A {
4591N/A poisonSeen = true;
4591N/A }
4591N/A else
4591N/A {
4649N/A b.setPosition(0);
4591N/A b.setID(id++);
4591N/A indexSortedSet.add(b);
4591N/A }
4591N/A }
4591N/A byte[] saveKey = null;
4643N/A int saveIndexID = 0;
6032N/A while (!indexSortedSet.isEmpty())
4591N/A {
5195N/A IndexOutputBuffer b = indexSortedSet.first();
4591N/A indexSortedSet.remove(b);
6032N/A if (saveKey == null)
4591N/A {
6032N/A saveKey = b.getKey();
4643N/A saveIndexID = b.getIndexID();
6032N/A if (b.isInsert(b.getPosition()))
4643N/A {
5201N/A b.writeID(insertByteStream, b.getPosition());
4963N/A insertKeyCount++;
4643N/A }
4643N/A else
4643N/A {
4963N/A b.writeID(deleteByteStream, b.getPosition());
4963N/A deleteKeyCount++;
4643N/A }
4591N/A }
4591N/A else
4591N/A {
6032N/A if (!b.compare(saveKey, saveIndexID))
4591N/A {
4963N/A bufferLen += writeRecord(saveKey, saveIndexID);
5201N/A insertByteStream.reset();
4643N/A deleteByteStream.reset();
4963N/A insertKeyCount = 0;
4963N/A deleteKeyCount = 0;
4963N/A saveKey = b.getKey();
6032N/A saveIndexID = b.getIndexID();
6032N/A if (b.isInsert(b.getPosition()))
4643N/A {
5201N/A b.writeID(insertByteStream, b.getPosition());
4963N/A insertKeyCount++;
4643N/A }
4643N/A else
4643N/A {
4963N/A b.writeID(deleteByteStream, b.getPosition());
4963N/A deleteKeyCount++;
4643N/A }
4591N/A }
4591N/A else
4591N/A {
6032N/A if (b.isInsert(b.getPosition()))
4643N/A {
6032N/A if (insertKeyCount++ <= indexMgr.getLimit())
4963N/A {
5201N/A b.writeID(insertByteStream, b.getPosition());
4963N/A }
4643N/A }
4643N/A else
4643N/A {
4963N/A b.writeID(deleteByteStream, b.getPosition());
4963N/A deleteKeyCount++;
4643N/A }
4591N/A }
4591N/A }
6032N/A if (b.hasMoreData())
4591N/A {
4591N/A b.getNextRecord();
4591N/A indexSortedSet.add(b);
4591N/A }
4591N/A }
6032N/A if (saveKey != null)
4591N/A {
4963N/A bufferLen += writeRecord(saveKey, saveIndexID);
4591N/A }
4649N/A return bufferLen;
4591N/A }
4963N/A
4963N/A private int writeByteStreams() throws IOException
4963N/A {
6032N/A if (insertKeyCount > indexMgr.getLimit())
4963N/A {
4963N/A insertKeyCount = 1;
5201N/A insertByteStream.reset();
4963N/A PackedInteger.writeInt(tmpArray, 0, -1);
5201N/A insertByteStream.write(tmpArray, 0, 1);
4963N/A }
4963N/A int insertSize = PackedInteger.getWriteIntLength(insertKeyCount);
4963N/A PackedInteger.writeInt(tmpArray, 0, insertKeyCount);
5201N/A bufferStream.write(tmpArray, 0, insertSize);
6032N/A if (insertByteStream.size() > 0)
4963N/A {
5201N/A insertByteStream.writeTo(bufferStream);
4963N/A }
4963N/A int deleteSize = PackedInteger.getWriteIntLength(deleteKeyCount);
4963N/A PackedInteger.writeInt(tmpArray, 0, deleteKeyCount);
5201N/A bufferStream.write(tmpArray, 0, deleteSize);
6032N/A if (deleteByteStream.size() > 0)
4963N/A {
5201N/A deleteByteStream.writeTo(bufferStream);
4963N/A }
4963N/A return insertSize + deleteSize;
4963N/A }
4963N/A
4963N/A private int writeHeader(int indexID, int keySize) throws IOException
4963N/A {
5201N/A bufferStream.writeInt(indexID);
4963N/A int packedSize = PackedInteger.getWriteIntLength(keySize);
4963N/A PackedInteger.writeInt(tmpArray, 0, keySize);
5201N/A bufferStream.write(tmpArray, 0, packedSize);
4963N/A return packedSize;
4963N/A }
4963N/A
5195N/A private int writeRecord(IndexOutputBuffer b) throws IOException
4963N/A {
4963N/A int keySize = b.getKeySize();
4963N/A int packedSize = writeHeader(b.getIndexID(), keySize);
5201N/A b.writeKey(bufferStream);
4963N/A packedSize += writeByteStreams();
6032N/A return (packedSize + keySize + insertByteStream.size()
6032N/A + deleteByteStream.size() + 4);
4963N/A }
4963N/A
4963N/A private int writeRecord(byte[] k, int indexID) throws IOException
4963N/A {
4963N/A int packedSize = writeHeader(indexID, k.length);
5201N/A bufferStream.write(k);
4963N/A packedSize += writeByteStreams();
6032N/A return (packedSize + k.length + insertByteStream.size()
6032N/A + deleteByteStream.size() + 4);
4963N/A }
4591N/A }
4591N/A
4591N/A /**
6032N/A * This task main function is to sort the index buffers given to it from the
6032N/A * import tasks reading the LDIF file. It will also create a index file writer
6032N/A * task and corresponding queue if needed. The sorted index buffers are put on
6032N/A * the index file writer queues for writing to a temporary file.
4591N/A */
4591N/A private final class SortTask implements Callable<Void>
4591N/A {
4591N/A
5195N/A private final IndexOutputBuffer indexBuffer;
5195N/A
5195N/A public SortTask(IndexOutputBuffer indexBuffer)
4591N/A {
4591N/A this.indexBuffer = indexBuffer;
4591N/A }
4591N/A
4591N/A /**
4591N/A * {@inheritDoc}
4591N/A */
5707N/A @Override
4591N/A public Void call() throws Exception
4591N/A {
5195N/A if (importConfiguration != null && importConfiguration.isCancelled()
5207N/A || isCanceled)
4591N/A {
5207N/A isCanceled = true;
4591N/A return null;
4591N/A }
4591N/A indexBuffer.sort();
5195N/A if (indexKeyQueMap.containsKey(indexBuffer.getIndexKey()))
5195N/A {
6032N/A BlockingQueue<IndexOutputBuffer> q =
6032N/A indexKeyQueMap.get(indexBuffer.getIndexKey());
4591N/A q.add(indexBuffer);
4591N/A }
4591N/A else
4591N/A {
4643N/A createIndexWriterTask(indexBuffer.getIndexKey());
6032N/A BlockingQueue<IndexOutputBuffer> q =
6032N/A indexKeyQueMap.get(indexBuffer.getIndexKey());
4591N/A q.add(indexBuffer);
4591N/A }
4591N/A return null;
4591N/A }
4591N/A
4643N/A private void createIndexWriterTask(IndexKey indexKey)
6032N/A throws FileNotFoundException
4591N/A {
4649N/A boolean isDN = false;
6032N/A synchronized (synObj)
4643N/A {
6032N/A if (indexKeyQueMap.containsKey(indexKey))
4591N/A {
4591N/A return;
4591N/A }
6032N/A if (indexKey.getIndexType().equals(ImportIndexType.DN))
4591N/A {
4649N/A isDN = true;
4591N/A }
6032N/A IndexManager indexMgr =
6032N/A new IndexManager(indexKey.getName(), isDN,
6032N/A indexKey.getEntryLimit());
6032N/A if (isDN)
4963N/A {
4963N/A DNIndexMgrList.add(indexMgr);
4963N/A }
4963N/A else
4963N/A {
4963N/A indexMgrList.add(indexMgr);
4963N/A }
5195N/A BlockingQueue<IndexOutputBuffer> newQue =
6032N/A new ArrayBlockingQueue<IndexOutputBuffer>(phaseOneBufferCount);
4963N/A ScratchFileWriterTask indexWriter =
6032N/A new ScratchFileWriterTask(newQue, indexMgr);
4963N/A scratchFileWriterList.add(indexWriter);
6032N/A scratchFileWriterFutures.add(scratchFileWriterService
6032N/A .submit(indexWriter));
4643N/A indexKeyQueMap.put(indexKey, newQue);
4591N/A }
4591N/A }
4591N/A }
4591N/A
4591N/A /**
6032N/A * The index manager class has several functions: 1. It used to carry
6032N/A * information about index processing created in phase one to phase two. 2. It
6032N/A * collects statistics about phase two processing for each index. 3. It
6032N/A * manages opening and closing the scratch index files.
4591N/A */
5195N/A final class IndexManager implements Comparable<IndexManager>
4591N/A {
5201N/A private final File bufferFile;
5201N/A private final String bufferFileName;
5201N/A private final File bufferIndexFile;
5201N/A private final String bufferIndexFileName;
5201N/A private long bufferFileSize;
4591N/A private long totalDNS;
4649N/A private final boolean isDN;
4963N/A private final int limit;
5201N/A private int numberOfBuffers = 0;
5201N/A private volatile IndexDBWriteTask writer = null;
5195N/A
5195N/A private IndexManager(String fileName, boolean isDN, int limit)
4591N/A {
5201N/A this.bufferFileName = fileName;
5201N/A this.bufferIndexFileName = fileName + ".index";
5201N/A
5201N/A this.bufferFile = new File(tempDir, bufferFileName);
5201N/A this.bufferIndexFile = new File(tempDir, bufferIndexFileName);
5201N/A
4649N/A this.isDN = isDN;
5669N/A if (limit > 0)
5669N/A {
5669N/A this.limit = limit;
5669N/A }
5669N/A else
5669N/A {
5669N/A this.limit = Integer.MAX_VALUE;
5669N/A }
4591N/A }
4591N/A
5201N/A private void setIndexDBWriteTask(IndexDBWriteTask writer)
4591N/A {
5201N/A this.writer = writer;
4591N/A }
4591N/A
5201N/A private File getBufferFile()
4591N/A {
5201N/A return bufferFile;
4591N/A }
4591N/A
5201N/A private long getBufferFileSize()
4591N/A {
5201N/A return bufferFileSize;
4591N/A }
4591N/A
5201N/A private File getBufferIndexFile()
4591N/A {
5201N/A return bufferIndexFile;
4591N/A }
4591N/A
5201N/A private void setBufferInfo(int numberOfBuffers, long bufferFileSize)
4591N/A {
5201N/A this.numberOfBuffers = numberOfBuffers;
5201N/A this.bufferFileSize = bufferFileSize;
4591N/A }
4591N/A
5195N/A /**
5195N/A * Updates the bytes read counter.
5195N/A *
5195N/A * @param bytesRead
5195N/A * The number of bytes read.
5195N/A */
5195N/A void addBytesRead(int bytesRead)
4591N/A {
5201N/A if (writer != null)
5201N/A {
5201N/A writer.addBytesRead(bytesRead);
5201N/A }
4963N/A }
4963N/A
5195N/A private void addTotDNCount(int delta)
4591N/A {
5201N/A totalDNS += delta;
4591N/A }
4591N/A
5195N/A private long getDNCount()
4591N/A {
4591N/A return totalDNS;
4591N/A }
4591N/A
5195N/A private boolean isDN2ID()
4643N/A {
4649N/A return isDN;
4643N/A }
4591N/A
5195N/A private void printStats(long deltaTime)
4591N/A {
5201N/A if (writer != null)
4591N/A {
5201N/A writer.printStats(deltaTime);
4591N/A }
4591N/A }
4591N/A
5195N/A /**
5195N/A * Returns the file name associated with this index manager.
5195N/A *
5195N/A * @return The file name associated with this index manager.
5195N/A */
5201N/A String getBufferFileName()
4643N/A {
5201N/A return bufferFileName;
4643N/A }
4963N/A
5195N/A private int getLimit()
4963N/A {
4963N/A return limit;
4963N/A }
4966N/A
5195N/A /**
5195N/A * {@inheritDoc}
5195N/A */
5707N/A @Override
4966N/A public int compareTo(IndexManager mgr)
4966N/A {
5201N/A return numberOfBuffers - mgr.numberOfBuffers;
5201N/A }
5201N/A
5201N/A private int getNumberOfBuffers()
5201N/A {
5201N/A return numberOfBuffers;
4966N/A }
4591N/A }
4591N/A
4765N/A /**
4963N/A * The rebuild index manager handles all rebuild index related processing.
4765N/A */
6032N/A private class RebuildIndexManager extends ImportTask implements
6032N/A DiskSpaceMonitorHandler
5207N/A {
4963N/A
6032N/A //Rebuild index configuration.
6032N/A private final RebuildConfig rebuildConfig;
6032N/A
6032N/A //Local DB backend configuration.
6032N/A private final LocalDBBackendCfg cfg;
6032N/A
6032N/A //Map of index keys to indexes.
6032N/A private final Map<IndexKey, Index> indexMap =
6032N/A new LinkedHashMap<IndexKey, Index>();
6032N/A
6032N/A //Map of index keys to extensible indexes.
6032N/A private final Map<IndexKey, Collection<Index>> extensibleIndexMap =
6032N/A new LinkedHashMap<IndexKey, Collection<Index>>();
6032N/A
6032N/A //List of VLV indexes.
6032N/A private final List<VLVIndex> vlvIndexes = new LinkedList<VLVIndex>();
6032N/A
6032N/A //The DN2ID index.
6032N/A private DN2ID dn2id = null;
6032N/A
6032N/A //The DN2URI index.
6032N/A private DN2URI dn2uri = null;
6032N/A
6032N/A //Total entries to be processed.
6032N/A private long totalEntries = 0;
6032N/A
6032N/A //Total entries processed.
6032N/A private final AtomicLong entriesProcessed = new AtomicLong(0);
6032N/A
6032N/A //The suffix instance.
6032N/A private Suffix suffix = null;
6032N/A
6032N/A //The entry container.
6032N/A private EntryContainer entryContainer;
4765N/A
4765N/A /**
4765N/A * Create an instance of the rebuild index manager using the specified
4765N/A * parameters.
4765N/A *
6032N/A * @param rebuildConfig
6032N/A * The rebuild configuration to use.
6032N/A * @param cfg
6032N/A * The local DB configuration to use.
4765N/A */
4963N/A public RebuildIndexManager(RebuildConfig rebuildConfig,
6032N/A LocalDBBackendCfg cfg)
4765N/A {
4765N/A this.rebuildConfig = rebuildConfig;
4765N/A this.cfg = cfg;
4765N/A }
4765N/A
4765N/A /**
4963N/A * Initialize a rebuild index manager.
4765N/A *
6032N/A * @throws ConfigException
6032N/A * If an configuration error occurred.
6032N/A * @throws InitializationException
6032N/A * If an initialization error occurred.
4765N/A */
4765N/A public void initialize() throws ConfigException, InitializationException
4765N/A {
4963N/A entryContainer =
6032N/A rootContainer.getEntryContainer(rebuildConfig.getBaseDN());
4963N/A suffix = Suffix.createSuffixContext(entryContainer, null, null, null);
6032N/A if (suffix == null)
4765N/A {
6032N/A Message msg =
6032N/A ERR_JEB_REBUILD_SUFFIX_ERROR.get(rebuildConfig.getBaseDN()
6032N/A .toString());
4765N/A throw new InitializationException(msg);
4765N/A }
4765N/A }
4765N/A
4765N/A /**
4765N/A * Print start message.
4765N/A *
6032N/A * @throws DatabaseException
6032N/A * If an database error occurred.
4765N/A */
4765N/A public void printStartMessage() throws DatabaseException
4765N/A {
4765N/A StringBuilder sb = new StringBuilder();
4765N/A List<String> rebuildList = rebuildConfig.getRebuildList();
6032N/A for (String index : rebuildList)
4765N/A {
6032N/A if (sb.length() > 0)
4765N/A {
4765N/A sb.append(", ");
4765N/A }
4765N/A sb.append(index);
4765N/A }
4765N/A totalEntries = suffix.getID2Entry().getRecordCount();
5720N/A
6075N/A Message message = null;
6032N/A switch (rebuildConfig.getRebuildMode())
6032N/A {
5720N/A case ALL:
4765N/A message = NOTE_JEB_REBUILD_ALL_START.get(totalEntries);
5720N/A break;
5720N/A case DEGRADED:
5720N/A message = NOTE_JEB_REBUILD_DEGRADED_START.get(totalEntries);
5720N/A break;
5720N/A default:
6075N/A if (!rebuildConfig.isClearDegradedState())
6075N/A {
6075N/A message = NOTE_JEB_REBUILD_START.get(sb.toString(), totalEntries);
6075N/A }
5720N/A break;
4765N/A }
6114N/A if ( message != null )
6114N/A {
6075N/A logError(message);
6075N/A }
4765N/A }
4765N/A
4765N/A /**
4765N/A * Print stop message.
4765N/A *
6032N/A * @param startTime
6032N/A * The time the rebuild started.
4765N/A */
4765N/A public void printStopMessage(long startTime)
4963N/A {
4765N/A long finishTime = System.currentTimeMillis();
4765N/A long totalTime = (finishTime - startTime);
4765N/A float rate = 0;
4765N/A if (totalTime > 0)
4765N/A {
6032N/A rate = 1000f * entriesProcessed.get() / totalTime;
4765N/A }
6075N/A
6075N/A if (!rebuildConfig.isClearDegradedState())
6075N/A {
6075N/A Message message =
6075N/A NOTE_JEB_REBUILD_FINAL_STATUS.get(entriesProcessed.get(),
6075N/A totalTime / 1000, rate);
6075N/A logError(message);
6075N/A }
4963N/A }
4765N/A
4765N/A /**
4765N/A * {@inheritDoc}
4765N/A */
5707N/A @Override
4765N/A public Void call() throws Exception
4765N/A {
4963N/A ID2Entry id2entry = entryContainer.getID2Entry();
5665N/A DiskOrderedCursor cursor =
5665N/A id2entry.openCursor(DiskOrderedCursorConfig.DEFAULT);
4765N/A DatabaseEntry key = new DatabaseEntry();
4765N/A DatabaseEntry data = new DatabaseEntry();
6032N/A try
6032N/A {
5665N/A while (cursor.getNext(key, data, null) == OperationStatus.SUCCESS)
4963N/A {
6032N/A if (isCanceled)
4963N/A {
4963N/A return null;
4963N/A }
4963N/A EntryID entryID = new EntryID(key);
6032N/A Entry entry =
6032N/A ID2Entry.entryFromDatabase(ByteString.wrap(data.getData()),
4963N/A entryContainer.getRootContainer().getCompressedSchema());
4963N/A processEntry(entry, entryID);
4963N/A entriesProcessed.getAndIncrement();
4963N/A }
4963N/A flushIndexBuffers();
4963N/A cursor.close();
4963N/A }
4963N/A catch (Exception e)
4765N/A {
4963N/A Message message =
6032N/A ERR_JEB_IMPORT_LDIF_REBUILD_INDEX_TASK_ERR.get(e.getMessage());
4963N/A logError(message);
5207N/A isCanceled = true;
4963N/A throw e;
4765N/A }
5207N/A finally
5207N/A {
5207N/A cursor.close();
5207N/A }
4765N/A return null;
4963N/A }
4963N/A
4765N/A /**
4963N/A * Perform rebuild index processing.
4765N/A *
5195N/A * @throws DatabaseException
5195N/A * If an database error occurred.
5195N/A * @throws InterruptedException
5195N/A * If an interrupted error occurred.
5195N/A * @throws ExecutionException
6032N/A * If an Execution error occurred.
5195N/A * @throws JebException
5195N/A * If an JEB error occurred.
4765N/A */
6032N/A public void rebuildIndexes() throws DatabaseException,
6032N/A InterruptedException, ExecutionException, JebException
4963N/A {
6032N/A // Sets only the needed indexes.
6032N/A setIndexesListsToBeRebuilt();
6032N/A
6032N/A if (!rebuildConfig.isClearDegradedState())
6032N/A {
6032N/A // If not in a 'clear degraded state' operation,
6032N/A // need to rebuild the indexes.
6032N/A setRebuildListIndexesTrusted(false);
6032N/A clearIndexes(true);
6032N/A phaseOne();
6032N/A if (isCanceled)
6032N/A {
6032N/A throw new InterruptedException("Rebuild Index canceled.");
6032N/A }
6032N/A phaseTwo();
6032N/A }
6075N/A else
6075N/A {
6075N/A Message message =
6114N/A NOTE_JEB_REBUILD_CLEARDEGRADEDSTATE_FINAL_STATUS.get(rebuildConfig
6114N/A .getRebuildList().toString());
6075N/A logError(message);
6075N/A }
6032N/A
6032N/A setRebuildListIndexesTrusted(true);
6032N/A }
6032N/A
6032N/A @SuppressWarnings("fallthrough")
6032N/A private void setIndexesListsToBeRebuilt() throws JebException
6032N/A {
6032N/A // Depends on rebuild mode, (re)building indexes' lists.
6032N/A final RebuildMode mode = rebuildConfig.getRebuildMode();
6032N/A switch (mode)
5207N/A {
5720N/A case ALL:
6032N/A rebuildIndexMap(false);
6032N/A // falls through
6032N/A case DEGRADED:
6032N/A if ((mode == RebuildMode.ALL)
6032N/A || (!entryContainer.getID2Children().isTrusted() || !entryContainer
6032N/A .getID2Subtree().isTrusted()))
6032N/A {
6032N/A dn2id = entryContainer.getDN2ID();
6032N/A }
6032N/A if ((mode == RebuildMode.ALL) || entryContainer.getDN2URI() == null)
6032N/A {
6032N/A dn2uri = entryContainer.getDN2URI();
6032N/A }
6032N/A if ((mode == RebuildMode.DEGRADED)
6032N/A || entryContainer.getAttributeIndexes().isEmpty())
6032N/A {
6032N/A rebuildIndexMap(true); // only degraded.
6032N/A }
6032N/A if ((mode == RebuildMode.ALL) || vlvIndexes.isEmpty())
6032N/A {
6032N/A vlvIndexes.addAll(new LinkedList<VLVIndex>(entryContainer
6032N/A .getVLVIndexes()));
6032N/A }
5720N/A break;
6032N/A
6032N/A case USER_DEFINED:
6032N/A // false may be required if the user wants to rebuild specific index.
6032N/A rebuildIndexMap(false);
5720N/A break;
5720N/A default:
5720N/A break;
4963N/A }
4963N/A }
4963N/A
6032N/A private void rebuildIndexMap(final boolean onlyDegraded)
6032N/A {
6032N/A // rebuildList contains the user-selected index(in USER_DEFINED mode).
6032N/A final List<String> rebuildList = rebuildConfig.getRebuildList();
6032N/A for (final Map.Entry<AttributeType, AttributeIndex> mapEntry : suffix
6032N/A .getAttrIndexMap().entrySet())
6032N/A {
6032N/A final AttributeType attributeType = mapEntry.getKey();
6032N/A final AttributeIndex attributeIndex = mapEntry.getValue();
6032N/A if (rebuildConfig.getRebuildMode() == RebuildMode.ALL
6032N/A || rebuildConfig.getRebuildMode() == RebuildMode.DEGRADED)
6032N/A {
6032N/A // Get all existing indexes for all && degraded mode.
6032N/A rebuildAttributeIndexes(attributeIndex, attributeType, onlyDegraded);
6032N/A }
6032N/A else
6032N/A {
6032N/A // Get indexes for user defined index.
6032N/A if (!rebuildList.isEmpty())
6032N/A {
6032N/A for (final String index : rebuildList)
6032N/A {
6032N/A if (attributeType.getNameOrOID().toLowerCase().equals(
6032N/A index.toLowerCase()))
6032N/A {
6032N/A rebuildAttributeIndexes(attributeIndex, attributeType,
6032N/A onlyDegraded);
6032N/A }
6032N/A }
6032N/A }
6032N/A }
6032N/A }
6032N/A }
6032N/A
6032N/A private void rebuildAttributeIndexes(final AttributeIndex attrIndex,
6032N/A final AttributeType attrType, final boolean onlyDegraded)
6032N/A throws DatabaseException
6032N/A {
6032N/A if (attrIndex.getSubstringIndex() != null)
6032N/A {
6032N/A fillIndexMap(attrType, attrIndex.getSubstringIndex(),
6032N/A ImportIndexType.SUBSTRING, onlyDegraded);
6032N/A }
6032N/A if (attrIndex.getOrderingIndex() != null)
6032N/A {
6032N/A fillIndexMap(attrType, attrIndex.getOrderingIndex(),
6032N/A ImportIndexType.ORDERING, onlyDegraded);
6032N/A }
6032N/A if (attrIndex.getEqualityIndex() != null)
6032N/A {
6032N/A fillIndexMap(attrType, attrIndex.getEqualityIndex(),
6032N/A ImportIndexType.EQUALITY, onlyDegraded);
6032N/A }
6032N/A if (attrIndex.getPresenceIndex() != null)
6032N/A {
6032N/A fillIndexMap(attrType, attrIndex.getPresenceIndex(),
6032N/A ImportIndexType.PRESENCE, onlyDegraded);
6032N/A }
6032N/A if (attrIndex.getApproximateIndex() != null)
6032N/A {
6032N/A fillIndexMap(attrType, attrIndex.getApproximateIndex(),
6032N/A ImportIndexType.APPROXIMATE, onlyDegraded);
6032N/A }
6032N/A final Map<String, Collection<Index>> extensibleMap =
6032N/A attrIndex.getExtensibleIndexes();
6032N/A if (!extensibleMap.isEmpty())
6032N/A {
6032N/A final Collection<Index> subIndexes =
6032N/A attrIndex.getExtensibleIndexes().get(
6032N/A EXTENSIBLE_INDEXER_ID_SUBSTRING);
6032N/A if (subIndexes != null && !subIndexes.isEmpty())
6032N/A {
6032N/A final List<Index> mutableCopy = new LinkedList<Index>(subIndexes);
6032N/A final Iterator<Index> i = mutableCopy.iterator();
6032N/A while (i.hasNext())
6032N/A {
6032N/A final Index subIndex = i.next();
6032N/A if (!onlyDegraded || !subIndex.isTrusted())
6032N/A {
6032N/A if ((rebuildConfig.isClearDegradedState() && subIndex
6032N/A .getRecordCount() == 0)
6032N/A || !rebuildConfig.isClearDegradedState())
6032N/A {
6032N/A int id = System.identityHashCode(subIndex);
6032N/A idContainerMap.putIfAbsent(id, subIndex);
6032N/A }
6032N/A }
6032N/A else
6032N/A {
6032N/A // This index is not a candidate for rebuilding.
6032N/A i.remove();
6032N/A }
6032N/A }
6032N/A if (!mutableCopy.isEmpty())
6032N/A {
6032N/A extensibleIndexMap.put(new IndexKey(attrType,
6032N/A ImportIndexType.EX_SUBSTRING, 0), mutableCopy);
6032N/A }
6032N/A }
6032N/A final Collection<Index> sharedIndexes =
6032N/A attrIndex.getExtensibleIndexes().get(EXTENSIBLE_INDEXER_ID_SHARED);
6032N/A if (sharedIndexes != null && !sharedIndexes.isEmpty())
6032N/A {
6032N/A final List<Index> mutableCopy = new LinkedList<Index>(sharedIndexes);
6032N/A final Iterator<Index> i = mutableCopy.iterator();
6032N/A while (i.hasNext())
6032N/A {
6032N/A final Index sharedIndex = i.next();
6032N/A if (!onlyDegraded || !sharedIndex.isTrusted())
6032N/A {
6032N/A if ((rebuildConfig.isClearDegradedState() && sharedIndex
6032N/A .getRecordCount() == 0)
6032N/A || !rebuildConfig.isClearDegradedState())
6032N/A {
6032N/A int id = System.identityHashCode(sharedIndex);
6032N/A idContainerMap.putIfAbsent(id, sharedIndex);
6032N/A }
6032N/A }
6032N/A else
6032N/A {
6032N/A // This index is not a candidate for rebuilding.
6032N/A i.remove();
6032N/A }
6032N/A }
6032N/A if (!mutableCopy.isEmpty())
6032N/A {
6032N/A extensibleIndexMap.put(new IndexKey(attrType,
6032N/A ImportIndexType.EX_SHARED, 0), mutableCopy);
6032N/A }
6032N/A }
6032N/A }
6032N/A }
6032N/A
6032N/A private void fillIndexMap(final AttributeType attrType,
6032N/A final Index partialAttrIndex, final ImportIndexType importIndexType,
6032N/A final boolean onlyDegraded)
6032N/A {
6032N/A if ((!onlyDegraded || !partialAttrIndex.isTrusted()))
6032N/A {
6032N/A if ((rebuildConfig.isClearDegradedState() && partialAttrIndex
6032N/A .getRecordCount() == 0)
6032N/A || !rebuildConfig.isClearDegradedState())
6032N/A {
6032N/A final int id = System.identityHashCode(partialAttrIndex);
6032N/A idContainerMap.putIfAbsent(id, partialAttrIndex);
6032N/A final IndexKey indexKey =
6032N/A new IndexKey(attrType, importIndexType, partialAttrIndex
6032N/A .getIndexEntryLimit());
6032N/A indexMap.put(indexKey, partialAttrIndex);
6032N/A }
6032N/A }
6032N/A }
6032N/A
6032N/A private void clearIndexes(boolean onlyDegraded) throws DatabaseException
6032N/A {
6032N/A // Clears all the entry's container databases
6032N/A // which are containing the indexes.
6032N/A
6032N/A if (!onlyDegraded)
6032N/A {
6032N/A // dn2uri does not have a trusted status.
6032N/A entryContainer.clearDatabase(entryContainer.getDN2URI());
6032N/A }
6032N/A
6032N/A if (!onlyDegraded || !entryContainer.getID2Children().isTrusted()
6032N/A || !entryContainer.getID2Subtree().isTrusted())
6032N/A {
6032N/A entryContainer.clearDatabase(entryContainer.getDN2ID());
6032N/A entryContainer.clearDatabase(entryContainer.getID2Children());
6032N/A entryContainer.clearDatabase(entryContainer.getID2Subtree());
6032N/A }
6032N/A
6032N/A if (!indexMap.isEmpty())
6032N/A {
6032N/A for (final Map.Entry<IndexKey, Index> mapEntry : indexMap.entrySet())
6032N/A {
6032N/A if (!onlyDegraded || !mapEntry.getValue().isTrusted())
6032N/A {
6032N/A entryContainer.clearDatabase(mapEntry.getValue());
6032N/A }
6032N/A }
6032N/A }
6032N/A
6032N/A if (!extensibleIndexMap.isEmpty())
6032N/A {
6032N/A for (final Collection<Index> subIndexes : extensibleIndexMap.values())
6032N/A {
6032N/A if (subIndexes != null)
6032N/A {
6032N/A for (final Index subIndex : subIndexes)
6032N/A {
6032N/A entryContainer.clearDatabase(subIndex);
6032N/A }
6032N/A }
6032N/A }
6032N/A }
6032N/A
6032N/A for (final VLVIndex vlvIndex : entryContainer.getVLVIndexes())
6032N/A {
6032N/A if (!onlyDegraded || !vlvIndex.isTrusted())
6032N/A {
6032N/A entryContainer.clearDatabase(vlvIndex);
6032N/A }
6032N/A }
6032N/A }
4765N/A
5207N/A private void setRebuildListIndexesTrusted(boolean trusted)
5207N/A throws JebException
4895N/A {
4895N/A try
4895N/A {
6032N/A if (dn2id != null)
4895N/A {
4895N/A EntryContainer ec = suffix.getEntryContainer();
6032N/A ec.getID2Children().setTrusted(null, trusted);
5207N/A ec.getID2Subtree().setTrusted(null, trusted);
4895N/A }
6032N/A if (!indexMap.isEmpty())
4895N/A {
6032N/A for (Map.Entry<IndexKey, Index> mapEntry : indexMap.entrySet())
6032N/A {
4895N/A Index index = mapEntry.getValue();
5207N/A index.setTrusted(null, trusted);
4895N/A }
4895N/A }
6032N/A if (!vlvIndexes.isEmpty())
4895N/A {
6032N/A for (VLVIndex vlvIndex : vlvIndexes)
4895N/A {
5207N/A vlvIndex.setTrusted(null, trusted);
4895N/A }
4895N/A }
6032N/A if (!extensibleIndexMap.isEmpty())
4895N/A {
6032N/A for (Collection<Index> subIndexes : extensibleIndexMap.values())
5117N/A {
6032N/A if (subIndexes != null)
6032N/A {
6032N/A for (Index subIndex : subIndexes)
6032N/A {
5207N/A subIndex.setTrusted(null, trusted);
5117N/A }
4895N/A }
4895N/A }
4895N/A }
4895N/A }
4895N/A catch (DatabaseException ex)
4895N/A {
4895N/A Message message =
6032N/A NOTE_JEB_IMPORT_LDIF_TRUSTED_FAILED.get(ex.getMessage());
4963N/A throw new JebException(message);
4963N/A }
4963N/A }
4963N/A
6032N/A private void phaseOne() throws DatabaseException, InterruptedException,
6032N/A ExecutionException
6032N/A {
4963N/A initializeIndexBuffers();
4963N/A RebuildFirstPhaseProgressTask progressTask =
6032N/A new RebuildFirstPhaseProgressTask();
4963N/A Timer timer = new Timer();
4963N/A timer.scheduleAtFixedRate(progressTask, TIMER_INTERVAL, TIMER_INTERVAL);
4963N/A scratchFileWriterService = Executors.newFixedThreadPool(2 * indexCount);
4963N/A bufferSortService = Executors.newFixedThreadPool(threadCount);
4963N/A ExecutorService rebuildIndexService =
6032N/A Executors.newFixedThreadPool(threadCount);
4963N/A List<Callable<Void>> tasks = new ArrayList<Callable<Void>>(threadCount);
4963N/A for (int i = 0; i < threadCount; i++)
4963N/A {
4963N/A tasks.add(this);
4963N/A }
4963N/A List<Future<Void>> results = rebuildIndexService.invokeAll(tasks);
6032N/A for (Future<Void> result : results)
6032N/A {
6032N/A if (!result.isDone())
6032N/A {
4963N/A result.get();
4963N/A }
4963N/A }
4963N/A stopScratchFileWriters();
4963N/A for (Future<?> result : scratchFileWriterFutures)
4963N/A {
6032N/A if (!result.isDone())
6032N/A {
4963N/A result.get();
4963N/A }
4963N/A }
5198N/A
5198N/A // Try to clear as much memory as possible.
5198N/A rebuildIndexService.shutdown();
5198N/A rebuildIndexService.awaitTermination(30, TimeUnit.SECONDS);
5198N/A bufferSortService.shutdown();
5198N/A bufferSortService.awaitTermination(30, TimeUnit.SECONDS);
5198N/A scratchFileWriterService.shutdown();
5198N/A scratchFileWriterService.awaitTermination(30, TimeUnit.SECONDS);
5198N/A timer.cancel();
5198N/A
4963N/A tasks.clear();
4963N/A results.clear();
5198N/A scratchFileWriterList.clear();
5198N/A scratchFileWriterFutures.clear();
5198N/A indexKeyQueMap.clear();
4963N/A freeBufferQueue.clear();
4963N/A }
4963N/A
6032N/A private void phaseTwo() throws InterruptedException, ExecutionException
4963N/A {
6032N/A SecondPhaseProgressTask progressTask =
6032N/A new SecondPhaseProgressTask(entriesProcessed.get());
4963N/A Timer timer2 = new Timer();
4963N/A timer2.scheduleAtFixedRate(progressTask, TIMER_INTERVAL, TIMER_INTERVAL);
4963N/A processIndexFiles();
4963N/A timer2.cancel();
4963N/A }
4963N/A
6114N/A private int getIndexCount() throws ConfigException, JebException,
6114N/A InitializationException
4963N/A {
5720N/A switch (rebuildConfig.getRebuildMode())
4963N/A {
5720N/A case ALL:
5720N/A return getTotalIndexCount(cfg);
5720N/A case DEGRADED:
5720N/A // FIXME: since the environment is not started we cannot determine which
5720N/A // indexes are degraded. As a workaround, be conservative and assume all
5720N/A // indexes need rebuilding.
5720N/A return getTotalIndexCount(cfg);
5720N/A default:
5720N/A return getRebuildListIndexCount(cfg);
4963N/A }
4963N/A }
4963N/A
4963N/A private int getRebuildListIndexCount(LocalDBBackendCfg cfg)
6114N/A throws JebException, ConfigException, InitializationException
4765N/A {
4963N/A int indexCount = 0;
4963N/A List<String> rebuildList = rebuildConfig.getRebuildList();
6032N/A if (!rebuildList.isEmpty())
4963N/A {
4963N/A for (String index : rebuildList)
4963N/A {
4963N/A String lowerName = index.toLowerCase();
4963N/A if (lowerName.equals("dn2id"))
4963N/A {
4963N/A indexCount += 3;
4963N/A }
4963N/A else if (lowerName.equals("dn2uri"))
4963N/A {
4963N/A indexCount++;
4963N/A }
4963N/A else if (lowerName.startsWith("vlv."))
4963N/A {
6032N/A if (lowerName.length() < 5)
4963N/A {
4963N/A Message msg = ERR_JEB_VLV_INDEX_NOT_CONFIGURED.get(lowerName);
4963N/A throw new JebException(msg);
4963N/A }
4963N/A indexCount++;
6032N/A }
6032N/A else if (lowerName.equals("id2subtree")
6032N/A || lowerName.equals("id2children"))
4963N/A {
4963N/A Message msg = ERR_JEB_ATTRIBUTE_INDEX_NOT_CONFIGURED.get(index);
6114N/A throw new InitializationException(msg);
4963N/A }
4963N/A else
4963N/A {
4963N/A String[] attrIndexParts = lowerName.split("\\.");
6032N/A if ((attrIndexParts.length <= 0) || (attrIndexParts.length > 3))
4963N/A {
4963N/A Message msg = ERR_JEB_ATTRIBUTE_INDEX_NOT_CONFIGURED.get(index);
6114N/A throw new InitializationException(msg);
4963N/A }
4963N/A AttributeType attrType =
6032N/A DirectoryServer.getAttributeType(attrIndexParts[0]);
4963N/A if (attrType == null)
4963N/A {
4963N/A Message msg = ERR_JEB_ATTRIBUTE_INDEX_NOT_CONFIGURED.get(index);
6114N/A throw new InitializationException(msg);
4963N/A }
6032N/A if (attrIndexParts.length != 1)
4963N/A {
6032N/A if (attrIndexParts.length == 2)
4963N/A {
6032N/A if (attrIndexParts[1].equals("presence"))
4963N/A {
4963N/A indexCount++;
4963N/A }
6032N/A else if (attrIndexParts[1].equals("equality"))
4963N/A {
4963N/A indexCount++;
4963N/A }
6032N/A else if (attrIndexParts[1].equals("substring"))
6032N/A {
6032N/A indexCount++;
6032N/A }
6032N/A else if (attrIndexParts[1].equals("ordering"))
4963N/A {
4963N/A indexCount++;
4963N/A }
6032N/A else if (attrIndexParts[1].equals("approximate"))
4963N/A {
4963N/A indexCount++;
4963N/A }
6032N/A else
4963N/A {
4963N/A Message msg =
6032N/A ERR_JEB_ATTRIBUTE_INDEX_NOT_CONFIGURED.get(index);
6114N/A throw new InitializationException(msg);
4963N/A }
4963N/A }
4963N/A else
4963N/A {
4963N/A boolean found = false;
4963N/A String s = attrIndexParts[1] + "." + attrIndexParts[2];
4963N/A for (String idx : cfg.listLocalDBIndexes())
4963N/A {
4963N/A LocalDBIndexCfg indexCfg = cfg.getLocalDBIndex(idx);
6032N/A if (indexCfg.getIndexType().contains(
6032N/A LocalDBIndexCfgDefn.IndexType.EXTENSIBLE))
4963N/A {
4963N/A Set<String> extensibleRules =
6032N/A indexCfg.getIndexExtensibleMatchingRule();
6032N/A for (String exRule : extensibleRules)
4963N/A {
6032N/A if (exRule.equalsIgnoreCase(s))
4963N/A {
4963N/A found = true;
4963N/A break;
4963N/A }
4963N/A }
4963N/A }
6032N/A if (found)
4963N/A {
4963N/A break;
4963N/A }
4963N/A }
6032N/A if (!found)
6032N/A {
4963N/A Message msg =
6032N/A ERR_JEB_ATTRIBUTE_INDEX_NOT_CONFIGURED.get(index);
6114N/A throw new InitializationException(msg);
4963N/A }
4963N/A indexCount++;
4963N/A }
4963N/A }
4963N/A else
4963N/A {
6114N/A boolean found = false;
6114N/A for (final String idx : cfg.listLocalDBIndexes())
4963N/A {
6032N/A if (!idx.equalsIgnoreCase(index))
4963N/A {
4963N/A continue;
4963N/A }
6114N/A found = true;
4963N/A LocalDBIndexCfg indexCfg = cfg.getLocalDBIndex(idx);
6032N/A if (indexCfg.getIndexType().contains(
6032N/A LocalDBIndexCfgDefn.IndexType.EQUALITY))
4963N/A {
4963N/A indexCount++;
4963N/A }
6032N/A if (indexCfg.getIndexType().contains(
6032N/A LocalDBIndexCfgDefn.IndexType.ORDERING))
4963N/A {
4963N/A indexCount++;
4963N/A }
6032N/A if (indexCfg.getIndexType().contains(
6032N/A LocalDBIndexCfgDefn.IndexType.PRESENCE))
4963N/A {
4963N/A indexCount++;
4963N/A }
6032N/A if (indexCfg.getIndexType().contains(
6032N/A LocalDBIndexCfgDefn.IndexType.SUBSTRING))
4963N/A {
4963N/A indexCount++;
4963N/A }
6032N/A if (indexCfg.getIndexType().contains(
6032N/A LocalDBIndexCfgDefn.IndexType.APPROXIMATE))
4963N/A {
4963N/A indexCount++;
4963N/A }
6032N/A if (indexCfg.getIndexType().contains(
6032N/A LocalDBIndexCfgDefn.IndexType.EXTENSIBLE))
4963N/A {
4963N/A Set<String> extensibleRules =
6032N/A indexCfg.getIndexExtensibleMatchingRule();
4963N/A boolean shared = false;
6114N/A for (final String exRule : extensibleRules)
4963N/A {
6032N/A if (exRule.endsWith(".sub"))
4963N/A {
4963N/A indexCount++;
4963N/A }
4963N/A else
4963N/A {
6032N/A if (!shared)
4963N/A {
6032N/A shared = true;
4963N/A indexCount++;
4963N/A }
4963N/A }
4963N/A }
4963N/A }
4963N/A }
6114N/A if (!found)
6114N/A {
6114N/A Message msg =
6114N/A ERR_JEB_ATTRIBUTE_INDEX_NOT_CONFIGURED.get(index);
6114N/A throw new InitializationException(msg);
6114N/A }
4963N/A }
4963N/A }
4963N/A }
4963N/A }
4963N/A return indexCount;
4765N/A }
4963N/A
6032N/A private void processEntry(Entry entry, EntryID entryID)
6032N/A throws DatabaseException, DirectoryException, JebException,
6032N/A InterruptedException
4963N/A {
6032N/A if (dn2id != null)
4963N/A {
4765N/A processDN2ID(suffix, entry.getDN(), entryID);
4963N/A }
6032N/A if (dn2uri != null)
4963N/A {
4765N/A processDN2URI(suffix, null, entry);
4963N/A }
4963N/A processIndexes(entry, entryID);
4963N/A processExtensibleIndexes(entry, entryID);
4963N/A processVLVIndexes(entry, entryID);
4963N/A }
4963N/A
4963N/A private void processVLVIndexes(Entry entry, EntryID entryID)
6032N/A throws DatabaseException, JebException, DirectoryException
4963N/A {
6032N/A for (VLVIndex vlvIdx : suffix.getEntryContainer().getVLVIndexes())
6032N/A {
4963N/A Transaction transaction = null;
4963N/A vlvIdx.addEntry(transaction, entryID, entry);
4963N/A }
4963N/A }
4963N/A
6032N/A private void processExtensibleIndexes(Entry entry, EntryID entryID)
6032N/A throws InterruptedException
4963N/A {
6032N/A for (Map.Entry<IndexKey, Collection<Index>> mapEntry :
6114N/A this.extensibleIndexMap.entrySet())
6032N/A {
4963N/A IndexKey key = mapEntry.getKey();
4963N/A AttributeType attrType = key.getAttributeType();
6032N/A if (entry.hasAttribute(attrType))
6032N/A {
4963N/A Collection<Index> indexes = mapEntry.getValue();
6032N/A for (Index index : indexes)
6032N/A {
4765N/A processAttribute(index, entry, entryID, key);
4963N/A }
4963N/A }
4963N/A }
4963N/A }
4963N/A
6032N/A private void processIndexes(Entry entry, EntryID entryID)
6032N/A throws DatabaseException, InterruptedException
4963N/A {
6032N/A for (Map.Entry<IndexKey, Index> mapEntry : indexMap.entrySet())
6032N/A {
4963N/A IndexKey key = mapEntry.getKey();
4963N/A AttributeType attrType = key.getAttributeType();
6032N/A if (entry.hasAttribute(attrType))
6032N/A {
4963N/A ImportIndexType indexType = key.getIndexType();
4963N/A Index index = mapEntry.getValue();
6032N/A if (indexType == ImportIndexType.SUBSTRING)
4963N/A {
6032N/A processAttribute(index, entry, entryID, new IndexKey(attrType,
6032N/A ImportIndexType.SUBSTRING, index.getIndexEntryLimit()));
4963N/A }
4963N/A else
4963N/A {
6032N/A processAttribute(index, entry, entryID, new IndexKey(attrType,
6032N/A indexType, index.getIndexEntryLimit()));
4963N/A }
4963N/A }
4963N/A }
4963N/A }
4963N/A
4963N/A /**
4963N/A * Return the number of entries processed by the rebuild manager.
4963N/A *
4963N/A * @return The number of entries processed.
4963N/A */
4963N/A public long getEntriesProcess()
4963N/A {
4963N/A return this.entriesProcessed.get();
4963N/A }
4963N/A
4963N/A /**
4963N/A * Return the total number of entries to process by the rebuild manager.
4963N/A *
4963N/A * @return The total number for entries to process.
4963N/A */
4963N/A public long getTotEntries()
4963N/A {
4963N/A return this.totalEntries;
4963N/A }
5207N/A
5707N/A @Override
6032N/A public void diskLowThresholdReached(DiskSpaceMonitor monitor)
6032N/A {
5207N/A diskFullThresholdReached(monitor);
5207N/A }
5207N/A
5707N/A @Override
6032N/A public void diskFullThresholdReached(DiskSpaceMonitor monitor)
6032N/A {
5207N/A isCanceled = true;
6032N/A Message msg =
6032N/A ERR_REBUILD_INDEX_LACK_DISK.get(monitor.getDirectory().getPath(),
6032N/A monitor.getFreeSpace(), monitor.getLowThreshold());
5207N/A logError(msg);
5207N/A }
5207N/A
5707N/A @Override
6032N/A public void diskSpaceRestored(DiskSpaceMonitor monitor)
6032N/A {
5207N/A // Do nothing
5207N/A }
4765N/A }
4765N/A
4765N/A /**
6032N/A * This class reports progress of rebuild index processing at fixed intervals.
4963N/A */
5195N/A private class RebuildFirstPhaseProgressTask extends TimerTask
4963N/A {
4963N/A /**
6032N/A * The number of records that had been processed at the time of the previous
6032N/A * progress report.
4963N/A */
4963N/A private long previousProcessed = 0;
4963N/A
4963N/A /**
4963N/A * The time in milliseconds of the previous progress report.
4963N/A */
4963N/A private long previousTime;
4963N/A
4963N/A /**
4963N/A * The environment statistics at the time of the previous report.
4963N/A */
4963N/A private EnvironmentStats prevEnvStats;
4963N/A
4963N/A /**
4963N/A * Create a new rebuild index progress task.
4963N/A *
6032N/A * @throws DatabaseException
6032N/A * If an error occurred while accessing the JE database.
4963N/A */
4963N/A public RebuildFirstPhaseProgressTask() throws DatabaseException
4963N/A {
4963N/A previousTime = System.currentTimeMillis();
4963N/A prevEnvStats = rootContainer.getEnvironmentStats(new StatsConfig());
4963N/A }
4765N/A
4765N/A /**
4963N/A * The action to be performed by this timer task.
4963N/A */
5707N/A @Override
4963N/A public void run()
4963N/A {
4963N/A long latestTime = System.currentTimeMillis();
4963N/A long deltaTime = latestTime - previousTime;
4963N/A
4963N/A if (deltaTime == 0)
4963N/A {
4963N/A return;
4963N/A }
4963N/A long entriesProcessed = rebuildManager.getEntriesProcess();
4963N/A long deltaCount = (entriesProcessed - previousProcessed);
6032N/A float rate = 1000f * deltaCount / deltaTime;
4963N/A float completed = 0;
6032N/A if (rebuildManager.getTotEntries() > 0)
4963N/A {
6032N/A completed = 100f * entriesProcessed / rebuildManager.getTotEntries();
4963N/A }
6032N/A Message message =
6032N/A NOTE_JEB_REBUILD_PROGRESS_REPORT.get(completed, entriesProcessed,
6032N/A rebuildManager.getTotEntries(), rate);
4963N/A logError(message);
4963N/A try
4963N/A {
4963N/A Runtime runtime = Runtime.getRuntime();
4963N/A long freeMemory = runtime.freeMemory() / MB;
4963N/A EnvironmentStats envStats =
6032N/A rootContainer.getEnvironmentStats(new StatsConfig());
4963N/A long nCacheMiss =
6032N/A envStats.getNCacheMiss() - prevEnvStats.getNCacheMiss();
4963N/A
4963N/A float cacheMissRate = 0;
4963N/A if (deltaCount > 0)
4963N/A {
6032N/A cacheMissRate = nCacheMiss / (float) deltaCount;
4963N/A }
6032N/A message =
6032N/A NOTE_JEB_REBUILD_CACHE_AND_MEMORY_REPORT.get(freeMemory,
6032N/A cacheMissRate);
4963N/A logError(message);
4963N/A prevEnvStats = envStats;
4963N/A }
4963N/A catch (DatabaseException e)
4963N/A {
4963N/A
4963N/A }
4963N/A previousProcessed = entriesProcessed;
4963N/A previousTime = latestTime;
4963N/A }
4963N/A }
4765N/A
4591N/A /**
6032N/A * This class reports progress of first phase of import processing at fixed
6032N/A * intervals.
3339N/A */
4591N/A private final class FirstPhaseProgressTask extends TimerTask
3339N/A {
3339N/A /**
6032N/A * The number of entries that had been read at the time of the previous
6032N/A * progress report.
3339N/A */
3339N/A private long previousCount = 0;
3339N/A
3339N/A /**
3339N/A * The time in milliseconds of the previous progress report.
3339N/A */
3339N/A private long previousTime;
3339N/A
3339N/A /**
3339N/A * The environment statistics at the time of the previous report.
3339N/A */
4649N/A private EnvironmentStats previousStats;
3339N/A
4591N/A // Determines if eviction has been detected.
3339N/A private boolean evicting = false;
3339N/A
4591N/A // Entry count when eviction was detected.
3339N/A private long evictionEntryCount = 0;
3339N/A
4963N/A /**
3339N/A * Create a new import progress task.
3339N/A */
4591N/A public FirstPhaseProgressTask()
3339N/A {
3339N/A previousTime = System.currentTimeMillis();
4591N/A try
4591N/A {
6032N/A previousStats = rootContainer.getEnvironmentStats(new StatsConfig());
4591N/A }
4591N/A catch (DatabaseException e)
4591N/A {
4591N/A throw new RuntimeException(e);
4591N/A }
3339N/A }
3339N/A
4963N/A /**
4963N/A * The action to be performed by this timer task.
4963N/A */
4963N/A @Override
4963N/A public void run()
4963N/A {
4963N/A long latestCount = reader.getEntriesRead() + 0;
4963N/A long deltaCount = (latestCount - previousCount);
4963N/A long latestTime = System.currentTimeMillis();
4963N/A long deltaTime = latestTime - previousTime;
4963N/A Message message;
4963N/A if (deltaTime == 0)
4963N/A {
4963N/A return;
4963N/A }
4963N/A long entriesRead = reader.getEntriesRead();
4963N/A long entriesIgnored = reader.getEntriesIgnored();
4963N/A long entriesRejected = reader.getEntriesRejected();
4963N/A float rate = 1000f * deltaCount / deltaTime;
6032N/A message =
6032N/A NOTE_JEB_IMPORT_PROGRESS_REPORT.get(entriesRead, entriesIgnored,
6032N/A entriesRejected, 0, rate);
4963N/A logError(message);
4963N/A try
4591N/A {
4963N/A Runtime runTime = Runtime.getRuntime();
6032N/A long freeMemory = runTime.freeMemory() / MB;
4963N/A EnvironmentStats environmentStats;
4963N/A
4963N/A //If first phase skip DN validation is specified use the root container
4963N/A //stats, else use the temporary environment stats.
6032N/A if (skipDNValidation)
4963N/A {
4963N/A environmentStats =
6032N/A rootContainer.getEnvironmentStats(new StatsConfig());
4963N/A }
4963N/A else
4963N/A {
4963N/A environmentStats = tmpEnv.getEnvironmentStats(new StatsConfig());
4963N/A }
6032N/A long nCacheMiss =
6032N/A environmentStats.getNCacheMiss() - previousStats.getNCacheMiss();
4963N/A
4963N/A float cacheMissRate = 0;
4963N/A if (deltaCount > 0)
4963N/A {
4963N/A cacheMissRate = nCacheMiss / (float) deltaCount;
4963N/A }
4963N/A message =
6032N/A NOTE_JEB_IMPORT_CACHE_AND_MEMORY_REPORT.get(freeMemory,
6032N/A cacheMissRate);
4963N/A logError(message);
4963N/A long evictPasses = environmentStats.getNEvictPasses();
4963N/A long evictNodes = environmentStats.getNNodesExplicitlyEvicted();
4963N/A long evictBinsStrip = environmentStats.getNBINsStripped();
4963N/A long cleanerRuns = environmentStats.getNCleanerRuns();
4963N/A long cleanerDeletions = environmentStats.getNCleanerDeletions();
6032N/A long cleanerEntriesRead = environmentStats.getNCleanerEntriesRead();
4963N/A long cleanerINCleaned = environmentStats.getNINsCleaned();
4963N/A long checkPoints = environmentStats.getNCheckpoints();
4963N/A if (evictPasses != 0)
4963N/A {
4963N/A if (!evicting)
4649N/A {
4963N/A evicting = true;
4963N/A evictionEntryCount = reader.getEntriesRead();
4963N/A message =
6032N/A NOTE_JEB_IMPORT_LDIF_EVICTION_DETECTED.get(evictionEntryCount);
4963N/A logError(message);
3339N/A }
4963N/A message =
6032N/A NOTE_JEB_IMPORT_LDIF_EVICTION_DETECTED_STATS.get(evictPasses,
6032N/A evictNodes, evictBinsStrip);
4963N/A logError(message);
4963N/A }
4963N/A if (cleanerRuns != 0)
4963N/A {
4963N/A message =
6032N/A NOTE_JEB_IMPORT_LDIF_CLEANER_STATS.get(cleanerRuns,
6032N/A cleanerDeletions, cleanerEntriesRead, cleanerINCleaned);
4963N/A logError(message);
4963N/A }
4963N/A if (checkPoints > 1)
4963N/A {
6032N/A message = NOTE_JEB_IMPORT_LDIF_BUFFER_CHECKPOINTS.get(checkPoints);
4963N/A logError(message);
4963N/A }
4963N/A previousStats = environmentStats;
4591N/A }
4963N/A catch (DatabaseException e)
4963N/A {
4963N/A // Unlikely to happen and not critical.
4963N/A }
4963N/A previousCount = latestCount;
4963N/A previousTime = latestTime;
4963N/A }
3339N/A }
4591N/A
4591N/A /**
4963N/A * This class reports progress of the second phase of import processing at
4963N/A * fixed intervals.
4591N/A */
5195N/A private class SecondPhaseProgressTask extends TimerTask
4591N/A {
4591N/A /**
6032N/A * The number of entries that had been read at the time of the previous
6032N/A * progress report.
4591N/A */
4591N/A private long previousCount = 0;
4591N/A
4591N/A /**
4591N/A * The time in milliseconds of the previous progress report.
4591N/A */
4591N/A private long previousTime;
4591N/A
4591N/A /**
4591N/A * The environment statistics at the time of the previous report.
4591N/A */
4649N/A private EnvironmentStats previousStats;
4591N/A
4591N/A // Determines if eviction has been detected.
4591N/A private boolean evicting = false;
4591N/A
4765N/A private long latestCount;
4591N/A
5201N/A /**
4591N/A * Create a new import progress task.
4963N/A *
5201N/A * @param latestCount
5201N/A * The latest count of entries processed in phase one.
4591N/A */
5201N/A public SecondPhaseProgressTask(long latestCount)
4591N/A {
4591N/A previousTime = System.currentTimeMillis();
4765N/A this.latestCount = latestCount;
4591N/A try
4591N/A {
5201N/A previousStats = rootContainer.getEnvironmentStats(new StatsConfig());
4591N/A }
4591N/A catch (DatabaseException e)
4591N/A {
4591N/A throw new RuntimeException(e);
4591N/A }
4591N/A }
4591N/A
4591N/A /**
4591N/A * The action to be performed by this timer task.
4591N/A */
4591N/A @Override
4591N/A public void run()
4591N/A {
4591N/A long deltaCount = (latestCount - previousCount);
4591N/A long latestTime = System.currentTimeMillis();
4591N/A long deltaTime = latestTime - previousTime;
4591N/A Message message;
4591N/A if (deltaTime == 0)
4591N/A {
4591N/A return;
4591N/A }
4591N/A try
4591N/A {
4649N/A Runtime runTime = Runtime.getRuntime();
4649N/A long freeMemory = runTime.freeMemory() / MB;
4649N/A EnvironmentStats environmentStats =
6032N/A rootContainer.getEnvironmentStats(new StatsConfig());
6032N/A long nCacheMiss =
6032N/A environmentStats.getNCacheMiss() - previousStats.getNCacheMiss();
4591N/A
4591N/A float cacheMissRate = 0;
4591N/A if (deltaCount > 0)
4591N/A {
4591N/A cacheMissRate = nCacheMiss / (float) deltaCount;
4591N/A }
4591N/A message =
6032N/A NOTE_JEB_IMPORT_CACHE_AND_MEMORY_REPORT.get(freeMemory,
6032N/A cacheMissRate);
4591N/A logError(message);
4649N/A long evictPasses = environmentStats.getNEvictPasses();
4649N/A long evictNodes = environmentStats.getNNodesExplicitlyEvicted();
4649N/A long evictBinsStrip = environmentStats.getNBINsStripped();
4649N/A long cleanerRuns = environmentStats.getNCleanerRuns();
4649N/A long cleanerDeletions = environmentStats.getNCleanerDeletions();
4649N/A long cleanerEntriesRead = environmentStats.getNCleanerEntriesRead();
4649N/A long cleanerINCleaned = environmentStats.getNINsCleaned();
4649N/A long checkPoints = environmentStats.getNCheckpoints();
4591N/A if (evictPasses != 0)
4591N/A {
4591N/A if (!evicting)
4591N/A {
4591N/A evicting = true;
4591N/A }
4591N/A message =
6032N/A NOTE_JEB_IMPORT_LDIF_EVICTION_DETECTED_STATS.get(evictPasses,
6032N/A evictNodes, evictBinsStrip);
4591N/A logError(message);
4591N/A }
4591N/A if (cleanerRuns != 0)
4591N/A {
4591N/A message =
6032N/A NOTE_JEB_IMPORT_LDIF_CLEANER_STATS.get(cleanerRuns,
6032N/A cleanerDeletions, cleanerEntriesRead, cleanerINCleaned);
4591N/A logError(message);
4591N/A }
4591N/A if (checkPoints > 1)
4591N/A {
6032N/A message = NOTE_JEB_IMPORT_LDIF_BUFFER_CHECKPOINTS.get(checkPoints);
4591N/A logError(message);
4591N/A }
4649N/A previousStats = environmentStats;
4591N/A }
4591N/A catch (DatabaseException e)
4591N/A {
4591N/A // Unlikely to happen and not critical.
4591N/A }
4591N/A previousCount = latestCount;
4591N/A previousTime = latestTime;
4591N/A
4963N/A //Do DN index managers first.
6032N/A for (IndexManager indexMgrDN : DNIndexMgrList)
4963N/A {
4963N/A indexMgrDN.printStats(deltaTime);
4963N/A }
4963N/A //Do non-DN index managers.
6032N/A for (IndexManager indexMgr : indexMgrList)
4591N/A {
4591N/A indexMgr.printStats(deltaTime);
4591N/A }
4591N/A }
4591N/A }
4643N/A
4643N/A /**
4643N/A * A class to hold information about the entry determined by the LDIF reader.
4963N/A * Mainly the suffix the entry belongs under and the ID assigned to it by the
4963N/A * reader.
4643N/A */
4643N/A public class EntryInformation
4643N/A {
4643N/A private EntryID entryID;
4643N/A private Suffix suffix;
4643N/A
4643N/A /**
4643N/A * Return the suffix associated with the entry.
4643N/A *
4643N/A * @return Entry's suffix instance;
4643N/A */
4643N/A public Suffix getSuffix()
4643N/A {
4643N/A return suffix;
4643N/A }
4643N/A
4643N/A /**
4643N/A * Set the suffix instance associated with the entry.
4643N/A *
6032N/A * @param suffix
6032N/A * The suffix associated with the entry.
4643N/A */
4643N/A public void setSuffix(Suffix suffix)
4643N/A {
4643N/A this.suffix = suffix;
4643N/A }
4643N/A
4643N/A /**
4643N/A * Set the entry's ID.
4643N/A *
6032N/A * @param entryID
6032N/A * The entry ID to set the entry ID to.
4643N/A */
4643N/A public void setEntryID(EntryID entryID)
4643N/A {
4643N/A this.entryID = entryID;
4643N/A }
4643N/A
4643N/A /**
4643N/A * Return the entry ID associated with the entry.
4643N/A *
4643N/A * @return The entry ID associated with the entry.
4643N/A */
4643N/A public EntryID getEntryID()
4643N/A {
4643N/A return entryID;
4643N/A }
4643N/A }
4643N/A
4643N/A /**
4643N/A * This class defines the individual index type available.
4643N/A */
6032N/A public enum ImportIndexType
6032N/A {
4643N/A /**
4643N/A * The DN index type.
4643N/A **/
4643N/A DN,
4643N/A
4643N/A /**
4643N/A * The equality index type.
4643N/A **/
4643N/A EQUALITY,
4643N/A
4643N/A /**
4643N/A * The presence index type.
4643N/A **/
4643N/A PRESENCE,
4643N/A
4643N/A /**
4649N/A * The sub-string index type.
4643N/A **/
4643N/A SUBSTRING,
4643N/A
4643N/A /**
4643N/A * The ordering index type.
4643N/A **/
4643N/A ORDERING,
4643N/A
4643N/A /**
4643N/A * The approximate index type.
4643N/A **/
4643N/A APPROXIMATE,
4643N/A
4643N/A /**
6032N/A * The extensible sub-string index type.
4643N/A **/
4643N/A EX_SUBSTRING,
4643N/A
4643N/A /**
4643N/A * The extensible shared index type.
4643N/A **/
4765N/A EX_SHARED,
4765N/A
4765N/A /**
4765N/A * The vlv index type.
4765N/A */
4765N/A VLV
4643N/A }
4643N/A
4643N/A /**
6032N/A * This class is used as an index key for hash maps that need to process
6032N/A * multiple suffix index elements into a single queue and/or maps based on
6032N/A * both attribute type and index type (ie., cn.equality, sn.equality,...).
4643N/A */
6032N/A public class IndexKey
6032N/A {
4643N/A
4963N/A private final AttributeType attributeType;
4765N/A private final ImportIndexType indexType;
4963N/A private final int entryLimit;
4765N/A
6032N/A /**
4963N/A * Create index key instance using the specified attribute type, index type
4963N/A * and index entry limit.
4643N/A *
6032N/A * @param attributeType
6032N/A * The attribute type.
6032N/A * @param indexType
6032N/A * The index type.
6032N/A * @param entryLimit
6032N/A * The entry limit for the index.
4643N/A */
4963N/A IndexKey(AttributeType attributeType, ImportIndexType indexType,
6032N/A int entryLimit)
4643N/A {
4963N/A this.attributeType = attributeType;
4643N/A this.indexType = indexType;
4963N/A this.entryLimit = entryLimit;
4643N/A }
4643N/A
4765N/A /**
4765N/A * An equals method that uses both the attribute type and the index type.
6032N/A * Only returns {@code true} if the attribute type and index type are equal.
4765N/A *
6032N/A * @param obj
6032N/A * the object to compare.
4963N/A * @return {@code true} if the objects are equal, or {@code false} if they
4963N/A * are not.
4765N/A */
5707N/A @Override
4765N/A public boolean equals(Object obj)
4765N/A {
6032N/A if (obj instanceof IndexKey)
6032N/A {
4765N/A IndexKey oKey = (IndexKey) obj;
6032N/A if (attributeType.equals(oKey.getAttributeType())
6032N/A && indexType.equals(oKey.getIndexType()))
4765N/A {
4963N/A return true;
4765N/A }
4643N/A }
4963N/A return false;
4765N/A }
4643N/A
4643N/A /**
4649N/A * A hash code method that adds the hash codes of the attribute type and
4643N/A * index type and returns that value.
4643N/A *
4963N/A * @return The combined hash values of attribute type hash code and the
4963N/A * index type hash code.
4643N/A */
5707N/A @Override
4643N/A public int hashCode()
4643N/A {
4963N/A return attributeType.hashCode() + indexType.hashCode();
4643N/A }
4643N/A
4643N/A /**
4643N/A * Return the attribute type.
4643N/A *
4643N/A * @return The attribute type.
4643N/A */
4963N/A public AttributeType getAttributeType()
4643N/A {
4963N/A return attributeType;
4643N/A }
4643N/A
4643N/A /**
4643N/A * Return the index type.
4963N/A *
4643N/A * @return The index type.
4643N/A */
4765N/A public ImportIndexType getIndexType()
4643N/A {
4643N/A return indexType;
4643N/A }
4643N/A
4643N/A /**
6032N/A * Return the index key name, which is the attribute type primary name, a
6032N/A * period, and the index type name. Used for building file names and
4963N/A * progress output.
4643N/A *
6032N/A * @return The index key name.
4643N/A */
4643N/A public String getName()
4643N/A {
6032N/A return attributeType.getPrimaryName() + "."
6032N/A + StaticUtils.toLowerCase(indexType.name());
4643N/A }
4963N/A
4963N/A /**
4963N/A * Return the entry limit associated with the index.
4963N/A *
6032N/A * @return The entry limit.
4963N/A */
4963N/A public int getEntryLimit()
4963N/A {
4963N/A return entryLimit;
4963N/A }
4963N/A }
4963N/A
4963N/A /**
6032N/A * The temporary environment will be shared when multiple suffixes are being
4963N/A * processed. This interface is used by those suffix instance to do parental
4963N/A * checking of the DN cache.
4963N/A */
6032N/A public static interface DNCache
6032N/A {
4963N/A
4963N/A /**
6032N/A * Returns {@code true} if the specified DN is contained in the DN cache, or
6032N/A * {@code false} otherwise.
4963N/A *
6032N/A * @param dn
6032N/A * The DN to check the presence of.
4963N/A * @return {@code true} if the cache contains the DN, or {@code false} if it
6032N/A * is not.
6032N/A * @throws DatabaseException
6032N/A * If an error occurs reading the database.
4963N/A */
4963N/A public boolean contains(DN dn) throws DatabaseException;
4963N/A }
4963N/A
4963N/A /**
4963N/A * Temporary environment used to check DN's when DN validation is performed
4963N/A * during phase one processing. It is deleted after phase one processing.
4963N/A */
4963N/A
4963N/A public final class TmpEnv implements DNCache
4963N/A {
4963N/A private String envPath;
4963N/A private Environment environment;
4963N/A private static final String DB_NAME = "dn_cache";
4963N/A private Database dnCache;
4963N/A
4963N/A /**
4963N/A * Create a temporary DB environment and database to be used as a cache of
4963N/A * DNs when DN validation is performed in phase one processing.
4963N/A *
6032N/A * @param envPath
6032N/A * The file path to create the environment under.
6032N/A * @throws DatabaseException
6032N/A * If an error occurs either creating the environment or the DN
6032N/A * database.
4963N/A */
4963N/A public TmpEnv(File envPath) throws DatabaseException
4963N/A {
4963N/A EnvironmentConfig envConfig = new EnvironmentConfig();
4963N/A envConfig.setConfigParam(EnvironmentConfig.ENV_RUN_CLEANER, "true");
4963N/A envConfig.setReadOnly(false);
4963N/A envConfig.setAllowCreate(true);
4963N/A envConfig.setTransactional(false);
4963N/A envConfig.setConfigParam(EnvironmentConfig.ENV_IS_LOCKING, "true");
4963N/A envConfig.setConfigParam(EnvironmentConfig.ENV_RUN_CHECKPOINTER, "false");
4963N/A envConfig.setConfigParam(EnvironmentConfig.EVICTOR_LRU_ONLY, "false");
4963N/A envConfig.setConfigParam(EnvironmentConfig.EVICTOR_NODES_PER_SCAN, "128");
6032N/A envConfig.setConfigParam(EnvironmentConfig.MAX_MEMORY, Long
6032N/A .toString(tmpEnvCacheSize));
4963N/A DatabaseConfig dbConfig = new DatabaseConfig();
4963N/A dbConfig.setAllowCreate(true);
4963N/A dbConfig.setTransactional(false);
4963N/A dbConfig.setTemporary(true);
4963N/A environment = new Environment(envPath, envConfig);
4963N/A dnCache = environment.openDatabase(null, DB_NAME, dbConfig);
4963N/A this.envPath = envPath.getPath();
4963N/A }
4963N/A
4963N/A private static final long FNV_INIT = 0xcbf29ce484222325L;
4963N/A private static final long FNV_PRIME = 0x100000001b3L;
4963N/A
4963N/A //Hash the DN bytes. Uses the FNV-1a hash.
4963N/A private byte[] hashCode(byte[] b)
4963N/A {
4963N/A long hash = FNV_INIT;
6032N/A for (byte aB : b)
6032N/A {
5117N/A hash ^= aB;
4963N/A hash *= FNV_PRIME;
4963N/A }
4963N/A return JebFormat.entryIDToDatabase(hash);
4963N/A }
4963N/A
4963N/A /**
4963N/A * Shutdown the temporary environment.
6032N/A *
6032N/A * @throws JebException
6032N/A * If error occurs.
4963N/A */
4963N/A public void shutdown() throws JebException
4963N/A {
4963N/A dnCache.close();
4963N/A environment.close();
4963N/A EnvManager.removeFiles(envPath);
4963N/A }
4963N/A
4963N/A /**
4963N/A * Insert the specified DN into the DN cache. It will return {@code true} if
4963N/A * the DN does not already exist in the cache and was inserted, or
4963N/A * {@code false} if the DN exists already in the cache.
4963N/A *
6032N/A * @param dn
6032N/A * The DN to insert in the cache.
6032N/A * @param val
6032N/A * A database entry to use in the insert.
6032N/A * @param key
6032N/A * A database entry to use in the insert.
6032N/A * @return {@code true} if the DN was inserted in the cache, or
6032N/A * {@code false} if the DN exists in the cache already and could not
6032N/A * be inserted.
6032N/A * @throws JebException
6032N/A * If an error occurs accessing the database.
4963N/A */
4963N/A public boolean insert(DN dn, DatabaseEntry val, DatabaseEntry key)
6032N/A throws JebException
4963N/A {
4963N/A byte[] dnBytes = StaticUtils.getBytes(dn.toNormalizedString());
4963N/A int len = PackedInteger.getWriteIntLength(dnBytes.length);
4963N/A byte[] dataBytes = new byte[dnBytes.length + len];
4963N/A int pos = PackedInteger.writeInt(dataBytes, 0, dnBytes.length);
4963N/A System.arraycopy(dnBytes, 0, dataBytes, pos, dnBytes.length);
4963N/A val.setData(dataBytes);
4963N/A key.setData(hashCode(dnBytes));
4963N/A return insert(key, val, dnBytes);
4963N/A }
4963N/A
4963N/A private boolean insert(DatabaseEntry key, DatabaseEntry val, byte[] dnBytes)
6032N/A throws JebException
4963N/A {
4963N/A boolean inserted = true;
4963N/A Cursor cursor = null;
4963N/A try
4963N/A {
4963N/A cursor = dnCache.openCursor(null, CursorConfig.DEFAULT);
4963N/A OperationStatus status = cursor.putNoOverwrite(key, val);
6032N/A if (status == OperationStatus.KEYEXIST)
4963N/A {
4963N/A DatabaseEntry dns = new DatabaseEntry();
4963N/A inserted = false;
4963N/A status = cursor.getSearchKey(key, dns, LockMode.RMW);
6032N/A if (status == OperationStatus.NOTFOUND)
4963N/A {
6032N/A Message message =
6032N/A Message.raw(Category.JEB, Severity.SEVERE_ERROR,
4963N/A "Search DN cache failed.");
4963N/A throw new JebException(message);
4963N/A }
6032N/A if (!isDNMatched(dns, dnBytes))
4963N/A {
4963N/A addDN(dns, cursor, dnBytes);
4963N/A inserted = true;
4963N/A }
4963N/A }
4963N/A }
4963N/A finally
4963N/A {
6032N/A if (cursor != null)
4963N/A {
4963N/A cursor.close();
4963N/A }
4963N/A }
4963N/A return inserted;
4963N/A }
4963N/A
4963N/A //Add the DN to the DNs as because of a hash collision.
6032N/A private void addDN(DatabaseEntry val, Cursor cursor, byte[] dnBytes)
6032N/A throws JebException
4963N/A {
4963N/A byte[] bytes = val.getData();
4963N/A int pLen = PackedInteger.getWriteIntLength(dnBytes.length);
4963N/A int totLen = bytes.length + (pLen + dnBytes.length);
4963N/A byte[] newRec = new byte[totLen];
4963N/A System.arraycopy(bytes, 0, newRec, 0, bytes.length);
5707N/A int pos = bytes.length;
4963N/A pos = PackedInteger.writeInt(newRec, pos, dnBytes.length);
4963N/A System.arraycopy(dnBytes, 0, newRec, pos, dnBytes.length);
4963N/A DatabaseEntry newVal = new DatabaseEntry(newRec);
4963N/A OperationStatus status = cursor.putCurrent(newVal);
6032N/A if (status != OperationStatus.SUCCESS)
4963N/A {
6032N/A Message message =
6032N/A Message.raw(Category.JEB, Severity.SEVERE_ERROR,
4963N/A "Add of DN to DN cache failed.");
4963N/A throw new JebException(message);
4963N/A }
4963N/A }
4963N/A
4963N/A //Return true if the specified DN is in the DNs saved as a result of hash
4963N/A //collisions.
4963N/A private boolean isDNMatched(DatabaseEntry dns, byte[] dnBytes)
4963N/A {
5707N/A int pos = 0;
4963N/A byte[] bytes = dns.getData();
6032N/A while (pos < dns.getData().length)
4963N/A {
4963N/A int pLen = PackedInteger.getReadIntLength(bytes, pos);
6032N/A int len = PackedInteger.readInt(bytes, pos);
6032N/A if (indexComparator.compare(bytes, pos + pLen, len, dnBytes,
6032N/A dnBytes.length) == 0)
4963N/A {
4963N/A return true;
4963N/A }
4963N/A pos += pLen + len;
4963N/A }
4963N/A return false;
4963N/A }
4963N/A
4963N/A /**
4963N/A * Check if the specified DN is contained in the temporary DN cache.
4963N/A *
6032N/A * @param dn
6032N/A * A DN check for.
6032N/A * @return {@code true} if the specified DN is in the temporary DN cache, or
6032N/A * {@code false} if it is not.
4963N/A */
5707N/A @Override
4963N/A public boolean contains(DN dn)
4963N/A {
4963N/A boolean dnExists = false;
4963N/A Cursor cursor = null;
4963N/A DatabaseEntry key = new DatabaseEntry();
4963N/A byte[] dnBytes = StaticUtils.getBytes(dn.toNormalizedString());
4963N/A key.setData(hashCode(dnBytes));
6032N/A try
6032N/A {
4963N/A cursor = dnCache.openCursor(null, CursorConfig.DEFAULT);
4963N/A DatabaseEntry dns = new DatabaseEntry();
4963N/A OperationStatus status =
6032N/A cursor.getSearchKey(key, dns, LockMode.DEFAULT);
6032N/A if (status == OperationStatus.SUCCESS)
4963N/A {
4963N/A dnExists = isDNMatched(dns, dnBytes);
4963N/A }
4963N/A }
4963N/A finally
4963N/A {
6032N/A if (cursor != null)
4963N/A {
4963N/A cursor.close();
4963N/A }
4963N/A }
4963N/A return dnExists;
4963N/A }
4963N/A
4963N/A /**
4963N/A * Return temporary environment stats.
4963N/A *
6032N/A * @param statsConfig
6032N/A * A stats configuration instance.
4963N/A * @return Environment stats.
6032N/A * @throws DatabaseException
6032N/A * If an error occurs retrieving the stats.
4963N/A */
4963N/A public EnvironmentStats getEnvironmentStats(StatsConfig statsConfig)
6032N/A throws DatabaseException
4963N/A {
4963N/A return environment.getStats(statsConfig);
4963N/A }
4963N/A }
4963N/A
5207N/A /**
5207N/A * {@inheritDoc}
5207N/A */
5707N/A @Override
6032N/A public void diskLowThresholdReached(DiskSpaceMonitor monitor)
6032N/A {
5207N/A diskFullThresholdReached(monitor);
5207N/A }
5207N/A
5207N/A /**
5207N/A * {@inheritDoc}
5207N/A */
5707N/A @Override
6032N/A public void diskFullThresholdReached(DiskSpaceMonitor monitor)
6032N/A {
5207N/A isCanceled = true;
5207N/A Message msg;
6032N/A if (!isPhaseOneDone)
5207N/A {
6032N/A msg =
6032N/A ERR_IMPORT_LDIF_LACK_DISK_PHASE_ONE.get(monitor.getDirectory()
6032N/A .getPath(), monitor.getFreeSpace(), monitor.getLowThreshold());
5207N/A }
5207N/A else
5207N/A {
6032N/A msg =
6032N/A ERR_IMPORT_LDIF_LACK_DISK_PHASE_TWO.get(monitor.getDirectory()
6032N/A .getPath(), monitor.getFreeSpace(), monitor.getLowThreshold());
5207N/A }
5207N/A logError(msg);
5207N/A }
5207N/A
5207N/A /**
5207N/A * {@inheritDoc}
5207N/A */
5707N/A @Override
6032N/A public void diskSpaceRestored(DiskSpaceMonitor monitor)
6032N/A {
5207N/A // Do nothing.
5207N/A }
3339N/A}