Importer.java revision 5117
3339N/A/*
3339N/A * CDDL HEADER START
3339N/A *
3339N/A * The contents of this file are subject to the terms of the
3339N/A * Common Development and Distribution License, Version 1.0 only
3339N/A * (the "License"). You may not use this file except in compliance
3339N/A * with the License.
3339N/A *
3339N/A * You can obtain a copy of the license at
3339N/A * trunk/opends/resource/legal-notices/OpenDS.LICENSE
3339N/A * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
3339N/A * See the License for the specific language governing permissions
3339N/A * and limitations under the License.
3339N/A *
3339N/A * When distributing Covered Code, include this CDDL HEADER in each
3339N/A * file and include the License file at
3339N/A * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
3339N/A * add the following below this CDDL HEADER, with the fields enclosed
3339N/A * by brackets "[]" replaced with your own identifying information:
3339N/A * Portions Copyright [yyyy] [name of copyright owner]
3339N/A *
3339N/A * CDDL HEADER END
3339N/A *
3339N/A *
5007N/A * Copyright 2008-2010 Sun Microsystems, Inc.
3339N/A */
3339N/A
3339N/Apackage org.opends.server.backends.jeb.importLDIF;
3339N/A
4591N/Aimport static org.opends.messages.JebMessages.*;
4591N/Aimport static org.opends.server.loggers.ErrorLogger.*;
4591N/Aimport static org.opends.server.util.DynamicConstants.*;
4591N/Aimport static org.opends.server.util.ServerConstants.*;
4591N/Aimport java.io.*;
4591N/Aimport java.nio.*;
4591N/Aimport java.nio.channels.FileChannel;
4591N/Aimport java.util.*;
4591N/Aimport java.util.concurrent.*;
4591N/Aimport java.util.concurrent.atomic.*;
4591N/Aimport static org.opends.server.util.StaticUtils.getFileForPath;
4591N/Aimport org.opends.messages.Message;
4591N/Aimport org.opends.messages.Category;
4591N/Aimport org.opends.messages.Severity;
3339N/Aimport org.opends.server.admin.std.server.LocalDBBackendCfg;
4765N/Aimport org.opends.server.admin.std.server.LocalDBIndexCfg;
4765N/Aimport org.opends.server.admin.std.meta.LocalDBIndexCfgDefn;
4591N/Aimport org.opends.server.backends.jeb.*;
3339N/Aimport org.opends.server.config.ConfigException;
3339N/Aimport org.opends.server.core.DirectoryServer;
4591N/Aimport org.opends.server.types.*;
4591N/Aimport org.opends.server.util.*;
4591N/Aimport com.sleepycat.je.*;
4963N/Aimport com.sleepycat.util.PackedInteger;
3339N/A
3339N/A
3339N/A/**
4963N/A * This class provides the engine that performs both importing of LDIF files and
4963N/A * the rebuilding of indexes.
3339N/A */
4591N/Apublic class Importer
4591N/A{
4963N/A private static final int TIMER_INTERVAL = 10000;
4963N/A final static int KB = 1024;
4963N/A private static final int MB = (KB * KB);
4963N/A private static final String DEFAULT_TMP_DIR = "import-tmp";
4963N/A private static final String TMPENV_DIR = "tmp-env";
4963N/A
4963N/A //Defaults for DB cache.
4963N/A private static final int MAX_DB_CACHE_SIZE = 8 * MB;
4963N/A private static final int MAX_DB_LOG_SIZE = 10 * MB;
5007N/A private static final int MIN_DB_CACHE_SIZE = 4 * MB;
4963N/A
4963N/A //Defaults for LDIF reader buffers, min memory required to import and default
4963N/A //size for byte buffers.
5007N/A private static final int READER_WRITER_BUFFER_SIZE = 1 * MB;
5007N/A private static final int MIN_DB_CACHE_MEMORY = MAX_DB_CACHE_SIZE +
5007N/A MAX_DB_LOG_SIZE;
4963N/A private static final int BYTE_BUFFER_CAPACITY = 128;
4963N/A
4963N/A //Min and MAX sizes of phase one buffer.
5007N/A private static final int MAX_BUFFER_SIZE = 100 * MB;
5007N/A private static final int MIN_BUFFER_SIZE = 8 * KB;
4963N/A
4963N/A //Min size of phase two read-ahead cache.
5007N/A private static final int MIN_READ_AHEAD_CACHE_SIZE = 2 * KB;
4963N/A
4963N/A //Set aside this much for the JVM from free memory.
4963N/A private static final int JVM_MEM_PCT = 45;
4963N/A
4963N/A //Percent of import memory to use for temporary environment if the
4963N/A //skip DN validation flag isn't specified.
4963N/A private static final int TMPENV_MEM_PCT = 50;
4963N/A //Small heap threshold used to give more memory to JVM to attempt OOM errors.
4963N/A private static final int SMALL_HEAP_SIZE = 256 * MB;
4963N/A
4963N/A //The DN attribute type.
4643N/A private static AttributeType dnType;
4963N/A
4963N/A //Comparators for DN and indexes respectively.
4963N/A private static final IndexBuffer.DNComparator dnComparator
4643N/A = new IndexBuffer.DNComparator();
4643N/A private static final IndexBuffer.IndexComparator indexComparator =
4643N/A new IndexBuffer.IndexComparator();
3339N/A
4963N/A //Phase one buffer and imported entries counts.
4591N/A private final AtomicInteger bufferCount = new AtomicInteger(0);
4679N/A private final AtomicLong importCount = new AtomicLong(0);
4963N/A
4963N/A //Phase one buffer size in bytes.
4963N/A private int bufferSize;
4963N/A
4963N/A //Temp scratch directory.
4591N/A private final File tempDir;
4963N/A
4963N/A //Index and thread counts.
5007N/A private final int indexCount;
5007N/A private int threadCount;
4963N/A
4963N/A //Set to true when validation is skipped.
4643N/A private final boolean skipDNValidation;
4963N/A
4963N/A //Temporary environment used when DN validation is done in first phase.
4963N/A private final TmpEnv tmpEnv;
4963N/A
4963N/A //Root container.
4963N/A private RootContainer rootContainer;
4963N/A
4963N/A //Import configuration.
4649N/A private final LDIFImportConfig importConfiguration;
4963N/A
4963N/A //LDIF reader.
4591N/A private LDIFReader reader;
4963N/A
4963N/A //Migrated entry count.
4643N/A private int migratedCount;
4963N/A
5007N/A //Size in bytes of temporary env, DB cache, DB log buf size.
5007N/A private long tmpEnvCacheSize = 0, dbCacheSize = MAX_DB_CACHE_SIZE,
5007N/A dbLogBufSize = MAX_DB_LOG_SIZE;
4963N/A
4963N/A //The executor service used for the buffer sort tasks.
4963N/A private ExecutorService bufferSortService;
4963N/A
4963N/A //The executor service used for the scratch file processing tasks.
4963N/A private ExecutorService scratchFileWriterService;
3339N/A
4591N/A //Queue of free index buffers -- used to re-cycle index buffers;
4649N/A private final BlockingQueue<IndexBuffer> freeBufferQueue =
4591N/A new LinkedBlockingQueue<IndexBuffer>();
3339N/A
4643N/A //Map of index keys to index buffers. Used to allocate sorted
4591N/A //index buffers to a index writer thread.
4591N/A private final
4643N/A Map<IndexKey, BlockingQueue<IndexBuffer>> indexKeyQueMap =
4643N/A new ConcurrentHashMap<IndexKey, BlockingQueue<IndexBuffer>>();
3339N/A
4591N/A //Map of DB containers to index managers. Used to start phase 2.
4643N/A private final List<IndexManager> indexMgrList =
4643N/A new LinkedList<IndexManager>();
3339N/A
4963N/A //Map of DB containers to DN-based index managers. Used to start phase 2.
4963N/A private final List<IndexManager> DNIndexMgrList =
4963N/A new LinkedList<IndexManager>();
4963N/A
4591N/A //Futures used to indicate when the index file writers are done flushing
4591N/A //their work queues and have exited. End of phase one.
4963N/A private final List<Future<?>> scratchFileWriterFutures;
4963N/A
4963N/A //List of index file writer tasks. Used to signal stopScratchFileWriters to
4963N/A //the index file writer tasks when the LDIF file has been done.
4963N/A private final List<ScratchFileWriterTask> scratchFileWriterList;
3598N/A
4643N/A
4643N/A //Map of DNs to Suffix objects.
4591N/A private final Map<DN, Suffix> dnSuffixMap = new LinkedHashMap<DN, Suffix>();
3681N/A
4963N/A //Map of container ids to database containers.
4643N/A private final ConcurrentHashMap<Integer, DatabaseContainer> idContainerMap =
4643N/A new ConcurrentHashMap<Integer, DatabaseContainer>();
4643N/A
4963N/A //Map of container ids to entry containers
4643N/A private final ConcurrentHashMap<Integer, EntryContainer> idECMap =
4643N/A new ConcurrentHashMap<Integer, EntryContainer>();
4643N/A
4963N/A //Used to synchronize when a scratch file index writer is first setup.
4643N/A private final Object synObj = new Object();
4963N/A
4963N/A //Rebuld index manager used when rebuilding indexes.
4963N/A private final RebuildIndexManager rebuildManager;
4963N/A
4963N/A //Set to true if the backend was cleared.
4963N/A private boolean clearedBackend = false;
4963N/A
4963N/A //Used to shutdown import if an error occurs in phase one.
4963N/A private volatile boolean isPhaseOneCanceled = false;
4963N/A
4963N/A //Number of phase one buffers
4963N/A private int phaseOneBufferCount;
4765N/A
5007N/A
4765N/A static
4643N/A {
4643N/A if ((dnType = DirectoryServer.getAttributeType("dn")) == null)
4643N/A {
4643N/A dnType = DirectoryServer.getDefaultAttributeType("dn");
4643N/A }
4643N/A }
4643N/A
4963N/A //Rebuild-index instance.
4765N/A private
4765N/A Importer(RebuildConfig rebuildConfig, LocalDBBackendCfg cfg,
4765N/A EnvironmentConfig envConfig) throws IOException,
4765N/A InitializationException, JebException, ConfigException
4765N/A {
4963N/A importConfiguration = null;
4963N/A tmpEnv = null;
4963N/A threadCount = 1;
4963N/A rebuildManager = new RebuildIndexManager(rebuildConfig, cfg);
4765N/A indexCount = rebuildManager.getIndexCount();
4963N/A scratchFileWriterList = new ArrayList<ScratchFileWriterTask>(indexCount);
4963N/A scratchFileWriterFutures = new CopyOnWriteArrayList<Future<?>>();
4765N/A File parentDir;
4765N/A if(rebuildConfig.getTmpDirectory() == null)
4765N/A {
4963N/A parentDir = getFileForPath(DEFAULT_TMP_DIR);
4765N/A }
4765N/A else
4765N/A {
4765N/A parentDir = getFileForPath(rebuildConfig.getTmpDirectory());
4765N/A }
4765N/A tempDir = new File(parentDir, cfg.getBackendId());
5117N/A recursiveDelete(tempDir);
4765N/A if(!tempDir.exists() && !tempDir.mkdirs())
4765N/A {
4765N/A Message message =
4765N/A ERR_JEB_IMPORT_CREATE_TMPDIR_ERROR.get(String.valueOf(tempDir));
4765N/A throw new IOException(message.toString());
4765N/A }
4765N/A skipDNValidation = true;
4765N/A if(envConfig != null)
4765N/A {
4765N/A initializeDBEnv(envConfig);
4765N/A }
4765N/A }
4765N/A
4765N/A
3339N/A /**
3339N/A * Create a new import job with the specified ldif import config.
3339N/A *
4649N/A * @param importConfiguration The LDIF import configuration.
4765N/A * @param localDBBackendCfg The local DB back-end configuration.
4765N/A * @param envConfig The JEB environment config.
4591N/A * @throws IOException If a problem occurs while opening the LDIF file for
4591N/A * reading.
4649N/A * @throws InitializationException If a problem occurs during initialization.
3339N/A */
4765N/A private Importer(LDIFImportConfig importConfiguration,
4765N/A LocalDBBackendCfg localDBBackendCfg,
4765N/A EnvironmentConfig envConfig) throws IOException,
4963N/A InitializationException, DatabaseException
4591N/A {
4963N/A rebuildManager = null;
4649N/A this.importConfiguration = importConfiguration;
4649N/A if(importConfiguration.getThreadCount() == 0)
4643N/A {
4643N/A threadCount = Runtime.getRuntime().availableProcessors() * 2;
4643N/A }
4643N/A else
4643N/A {
4649N/A threadCount = importConfiguration.getThreadCount();
4643N/A }
4765N/A indexCount = localDBBackendCfg.listLocalDBIndexes().length + 2;
4963N/A if(!importConfiguration.appendToExistingData()) {
4963N/A if(importConfiguration.clearBackend() ||
4963N/A localDBBackendCfg.getBaseDN().size() <= 1) {
4963N/A clearedBackend = true;
4963N/A }
4963N/A }
4963N/A scratchFileWriterList = new ArrayList<ScratchFileWriterTask>(indexCount);
4963N/A scratchFileWriterFutures = new CopyOnWriteArrayList<Future<?>>();
4591N/A File parentDir;
4649N/A if(importConfiguration.getTmpDirectory() == null)
4591N/A {
4963N/A parentDir = getFileForPath(DEFAULT_TMP_DIR);
3339N/A }
4591N/A else
4591N/A {
4649N/A parentDir = getFileForPath(importConfiguration.getTmpDirectory());
4591N/A }
4765N/A tempDir = new File(parentDir, localDBBackendCfg.getBackendId());
5117N/A recursiveDelete(tempDir);
4591N/A if(!tempDir.exists() && !tempDir.mkdirs())
4591N/A {
4649N/A Message message =
4649N/A ERR_JEB_IMPORT_CREATE_TMPDIR_ERROR.get(String.valueOf(tempDir));
4649N/A throw new IOException(message.toString());
4591N/A }
4649N/A skipDNValidation = importConfiguration.getSkipDNValidation();
4963N/A initializeDBEnv(envConfig);
4963N/A //Set up temporary environment.
4963N/A if(!skipDNValidation)
3339N/A {
5117N/A File envPath = new File(tempDir, TMPENV_DIR);
4963N/A envPath.mkdirs();
4963N/A tmpEnv = new TmpEnv(envPath);
3339N/A }
4591N/A else
4591N/A {
4963N/A tmpEnv = null;
3339N/A }
3339N/A }
3339N/A
4963N/A
4765N/A /**
4765N/A * Return and import LDIF instance using the specified arguments.
4765N/A *
4765N/A * @param importCfg The import config to use.
4765N/A * @param localDBBackendCfg The local DB backend config to use.
4765N/A * @param envCfg The JEB environment config to use.
4765N/A * @return A import LDIF instance.
4765N/A *
4765N/A * @throws IOException If an I/O error occurs.
4765N/A * @throws InitializationException If the instance cannot be initialized.
4765N/A */
4765N/A public static
4765N/A Importer getInstance(LDIFImportConfig importCfg,
4765N/A LocalDBBackendCfg localDBBackendCfg,
4765N/A EnvironmentConfig envCfg)
4765N/A throws IOException, InitializationException
4765N/A {
4765N/A return new Importer(importCfg, localDBBackendCfg, envCfg);
4765N/A }
4765N/A
4765N/A
4765N/A /**
4765N/A * Return an import rebuild index instance using the specified arguments.
4765N/A *
4765N/A * @param rebuildCfg The rebuild config to use.
4765N/A * @param localDBBackendCfg The local DB backend config to use.
4765N/A * @param envCfg The JEB environment config to use.
4765N/A * @return An import rebuild index instance.
4765N/A *
4765N/A * @throws IOException If an I/O error occurs.
4765N/A * @throws InitializationException If the instance cannot be initialized.
4765N/A * @throws JebException If a JEB exception occurs.
4765N/A * @throws ConfigException If the instance cannot be configured.
4765N/A */
4765N/A public static synchronized
4765N/A Importer getInstance(RebuildConfig rebuildCfg,
4765N/A LocalDBBackendCfg localDBBackendCfg,
4765N/A EnvironmentConfig envCfg)
4765N/A throws IOException, InitializationException, JebException, ConfigException
4765N/A {
4765N/A return new Importer(rebuildCfg, localDBBackendCfg, envCfg);
4765N/A }
4765N/A
4963N/A
5007N/A private void adjustBufferSize(long availMem)
5007N/A {
5007N/A int oldThreadCount = threadCount;
5007N/A for(;threadCount > 0; threadCount--)
5007N/A {
5007N/A phaseOneBufferCount = 2 * (indexCount * threadCount);
5007N/A bufferSize = (int) (availMem / phaseOneBufferCount);
5007N/A if(bufferSize >= MIN_BUFFER_SIZE)
5007N/A {
5007N/A break;
5007N/A }
5007N/A }
5007N/A Message message =
5007N/A NOTE_JEB_IMPORT_ADJUST_THREAD_COUNT.get(oldThreadCount, threadCount);
5007N/A logError(message);
5007N/A }
5007N/A
5007N/A
4963N/A private boolean getBufferSizes(long availMem)
4591N/A {
4963N/A boolean maxBuf = false;
5007N/A bufferSize = (int) (availMem/ phaseOneBufferCount);
4591N/A if(bufferSize >= MIN_BUFFER_SIZE)
3615N/A {
4591N/A if(bufferSize > MAX_BUFFER_SIZE)
3339N/A {
4591N/A bufferSize = MAX_BUFFER_SIZE;
4963N/A maxBuf = true;
3339N/A }
3339N/A }
4963N/A else if(bufferSize < MIN_BUFFER_SIZE)
3339N/A {
5007N/A adjustBufferSize(availMem);
3339N/A }
4963N/A return maxBuf;
3339N/A }
3339N/A
4963N/A
3339N/A /**
4591N/A * Return the suffix instance in the specified map that matches the specified
4591N/A * DN.
4591N/A *
4591N/A * @param dn The DN to search for.
4591N/A * @param map The map to search.
4591N/A * @return The suffix instance that matches the DN, or null if no match is
4591N/A * found.
4591N/A */
4591N/A public static Suffix getMatchSuffix(DN dn, Map<DN, Suffix> map)
4591N/A {
4591N/A Suffix suffix = null;
4591N/A DN nodeDN = dn;
4591N/A
4591N/A while (suffix == null && nodeDN != null) {
4591N/A suffix = map.get(nodeDN);
4591N/A if (suffix == null)
4591N/A {
4591N/A nodeDN = nodeDN.getParentDNInSuffix();
4591N/A }
4591N/A }
4591N/A return suffix;
4591N/A }
4591N/A
4963N/A
4963N/A private long getTmpEnvironmentMemory(long availableMemoryImport)
4963N/A {
4963N/A int tmpMemPct = TMPENV_MEM_PCT;
4963N/A tmpEnvCacheSize = (availableMemoryImport * tmpMemPct) / 100;
4963N/A availableMemoryImport -= tmpEnvCacheSize;
4963N/A if(!clearedBackend)
4963N/A {
4963N/A long additionalDBCache = (tmpEnvCacheSize * 85) / 100;
4963N/A tmpEnvCacheSize -= additionalDBCache;
4963N/A dbCacheSize += additionalDBCache;
4963N/A }
4963N/A return availableMemoryImport;
4963N/A }
4963N/A
4963N/A
4963N/A //Used for large heap sizes when the buffer max size has been identified. Any
4963N/A //extra memory can be given to the temporary environment in that case.
4963N/A private void adjustTmpEnvironmentMemory(long availableMemoryImport)
4963N/A {
4963N/A long additionalMem = availableMemoryImport -
5007N/A (phaseOneBufferCount * MAX_BUFFER_SIZE);
5007N/A if(additionalMem > 0)
4963N/A {
5007N/A tmpEnvCacheSize += additionalMem;
5007N/A if(!clearedBackend)
5007N/A {
5007N/A //The DN cache probably needs to be smaller and the DB cache bigger
5007N/A //because the dn2id is checked if the backend has not been cleared.
5007N/A long additionalDBCache = (tmpEnvCacheSize * 85) / 100;
5007N/A tmpEnvCacheSize -= additionalDBCache;
5007N/A dbCacheSize += additionalDBCache;
5007N/A }
4963N/A }
4963N/A }
4963N/A
4963N/A
5007N/A private long defaultMemoryCalc(long availMem)
5007N/A throws InitializationException
5007N/A {
5117N/A long bufMem;
5007N/A if(availMem < (MIN_DB_CACHE_MEMORY + MIN_DB_CACHE_SIZE))
5007N/A {
5007N/A long minCacheSize = MIN_DB_CACHE_SIZE;
5007N/A if (System.getProperty(PROPERTY_RUNNING_UNIT_TESTS) != null) {
5007N/A minCacheSize = 500 *KB;
5007N/A }
5007N/A dbCacheSize = minCacheSize;
5007N/A tmpEnvCacheSize = minCacheSize;
5007N/A dbLogBufSize = 0;
5007N/A bufMem = availMem - 2 * minCacheSize;
5007N/A if(bufMem < 0 || (bufMem < (2 * indexCount) * MIN_BUFFER_SIZE)) {
5007N/A Message message =
5007N/A ERR_IMPORT_LDIF_LACK_MEM.get(availMem,
5007N/A ((2 * indexCount) * MIN_BUFFER_SIZE) + 2 * MIN_DB_CACHE_SIZE);
5007N/A throw new InitializationException(message);
5007N/A }
5007N/A }
5007N/A else
5007N/A {
5007N/A bufMem = getTmpEnvironmentMemory(availMem);
5007N/A }
5007N/A return bufMem;
5007N/A }
5007N/A
5007N/A private long skipDNValidationCalc(long availMem)
5007N/A throws InitializationException
5007N/A {
5007N/A long bufMem = availMem;
5007N/A if(availMem < (MIN_DB_CACHE_MEMORY))
5007N/A {
5007N/A long minCacheSize = MIN_DB_CACHE_SIZE;
5007N/A if (System.getProperty(PROPERTY_RUNNING_UNIT_TESTS) != null) {
5007N/A minCacheSize = 500 *KB;
5007N/A }
5007N/A dbCacheSize = minCacheSize;
5007N/A dbLogBufSize = 0;
5007N/A bufMem = availMem - minCacheSize;
5007N/A if(bufMem < 0 || (bufMem < (2 * indexCount) * MIN_BUFFER_SIZE)) {
5007N/A Message message =
5007N/A ERR_IMPORT_LDIF_LACK_MEM.get(availMem,
5007N/A ((2 * indexCount) * MIN_BUFFER_SIZE) + MIN_DB_CACHE_SIZE);
5007N/A throw new InitializationException(message);
5007N/A }
5007N/A }
5007N/A return bufMem;
5007N/A }
5007N/A
4591N/A /**
4591N/A * Calculate buffer sizes and initialize JEB properties based on memory.
4591N/A *
4591N/A * @param envConfig The environment config to use in the calculations.
3339N/A *
4591N/A * @throws InitializationException If a problem occurs during calculation.
4591N/A */
4765N/A private void initializeDBEnv(EnvironmentConfig envConfig)
4591N/A throws InitializationException
4591N/A {
4649N/A Message message;
4963N/A phaseOneBufferCount = 2 * (indexCount * threadCount);
4649N/A Runtime runTime = Runtime.getRuntime();
4963N/A long totFreeMemory = runTime.freeMemory() +
4963N/A (runTime.maxMemory() - runTime.totalMemory());
4963N/A int importMemPct = (100 - JVM_MEM_PCT);
4963N/A if(totFreeMemory <= SMALL_HEAP_SIZE)
4963N/A {
4963N/A importMemPct -= 15;
4963N/A }
4963N/A if(rebuildManager != null)
4963N/A {
4963N/A importMemPct -= 15;
4963N/A }
5117N/A long phaseOneBufferMemory;
4963N/A if(!skipDNValidation)
4963N/A {
5007N/A phaseOneBufferMemory =
5007N/A defaultMemoryCalc((totFreeMemory * importMemPct) / 100);
4963N/A }
5007N/A else
5007N/A {
5007N/A phaseOneBufferMemory =
5007N/A skipDNValidationCalc((totFreeMemory * importMemPct) / 100);
5007N/A }
5007N/A boolean maxBuffers = getBufferSizes(phaseOneBufferMemory);
5007N/A //Give any extra memory to the temp environment cache if there is any.
4963N/A if(!skipDNValidation && maxBuffers)
4963N/A {
5007N/A adjustTmpEnvironmentMemory(phaseOneBufferMemory);
4963N/A }
5007N/A message = NOTE_JEB_IMPORT_LDIF_TOT_MEM_BUF.get(phaseOneBufferMemory,
4963N/A phaseOneBufferCount);
4649N/A logError(message);
4963N/A if(tmpEnvCacheSize > 0)
4649N/A {
4963N/A message = NOTE_JEB_IMPORT_LDIF_TMP_ENV_MEM.get(tmpEnvCacheSize);
4963N/A logError(message);
4649N/A }
4963N/A envConfig.setConfigParam(EnvironmentConfig.ENV_RUN_CLEANER, "true");
4963N/A envConfig.setConfigParam(EnvironmentConfig.MAX_MEMORY,
5007N/A Long.toString(dbCacheSize));
4963N/A message = NOTE_JEB_IMPORT_LDIF_DB_MEM_BUF_INFO.get(dbCacheSize,
5007N/A bufferSize);
4963N/A logError(message);
5007N/A if(dbLogBufSize > 0)
5007N/A {
5007N/A envConfig.setConfigParam(EnvironmentConfig.LOG_TOTAL_BUFFER_BYTES,
5007N/A Long.toString(MAX_DB_LOG_SIZE));
5007N/A message = NOTE_JEB_IMPORT_LDIF_LOG_BYTES.get(MAX_DB_LOG_SIZE);
5007N/A logError(message);
5007N/A }
4591N/A }
4591N/A
4591N/A
4963N/A private void initializeIndexBuffers()
4591N/A {
4963N/A for(int i = 0; i < phaseOneBufferCount; i++)
4591N/A {
4591N/A IndexBuffer b = IndexBuffer.createIndexBuffer(bufferSize);
4649N/A freeBufferQueue.add(b);
4591N/A }
4591N/A }
4591N/A
4591N/A
4649N/A private void initializeSuffixes() throws DatabaseException, JebException,
4643N/A ConfigException, InitializationException
4591N/A {
4643N/A for(EntryContainer ec : rootContainer.getEntryContainers())
4643N/A {
4643N/A Suffix suffix = getSuffix(ec);
4643N/A if(suffix != null)
4643N/A {
4643N/A dnSuffixMap.put(ec.getBaseDN(), suffix);
4963N/A generateIndexID(suffix);
4963N/A }
4963N/A }
4963N/A }
4963N/A
4963N/A
4963N/A //Mainly used to support multiple suffixes. Each index in each suffix gets
4963N/A //an unique ID to identify which DB it needs to go to in phase two processing.
4963N/A private void generateIndexID(Suffix suffix)
4963N/A {
4963N/A for(Map.Entry<AttributeType, AttributeIndex> mapEntry :
4963N/A suffix.getAttrIndexMap().entrySet()) {
4963N/A AttributeIndex attributeIndex = mapEntry.getValue();
4963N/A DatabaseContainer container;
4963N/A if((container=attributeIndex.getEqualityIndex()) != null) {
4963N/A int id = System.identityHashCode(container);
4963N/A idContainerMap.putIfAbsent(id, container);
4963N/A }
4963N/A if((container=attributeIndex.getPresenceIndex()) != null) {
4963N/A int id = System.identityHashCode(container);
4963N/A idContainerMap.putIfAbsent(id, container);
4963N/A }
4963N/A if((container=attributeIndex.getSubstringIndex()) != null) {
4963N/A int id = System.identityHashCode(container);
4963N/A idContainerMap.putIfAbsent(id, container);
4963N/A }
4963N/A if((container=attributeIndex.getOrderingIndex()) != null) {
4963N/A int id = System.identityHashCode(container);
4963N/A idContainerMap.putIfAbsent(id, container);
4963N/A }
4963N/A if((container=attributeIndex.getApproximateIndex()) != null) {
4963N/A int id = System.identityHashCode(container);
4963N/A idContainerMap.putIfAbsent(id, container);
4963N/A }
4963N/A Map<String,Collection<Index>> extensibleMap =
4963N/A attributeIndex.getExtensibleIndexes();
4963N/A if(!extensibleMap.isEmpty()) {
4963N/A Collection<Index> subIndexes =
4963N/A attributeIndex.getExtensibleIndexes().get(
4963N/A EXTENSIBLE_INDEXER_ID_SUBSTRING);
4963N/A if(subIndexes != null) {
4963N/A for(DatabaseContainer subIndex : subIndexes) {
4963N/A int id = System.identityHashCode(subIndex);
4963N/A idContainerMap.putIfAbsent(id, subIndex);
4963N/A }
4963N/A }
4963N/A Collection<Index> sharedIndexes =
4963N/A attributeIndex.getExtensibleIndexes().get(
4963N/A EXTENSIBLE_INDEXER_ID_SHARED);
4963N/A if(sharedIndexes !=null) {
4963N/A for(DatabaseContainer sharedIndex : sharedIndexes) {
4963N/A int id = System.identityHashCode(sharedIndex);
4963N/A idContainerMap.putIfAbsent(id, sharedIndex);
4963N/A }
4963N/A }
4643N/A }
4643N/A }
4591N/A }
4591N/A
4591N/A
4643N/A private Suffix getSuffix(EntryContainer entryContainer)
4643N/A throws DatabaseException, JebException, ConfigException,
4643N/A InitializationException {
4643N/A DN baseDN = entryContainer.getBaseDN();
4649N/A EntryContainer sourceEntryContainer = null;
4643N/A List<DN> includeBranches = new ArrayList<DN>();
4643N/A List<DN> excludeBranches = new ArrayList<DN>();
4643N/A
4649N/A if(!importConfiguration.appendToExistingData() &&
4649N/A !importConfiguration.clearBackend())
4643N/A {
4649N/A for(DN dn : importConfiguration.getExcludeBranches())
4643N/A {
4643N/A if(baseDN.equals(dn))
4643N/A {
4643N/A // This entire base DN was explicitly excluded. Skip.
4643N/A return null;
4643N/A }
4643N/A if(baseDN.isAncestorOf(dn))
4643N/A {
4643N/A excludeBranches.add(dn);
4643N/A }
4643N/A }
4643N/A
4649N/A if(!importConfiguration.getIncludeBranches().isEmpty())
4643N/A {
4649N/A for(DN dn : importConfiguration.getIncludeBranches())
4643N/A {
4643N/A if(baseDN.isAncestorOf(dn))
4643N/A {
4643N/A includeBranches.add(dn);
4643N/A }
4643N/A }
4643N/A
4643N/A if(includeBranches.isEmpty())
4643N/A {
4649N/A /*
4649N/A There are no branches in the explicitly defined include list under
4649N/A this base DN. Skip this base DN all together.
4649N/A */
4643N/A
4649N/A return null;
4643N/A }
4643N/A
4643N/A // Remove any overlapping include branches.
4643N/A Iterator<DN> includeBranchIterator = includeBranches.iterator();
4643N/A while(includeBranchIterator.hasNext())
4643N/A {
4643N/A DN includeDN = includeBranchIterator.next();
4643N/A boolean keep = true;
4643N/A for(DN dn : includeBranches)
4643N/A {
4643N/A if(!dn.equals(includeDN) && dn.isAncestorOf(includeDN))
4643N/A {
4643N/A keep = false;
4643N/A break;
4643N/A }
4643N/A }
4643N/A if(!keep)
4643N/A {
4643N/A includeBranchIterator.remove();
4643N/A }
4643N/A }
4643N/A
4649N/A // Remove any exclude branches that are not are not under a include
4643N/A // branch since they will be migrated as part of the existing entries
4643N/A // outside of the include branches anyways.
4643N/A Iterator<DN> excludeBranchIterator = excludeBranches.iterator();
4643N/A while(excludeBranchIterator.hasNext())
4643N/A {
4643N/A DN excludeDN = excludeBranchIterator.next();
4643N/A boolean keep = false;
4643N/A for(DN includeDN : includeBranches)
4643N/A {
4643N/A if(includeDN.isAncestorOf(excludeDN))
4643N/A {
4643N/A keep = true;
4643N/A break;
4643N/A }
4643N/A }
4643N/A if(!keep)
4643N/A {
4643N/A excludeBranchIterator.remove();
4643N/A }
4643N/A }
4643N/A
4643N/A if(includeBranches.size() == 1 && excludeBranches.size() == 0 &&
4643N/A includeBranches.get(0).equals(baseDN))
4643N/A {
4643N/A // This entire base DN is explicitly included in the import with
4643N/A // no exclude branches that we need to migrate. Just clear the entry
4643N/A // container.
4643N/A entryContainer.lock();
4643N/A entryContainer.clear();
4643N/A entryContainer.unlock();
4643N/A }
4643N/A else
4643N/A {
4643N/A // Create a temp entry container
4649N/A sourceEntryContainer = entryContainer;
4643N/A entryContainer =
4643N/A rootContainer.openEntryContainer(baseDN,
4643N/A baseDN.toNormalizedString() +
4643N/A "_importTmp");
4643N/A }
4643N/A }
4643N/A }
4649N/A return Suffix.createSuffixContext(entryContainer, sourceEntryContainer,
4643N/A includeBranches, excludeBranches);
4643N/A }
4643N/A
4643N/A
4765N/A /**
4765N/A * Rebuild the indexes using the specified rootcontainer.
4765N/A *
4765N/A * @param rootContainer The rootcontainer to rebuild indexes in.
4765N/A *
4765N/A * @throws ConfigException If a configuration error occurred.
4765N/A * @throws InitializationException If an initialization error occurred.
4765N/A * @throws IOException If an IO error occurred.
4765N/A * @throws JebException If the JEB database had an error.
4765N/A * @throws DatabaseException If a database error occurred.
4765N/A * @throws InterruptedException If an interrupted error occurred.
4765N/A * @throws ExecutionException If an execution error occurred.
4765N/A */
4765N/A public void
4765N/A rebuildIndexes(RootContainer rootContainer) throws ConfigException,
4765N/A InitializationException, IOException, JebException, DatabaseException,
4765N/A InterruptedException, ExecutionException
4765N/A {
4765N/A this.rootContainer = rootContainer;
4765N/A long startTime = System.currentTimeMillis();
4765N/A rebuildManager.initialize();
4765N/A rebuildManager.printStartMessage();
4765N/A rebuildManager.rebuldIndexes();
5117N/A recursiveDelete(tempDir);
4765N/A rebuildManager.printStopMessage(startTime);
4765N/A }
4765N/A
4591N/A
4591N/A /**
4649N/A * Import a LDIF using the specified root container.
4591N/A *
4591N/A * @param rootContainer The root container to use during the import.
4591N/A *
4591N/A * @return A LDIF result.
4591N/A * @throws ConfigException If the import failed because of an configuration
4591N/A * error.
4591N/A * @throws IOException If the import failed because of an IO error.
4591N/A * @throws InitializationException If the import failed because of an
4591N/A * initialization error.
4591N/A * @throws JebException If the import failed due to a database error.
4591N/A * @throws InterruptedException If the import failed due to an interrupted
4591N/A * error.
4591N/A * @throws ExecutionException If the import failed due to an execution error.
4643N/A * @throws DatabaseException If the import failed due to a database error.
3339N/A */
4591N/A public LDIFImportResult
4591N/A processImport(RootContainer rootContainer) throws ConfigException,
4643N/A InitializationException, IOException, JebException, DatabaseException,
4591N/A InterruptedException, ExecutionException
4591N/A {
4801N/A this.rootContainer = rootContainer;
4963N/A reader = new LDIFReader(importConfiguration, rootContainer,
4963N/A READER_WRITER_BUFFER_SIZE);
4660N/A try
4660N/A {
4660N/A Message message =
4660N/A NOTE_JEB_IMPORT_STARTING.get(DirectoryServer.getVersionString(),
4660N/A BUILD_ID, REVISION_NUMBER);
4660N/A logError(message);
4660N/A message = NOTE_JEB_IMPORT_THREAD_COUNT.get(threadCount);
4660N/A logError(message);
4660N/A initializeSuffixes();
4660N/A long startTime = System.currentTimeMillis();
4963N/A phaseOne();
4963N/A long phaseOneFinishTime = System.currentTimeMillis();
4963N/A if(!skipDNValidation)
4963N/A {
4963N/A tmpEnv.shutdown();
4963N/A }
4963N/A if(isPhaseOneCanceled)
4963N/A {
4963N/A throw new InterruptedException("Import processing canceled.");
4963N/A }
4963N/A long phaseTwoTime = System.currentTimeMillis();
4963N/A phaseTwo();
4963N/A long phaseTwoFinishTime = System.currentTimeMillis();
4660N/A setIndexesTrusted();
4660N/A switchContainers();
5117N/A recursiveDelete(tempDir);
4660N/A long finishTime = System.currentTimeMillis();
4660N/A long importTime = (finishTime - startTime);
4660N/A float rate = 0;
4966N/A message = NOTE_JEB_IMPORT_PHASE_STATS.get(importTime/1000,
4966N/A (phaseOneFinishTime - startTime)/1000,
4966N/A (phaseTwoFinishTime - phaseTwoTime)/1000);
4963N/A logError(message);
4660N/A if (importTime > 0)
4660N/A rate = 1000f * reader.getEntriesRead() / importTime;
4765N/A message = NOTE_JEB_IMPORT_FINAL_STATUS.get(reader.getEntriesRead(),
4765N/A importCount.get(), reader.getEntriesIgnored(),
4765N/A reader.getEntriesRejected(), migratedCount,
4765N/A importTime / 1000, rate);
4660N/A logError(message);
4649N/A }
4643N/A finally
4643N/A {
4643N/A reader.close();
4643N/A }
4591N/A return new LDIFImportResult(reader.getEntriesRead(), reader
4591N/A .getEntriesRejected(), reader.getEntriesIgnored());
4591N/A }
4591N/A
4591N/A
5117N/A private void recursiveDelete(File dir)
5117N/A {
5117N/A if(dir.listFiles() != null)
5117N/A {
5117N/A for(File f : dir.listFiles())
5117N/A {
5117N/A if(f.isDirectory())
5117N/A {
5117N/A recursiveDelete(f);
5117N/A }
5117N/A f.delete();
5117N/A }
5117N/A }
5117N/A dir.delete();
5117N/A }
5117N/A
5117N/A
4813N/A private void switchContainers()
4813N/A throws DatabaseException, JebException, InitializationException
4813N/A {
4643N/A
4643N/A for(Suffix suffix : dnSuffixMap.values()) {
4643N/A DN baseDN = suffix.getBaseDN();
4649N/A EntryContainer entryContainer =
4643N/A suffix.getSrcEntryContainer();
4649N/A if(entryContainer != null) {
4649N/A EntryContainer needRegisterContainer =
4643N/A rootContainer.unregisterEntryContainer(baseDN);
5117N/A
4649N/A needRegisterContainer.lock();
4649N/A needRegisterContainer.close();
4649N/A needRegisterContainer.delete();
4649N/A needRegisterContainer.unlock();
4643N/A EntryContainer newEC = suffix.getEntryContainer();
4643N/A newEC.lock();
4643N/A newEC.setDatabasePrefix(baseDN.toNormalizedString());
4643N/A newEC.unlock();
4643N/A rootContainer.registerEntryContainer(baseDN, newEC);
4643N/A }
4643N/A }
4643N/A }
4643N/A
4643N/A
4643N/A
4591N/A private void setIndexesTrusted() throws JebException
4591N/A {
4591N/A try {
4591N/A for(Suffix s : dnSuffixMap.values()) {
4591N/A s.setIndexesTrusted();
4591N/A }
4591N/A }
4591N/A catch (DatabaseException ex)
4591N/A {
4649N/A Message message =
4649N/A NOTE_JEB_IMPORT_LDIF_TRUSTED_FAILED.get(ex.getMessage());
4649N/A throw new JebException(message);
4591N/A }
4591N/A }
4591N/A
4591N/A
4963N/A private void phaseOne() throws InterruptedException, ExecutionException
4591N/A {
4963N/A initializeIndexBuffers();
4591N/A FirstPhaseProgressTask progressTask = new FirstPhaseProgressTask();
4591N/A Timer timer = new Timer();
4591N/A timer.scheduleAtFixedRate(progressTask, TIMER_INTERVAL, TIMER_INTERVAL);
4963N/A scratchFileWriterService = Executors.newFixedThreadPool(2 * indexCount);
4963N/A bufferSortService = Executors.newFixedThreadPool(threadCount);
4643N/A ExecutorService execService = Executors.newFixedThreadPool(threadCount);
4643N/A List<Callable<Void>> tasks = new ArrayList<Callable<Void>>(threadCount);
4643N/A tasks.add(new MigrateExistingTask());
4643N/A List<Future<Void>> results = execService.invokeAll(tasks);
4765N/A for (Future<Void> result : results) {
4765N/A if(!result.isDone()) {
4765N/A result.get();
4765N/A }
4765N/A }
4643N/A tasks.clear();
4643N/A results.clear();
4649N/A if (importConfiguration.appendToExistingData() &&
4649N/A importConfiguration.replaceExistingEntries())
4591N/A {
4643N/A for (int i = 0; i < threadCount; i++)
4643N/A {
4643N/A tasks.add(new AppendReplaceTask());
4643N/A }
4643N/A }
4643N/A else
4643N/A {
4643N/A for (int i = 0; i < threadCount; i++)
4643N/A {
4643N/A tasks.add(new ImportTask());
4643N/A }
4591N/A }
4643N/A results = execService.invokeAll(tasks);
4643N/A for (Future<Void> result : results)
4765N/A if(!result.isDone()) {
4765N/A result.get();
4765N/A }
4643N/A tasks.clear();
4643N/A results.clear();
4643N/A tasks.add(new MigrateExcludedTask());
4643N/A results = execService.invokeAll(tasks);
4643N/A for (Future<Void> result : results)
4765N/A if(!result.isDone()) {
4765N/A result.get();
4765N/A }
4963N/A stopScratchFileWriters();
4963N/A for (Future<?> result : scratchFileWriterFutures)
4591N/A {
4765N/A if(!result.isDone()) {
4765N/A result.get();
4765N/A }
4591N/A }
4963N/A //Try to clear as much memory as possible.
4963N/A scratchFileWriterList.clear();
4963N/A scratchFileWriterFutures.clear();
4718N/A indexKeyQueMap.clear();
4591N/A execService.shutdown();
4649N/A freeBufferQueue.clear();
4963N/A bufferSortService.shutdown();
4963N/A scratchFileWriterService.shutdown();
4591N/A timer.cancel();
4591N/A }
4591N/A
4591N/A
3339N/A
4963N/A private void phaseTwo() throws InterruptedException, JebException,
4963N/A ExecutionException
4591N/A {
4591N/A SecondPhaseProgressTask progress2Task =
4963N/A new SecondPhaseProgressTask(reader.getEntriesRead());
4591N/A Timer timer2 = new Timer();
4591N/A timer2.scheduleAtFixedRate(progress2Task, TIMER_INTERVAL, TIMER_INTERVAL);
4591N/A processIndexFiles();
4591N/A timer2.cancel();
4591N/A }
4591N/A
4591N/A
4963N/A private int getBufferCount(int dbThreads)
4591N/A {
4963N/A int buffers = 0;
4966N/A
4966N/A List<IndexManager> totList = new LinkedList<IndexManager>(DNIndexMgrList);
4966N/A totList.addAll(indexMgrList);
4966N/A Collections.sort(totList, Collections.reverseOrder());
4967N/A int limit = Math.min(dbThreads, totList.size());
4967N/A for(int i = 0; i < limit; i ++)
4963N/A {
4966N/A buffers += totList.get(i).getBufferList().size();
4963N/A }
4963N/A return buffers;
4963N/A }
4963N/A
4963N/A
4963N/A private void processIndexFiles() throws InterruptedException,
4963N/A JebException, ExecutionException
4963N/A {
4963N/A ExecutorService dbService;
4591N/A if(bufferCount.get() == 0)
4591N/A {
4591N/A return;
4591N/A }
4963N/A int dbThreads = Runtime.getRuntime().availableProcessors();
4963N/A if(dbThreads < 4)
4591N/A {
4963N/A dbThreads = 4;
4591N/A }
4963N/A int readAheadSize = cacheSizeFromFreeMemory(getBufferCount(dbThreads));
4963N/A List<Future<Void>> futures = new LinkedList<Future<Void>>();
4963N/A dbService = Executors.newFixedThreadPool(dbThreads);
4963N/A //Start DN processing first.
4963N/A for(IndexManager dnMgr : DNIndexMgrList)
4963N/A {
4963N/A futures.add(dbService.submit(new IndexDBWriteTask(dnMgr, readAheadSize)));
4963N/A }
4963N/A for(IndexManager mgr : indexMgrList)
4591N/A {
4963N/A futures.add(dbService.submit(new IndexDBWriteTask(mgr, readAheadSize)));
4963N/A }
4963N/A for (Future<Void> result : futures)
4963N/A if(!result.isDone()) {
4963N/A result.get();
3339N/A }
4963N/A dbService.shutdown();
4963N/A }
4963N/A
4963N/A
4963N/A private int cacheSizeFromFreeMemory(int buffers)
4963N/A {
4963N/A Runtime runTime = Runtime.getRuntime();
4963N/A runTime.gc();
4963N/A runTime.gc();
4963N/A long freeMemory = runTime.freeMemory();
4963N/A long maxMemory = runTime.maxMemory();
4963N/A long totMemory = runTime.totalMemory();
4963N/A long totFreeMemory = (freeMemory + (maxMemory - totMemory));
4963N/A int importMemPct = (100 - JVM_MEM_PCT);
4963N/A //For very small heaps, give more memory to the JVM.
4963N/A if(totFreeMemory <= SMALL_HEAP_SIZE)
4963N/A {
4963N/A importMemPct -= 35;
4591N/A }
4963N/A long availableMemory = (totFreeMemory * importMemPct) / 100;
4963N/A int averageBufferSize = (int)(availableMemory /buffers);
4963N/A int cacheSize = Math.max(MIN_READ_AHEAD_CACHE_SIZE, averageBufferSize);
4963N/A //Cache size is never larger than the buffer size.
4591N/A if(cacheSize > bufferSize)
4591N/A {
4591N/A cacheSize = bufferSize;
4591N/A }
4649N/A Message message =
4963N/A NOTE_JEB_IMPORT_LDIF_PHASE_TWO_MEM_REPORT.get(availableMemory,
4963N/A cacheSize, buffers);
4649N/A logError(message);
4591N/A return cacheSize;
4591N/A }
4591N/A
4963N/A
4963N/A private void stopScratchFileWriters()
4591N/A {
4649N/A IndexBuffer indexBuffer = IndexBuffer.createIndexBuffer(0);
4963N/A for(ScratchFileWriterTask task : scratchFileWriterList)
4591N/A {
4649N/A task.queue.add(indexBuffer);
3339N/A }
3339N/A }
3339N/A
3339N/A
4660N/A /**
4643N/A * Task used to migrate excluded branch.
4591N/A */
4643N/A private final class MigrateExcludedTask extends ImportTask
4643N/A {
4643N/A
4963N/A /**
4963N/A * {@inheritDoc}
4963N/A */
4643N/A public Void call() throws Exception
4643N/A {
4643N/A for(Suffix suffix : dnSuffixMap.values()) {
4649N/A EntryContainer entryContainer = suffix.getSrcEntryContainer();
4649N/A if(entryContainer != null &&
4643N/A !suffix.getExcludeBranches().isEmpty()) {
4643N/A DatabaseEntry key = new DatabaseEntry();
4643N/A DatabaseEntry data = new DatabaseEntry();
4643N/A LockMode lockMode = LockMode.DEFAULT;
4643N/A OperationStatus status;
4643N/A Message message = NOTE_JEB_IMPORT_MIGRATION_START.get(
4643N/A "excluded", String.valueOf(suffix.getBaseDN()));
4643N/A logError(message);
4643N/A Cursor cursor =
4649N/A entryContainer.getDN2ID().openCursor(null,
4643N/A CursorConfig.READ_COMMITTED);
4649N/A Comparator<byte[]> comparator =
4649N/A entryContainer.getDN2ID().getComparator();
4643N/A try {
4643N/A for(DN excludedDN : suffix.getExcludeBranches()) {
4643N/A byte[] bytes =
4643N/A StaticUtils.getBytes(excludedDN.toNormalizedString());
4643N/A key.setData(bytes);
4643N/A status = cursor.getSearchKeyRange(key, data, lockMode);
4643N/A if(status == OperationStatus.SUCCESS &&
4643N/A Arrays.equals(key.getData(), bytes)) {
4643N/A // This is the base entry for a branch that was excluded in the
4643N/A // import so we must migrate all entries in this branch over to
4643N/A // the new entry container.
4660N/A byte[] end = StaticUtils.getBytes("," +
4660N/A excludedDN.toNormalizedString());
4643N/A end[0] = (byte) (end[0] + 1);
4643N/A
4643N/A while(status == OperationStatus.SUCCESS &&
4963N/A comparator.compare(key.getData(), end) < 0 &&
4963N/A !importConfiguration.isCancelled() &&
4963N/A !isPhaseOneCanceled) {
4643N/A EntryID id = new EntryID(data);
4649N/A Entry entry = entryContainer.getID2Entry().get(null,
4643N/A id, LockMode.DEFAULT);
4643N/A processEntry(entry, rootContainer.getNextEntryID(),
4643N/A suffix);
4643N/A migratedCount++;
4643N/A status = cursor.getNext(key, data, lockMode);
4643N/A }
4643N/A }
4643N/A }
4963N/A cursor.close();
4963N/A flushIndexBuffers();
4643N/A }
4660N/A catch (Exception e)
4660N/A {
4660N/A message =
4660N/A ERR_JEB_IMPORT_LDIF_MIGRATE_EXCLUDED_TASK_ERR.get(e.getMessage());
4660N/A logError(message);
4963N/A isPhaseOneCanceled =true;
4660N/A throw e;
4660N/A }
4643N/A }
4643N/A }
4643N/A return null;
4643N/A }
4643N/A }
4643N/A
4643N/A
4643N/A /**
4643N/A * Task to migrate existing entries.
4643N/A */
4643N/A private final class MigrateExistingTask extends ImportTask
4643N/A {
4643N/A
4963N/A /**
4963N/A * {@inheritDoc}
4963N/A */
4643N/A public Void call() throws Exception
4643N/A {
4660N/A for(Suffix suffix : dnSuffixMap.values()) {
4660N/A EntryContainer entryContainer = suffix.getSrcEntryContainer();
4660N/A if(entryContainer != null &&
4660N/A !suffix.getIncludeBranches().isEmpty()) {
4660N/A DatabaseEntry key = new DatabaseEntry();
4660N/A DatabaseEntry data = new DatabaseEntry();
4660N/A LockMode lockMode = LockMode.DEFAULT;
4660N/A OperationStatus status;
4660N/A Message message = NOTE_JEB_IMPORT_MIGRATION_START.get(
4660N/A "existing", String.valueOf(suffix.getBaseDN()));
4660N/A logError(message);
4660N/A Cursor cursor =
4660N/A entryContainer.getDN2ID().openCursor(null,
4660N/A null);
4660N/A try {
4660N/A status = cursor.getFirst(key, data, lockMode);
4660N/A while(status == OperationStatus.SUCCESS &&
4963N/A !importConfiguration.isCancelled() && !isPhaseOneCanceled) {
4660N/A DN dn = DN.decode(ByteString.wrap(key.getData()));
4660N/A if(!suffix.getIncludeBranches().contains(dn)) {
4660N/A EntryID id = new EntryID(data);
4660N/A Entry entry =
4660N/A entryContainer.getID2Entry().get(null,
4660N/A id, LockMode.DEFAULT);
4660N/A processEntry(entry, rootContainer.getNextEntryID(),suffix);
4660N/A migratedCount++;
4660N/A status = cursor.getNext(key, data, lockMode);
4660N/A } else {
4660N/A // This is the base entry for a branch that will be included
4660N/A // in the import so we don't want to copy the branch to the
4660N/A // new entry container.
4643N/A
4660N/A /**
4660N/A * Advance the cursor to next entry at the same level in the
4660N/A * DIT
4660N/A * skipping all the entries in this branch.
4660N/A * Set the next starting value to a value of equal length but
4660N/A * slightly greater than the previous DN. Since keys are
4660N/A * compared in reverse order we must set the first byte
4660N/A * (the comma).
4660N/A * No possibility of overflow here.
4660N/A */
4660N/A byte[] begin =
4660N/A StaticUtils.getBytes("," + dn.toNormalizedString());
4660N/A begin[0] = (byte) (begin[0] + 1);
4660N/A key.setData(begin);
4660N/A status = cursor.getSearchKeyRange(key, data, lockMode);
4643N/A }
4643N/A }
4963N/A cursor.close();
4963N/A flushIndexBuffers();
4643N/A }
4660N/A catch(Exception e)
4660N/A {
4660N/A message =
4660N/A ERR_JEB_IMPORT_LDIF_MIGRATE_EXISTING_TASK_ERR.get(e.getMessage());
4660N/A logError(message);
4963N/A isPhaseOneCanceled =true;
4660N/A throw e;
4660N/A }
4643N/A }
4660N/A }
4643N/A return null;
4643N/A }
4643N/A }
4643N/A
4643N/A /**
4963N/A * Task to perform append/replace processing.
4643N/A */
4643N/A private class AppendReplaceTask extends ImportTask
4643N/A {
4963N/A private final Set<byte[]> insertKeySet = new HashSet<byte[]>(),
4963N/A deleteKeySet = new HashSet<byte[]>();
4643N/A private final EntryInformation entryInfo = new EntryInformation();
4643N/A private Entry oldEntry;
4643N/A private EntryID entryID;
4591N/A
4963N/A
4591N/A /**
4591N/A * {@inheritDoc}
4591N/A */
4591N/A public Void call() throws Exception
4591N/A {
4660N/A try
4591N/A {
4660N/A while (true)
4591N/A {
4963N/A if (importConfiguration.isCancelled() || isPhaseOneCanceled)
4660N/A {
4660N/A IndexBuffer indexBuffer = IndexBuffer.createIndexBuffer(0);
4660N/A freeBufferQueue.add(indexBuffer);
4660N/A return null;
4660N/A }
4660N/A oldEntry = null;
4660N/A Entry entry = reader.readEntry(dnSuffixMap, entryInfo);
4660N/A if (entry == null)
4660N/A {
4660N/A break;
4660N/A }
4660N/A entryID = entryInfo.getEntryID();
4660N/A Suffix suffix = entryInfo.getSuffix();
4660N/A processEntry(entry, suffix);
4591N/A }
4660N/A flushIndexBuffers();
4643N/A }
4660N/A catch(Exception e)
4660N/A {
4660N/A Message message =
4660N/A ERR_JEB_IMPORT_LDIF_APPEND_REPLACE_TASK_ERR.get(e.getMessage());
4660N/A logError(message);
4963N/A isPhaseOneCanceled = true;
4660N/A throw e;
4660N/A }
4643N/A return null;
4643N/A }
4643N/A
4643N/A
4643N/A void processEntry(Entry entry, Suffix suffix)
4643N/A throws DatabaseException, ConfigException, DirectoryException,
4963N/A JebException, InterruptedException
4643N/A
4643N/A {
4643N/A DN entryDN = entry.getDN();
4643N/A DN2ID dn2id = suffix.getDN2ID();
4643N/A EntryID oldID = dn2id.get(null, entryDN, LockMode.DEFAULT);
4643N/A if(oldID != null)
4643N/A {
4643N/A oldEntry = suffix.getID2Entry().get(null, oldID, LockMode.DEFAULT);
4643N/A }
4643N/A if(oldEntry == null)
4643N/A {
4643N/A if(!skipDNValidation)
4591N/A {
4963N/A if(!dnSanityCheck(entryDN, entry, suffix))
4591N/A {
4591N/A suffix.removePending(entryDN);
4643N/A return;
4591N/A }
4591N/A suffix.removePending(entryDN);
4591N/A }
4591N/A else
4591N/A {
4591N/A processDN2ID(suffix, entryDN, entryID);
4591N/A suffix.removePending(entryDN);
4591N/A }
4643N/A }
4643N/A else
4643N/A {
4643N/A suffix.removePending(entryDN);
4643N/A entryID = oldID;
4643N/A }
4700N/A processDN2URI(suffix, oldEntry, entry);
4643N/A suffix.getID2Entry().put(null, entryID, entry);
4643N/A if(oldEntry == null)
4643N/A {
4591N/A processIndexes(suffix, entry, entryID);
4591N/A }
4643N/A else
4643N/A {
4643N/A processAllIndexes(suffix, entry, entryID);
4643N/A }
4643N/A }
4643N/A
4963N/A
4643N/A void
4649N/A processAllIndexes(Suffix suffix, Entry entry, EntryID entryID) throws
4963N/A DatabaseException, DirectoryException, JebException,
4963N/A ConfigException, InterruptedException
4643N/A {
4649N/A
4643N/A for(Map.Entry<AttributeType, AttributeIndex> mapEntry :
4649N/A suffix.getAttrIndexMap().entrySet()) {
4649N/A AttributeType attributeType = mapEntry.getKey();
4643N/A AttributeIndex attributeIndex = mapEntry.getValue();
4643N/A Index index;
4643N/A if((index=attributeIndex.getEqualityIndex()) != null) {
4649N/A processAttribute(index, entry, entryID,
4963N/A new IndexKey(attributeType, ImportIndexType.EQUALITY,
4963N/A index.getIndexEntryLimit()));
4643N/A }
4643N/A if((index=attributeIndex.getPresenceIndex()) != null) {
4649N/A processAttribute(index, entry, entryID,
4963N/A new IndexKey(attributeType, ImportIndexType.PRESENCE,
4963N/A index.getIndexEntryLimit()));
4643N/A }
4643N/A if((index=attributeIndex.getSubstringIndex()) != null) {
4649N/A processAttribute(index, entry, entryID,
4963N/A new IndexKey(attributeType, ImportIndexType.SUBSTRING,
4963N/A index.getIndexEntryLimit()));
4643N/A }
4643N/A if((index=attributeIndex.getOrderingIndex()) != null) {
4649N/A processAttribute(index, entry, entryID,
4963N/A new IndexKey(attributeType, ImportIndexType.ORDERING,
4963N/A index.getIndexEntryLimit()));
4643N/A }
4643N/A if((index=attributeIndex.getApproximateIndex()) != null) {
4649N/A processAttribute(index, entry, entryID,
4963N/A new IndexKey(attributeType, ImportIndexType.APPROXIMATE,
4963N/A index.getIndexEntryLimit()));
4643N/A }
4649N/A for(VLVIndex vlvIdx : suffix.getEntryContainer().getVLVIndexes()) {
4649N/A Transaction transaction = null;
4649N/A vlvIdx.addEntry(transaction, entryID, entry);
4643N/A }
4643N/A Map<String,Collection<Index>> extensibleMap =
4643N/A attributeIndex.getExtensibleIndexes();
4643N/A if(!extensibleMap.isEmpty()) {
4643N/A Collection<Index> subIndexes =
4643N/A attributeIndex.getExtensibleIndexes().get(
4643N/A EXTENSIBLE_INDEXER_ID_SUBSTRING);
4643N/A if(subIndexes != null) {
4643N/A for(Index subIndex: subIndexes) {
4649N/A processAttribute(subIndex, entry, entryID,
4963N/A new IndexKey(attributeType, ImportIndexType.EX_SUBSTRING,
4963N/A subIndex.getIndexEntryLimit()));
4643N/A }
4643N/A }
4643N/A Collection<Index> sharedIndexes =
4643N/A attributeIndex.getExtensibleIndexes().get(
4643N/A EXTENSIBLE_INDEXER_ID_SHARED);
4643N/A if(sharedIndexes !=null) {
4643N/A for(Index sharedIndex:sharedIndexes) {
4649N/A processAttribute(sharedIndex, entry, entryID,
4963N/A new IndexKey(attributeType, ImportIndexType.EX_SHARED,
4963N/A sharedIndex.getIndexEntryLimit()));
4643N/A }
4643N/A }
4643N/A }
4643N/A }
4643N/A }
4643N/A
4643N/A
4649N/A void processAttribute(Index index, Entry entry, EntryID entryID,
4643N/A IndexKey indexKey) throws DatabaseException,
4963N/A ConfigException, InterruptedException
4643N/A {
4643N/A if(oldEntry != null)
4643N/A {
4643N/A deleteKeySet.clear();
4643N/A index.indexer.indexEntry(oldEntry, deleteKeySet);
4643N/A for(byte[] delKey : deleteKeySet)
4643N/A {
4643N/A processKey(index, delKey, entryID, indexComparator, indexKey, false);
4643N/A }
4643N/A }
4643N/A insertKeySet.clear();
4643N/A index.indexer.indexEntry(entry, insertKeySet);
4643N/A for(byte[] key : insertKeySet)
4643N/A {
4643N/A processKey(index, key, entryID, indexComparator, indexKey, true);
4643N/A }
4643N/A }
4643N/A }
4643N/A
4643N/A
4643N/A /**
4963N/A * This task performs phase reading and processing of the entries read from
4963N/A * the LDIF file(s). This task is used if the append flag wasn't specified.
4643N/A */
4643N/A private class ImportTask implements Callable<Void>
4643N/A {
4963N/A private final Map<IndexKey, IndexBuffer> indexBufferMap =
4963N/A new HashMap<IndexKey, IndexBuffer>();
4643N/A private final Set<byte[]> insertKeySet = new HashSet<byte[]>();
4643N/A private final EntryInformation entryInfo = new EntryInformation();
4963N/A private DatabaseEntry keyEntry = new DatabaseEntry(),
4963N/A valEntry = new DatabaseEntry();
4963N/A
4643N/A
4643N/A /**
4643N/A * {@inheritDoc}
4643N/A */
4643N/A public Void call() throws Exception
4643N/A {
4660N/A try
4643N/A {
4660N/A while (true)
4643N/A {
4963N/A if (importConfiguration.isCancelled() || isPhaseOneCanceled)
4660N/A {
4660N/A IndexBuffer indexBuffer = IndexBuffer.createIndexBuffer(0);
4660N/A freeBufferQueue.add(indexBuffer);
4660N/A return null;
4660N/A }
4660N/A Entry entry = reader.readEntry(dnSuffixMap, entryInfo);
4660N/A if (entry == null)
4660N/A {
4660N/A break;
4660N/A }
4660N/A EntryID entryID = entryInfo.getEntryID();
4660N/A Suffix suffix = entryInfo.getSuffix();
4660N/A processEntry(entry, entryID, suffix);
4643N/A }
4660N/A flushIndexBuffers();
4643N/A }
4660N/A catch (Exception e)
4660N/A {
4660N/A Message message =
4660N/A ERR_JEB_IMPORT_LDIF_IMPORT_TASK_ERR.get(e.getMessage());
4660N/A logError(message);
4963N/A isPhaseOneCanceled = true;
4660N/A throw e;
4660N/A }
4591N/A return null;
4591N/A }
4591N/A
4591N/A
4643N/A void processEntry(Entry entry, EntryID entryID, Suffix suffix)
4643N/A throws DatabaseException, ConfigException, DirectoryException,
4963N/A JebException, InterruptedException
4643N/A
4643N/A {
4643N/A DN entryDN = entry.getDN();
4643N/A if(!skipDNValidation)
4643N/A {
4963N/A if(!dnSanityCheck(entryDN, entry, suffix))
4643N/A {
4643N/A suffix.removePending(entryDN);
4643N/A return;
4643N/A }
4643N/A }
4963N/A suffix.removePending(entryDN);
4963N/A processDN2ID(suffix, entryDN, entryID);
4700N/A processDN2URI(suffix, null, entry);
4963N/A processIndexes(suffix, entry, entryID);
4643N/A suffix.getID2Entry().put(null, entryID, entry);
4823N/A importCount.getAndIncrement();
4643N/A }
4643N/A
4963N/A //Examine the DN for duplicates and missing parents.
4963N/A boolean dnSanityCheck(DN entryDN, Entry entry, Suffix suffix)
4963N/A throws JebException, InterruptedException
4591N/A {
5117N/A //Perform parent checking.
5117N/A DN parentDN = suffix.getEntryContainer().getParentWithinBase(entryDN);
5117N/A if (parentDN != null) {
5117N/A if (!suffix.isParentProcessed(parentDN, tmpEnv, clearedBackend)) {
5117N/A Message message =
5117N/A ERR_JEB_IMPORT_PARENT_NOT_FOUND.get(parentDN.toString());
5117N/A reader.rejectEntry(entry, message);
5117N/A return false;
5117N/A }
5117N/A }
4963N/A //If the backend was not cleared, then the dn2id needs to checked first
4963N/A //for DNs that might not exist in the DN cache. If the DN is not in
4963N/A //the suffixes dn2id DB, then the dn cache is used.
4963N/A if(!clearedBackend)
4591N/A {
4963N/A EntryID id = suffix.getDN2ID().get(null, entryDN, LockMode.DEFAULT);
4963N/A if(id != null || !tmpEnv.insert(entryDN, keyEntry, valEntry) )
4963N/A {
4963N/A Message message = WARN_JEB_IMPORT_ENTRY_EXISTS.get();
4963N/A reader.rejectEntry(entry, message);
4591N/A return false;
4591N/A }
4591N/A }
4963N/A else if(!tmpEnv.insert(entryDN, keyEntry, valEntry))
4591N/A {
4963N/A Message message = WARN_JEB_IMPORT_ENTRY_EXISTS.get();
4963N/A reader.rejectEntry(entry, message);
4963N/A return false;
4963N/A }
4591N/A return true;
4591N/A }
4591N/A
4591N/A
4643N/A void
4649N/A processIndexes(Suffix suffix, Entry entry, EntryID entryID) throws
4963N/A DatabaseException, DirectoryException, JebException,
4963N/A ConfigException, InterruptedException
4591N/A {
4591N/A for(Map.Entry<AttributeType, AttributeIndex> mapEntry :
4649N/A suffix.getAttrIndexMap().entrySet()) {
4649N/A AttributeType attributeType = mapEntry.getKey();
4649N/A if(entry.hasAttribute(attributeType)) {
4591N/A AttributeIndex attributeIndex = mapEntry.getValue();
4591N/A Index index;
4591N/A if((index=attributeIndex.getEqualityIndex()) != null) {
4649N/A processAttribute(index, entry, entryID,
4963N/A new IndexKey(attributeType, ImportIndexType.EQUALITY,
4963N/A index.getIndexEntryLimit()));
4591N/A }
4591N/A if((index=attributeIndex.getPresenceIndex()) != null) {
4649N/A processAttribute(index, entry, entryID,
4963N/A new IndexKey(attributeType, ImportIndexType.PRESENCE,
4963N/A index.getIndexEntryLimit()));
4591N/A }
4591N/A if((index=attributeIndex.getSubstringIndex()) != null) {
4649N/A processAttribute(index, entry, entryID,
4963N/A new IndexKey(attributeType, ImportIndexType.SUBSTRING,
4963N/A index.getIndexEntryLimit()));
4591N/A }
4591N/A if((index=attributeIndex.getOrderingIndex()) != null) {
4649N/A processAttribute(index, entry, entryID,
4963N/A new IndexKey(attributeType, ImportIndexType.ORDERING,
4963N/A index.getIndexEntryLimit()));
4591N/A }
4591N/A if((index=attributeIndex.getApproximateIndex()) != null) {
4649N/A processAttribute(index, entry, entryID,
4963N/A new IndexKey(attributeType, ImportIndexType.APPROXIMATE,
4963N/A index.getIndexEntryLimit()));
4591N/A }
4649N/A for(VLVIndex vlvIdx : suffix.getEntryContainer().getVLVIndexes()) {
4649N/A Transaction transaction = null;
4649N/A vlvIdx.addEntry(transaction, entryID, entry);
4591N/A }
4591N/A Map<String,Collection<Index>> extensibleMap =
4591N/A attributeIndex.getExtensibleIndexes();
4591N/A if(!extensibleMap.isEmpty()) {
4591N/A Collection<Index> subIndexes =
4591N/A attributeIndex.getExtensibleIndexes().get(
4591N/A EXTENSIBLE_INDEXER_ID_SUBSTRING);
4591N/A if(subIndexes != null) {
4591N/A for(Index subIndex: subIndexes) {
4649N/A processAttribute(subIndex, entry, entryID,
4963N/A new IndexKey(attributeType, ImportIndexType.EX_SUBSTRING,
4963N/A subIndex.getIndexEntryLimit()));
4591N/A }
4591N/A }
4591N/A Collection<Index> sharedIndexes =
4591N/A attributeIndex.getExtensibleIndexes().get(
4591N/A EXTENSIBLE_INDEXER_ID_SHARED);
4591N/A if(sharedIndexes !=null) {
4591N/A for(Index sharedIndex:sharedIndexes) {
4649N/A processAttribute(sharedIndex, entry, entryID,
4963N/A new IndexKey(attributeType, ImportIndexType.EX_SHARED,
4963N/A sharedIndex.getIndexEntryLimit()));
4591N/A }
4591N/A }
4591N/A }
4591N/A }
4591N/A }
4591N/A }
4591N/A
4591N/A
4591N/A
4649N/A void processAttribute(Index index, Entry entry, EntryID entryID,
4643N/A IndexKey indexKey) throws DatabaseException,
4963N/A ConfigException, InterruptedException
4591N/A {
4591N/A insertKeySet.clear();
4591N/A index.indexer.indexEntry(entry, insertKeySet);
4591N/A for(byte[] key : insertKeySet)
4591N/A {
4643N/A processKey(index, key, entryID, indexComparator, indexKey, true);
4591N/A }
4591N/A }
4591N/A
4591N/A
4643N/A void flushIndexBuffers() throws InterruptedException,
4591N/A ExecutionException
4591N/A {
4643N/A Set<Map.Entry<IndexKey, IndexBuffer>> set = indexBufferMap.entrySet();
4963N/A Iterator<Map.Entry<IndexKey, IndexBuffer>> setIterator = set.iterator();
4963N/A while(setIterator.hasNext())
4591N/A {
4963N/A Map.Entry<IndexKey, IndexBuffer> e = setIterator.next();
4643N/A IndexKey indexKey = e.getKey();
4591N/A IndexBuffer indexBuffer = e.getValue();
4963N/A setIterator.remove();
4765N/A ImportIndexType indexType = indexKey.getIndexType();
4765N/A if(indexType.equals(ImportIndexType.DN))
4591N/A {
4591N/A indexBuffer.setComparator(dnComparator);
4591N/A }
4591N/A else
4591N/A {
4591N/A indexBuffer.setComparator(indexComparator);
4591N/A }
4643N/A indexBuffer.setIndexKey(indexKey);
4963N/A indexBuffer.setDiscard();
4963N/A Future<Void> future =
4963N/A bufferSortService.submit(new SortTask(indexBuffer));
4591N/A future.get();
4591N/A }
4591N/A }
4591N/A
4591N/A
4643N/A int
4643N/A processKey(DatabaseContainer container, byte[] key, EntryID entryID,
4643N/A IndexBuffer.ComparatorBuffer<byte[]> comparator, IndexKey indexKey,
4643N/A boolean insert)
4963N/A throws ConfigException, InterruptedException
4591N/A {
4591N/A IndexBuffer indexBuffer;
4643N/A if(!indexBufferMap.containsKey(indexKey))
4591N/A {
4591N/A indexBuffer = getNewIndexBuffer();
4643N/A indexBufferMap.put(indexKey, indexBuffer);
4591N/A }
4591N/A else
4591N/A {
4643N/A indexBuffer = indexBufferMap.get(indexKey);
4591N/A }
4963N/A if(!indexBuffer.isSpaceAvailable(key, entryID.longValue()))
4591N/A {
4591N/A indexBuffer.setComparator(comparator);
4643N/A indexBuffer.setIndexKey(indexKey);
4963N/A bufferSortService.submit(new SortTask(indexBuffer));
4591N/A indexBuffer = getNewIndexBuffer();
4643N/A indexBufferMap.remove(indexKey);
4643N/A indexBufferMap.put(indexKey, indexBuffer);
4591N/A }
4643N/A int id = System.identityHashCode(container);
4643N/A indexBuffer.add(key, entryID, id, insert);
4643N/A return id;
4591N/A }
4591N/A
4591N/A
4963N/A IndexBuffer getNewIndexBuffer() throws ConfigException, InterruptedException
4591N/A {
4963N/A IndexBuffer indexBuffer = freeBufferQueue.take();
4963N/A if(indexBuffer == null)
4963N/A {
4963N/A Message message = Message.raw(Category.JEB, Severity.SEVERE_ERROR,
4963N/A "Index buffer processing error.");
4963N/A throw new InterruptedException(message.toString());
4963N/A }
4963N/A if(indexBuffer.isPoison())
4963N/A {
4963N/A Message message = Message.raw(Category.JEB, Severity.SEVERE_ERROR,
4963N/A "Cancel processing received.");
4963N/A throw new InterruptedException(message.toString());
4963N/A }
4591N/A return indexBuffer;
4591N/A }
4591N/A
4591N/A
4643N/A void processDN2ID(Suffix suffix, DN dn, EntryID entryID)
4963N/A throws ConfigException, InterruptedException
4591N/A {
4591N/A DatabaseContainer dn2id = suffix.getDN2ID();
4591N/A byte[] dnBytes = StaticUtils.getBytes(dn.toNormalizedString());
4643N/A int id = processKey(dn2id, dnBytes, entryID, dnComparator,
4963N/A new IndexKey(dnType, ImportIndexType.DN, 1), true);
4643N/A idECMap.putIfAbsent(id, suffix.getEntryContainer());
4591N/A }
4700N/A
4963N/A
4700N/A void processDN2URI(Suffix suffix, Entry oldEntry, Entry newEntry)
4700N/A throws DatabaseException
4700N/A {
4700N/A DN2URI dn2uri = suffix.getDN2URI();
4700N/A if(oldEntry != null)
4700N/A {
4700N/A dn2uri.replaceEntry(null, oldEntry, newEntry);
4700N/A }
4700N/A else
4700N/A {
4700N/A dn2uri.addEntry(null, newEntry);
4700N/A }
4700N/A }
4591N/A }
4591N/A
4643N/A
4591N/A /**
4963N/A * This task reads sorted records from the temporary index scratch files,
4963N/A * processes the records and writes the results to the index database. The
4963N/A * DN index is treated differently then non-DN indexes.
4591N/A */
4963N/A private final class IndexDBWriteTask implements Callable<Void>
4643N/A {
4591N/A private final IndexManager indexMgr;
4591N/A private final DatabaseEntry dbKey, dbValue;
4591N/A private final int cacheSize;
4643N/A private final Map<Integer, DNState> dnStateMap =
4963N/A new HashMap<Integer, DNState>();
4643N/A private final Map<Integer, Index> indexMap = new HashMap<Integer, Index>();
4591N/A
4963N/A
4963N/A public IndexDBWriteTask(IndexManager indexMgr, int cacheSize)
4591N/A {
4591N/A this.indexMgr = indexMgr;
4591N/A this.dbKey = new DatabaseEntry();
4591N/A this.dbValue = new DatabaseEntry();
4591N/A this.cacheSize = cacheSize;
4591N/A }
4591N/A
4963N/A
4649N/A private SortedSet<Buffer> initializeBuffers() throws IOException
4591N/A {
4643N/A SortedSet<Buffer> bufferSet = new TreeSet<Buffer>();
4643N/A for(Buffer b : indexMgr.getBufferList())
4591N/A {
4963N/A b.initializeCache(indexMgr, null, cacheSize);
4591N/A bufferSet.add(b);
4591N/A }
4736N/A indexMgr.getBufferList().clear();
4643N/A return bufferSet;
4643N/A }
4643N/A
4963N/A
4963N/A /**
4963N/A * {@inheritDoc}
4963N/A */
4643N/A public Void call() throws Exception
4643N/A {
4963N/A ByteBuffer cKey = null;
4736N/A ImportIDSet cInsertIDSet = new ImportIDSet(),
4736N/A cDeleteIDSet = new ImportIDSet();
4963N/A Thread.setDefaultUncaughtExceptionHandler(
4963N/A new DefaultExceptionHandler());
4963N/A indexMgr.setStarted();
4963N/A Message message =
4963N/A NOTE_JEB_IMPORT_LDIF_INDEX_STARTED.get(indexMgr.getFileName(),
4963N/A indexMgr.getBufferList().size());
4963N/A logError(message);
4643N/A Integer cIndexID = null;
4660N/A try
4591N/A {
4660N/A indexMgr.openIndexFile();
4660N/A SortedSet<Buffer> bufferSet = initializeBuffers();
4660N/A while(!bufferSet.isEmpty())
4591N/A {
4660N/A Buffer b;
4660N/A b = bufferSet.first();
4660N/A bufferSet.remove(b);
4660N/A if(cKey == null)
4591N/A {
4963N/A cKey = ByteBuffer.allocate(BYTE_BUFFER_CAPACITY);
4643N/A cIndexID = b.getIndexID();
4963N/A cKey.clear();
4963N/A if(b.getKeyLen() > cKey.capacity())
4963N/A {
4963N/A cKey = ByteBuffer.allocate(b.getKeyLen());
4963N/A }
4963N/A cKey.flip();
4963N/A b.getKey(cKey);
4736N/A cInsertIDSet.merge(b.getInsertIDSet());
4736N/A cDeleteIDSet.merge(b.getDeleteIDSet());
4643N/A cInsertIDSet.setKey(cKey);
4643N/A cDeleteIDSet.setKey(cKey);
4591N/A }
4591N/A else
4591N/A {
4660N/A if(b.compare(cKey, cIndexID) != 0)
4660N/A {
4660N/A addToDB(cInsertIDSet, cDeleteIDSet, cIndexID);
4660N/A indexMgr.incrementKeyCount();
4660N/A cIndexID = b.getIndexID();
4963N/A cKey.clear();
4963N/A if(b.getKeyLen() > cKey.capacity())
4963N/A {
4963N/A cKey = ByteBuffer.allocate(b.getKeyLen());
4963N/A }
4963N/A cKey.flip();
4963N/A b.getKey(cKey);
4736N/A cInsertIDSet.clear(true);
4736N/A cDeleteIDSet.clear(true);
4736N/A cInsertIDSet.merge(b.getInsertIDSet());
4736N/A cDeleteIDSet.merge(b.getDeleteIDSet());
4660N/A cInsertIDSet.setKey(cKey);
4660N/A cDeleteIDSet.setKey(cKey);
4660N/A }
4660N/A else
4660N/A {
4660N/A cInsertIDSet.merge(b.getInsertIDSet());
4660N/A cDeleteIDSet.merge(b.getDeleteIDSet());
4660N/A }
4660N/A }
4660N/A if(b.hasMoreData())
4660N/A {
4660N/A b.getNextRecord();
4660N/A bufferSet.add(b);
4591N/A }
4591N/A }
4660N/A if(cKey != null)
4591N/A {
4660N/A addToDB(cInsertIDSet, cDeleteIDSet, cIndexID);
4591N/A }
4591N/A }
4660N/A catch (Exception e)
4591N/A {
4963N/A message =
4660N/A ERR_JEB_IMPORT_LDIF_INDEX_WRITE_DB_ERR.get(indexMgr.getFileName(),
4963N/A e.getMessage());
4660N/A logError(message);
4736N/A e.printStackTrace();
4660N/A throw e;
4591N/A }
5117N/A finally
5117N/A {
5117N/A cleanUP();
5117N/A }
4591N/A return null;
4591N/A }
4591N/A
4591N/A
4591N/A private void cleanUP() throws DatabaseException, DirectoryException,
4813N/A IOException
4591N/A {
5117N/A try
4591N/A {
5117N/A if(indexMgr.isDN2ID())
4591N/A {
5117N/A for(DNState dnState : dnStateMap.values())
5117N/A {
5117N/A dnState.flush();
5117N/A }
5117N/A Message msg =
5117N/A NOTE_JEB_IMPORT_LDIF_DN_CLOSE.get(indexMgr.getDNCount());
5117N/A logError(msg);
4591N/A }
5117N/A else
5117N/A {
5117N/A for(Index index : indexMap.values())
5117N/A {
5117N/A index.closeCursor();
5117N/A }
5117N/A Message message =
5117N/A NOTE_JEB_IMPORT_LDIF_INDEX_CLOSE.get(indexMgr.getFileName());
5117N/A logError(message);
5117N/A }
4591N/A }
5117N/A finally
5117N/A {
5117N/A indexMgr.setDone();
5117N/A indexMgr.close();
5117N/A indexMgr.deleteIndexFile();
5117N/A }
4591N/A }
4591N/A
4963N/A
4963N/A private void addToDB(ImportIDSet insertSet, ImportIDSet deleteSet,
4963N/A int indexID) throws InterruptedException,
4963N/A DatabaseException, DirectoryException
4643N/A {
4643N/A if(!indexMgr.isDN2ID())
4643N/A {
4643N/A Index index;
4963N/A if((deleteSet.size() > 0) || (!deleteSet.isDefined()))
4643N/A {
4963N/A dbKey.setData(deleteSet.getKey().array(), 0,
4963N/A deleteSet.getKey().limit());
4643N/A index = (Index)idContainerMap.get(indexID);
4963N/A index.delete(dbKey, deleteSet, dbValue);
4643N/A if(!indexMap.containsKey(indexID))
4643N/A {
4643N/A indexMap.put(indexID, index);
4643N/A }
4643N/A }
4963N/A if((insertSet.size() > 0) || (!insertSet.isDefined()))
4643N/A {
4963N/A dbKey.setData(insertSet.getKey().array(), 0,
4963N/A insertSet.getKey().limit());
4643N/A index = (Index)idContainerMap.get(indexID);
4963N/A index.insert(dbKey, insertSet, dbValue);
4643N/A if(!indexMap.containsKey(indexID))
4643N/A {
4643N/A indexMap.put(indexID, index);
4643N/A }
4643N/A }
4643N/A }
4963N/A else
4643N/A {
4963N/A addDN2ID(insertSet, indexID);
4643N/A }
4643N/A }
4643N/A
4963N/A
4643N/A private void addDN2ID(ImportIDSet record, Integer indexID)
4591N/A throws DatabaseException, DirectoryException
4591N/A {
4643N/A DNState dnState;
4643N/A if(!dnStateMap.containsKey(indexID))
4591N/A {
4643N/A dnState = new DNState(idECMap.get(indexID));
4643N/A dnStateMap.put(indexID, dnState);
4591N/A }
4643N/A else
4591N/A {
4643N/A dnState = dnStateMap.get(indexID);
4591N/A }
4643N/A if(!dnState.checkParent(record))
4643N/A {
4643N/A return;
4643N/A }
4643N/A dnState.writeToDB();
4591N/A }
4591N/A
4591N/A
4649N/A /**
4643N/A * This class is used to by a index DB merge thread performing DN processing
4643N/A * to keep track of the state of individual DN2ID index processing.
4643N/A */
4643N/A class DNState
4591N/A {
4963N/A private final int DN_STATE_CACHE_SIZE = 64 * KB;
4963N/A
4736N/A private DN parentDN, lastDN;
4736N/A private EntryID parentID, lastID, entryID;
4736N/A private final DatabaseEntry DNKey, DNValue;
4643N/A private final TreeMap<DN, EntryID> parentIDMap =
4643N/A new TreeMap<DN, EntryID>();
4643N/A private final EntryContainer entryContainer;
4643N/A private final Map<byte[], ImportIDSet> id2childTree;
4643N/A private final Map<byte[], ImportIDSet> id2subtreeTree;
4736N/A private final int childLimit, subTreeLimit;
4736N/A private final boolean childDoCount, subTreeDoCount;
4643N/A
4963N/A
4643N/A DNState(EntryContainer entryContainer)
4591N/A {
4643N/A this.entryContainer = entryContainer;
4736N/A Comparator<byte[]> childComparator =
4736N/A entryContainer.getID2Children().getComparator();
4643N/A id2childTree = new TreeMap<byte[], ImportIDSet>(childComparator);
4736N/A childLimit = entryContainer.getID2Children().getIndexEntryLimit();
4736N/A childDoCount = entryContainer.getID2Children().getMaintainCount();
4736N/A Comparator<byte[]> subComparator =
4736N/A entryContainer.getID2Subtree().getComparator();
4736N/A subTreeLimit = entryContainer.getID2Subtree().getIndexEntryLimit();
4736N/A subTreeDoCount = entryContainer.getID2Subtree().getMaintainCount();
4643N/A id2subtreeTree = new TreeMap<byte[], ImportIDSet>(subComparator);
4736N/A DNKey = new DatabaseEntry();
4736N/A DNValue = new DatabaseEntry();
4591N/A }
4591N/A
4591N/A
4835N/A private boolean checkParent(ImportIDSet record) throws DirectoryException,
4835N/A DatabaseException
4591N/A {
4963N/A DN dn = DN.decode(new String(record.getKey().array(), 0 ,
4963N/A record.getKey().limit()));
4963N/A DNKey.setData(record.getKey().array(), 0 , record.getKey().limit());
4643N/A byte[] v = record.toDatabase();
4643N/A long v1 = JebFormat.entryIDFromDatabase(v);
4736N/A DNValue.setData(v);
4963N/A
4856N/A entryID = new EntryID(v1);
4835N/A //Bypass the cache for append data, lookup the parent in DN2ID and
4835N/A //return.
4835N/A if(importConfiguration != null &&
4835N/A importConfiguration.appendToExistingData())
4591N/A {
5017N/A parentDN = entryContainer.getParentWithinBase(dn);
5017N/A //If null is returned than this is a suffix DN.
5017N/A if(parentDN != null)
5017N/A {
5017N/A parentID =
5017N/A entryContainer.getDN2ID().get(null, parentDN, LockMode.DEFAULT);
5017N/A }
4591N/A }
4591N/A else
4591N/A {
4835N/A if(parentIDMap.isEmpty())
4656N/A {
4725N/A parentIDMap.put(dn, entryID);
4835N/A return true;
4835N/A }
4835N/A else if(lastDN != null && lastDN.isAncestorOf(dn))
4835N/A {
4835N/A parentIDMap.put(lastDN, lastID);
4835N/A parentDN = lastDN;
4835N/A parentID = lastID;
4643N/A lastDN = dn;
4643N/A lastID = entryID;
4835N/A return true;
4835N/A }
4835N/A else if(parentIDMap.lastKey().isAncestorOf(dn))
4835N/A {
4835N/A parentDN = parentIDMap.lastKey();
4835N/A parentID = parentIDMap.get(parentDN);
4835N/A lastDN = dn;
4835N/A lastID = entryID;
4835N/A return true;
4643N/A }
4643N/A else
4643N/A {
4835N/A DN newParentDN = entryContainer.getParentWithinBase(dn);
4835N/A if(parentIDMap.containsKey(newParentDN))
4835N/A {
4835N/A EntryID newParentID = parentIDMap.get(newParentDN);
4835N/A DN lastDN = parentIDMap.lastKey();
4835N/A while(!newParentDN.equals(lastDN)) {
4835N/A parentIDMap.remove(lastDN);
4835N/A lastDN = parentIDMap.lastKey();
4835N/A }
4835N/A parentIDMap.put(dn, entryID);
4835N/A parentDN = newParentDN;
4835N/A parentID = newParentID;
4835N/A lastDN = dn;
4835N/A lastID = entryID;
4835N/A }
4835N/A else
4835N/A {
4835N/A Message message =
4649N/A NOTE_JEB_IMPORT_LDIF_DN_NO_PARENT.get(dn.toString());
4835N/A Entry e = new Entry(dn, null, null, null);
4835N/A reader.rejectEntry(e, message);
4835N/A return false;
4835N/A }
4643N/A }
4591N/A }
4643N/A return true;
4591N/A }
4643N/A
4591N/A
4656N/A private void id2child(EntryID childID)
4736N/A throws DatabaseException, DirectoryException
4591N/A {
4736N/A ImportIDSet idSet;
4736N/A if(!id2childTree.containsKey(parentID.getDatabaseEntry().getData()))
4736N/A {
4736N/A idSet = new ImportIDSet(1,childLimit, childDoCount);
4736N/A id2childTree.put(parentID.getDatabaseEntry().getData(), idSet);
4736N/A }
4736N/A else
4736N/A {
4736N/A idSet = id2childTree.get(parentID.getDatabaseEntry().getData());
4736N/A }
4736N/A idSet.addEntryID(childID);
4736N/A if(id2childTree.size() > DN_STATE_CACHE_SIZE)
4736N/A {
4736N/A flushMapToDB(id2childTree, entryContainer.getID2Children(), true);
4736N/A }
4591N/A }
4591N/A
4963N/A
4835N/A private EntryID getParentID(DN dn) throws DatabaseException
4835N/A {
4835N/A EntryID nodeID;
4835N/A //Bypass the cache for append data, lookup the parent DN in the DN2ID
4835N/A //db.
4835N/A if (importConfiguration != null &&
4835N/A importConfiguration.appendToExistingData())
4835N/A {
4835N/A nodeID = entryContainer.getDN2ID().get(null, dn, LockMode.DEFAULT);
4835N/A }
4835N/A else
4835N/A {
4835N/A nodeID = parentIDMap.get(dn);
4835N/A }
4835N/A return nodeID;
4835N/A }
4643N/A
4963N/A
4736N/A private void id2SubTree(EntryID childID)
4736N/A throws DatabaseException, DirectoryException
4591N/A {
4643N/A ImportIDSet idSet;
4643N/A if(!id2subtreeTree.containsKey(parentID.getDatabaseEntry().getData()))
4643N/A {
4736N/A idSet = new ImportIDSet(1, subTreeLimit, subTreeDoCount);
4643N/A id2subtreeTree.put(parentID.getDatabaseEntry().getData(), idSet);
4591N/A }
4591N/A else
4591N/A {
4643N/A idSet = id2subtreeTree.get(parentID.getDatabaseEntry().getData());
4643N/A }
4643N/A idSet.addEntryID(childID);
4643N/A for (DN dn = entryContainer.getParentWithinBase(parentDN); dn != null;
4643N/A dn = entryContainer.getParentWithinBase(dn))
4643N/A {
4835N/A EntryID nodeID = getParentID(dn);
5117N/A if(nodeID == null)
5117N/A {
5117N/A // We have a missing parent. Maybe parent checking was turned off?
5117N/A // Just ignore.
5117N/A break;
5117N/A }
4643N/A if(!id2subtreeTree.containsKey(nodeID.getDatabaseEntry().getData()))
4643N/A {
4736N/A idSet = new ImportIDSet(1, subTreeLimit, subTreeDoCount);
4643N/A id2subtreeTree.put(nodeID.getDatabaseEntry().getData(), idSet);
4643N/A }
4643N/A else
4643N/A {
4643N/A idSet = id2subtreeTree.get(nodeID.getDatabaseEntry().getData());
4643N/A }
4643N/A idSet.addEntryID(childID);
4591N/A }
4736N/A if (id2subtreeTree.size() > DN_STATE_CACHE_SIZE)
4736N/A {
4736N/A flushMapToDB(id2subtreeTree, entryContainer.getID2Subtree(), true);
4736N/A }
4591N/A }
4643N/A
4591N/A
4736N/A public void writeToDB() throws DatabaseException, DirectoryException
4591N/A {
4736N/A entryContainer.getDN2ID().putRaw(null, DNKey, DNValue);
4736N/A indexMgr.addTotDNCount(1);
4736N/A if(parentDN != null)
4736N/A {
4736N/A id2child(entryID);
4736N/A id2SubTree(entryID);
4736N/A }
4591N/A }
4591N/A
4963N/A
4736N/A private void flushMapToDB(Map<byte[], ImportIDSet> map, Index index,
4736N/A boolean clearMap)
4736N/A throws DatabaseException, DirectoryException
4736N/A {
4736N/A for(Map.Entry<byte[], ImportIDSet> e : map.entrySet())
4736N/A {
4736N/A byte[] key = e.getKey();
4736N/A ImportIDSet idSet = e.getValue();
4736N/A DNKey.setData(key);
4736N/A index.insert(DNKey, idSet, DNValue);
4736N/A }
4736N/A index.closeCursor();
4736N/A if(clearMap)
4736N/A {
4736N/A map.clear();
4736N/A }
4736N/A }
4591N/A
4963N/A
4643N/A public void flush() throws DatabaseException, DirectoryException
4643N/A {
4736N/A flushMapToDB(id2childTree, entryContainer.getID2Children(), false);
4736N/A flushMapToDB(id2subtreeTree, entryContainer.getID2Subtree(), false);
4643N/A }
4591N/A }
4591N/A }
4591N/A
4591N/A
4591N/A /**
4963N/A * This task writes the temporary scratch index files using the sorted
4963N/A * buffers read from a blocking queue private to each index.
4591N/A */
4963N/A private final class ScratchFileWriterTask implements Callable<Void>
4591N/A {
4963N/A private final int DRAIN_TO = 3;
4591N/A private final IndexManager indexMgr;
4649N/A private final BlockingQueue<IndexBuffer> queue;
4643N/A private final ByteArrayOutputStream insetByteStream =
4643N/A new ByteArrayOutputStream(2 * bufferSize);
4643N/A private final ByteArrayOutputStream deleteByteStream =
4591N/A new ByteArrayOutputStream(2 * bufferSize);
4963N/A private final byte[] tmpArray = new byte[8];
4963N/A private int insertKeyCount = 0, deleteKeyCount = 0;
4591N/A private final DataOutputStream dataStream;
4649N/A private long bufferCount = 0;
4591N/A private final File file;
4591N/A private final SortedSet<IndexBuffer> indexSortedSet;
4591N/A private boolean poisonSeen = false;
4963N/A
4963N/A
4963N/A public ScratchFileWriterTask(BlockingQueue<IndexBuffer> queue,
4963N/A IndexManager indexMgr) throws FileNotFoundException
4591N/A {
4649N/A this.queue = queue;
4591N/A file = indexMgr.getFile();
4591N/A this.indexMgr = indexMgr;
4591N/A BufferedOutputStream bufferedStream =
4963N/A new BufferedOutputStream(new FileOutputStream(file),
4963N/A READER_WRITER_BUFFER_SIZE);
4591N/A dataStream = new DataOutputStream(bufferedStream);
4591N/A indexSortedSet = new TreeSet<IndexBuffer>();
4591N/A }
4591N/A
4591N/A
4963N/A /**
4963N/A * {@inheritDoc}
4963N/A */
4963N/A public Void call() throws Exception
4591N/A {
4591N/A long offset = 0;
4591N/A List<IndexBuffer> l = new LinkedList<IndexBuffer>();
4591N/A try {
4591N/A while(true)
4591N/A {
4649N/A IndexBuffer indexBuffer = queue.poll();
4591N/A if(indexBuffer != null)
4591N/A {
4591N/A long beginOffset = offset;
4649N/A long bufferLen;
4649N/A if(!queue.isEmpty())
4591N/A {
4649N/A queue.drainTo(l, DRAIN_TO);
4591N/A l.add(indexBuffer);
4649N/A bufferLen = writeIndexBuffers(l);
4591N/A for(IndexBuffer id : l)
4591N/A {
4963N/A if(!id.isDiscard())
4963N/A {
4963N/A id.reset();
4963N/A freeBufferQueue.add(id);
4963N/A }
4591N/A }
4591N/A l.clear();
4591N/A }
4591N/A else
4591N/A {
4591N/A if(indexBuffer.isPoison())
4591N/A {
4591N/A break;
4591N/A }
4649N/A bufferLen = writeIndexBuffer(indexBuffer);
4963N/A if(!indexBuffer.isDiscard())
4963N/A {
4963N/A indexBuffer.reset();
4963N/A freeBufferQueue.add(indexBuffer);
4963N/A }
4644N/A }
4649N/A offset += bufferLen;
4649N/A indexMgr.addBuffer(new Buffer(beginOffset, offset, bufferCount));
4649N/A bufferCount++;
4649N/A Importer.this.bufferCount.incrementAndGet();
4643N/A if(poisonSeen)
4643N/A {
4643N/A break;
4643N/A }
4591N/A }
4591N/A }
4591N/A }
4963N/A catch (Exception e)
4660N/A {
4649N/A Message message =
4591N/A ERR_JEB_IMPORT_LDIF_INDEX_FILEWRITER_ERR.get(file.getName(),
4643N/A e.getMessage());
4649N/A logError(message);
4963N/A isPhaseOneCanceled = true;
4963N/A throw e;
4591N/A }
4963N/A finally
4963N/A {
4963N/A dataStream.close();
4963N/A indexMgr.setFileLength();
4963N/A }
4963N/A return null;
4591N/A }
4591N/A
4591N/A
4591N/A private long writeIndexBuffer(IndexBuffer indexBuffer) throws IOException
4591N/A {
4649N/A int numberKeys = indexBuffer.getNumberKeys();
4649N/A indexBuffer.setPosition(-1);
4649N/A long bufferLen = 0;
4963N/A insetByteStream.reset(); insertKeyCount = 0;
4963N/A deleteByteStream.reset(); deleteKeyCount = 0;
4649N/A for(int i = 0; i < numberKeys; i++)
4591N/A {
4649N/A if(indexBuffer.getPosition() == -1)
4591N/A {
4649N/A indexBuffer.setPosition(i);
4643N/A if(indexBuffer.isInsert(i))
4643N/A {
4963N/A indexBuffer.writeID(insetByteStream, i);
4963N/A insertKeyCount++;
4643N/A }
4643N/A else
4643N/A {
4963N/A indexBuffer.writeID(deleteByteStream, i);
4963N/A deleteKeyCount++;
4643N/A }
4591N/A continue;
4591N/A }
4591N/A if(!indexBuffer.compare(i))
4591N/A {
4963N/A bufferLen += writeRecord(indexBuffer);
4649N/A indexBuffer.setPosition(i);
4963N/A insetByteStream.reset();insertKeyCount = 0;
4963N/A deleteByteStream.reset();deleteKeyCount = 0;
4591N/A }
4643N/A if(indexBuffer.isInsert(i))
4643N/A {
4963N/A if(insertKeyCount++ <= indexMgr.getLimit())
4963N/A {
4963N/A indexBuffer.writeID(insetByteStream, i);
4963N/A }
4643N/A }
4643N/A else
4643N/A {
4963N/A indexBuffer.writeID(deleteByteStream, i);
4963N/A deleteKeyCount++;
4643N/A }
4591N/A }
4649N/A if(indexBuffer.getPosition() != -1)
4591N/A {
4963N/A bufferLen += writeRecord(indexBuffer);
4591N/A }
4649N/A return bufferLen;
4591N/A }
4591N/A
4591N/A
4591N/A private long writeIndexBuffers(List<IndexBuffer> buffers)
4591N/A throws IOException
4591N/A {
4591N/A long id = 0;
4649N/A long bufferLen = 0;
4963N/A insetByteStream.reset(); insertKeyCount = 0;
4963N/A deleteByteStream.reset(); deleteKeyCount = 0;
4591N/A for(IndexBuffer b : buffers)
4591N/A {
4591N/A if(b.isPoison())
4591N/A {
4591N/A poisonSeen = true;
4591N/A }
4591N/A else
4591N/A {
4649N/A b.setPosition(0);
4591N/A b.setID(id++);
4591N/A indexSortedSet.add(b);
4591N/A }
4591N/A }
4591N/A byte[] saveKey = null;
4643N/A int saveIndexID = 0;
4591N/A while(!indexSortedSet.isEmpty())
4591N/A {
4591N/A IndexBuffer b = indexSortedSet.first();
4591N/A indexSortedSet.remove(b);
4591N/A if(saveKey == null)
4591N/A {
4963N/A saveKey = b.getKey();
4643N/A saveIndexID = b.getIndexID();
4649N/A if(b.isInsert(b.getPosition()))
4643N/A {
4963N/A b.writeID(insetByteStream, b.getPosition());
4963N/A insertKeyCount++;
4643N/A }
4643N/A else
4643N/A {
4963N/A b.writeID(deleteByteStream, b.getPosition());
4963N/A deleteKeyCount++;
4643N/A }
4591N/A }
4591N/A else
4591N/A {
4643N/A if(!b.compare(saveKey, saveIndexID))
4591N/A {
4963N/A bufferLen += writeRecord(saveKey, saveIndexID);
4643N/A insetByteStream.reset();
4643N/A deleteByteStream.reset();
4963N/A insertKeyCount = 0;
4963N/A deleteKeyCount = 0;
4963N/A saveKey = b.getKey();
4643N/A saveIndexID = b.getIndexID();
4649N/A if(b.isInsert(b.getPosition()))
4643N/A {
4963N/A b.writeID(insetByteStream, b.getPosition());
4963N/A insertKeyCount++;
4643N/A }
4643N/A else
4643N/A {
4963N/A b.writeID(deleteByteStream, b.getPosition());
4963N/A deleteKeyCount++;
4643N/A }
4591N/A }
4591N/A else
4591N/A {
4649N/A if(b.isInsert(b.getPosition()))
4643N/A {
4963N/A if(insertKeyCount++ <= indexMgr.getLimit())
4963N/A {
4963N/A b.writeID(insetByteStream, b.getPosition());
4963N/A }
4643N/A }
4643N/A else
4643N/A {
4963N/A b.writeID(deleteByteStream, b.getPosition());
4963N/A deleteKeyCount++;
4643N/A }
4591N/A }
4591N/A }
4591N/A if(b.hasMoreData())
4591N/A {
4591N/A b.getNextRecord();
4591N/A indexSortedSet.add(b);
4591N/A }
4591N/A }
4591N/A if(saveKey != null)
4591N/A {
4963N/A bufferLen += writeRecord(saveKey, saveIndexID);
4591N/A }
4649N/A return bufferLen;
4591N/A }
4963N/A
4963N/A
4963N/A private int writeByteStreams() throws IOException
4963N/A {
4963N/A if(insertKeyCount > indexMgr.getLimit())
4963N/A {
4963N/A insertKeyCount = 1;
4963N/A insetByteStream.reset();
4963N/A PackedInteger.writeInt(tmpArray, 0, -1);
4963N/A insetByteStream.write(tmpArray, 0, 1);
4963N/A }
4963N/A int insertSize = PackedInteger.getWriteIntLength(insertKeyCount);
4963N/A PackedInteger.writeInt(tmpArray, 0, insertKeyCount);
4963N/A dataStream.write(tmpArray, 0, insertSize);
4963N/A if(insetByteStream.size() > 0)
4963N/A {
4963N/A insetByteStream.writeTo(dataStream);
4963N/A }
4963N/A int deleteSize = PackedInteger.getWriteIntLength(deleteKeyCount);
4963N/A PackedInteger.writeInt(tmpArray, 0, deleteKeyCount);
4963N/A dataStream.write(tmpArray, 0, deleteSize);
4963N/A if(deleteByteStream.size() > 0)
4963N/A {
4963N/A deleteByteStream.writeTo(dataStream);
4963N/A }
4963N/A return insertSize + deleteSize;
4963N/A }
4963N/A
4963N/A
4963N/A private int writeHeader(int indexID, int keySize) throws IOException
4963N/A {
4963N/A dataStream.writeInt(indexID);
4963N/A int packedSize = PackedInteger.getWriteIntLength(keySize);
4963N/A PackedInteger.writeInt(tmpArray, 0, keySize);
4963N/A dataStream.write(tmpArray, 0, packedSize);
4963N/A return packedSize;
4963N/A }
4963N/A
4963N/A
4963N/A private int writeRecord(IndexBuffer b) throws IOException
4963N/A {
4963N/A int keySize = b.getKeySize();
4963N/A int packedSize = writeHeader(b.getIndexID(), keySize);
4963N/A b.writeKey(dataStream);
4963N/A packedSize += writeByteStreams();
4963N/A return (packedSize + keySize + insetByteStream.size() +
4963N/A deleteByteStream.size() + 4);
4963N/A }
4963N/A
4963N/A
4963N/A private int writeRecord(byte[] k, int indexID) throws IOException
4963N/A {
4963N/A int packedSize = writeHeader(indexID, k.length);
4963N/A dataStream.write(k);
4963N/A packedSize += writeByteStreams();
4963N/A return (packedSize + k.length + insetByteStream.size() +
4963N/A deleteByteStream.size() + 4);
4963N/A }
4591N/A }
4591N/A
4963N/A
4591N/A /**
4591N/A * This task main function is to sort the index buffers given to it from
4591N/A * the import tasks reading the LDIF file. It will also create a index
4591N/A * file writer task and corresponding queue if needed. The sorted index
4591N/A * buffers are put on the index file writer queues for writing to a temporary
4591N/A * file.
4591N/A */
4591N/A private final class SortTask implements Callable<Void>
4591N/A {
4591N/A
4591N/A private final IndexBuffer indexBuffer;
4591N/A
4591N/A public SortTask(IndexBuffer indexBuffer)
4591N/A {
4591N/A this.indexBuffer = indexBuffer;
4591N/A }
4591N/A
4591N/A /**
4591N/A * {@inheritDoc}
4591N/A */
4591N/A public Void call() throws Exception
4591N/A {
4765N/A if (importConfiguration != null &&
4963N/A importConfiguration.isCancelled() || isPhaseOneCanceled)
4591N/A {
4963N/A isPhaseOneCanceled =true;
4591N/A return null;
4591N/A }
4591N/A indexBuffer.sort();
4643N/A if(indexKeyQueMap.containsKey(indexBuffer.getIndexKey())) {
4591N/A BlockingQueue<IndexBuffer> q =
4643N/A indexKeyQueMap.get(indexBuffer.getIndexKey());
4591N/A q.add(indexBuffer);
4591N/A }
4591N/A else
4591N/A {
4643N/A createIndexWriterTask(indexBuffer.getIndexKey());
4643N/A BlockingQueue<IndexBuffer> q =
4643N/A indexKeyQueMap.get(indexBuffer.getIndexKey());
4591N/A q.add(indexBuffer);
4591N/A }
4591N/A return null;
4591N/A }
4591N/A
4643N/A private void createIndexWriterTask(IndexKey indexKey)
4643N/A throws FileNotFoundException
4591N/A {
4649N/A boolean isDN = false;
4643N/A synchronized(synObj)
4643N/A {
4643N/A if(indexKeyQueMap.containsKey(indexKey))
4591N/A {
4591N/A return;
4591N/A }
4765N/A if(indexKey.getIndexType().equals(ImportIndexType.DN))
4591N/A {
4649N/A isDN = true;
4591N/A }
4963N/A IndexManager indexMgr = new IndexManager(indexKey.getName(), isDN,
4963N/A indexKey.getEntryLimit());
4963N/A if(isDN)
4963N/A {
4963N/A DNIndexMgrList.add(indexMgr);
4963N/A }
4963N/A else
4963N/A {
4963N/A indexMgrList.add(indexMgr);
4963N/A }
4591N/A BlockingQueue<IndexBuffer> newQue =
4963N/A new ArrayBlockingQueue<IndexBuffer>(phaseOneBufferCount);
4963N/A ScratchFileWriterTask indexWriter =
4963N/A new ScratchFileWriterTask(newQue, indexMgr);
4963N/A scratchFileWriterList.add(indexWriter);
4963N/A scratchFileWriterFutures.add(
4963N/A scratchFileWriterService.submit(indexWriter));
4643N/A indexKeyQueMap.put(indexKey, newQue);
4591N/A }
4591N/A }
4591N/A }
4591N/A
4591N/A /**
4591N/A * The buffer class is used to process a buffer from the temporary index files
4591N/A * during phase 2 processing.
4591N/A */
4591N/A private final class Buffer implements Comparable<Buffer>
4591N/A {
4591N/A private IndexManager indexMgr;
4591N/A private final long begin, end, id;
4591N/A private long offset;
4591N/A private ByteBuffer cache;
5117N/A private int limit;
4736N/A private ImportIDSet insertIDSet = null, deleteIDSet = null;
4643N/A private Integer indexID = null;
4643N/A private boolean doCount;
4963N/A private ByteBuffer keyBuf = ByteBuffer.allocate(BYTE_BUFFER_CAPACITY);
4591N/A
4591N/A
4591N/A public Buffer(long begin, long end, long id)
4591N/A {
4591N/A this.begin = begin;
4591N/A this.end = end;
4591N/A this.offset = 0;
4591N/A this.id = id;
4591N/A }
4591N/A
4591N/A
4649N/A private void initializeCache(IndexManager indexMgr, ByteBuffer b,
4591N/A long cacheSize) throws IOException
4591N/A {
4591N/A this.indexMgr = indexMgr;
4591N/A if(b == null)
4591N/A {
4591N/A cache = ByteBuffer.allocate((int)cacheSize);
4591N/A }
4591N/A else
4591N/A {
4591N/A cache = b;
4591N/A }
4591N/A loadCache();
4591N/A cache.flip();
4963N/A keyBuf.flip();
4591N/A }
4591N/A
4591N/A
4591N/A private void loadCache() throws IOException
4591N/A {
4591N/A FileChannel fileChannel = indexMgr.getChannel();
4591N/A fileChannel.position(begin + offset);
4591N/A long leftToRead = end - (begin + offset);
4591N/A long bytesToRead;
4591N/A if(leftToRead < cache.remaining())
4591N/A {
4649N/A cache.limit((int) (cache.position() + leftToRead));
4591N/A bytesToRead = (int)leftToRead;
4591N/A }
4591N/A else
4591N/A {
4591N/A bytesToRead = Math.min((end - offset),cache.remaining());
4591N/A }
4591N/A int bytesRead = 0;
4591N/A while(bytesRead < bytesToRead)
4591N/A {
4591N/A bytesRead += fileChannel.read(cache);
4591N/A }
4591N/A offset += bytesRead;
4591N/A indexMgr.addBytesRead(bytesRead);
4591N/A }
4591N/A
4649N/A public boolean hasMoreData() throws IOException
4591N/A {
5117N/A boolean ret = ((begin + offset) >= end);
5117N/A return !(cache.remaining() == 0 && ret);
4591N/A }
4591N/A
4963N/A public int getKeyLen()
4963N/A {
4963N/A return keyBuf.limit();
4963N/A }
4963N/A
4963N/A public void getKey(ByteBuffer b)
4591N/A {
4963N/A keyBuf.get(b.array(), 0, keyBuf.limit());
4963N/A b.limit(keyBuf.limit());
4963N/A }
4963N/A
4963N/A ByteBuffer getKeyBuf()
4963N/A {
4963N/A return keyBuf;
4591N/A }
4591N/A
4643N/A public ImportIDSet getInsertIDSet()
4591N/A {
4643N/A return insertIDSet;
4643N/A }
4643N/A
4643N/A public ImportIDSet getDeleteIDSet()
4643N/A {
4643N/A return deleteIDSet;
4591N/A }
4591N/A
4649N/A public long getBufferID()
4591N/A {
4591N/A return id;
4591N/A }
4591N/A
4643N/A public Integer getIndexID()
4643N/A {
4643N/A if(indexID == null)
4643N/A {
4643N/A try {
4643N/A getNextRecord();
4643N/A } catch(IOException ex) {
4963N/A Message message = ERR_JEB_IO_ERROR.get(ex.getMessage());
4963N/A logError(message);
4963N/A ex.printStackTrace();
4963N/A System.exit(1);
4643N/A }
4643N/A }
4643N/A return indexID;
4643N/A }
4643N/A
4591N/A public void getNextRecord() throws IOException
4591N/A {
4643N/A getNextIndexID();
4649N/A getContainerParameters();
4591N/A getNextKey();
4643N/A getNextIDSet(true); //get insert ids
4643N/A getNextIDSet(false); //get delete ids
4643N/A }
4643N/A
4649N/A private void getContainerParameters()
4643N/A {
4643N/A limit = 1;
4643N/A doCount = false;
4643N/A if(!indexMgr.isDN2ID())
4643N/A {
4643N/A Index index = (Index) idContainerMap.get(indexID);
4643N/A limit = index.getIndexEntryLimit();
4643N/A doCount = index.getMaintainCount();
4736N/A if(insertIDSet == null)
4736N/A {
4736N/A insertIDSet = new ImportIDSet(128, limit, doCount);
4736N/A deleteIDSet = new ImportIDSet(128, limit, doCount);
4736N/A }
4643N/A }
4643N/A else
4643N/A {
4736N/A if(insertIDSet == null)
4736N/A {
4736N/A insertIDSet = new ImportIDSet(1, limit, doCount);
4736N/A deleteIDSet = new ImportIDSet(1, limit, doCount);
4736N/A }
4643N/A }
4591N/A }
4591N/A
4963N/A
4591N/A private int getInt() throws IOException
4591N/A {
4591N/A ensureData(4);
4591N/A return cache.getInt();
4591N/A }
4591N/A
4643N/A private void getNextIndexID() throws IOException, BufferUnderflowException
4643N/A {
4649N/A indexID = getInt();
4643N/A }
4643N/A
4591N/A private void getNextKey() throws IOException, BufferUnderflowException
4591N/A {
4963N/A ensureData(20);
4963N/A byte[] ba = cache.array();
4963N/A int p = cache.position();
4963N/A int len = PackedInteger.getReadIntLength(ba, p);
4963N/A int keyLen = PackedInteger.readInt(ba, p);
4963N/A cache.position(p + len);
4963N/A if(keyLen > keyBuf.capacity())
4963N/A {
4963N/A keyBuf = ByteBuffer.allocate(keyLen);
4963N/A }
4963N/A ensureData(keyLen);
4963N/A keyBuf.clear();
4963N/A cache.get(keyBuf.array(), 0, keyLen);
4963N/A keyBuf.limit(keyLen);
4591N/A }
4591N/A
4643N/A private void getNextIDSet(boolean insert)
4643N/A throws IOException, BufferUnderflowException
4591N/A {
4963N/A ensureData(20);
4963N/A int p = cache.position();
4963N/A byte[] ba = cache.array();
4963N/A int len = PackedInteger.getReadIntLength(ba, p);
4963N/A int keyCount = PackedInteger.readInt(ba, p);
4963N/A p += len;
4963N/A cache.position(p);
4643N/A if(insert)
4643N/A {
4963N/A insertIDSet.clear(false);
4643N/A }
4643N/A else
4643N/A {
4963N/A deleteIDSet.clear(false);
4643N/A }
4963N/A for(int k = 0; k < keyCount; k++)
4591N/A {
4963N/A if(ensureData(9))
4963N/A {
4963N/A p = cache.position();
4963N/A }
4963N/A len = PackedInteger.getReadLongLength(ba, p);
4963N/A long l = PackedInteger.readLong(ba, p);
4963N/A p += len;
4963N/A cache.position(p);
4643N/A if(insert)
4643N/A {
4643N/A insertIDSet.addEntryID(l);
4643N/A }
4643N/A else
4643N/A {
4643N/A deleteIDSet.addEntryID(l);
4643N/A }
4591N/A }
4591N/A }
4591N/A
4591N/A
4963N/A private boolean ensureData(int len) throws IOException
4591N/A {
4963N/A boolean ret = false;
4591N/A if(cache.remaining() == 0)
4591N/A {
4591N/A cache.clear();
4591N/A loadCache();
4591N/A cache.flip();
4963N/A ret = true;
4591N/A }
4591N/A else if(cache.remaining() < len)
4591N/A {
4591N/A cache.compact();
4591N/A loadCache();
4591N/A cache.flip();
4963N/A ret = true;
4591N/A }
4963N/A return ret;
4591N/A }
4591N/A
4643N/A
4963N/A private int compare(ByteBuffer cKey, Integer cIndexID)
4643N/A {
5117N/A int returnCode, rc;
4963N/A if(keyBuf.limit() == 0)
4643N/A {
4643N/A getIndexID();
4643N/A }
4963N/A if(indexMgr.isDN2ID())
4963N/A {
4963N/A rc = dnComparator.compare(keyBuf.array(), 0, keyBuf.limit(),
4963N/A cKey.array(), cKey.limit());
4963N/A }
4963N/A else
4963N/A {
4963N/A rc = indexComparator.compare(keyBuf.array(), 0, keyBuf.limit(),
4963N/A cKey.array(), cKey.limit());
4963N/A }
4963N/A if(rc != 0) {
4649N/A returnCode = 1;
4643N/A }
4643N/A else
4643N/A {
4649N/A returnCode = (indexID.intValue() == cIndexID.intValue()) ? 0 : 1;
4643N/A }
4649N/A return returnCode;
4643N/A }
4643N/A
4643N/A
4643N/A
4591N/A public int compareTo(Buffer o) {
4643N/A //used in remove.
4591N/A if(this.equals(o))
4591N/A {
4591N/A return 0;
4591N/A }
4963N/A if(keyBuf.limit() == 0) {
4643N/A getIndexID();
4643N/A }
4963N/A if(o.getKeyBuf().limit() == 0)
4643N/A {
4643N/A o.getIndexID();
4643N/A }
5117N/A int returnCode;
4963N/A byte[] oKey = o.getKeyBuf().array();
4963N/A int oLen = o.getKeyBuf().limit();
4963N/A if(indexMgr.isDN2ID())
4963N/A {
4963N/A returnCode = dnComparator.compare(keyBuf.array(), 0, keyBuf.limit(),
4963N/A oKey, oLen);
4963N/A }
4963N/A else
4963N/A {
4963N/A returnCode = indexComparator.compare(keyBuf.array(), 0, keyBuf.limit(),
4963N/A oKey, oLen);
4963N/A }
4649N/A if(returnCode == 0)
4591N/A {
4643N/A if(indexID.intValue() == o.getIndexID().intValue())
4591N/A {
4643N/A if(insertIDSet.isDefined())
4643N/A {
4649N/A returnCode = -1;
4643N/A }
4643N/A else if(o.getInsertIDSet().isDefined())
4643N/A {
4649N/A returnCode = 1;
4643N/A }
4643N/A else if(insertIDSet.size() == o.getInsertIDSet().size())
4643N/A {
4649N/A returnCode = id > o.getBufferID() ? 1 : -1;
4643N/A }
4643N/A else
4643N/A {
4649N/A returnCode = insertIDSet.size() - o.getInsertIDSet().size();
4643N/A }
4591N/A }
4649N/A else if(indexID > o.getIndexID())
4591N/A {
4649N/A returnCode = 1;
4591N/A }
4591N/A else
4591N/A {
4649N/A returnCode = -1;
4591N/A }
4591N/A }
4649N/A return returnCode;
4591N/A }
4591N/A }
4591N/A
4963N/A
4591N/A /**
4963N/A * The index manager class has several functions:
4963N/A *
4963N/A * 1. It used to carry information about index processing created in phase
4963N/A * one to phase two.
4963N/A *
4963N/A * 2. It collects statistics about phase two processing for each index.
4963N/A *
4963N/A * 3. It manages opening and closing the scratch index files.
4591N/A */
4966N/A private final class IndexManager implements Comparable<IndexManager>
4591N/A {
4591N/A private final File file;
4649N/A private RandomAccessFile rFile = null;
4591N/A private final List<Buffer> bufferList = new LinkedList<Buffer>();
4591N/A private long fileLength, bytesRead = 0;
4963N/A private boolean done = false, started = false;
4591N/A private long totalDNS;
4591N/A private AtomicInteger keyCount = new AtomicInteger(0);
4649N/A private final String fileName;
4649N/A private final boolean isDN;
4963N/A private final int limit;
4963N/A
4963N/A
4963N/A IndexManager(String fileName, boolean isDN, int limit)
4591N/A {
4649N/A file = new File(tempDir, fileName);
4649N/A this.fileName = fileName;
4649N/A this.isDN = isDN;
4963N/A this.limit = limit;
4591N/A }
4591N/A
4963N/A
4963N/A void openIndexFile() throws FileNotFoundException
4591N/A {
4649N/A rFile = new RandomAccessFile(file, "r");
4591N/A }
4591N/A
4963N/A
4591N/A public FileChannel getChannel()
4591N/A {
4649N/A return rFile.getChannel();
4591N/A }
4591N/A
4963N/A
4591N/A public void addBuffer(Buffer o)
4591N/A {
4591N/A this.bufferList.add(o);
4591N/A }
4591N/A
4963N/A
4591N/A public List<Buffer> getBufferList()
4591N/A {
4591N/A return bufferList;
4591N/A }
4591N/A
4963N/A
4591N/A public File getFile()
4591N/A {
4591N/A return file;
4591N/A }
4591N/A
4963N/A
4649N/A public boolean deleteIndexFile()
4591N/A {
4963N/A return file.delete();
4591N/A }
4591N/A
4963N/A
4591N/A public void close() throws IOException
4591N/A {
4963N/A rFile.close();
4591N/A }
4591N/A
4963N/A
4591N/A public void setFileLength()
4591N/A {
4591N/A this.fileLength = file.length();
4591N/A }
4591N/A
4963N/A
4591N/A public void addBytesRead(int bytesRead)
4591N/A {
4591N/A this.bytesRead += bytesRead;
4591N/A }
4591N/A
4963N/A
4591N/A public void setDone()
4591N/A {
4591N/A this.done = true;
4591N/A }
4591N/A
4963N/A
4963N/A public void setStarted()
4963N/A {
4963N/A started = true;
4963N/A }
4963N/A
4963N/A
4591N/A public void addTotDNCount(int delta)
4591N/A {
4591N/A this.totalDNS += delta;
4591N/A }
4591N/A
4591N/A
4643N/A public long getDNCount()
4591N/A {
4591N/A return totalDNS;
4591N/A }
4591N/A
4963N/A
4643N/A public boolean isDN2ID()
4643N/A {
4649N/A return isDN;
4643N/A }
4591N/A
4963N/A
4591N/A public void printStats(long deltaTime)
4591N/A {
4963N/A if(!done && started)
4591N/A {
4591N/A float rate = 1000f * keyCount.getAndSet(0) / deltaTime;
4649N/A Message message = NOTE_JEB_IMPORT_LDIF_PHASE_TWO_REPORT.get(fileName,
4963N/A (fileLength - bytesRead), rate);
4649N/A logError(message);
4591N/A }
4591N/A }
4591N/A
4963N/A
4649N/A public void incrementKeyCount()
4591N/A {
4591N/A keyCount.incrementAndGet();
4591N/A }
4643N/A
4963N/A
4649N/A public String getFileName()
4643N/A {
4649N/A return fileName;
4643N/A }
4963N/A
4963N/A
4963N/A public int getLimit()
4963N/A {
4963N/A return limit;
4963N/A }
4966N/A
4966N/A
4966N/A public int compareTo(IndexManager mgr)
4966N/A {
4966N/A if(bufferList.size() == mgr.getBufferList().size())
4966N/A {
4966N/A return 0;
4966N/A }
4966N/A else if (bufferList.size() < mgr.getBufferList().size())
4966N/A {
4966N/A return -1;
4966N/A }
4966N/A else
4966N/A {
4966N/A return 1;
4966N/A }
4966N/A }
4591N/A }
4591N/A
4765N/A
4765N/A /**
4963N/A * The rebuild index manager handles all rebuild index related processing.
4765N/A */
4963N/A class RebuildIndexManager extends ImportTask {
4963N/A
4963N/A //Rebuild index configuration.
4765N/A private final RebuildConfig rebuildConfig;
4963N/A
4963N/A //Local DB backend configuration.
4765N/A private final LocalDBBackendCfg cfg;
4963N/A
4963N/A //Map of index keys to indexes.
4765N/A private final Map<IndexKey, Index> indexMap =
4765N/A new LinkedHashMap<IndexKey, Index>();
4963N/A
4963N/A //Map of index keys to extensible indexes.
4765N/A private final Map<IndexKey, Collection<Index>> extensibleIndexMap =
4765N/A new LinkedHashMap<IndexKey, Collection<Index>>();
4963N/A
4963N/A //List of VLV indexes.
4895N/A private final List<VLVIndex> vlvIndexes = new LinkedList<VLVIndex>();
4963N/A
4963N/A //The DN2ID index.
4765N/A private DN2ID dn2id = null;
4963N/A
4963N/A //The DN2URI index.
4765N/A private DN2URI dn2uri = null;
4963N/A
4963N/A //Total entries to be processed.
4765N/A private long totalEntries =0;
4963N/A
4963N/A //Total entries processed.
4765N/A private final AtomicLong entriesProcessed = new AtomicLong(0);
4963N/A
4963N/A //The suffix instance.
4765N/A private Suffix suffix = null;
4963N/A
4963N/A //Set to true if the rebuild all flag was specified.
4765N/A private final boolean rebuildAll;
4963N/A
4963N/A //The entry container.
4963N/A private EntryContainer entryContainer;
4765N/A
4765N/A
4765N/A /**
4765N/A * Create an instance of the rebuild index manager using the specified
4765N/A * parameters.
4765N/A *
4765N/A * @param rebuildConfig The rebuild configuration to use.
4765N/A * @param cfg The local DB configuration to use.
4765N/A */
4963N/A public RebuildIndexManager(RebuildConfig rebuildConfig,
4963N/A LocalDBBackendCfg cfg)
4765N/A {
4765N/A this.rebuildConfig = rebuildConfig;
4765N/A this.cfg = cfg;
4963N/A rebuildAll = rebuildConfig.isRebuildAll();
4765N/A }
4765N/A
4963N/A
4765N/A /**
4963N/A * Initialize a rebuild index manager.
4765N/A *
4765N/A * @throws ConfigException If an configuration error occurred.
4765N/A * @throws InitializationException If an initialization error occurred.
4765N/A */
4765N/A public void initialize() throws ConfigException, InitializationException
4765N/A {
4963N/A entryContainer =
4963N/A rootContainer.getEntryContainer(rebuildConfig.getBaseDN());
4963N/A suffix = Suffix.createSuffixContext(entryContainer, null, null, null);
4765N/A if(suffix == null)
4765N/A {
4765N/A Message msg = ERR_JEB_REBUILD_SUFFIX_ERROR.get(rebuildConfig.
4765N/A getBaseDN().toString());
4765N/A throw new InitializationException(msg);
4765N/A }
4765N/A }
4765N/A
4963N/A
4765N/A /**
4765N/A * Print start message.
4765N/A *
4765N/A * @throws DatabaseException If an database error occurred.
4765N/A */
4765N/A public void printStartMessage() throws DatabaseException
4765N/A {
4765N/A StringBuilder sb = new StringBuilder();
4765N/A List<String> rebuildList = rebuildConfig.getRebuildList();
4765N/A for(String index : rebuildList)
4765N/A {
4765N/A if(sb.length() > 0)
4765N/A {
4765N/A sb.append(", ");
4765N/A }
4765N/A sb.append(index);
4765N/A }
4765N/A totalEntries = suffix.getID2Entry().getRecordCount();
4765N/A Message message = NOTE_JEB_REBUILD_START.get(sb.toString(), totalEntries);
4765N/A if(rebuildAll) {
4765N/A message = NOTE_JEB_REBUILD_ALL_START.get(totalEntries);
4765N/A }
4765N/A logError(message);
4765N/A }
4765N/A
4963N/A
4765N/A /**
4765N/A * Print stop message.
4765N/A *
4765N/A * @param startTime The time the rebuild started.
4765N/A */
4765N/A public void printStopMessage(long startTime)
4963N/A {
4765N/A long finishTime = System.currentTimeMillis();
4765N/A long totalTime = (finishTime - startTime);
4765N/A float rate = 0;
4765N/A if (totalTime > 0)
4765N/A {
4765N/A rate = 1000f* entriesProcessed.get() / totalTime;
4765N/A }
4765N/A Message message =
4963N/A NOTE_JEB_REBUILD_FINAL_STATUS.get(entriesProcessed.get(),
4963N/A totalTime/1000, rate);
4963N/A logError(message);
4963N/A }
4765N/A
4765N/A
4765N/A /**
4765N/A * {@inheritDoc}
4765N/A */
4765N/A public Void call() throws Exception
4765N/A {
4963N/A ID2Entry id2entry = entryContainer.getID2Entry();
4765N/A Cursor cursor = id2entry.openCursor(null, CursorConfig.READ_COMMITTED);
4765N/A DatabaseEntry key = new DatabaseEntry();
4765N/A DatabaseEntry data = new DatabaseEntry();
4765N/A LockMode lockMode = LockMode.DEFAULT;
4765N/A OperationStatus status;
4765N/A try {
4963N/A for (status = cursor.getFirst(key, data, lockMode);
4963N/A status == OperationStatus.SUCCESS;
4963N/A status = cursor.getNext(key, data, lockMode))
4963N/A {
4963N/A if(isPhaseOneCanceled)
4963N/A {
4963N/A return null;
4963N/A }
4963N/A EntryID entryID = new EntryID(key);
4963N/A Entry entry = ID2Entry.entryFromDatabase(
4963N/A ByteString.wrap(data.getData()),
4963N/A entryContainer.getRootContainer().getCompressedSchema());
4963N/A processEntry(entry, entryID);
4963N/A entriesProcessed.getAndIncrement();
4963N/A }
4963N/A flushIndexBuffers();
4963N/A cursor.close();
4963N/A }
4963N/A catch (Exception e)
4765N/A {
4963N/A Message message =
4963N/A ERR_JEB_IMPORT_LDIF_REBUILD_INDEX_TASK_ERR.get(e.getMessage());
4963N/A logError(message);
4963N/A isPhaseOneCanceled = true;
4963N/A throw e;
4765N/A }
4765N/A return null;
4963N/A }
4963N/A
4765N/A
4765N/A /**
4963N/A * Perform rebuild index processing.
4765N/A *
4765N/A * @throws DatabaseException If an database error occurred.
4765N/A * @throws InterruptedException If an interrupted error occurred.
4765N/A * @throws ExecutionException If an Excecution error occurred.
4765N/A * @throws JebException If an JEB error occurred.
4765N/A */
4765N/A public void rebuldIndexes() throws DatabaseException, InterruptedException,
4963N/A ExecutionException, JebException
4963N/A {
4963N/A phaseOne();
4963N/A if(isPhaseOneCanceled)
4963N/A {
4963N/A throw new InterruptedException("Rebuild Index canceled.");
4963N/A }
4963N/A phaseTwo();
4963N/A if(rebuildAll)
4963N/A {
4963N/A setAllIndexesTrusted();
4963N/A }
4963N/A else
4963N/A {
4963N/A setRebuildListIndexesTrusted();
4963N/A }
4963N/A }
4963N/A
4765N/A
4895N/A private void setRebuildListIndexesTrusted() throws JebException
4895N/A {
4895N/A try
4895N/A {
4895N/A if(dn2id != null)
4895N/A {
4895N/A EntryContainer ec = suffix.getEntryContainer();
4895N/A ec.getID2Children().setTrusted(null,true);
4895N/A ec.getID2Subtree().setTrusted(null, true);
4895N/A }
4895N/A if(!indexMap.isEmpty())
4895N/A {
4895N/A for(Map.Entry<IndexKey, Index> mapEntry : indexMap.entrySet()) {
4895N/A Index index = mapEntry.getValue();
4895N/A index.setTrusted(null, true);
4895N/A }
4895N/A }
4895N/A if(!vlvIndexes.isEmpty())
4895N/A {
4895N/A for(VLVIndex vlvIndex : vlvIndexes)
4895N/A {
4895N/A vlvIndex.setTrusted(null, true);
4895N/A }
4895N/A }
4895N/A if(!extensibleIndexMap.isEmpty())
4895N/A {
5117N/A for(Collection<Index> subIndexes : extensibleIndexMap.values())
5117N/A {
5117N/A if(subIndexes != null) {
5117N/A for(Index subIndex : subIndexes) {
5117N/A subIndex.setTrusted(null, true);
5117N/A }
4895N/A }
4895N/A }
4895N/A }
4895N/A }
4895N/A catch (DatabaseException ex)
4895N/A {
4895N/A Message message =
4895N/A NOTE_JEB_IMPORT_LDIF_TRUSTED_FAILED.get(ex.getMessage());
4895N/A throw new JebException(message);
4895N/A }
4895N/A }
4895N/A
4963N/A
4895N/A private void setAllIndexesTrusted() throws JebException
4963N/A {
4963N/A try {
4963N/A suffix.setIndexesTrusted();
4963N/A }
4963N/A catch (DatabaseException ex)
4963N/A {
4963N/A Message message =
4963N/A NOTE_JEB_IMPORT_LDIF_TRUSTED_FAILED.get(ex.getMessage());
4963N/A throw new JebException(message);
4963N/A }
4963N/A }
4963N/A
4963N/A
4963N/A private void phaseOne() throws DatabaseException,
4963N/A InterruptedException, ExecutionException {
4963N/A if(rebuildAll)
4963N/A {
4963N/A clearAllIndexes();
4963N/A }
4963N/A else
4963N/A {
4963N/A clearRebuildListIndexes();
4963N/A }
4963N/A initializeIndexBuffers();
4963N/A RebuildFirstPhaseProgressTask progressTask =
4963N/A new RebuildFirstPhaseProgressTask();
4963N/A Timer timer = new Timer();
4963N/A timer.scheduleAtFixedRate(progressTask, TIMER_INTERVAL, TIMER_INTERVAL);
4963N/A scratchFileWriterService = Executors.newFixedThreadPool(2 * indexCount);
4963N/A bufferSortService = Executors.newFixedThreadPool(threadCount);
4963N/A ExecutorService rebuildIndexService =
4963N/A Executors.newFixedThreadPool(threadCount);
4963N/A List<Callable<Void>> tasks = new ArrayList<Callable<Void>>(threadCount);
4963N/A for (int i = 0; i < threadCount; i++)
4963N/A {
4963N/A tasks.add(this);
4963N/A }
4963N/A List<Future<Void>> results = rebuildIndexService.invokeAll(tasks);
4963N/A for (Future<Void> result : results) {
4963N/A if(!result.isDone()) {
4963N/A result.get();
4963N/A }
4963N/A }
4963N/A stopScratchFileWriters();
4963N/A for (Future<?> result : scratchFileWriterFutures)
4963N/A {
4963N/A if(!result.isDone()) {
4963N/A result.get();
4963N/A }
4963N/A }
4963N/A //Try to clear as much memory as possible.
4963N/A tasks.clear();
4963N/A results.clear();
4963N/A rebuildIndexService.shutdown();
4963N/A freeBufferQueue.clear();
4963N/A bufferSortService.shutdown();
5016N/A scratchFileWriterService.shutdown();
4963N/A timer.cancel();
4963N/A }
4963N/A
4963N/A
4963N/A private void phaseTwo() throws InterruptedException, JebException,
4963N/A ExecutionException
4963N/A {
4963N/A SecondPhaseProgressTask progressTask =
4963N/A new SecondPhaseProgressTask(entriesProcessed.get());
4963N/A Timer timer2 = new Timer();
4963N/A timer2.scheduleAtFixedRate(progressTask, TIMER_INTERVAL, TIMER_INTERVAL);
4963N/A processIndexFiles();
4963N/A timer2.cancel();
4963N/A }
4963N/A
4963N/A
4963N/A private int getIndexCount() throws ConfigException, JebException
4963N/A {
4963N/A int indexCount;
4963N/A if(!rebuildAll)
4963N/A {
4963N/A indexCount = getRebuildListIndexCount(cfg);
4963N/A }
4963N/A else
4963N/A {
4963N/A indexCount = getAllIndexesCount(cfg);
4963N/A }
4963N/A return indexCount;
4963N/A }
4963N/A
4963N/A
4963N/A private int getAllIndexesCount(LocalDBBackendCfg cfg)
4963N/A {
4963N/A int indexCount = cfg.listLocalDBIndexes().length;
4963N/A indexCount += cfg.listLocalDBVLVIndexes().length;
4963N/A //Add four for: DN, id2subtree, id2children and dn2uri.
4963N/A indexCount += 4;
4963N/A return indexCount;
4963N/A }
4963N/A
4963N/A
4963N/A private int getRebuildListIndexCount(LocalDBBackendCfg cfg)
4963N/A throws JebException, ConfigException
4765N/A {
4963N/A int indexCount = 0;
4963N/A List<String> rebuildList = rebuildConfig.getRebuildList();
4963N/A if(!rebuildList.isEmpty())
4963N/A {
4963N/A for (String index : rebuildList)
4963N/A {
4963N/A String lowerName = index.toLowerCase();
4963N/A if (lowerName.equals("dn2id"))
4963N/A {
4963N/A indexCount += 3;
4963N/A }
4963N/A else if (lowerName.equals("dn2uri"))
4963N/A {
4963N/A indexCount++;
4963N/A }
4963N/A else if (lowerName.startsWith("vlv."))
4963N/A {
4963N/A if(lowerName.length() < 5)
4963N/A {
4963N/A Message msg = ERR_JEB_VLV_INDEX_NOT_CONFIGURED.get(lowerName);
4963N/A throw new JebException(msg);
4963N/A }
4963N/A indexCount++;
4963N/A } else if(lowerName.equals("id2subtree") ||
4963N/A lowerName.equals("id2children"))
4963N/A {
4963N/A Message msg = ERR_JEB_ATTRIBUTE_INDEX_NOT_CONFIGURED.get(index);
4963N/A throw new JebException(msg);
4963N/A }
4963N/A else
4963N/A {
4963N/A String[] attrIndexParts = lowerName.split("\\.");
4963N/A if((attrIndexParts.length <= 0) || (attrIndexParts.length > 3))
4963N/A {
4963N/A Message msg = ERR_JEB_ATTRIBUTE_INDEX_NOT_CONFIGURED.get(index);
4963N/A throw new JebException(msg);
4963N/A }
4963N/A AttributeType attrType =
4963N/A DirectoryServer.getAttributeType(attrIndexParts[0]);
4963N/A if (attrType == null)
4963N/A {
4963N/A Message msg = ERR_JEB_ATTRIBUTE_INDEX_NOT_CONFIGURED.get(index);
4963N/A throw new JebException(msg);
4963N/A }
4963N/A if(attrIndexParts.length != 1)
4963N/A {
4963N/A if(attrIndexParts.length == 2)
4963N/A {
4963N/A if(attrIndexParts[1].equals("presence"))
4963N/A {
4963N/A indexCount++;
4963N/A }
4963N/A else if(attrIndexParts[1].equals("equality"))
4963N/A {
4963N/A indexCount++;
4963N/A }
4963N/A else if(attrIndexParts[1].equals("substring"))
4963N/A {
4963N/A indexCount++;
4963N/A }
4963N/A else if(attrIndexParts[1].equals("ordering"))
4963N/A {
4963N/A indexCount++;
4963N/A }
4963N/A else if(attrIndexParts[1].equals("approximate"))
4963N/A {
4963N/A indexCount++;
4963N/A } else {
4963N/A Message msg =
4963N/A ERR_JEB_ATTRIBUTE_INDEX_NOT_CONFIGURED.get(index);
4963N/A throw new JebException(msg);
4963N/A }
4963N/A }
4963N/A else
4963N/A {
4963N/A boolean found = false;
4963N/A String s = attrIndexParts[1] + "." + attrIndexParts[2];
4963N/A for (String idx : cfg.listLocalDBIndexes())
4963N/A {
4963N/A LocalDBIndexCfg indexCfg = cfg.getLocalDBIndex(idx);
4963N/A if (indexCfg.getIndexType().
4963N/A contains(LocalDBIndexCfgDefn.IndexType.EXTENSIBLE))
4963N/A {
4963N/A Set<String> extensibleRules =
4963N/A indexCfg.getIndexExtensibleMatchingRule();
4963N/A for(String exRule : extensibleRules)
4963N/A {
4963N/A if(exRule.equalsIgnoreCase(s))
4963N/A {
4963N/A found = true;
4963N/A break;
4963N/A }
4963N/A }
4963N/A }
4963N/A if(found)
4963N/A {
4963N/A break;
4963N/A }
4963N/A }
4963N/A if(!found) {
4963N/A Message msg =
4963N/A ERR_JEB_ATTRIBUTE_INDEX_NOT_CONFIGURED.get(index);
4963N/A throw new JebException(msg);
4963N/A }
4963N/A indexCount++;
4963N/A }
4963N/A }
4963N/A else
4963N/A {
4963N/A for (String idx : cfg.listLocalDBIndexes())
4963N/A {
4963N/A if(!idx.equalsIgnoreCase(index))
4963N/A {
4963N/A continue;
4963N/A }
4963N/A LocalDBIndexCfg indexCfg = cfg.getLocalDBIndex(idx);
4963N/A if(indexCfg.getIndexType().
4963N/A contains(LocalDBIndexCfgDefn.IndexType.EQUALITY))
4963N/A {
4963N/A indexCount++;
4963N/A }
4963N/A if(indexCfg.getIndexType().
4963N/A contains(LocalDBIndexCfgDefn.IndexType.ORDERING))
4963N/A {
4963N/A indexCount++;
4963N/A }
4963N/A if(indexCfg.getIndexType().
4963N/A contains(LocalDBIndexCfgDefn.IndexType.PRESENCE))
4963N/A {
4963N/A indexCount++;
4963N/A }
4963N/A if(indexCfg.getIndexType().
4963N/A contains(LocalDBIndexCfgDefn.IndexType.SUBSTRING))
4963N/A {
4963N/A indexCount++;
4963N/A }
4963N/A if(indexCfg.getIndexType().
4963N/A contains(LocalDBIndexCfgDefn.IndexType.APPROXIMATE))
4963N/A {
4963N/A indexCount++;
4963N/A }
4963N/A if (indexCfg.getIndexType().
4963N/A contains(LocalDBIndexCfgDefn.IndexType.EXTENSIBLE))
4963N/A {
4963N/A Set<String> extensibleRules =
4963N/A indexCfg.getIndexExtensibleMatchingRule();
4963N/A boolean shared = false;
4963N/A for(String exRule : extensibleRules)
4963N/A {
4963N/A if(exRule.endsWith(".sub"))
4963N/A {
4963N/A indexCount++;
4963N/A }
4963N/A else
4963N/A {
4963N/A if(!shared)
4963N/A {
4963N/A shared=true;
4963N/A indexCount++;
4963N/A }
4963N/A }
4963N/A }
4963N/A }
4963N/A }
4963N/A }
4963N/A }
4963N/A }
4963N/A }
4963N/A return indexCount;
4765N/A }
4963N/A
4963N/A
4963N/A private void clearRebuildListIndexes() throws DatabaseException
4963N/A {
4963N/A List<String> rebuildList = rebuildConfig.getRebuildList();
4963N/A if(!rebuildList.isEmpty())
4963N/A {
4963N/A for (String index : rebuildList)
4963N/A {
4963N/A String lowerName = index.toLowerCase();
4963N/A if (lowerName.equals("dn2id"))
4963N/A {
4963N/A clearDN2IDIndexes();
4963N/A }
4963N/A else if (lowerName.equals("dn2uri"))
4963N/A {
4963N/A clearDN2URI();
4963N/A }
4963N/A else if (lowerName.startsWith("vlv."))
4963N/A {
4963N/A clearVLVIndex(lowerName.substring(4));
4963N/A }
4963N/A else
4963N/A {
4963N/A String[] attrIndexParts = lowerName.split("\\.");
4963N/A AttributeType attrType =
4963N/A DirectoryServer.getAttributeType(attrIndexParts[0]);
4963N/A AttributeIndex attrIndex =
4963N/A entryContainer.getAttributeIndex(attrType);
4963N/A if(attrIndexParts.length != 1)
4963N/A {
4963N/A Index partialAttrIndex;
4963N/A if(attrIndexParts[1].equals("presence"))
4963N/A {
4963N/A partialAttrIndex = attrIndex.getPresenceIndex();
4963N/A int id = System.identityHashCode(partialAttrIndex);
4963N/A idContainerMap.putIfAbsent(id, partialAttrIndex);
4963N/A entryContainer.clearDatabase(partialAttrIndex);
4963N/A IndexKey indexKey =
4963N/A new IndexKey(attrType, ImportIndexType.PRESENCE,
4963N/A partialAttrIndex.getIndexEntryLimit());
4963N/A indexMap.put(indexKey, partialAttrIndex);
4963N/A }
4963N/A else if(attrIndexParts[1].equals("equality"))
4963N/A {
4963N/A partialAttrIndex = attrIndex.getEqualityIndex();
4963N/A int id = System.identityHashCode(partialAttrIndex);
4963N/A idContainerMap.putIfAbsent(id, partialAttrIndex);
4963N/A entryContainer.clearDatabase(partialAttrIndex);
4963N/A IndexKey indexKey =
4963N/A new IndexKey(attrType, ImportIndexType.EQUALITY,
4963N/A partialAttrIndex.getIndexEntryLimit());
4963N/A indexMap.put(indexKey, partialAttrIndex);
4963N/A }
4963N/A else if(attrIndexParts[1].equals("substring"))
4963N/A {
4963N/A partialAttrIndex = attrIndex.getSubstringIndex();
4963N/A int id = System.identityHashCode(partialAttrIndex);
4963N/A idContainerMap.putIfAbsent(id, partialAttrIndex);
4963N/A entryContainer.clearDatabase(partialAttrIndex);
4963N/A IndexKey indexKey =
4963N/A new IndexKey(attrType, ImportIndexType.SUBSTRING,
4963N/A partialAttrIndex.getIndexEntryLimit());
4963N/A indexMap.put(indexKey, partialAttrIndex);
4963N/A }
4963N/A else if(attrIndexParts[1].equals("ordering"))
4963N/A {
4963N/A partialAttrIndex = attrIndex.getOrderingIndex();
4963N/A int id = System.identityHashCode(partialAttrIndex);
4963N/A idContainerMap.putIfAbsent(id, partialAttrIndex);
4963N/A entryContainer.clearDatabase(partialAttrIndex);
4963N/A IndexKey indexKey =
4963N/A new IndexKey(attrType, ImportIndexType.ORDERING,
4963N/A partialAttrIndex.getIndexEntryLimit());
4963N/A indexMap.put(indexKey, partialAttrIndex);
4963N/A }
4963N/A else if(attrIndexParts[1].equals("approximate"))
4963N/A {
4963N/A partialAttrIndex = attrIndex.getApproximateIndex();
4963N/A int id = System.identityHashCode(partialAttrIndex);
4963N/A idContainerMap.putIfAbsent(id, partialAttrIndex);
4963N/A entryContainer.clearDatabase(partialAttrIndex);
4963N/A IndexKey indexKey =
4963N/A new IndexKey(attrType, ImportIndexType.APPROXIMATE,
4963N/A partialAttrIndex.getIndexEntryLimit());
4963N/A indexMap.put(indexKey, partialAttrIndex);
4963N/A }
4963N/A else
4963N/A {
4963N/A String dbPart = "shared";
4963N/A if(attrIndexParts[2].startsWith("sub"))
4963N/A {
4963N/A dbPart = "substring";
4963N/A }
4963N/A StringBuilder nameBldr = new StringBuilder();
4963N/A nameBldr.append(entryContainer.getDatabasePrefix());
4963N/A nameBldr.append("_");
4963N/A nameBldr.append(attrIndexParts[0]);
4963N/A nameBldr.append(".");
4963N/A nameBldr.append(attrIndexParts[1]);
4963N/A nameBldr.append(".");
4963N/A nameBldr.append(dbPart);
4963N/A String indexName = nameBldr.toString();
4963N/A Map<String,Collection<Index>> extensibleMap =
4963N/A attrIndex.getExtensibleIndexes();
4963N/A if(!extensibleMap.isEmpty()) {
4963N/A Collection<Index> subIndexes =
4963N/A attrIndex.getExtensibleIndexes().get(
4963N/A EXTENSIBLE_INDEXER_ID_SUBSTRING);
4963N/A if(subIndexes != null) {
4963N/A for(Index subIndex : subIndexes) {
4963N/A String name = subIndex.getName();
4963N/A if(name.equalsIgnoreCase(indexName))
4963N/A {
4963N/A entryContainer.clearDatabase(subIndex);
4963N/A int id = System.identityHashCode(subIndex);
4963N/A idContainerMap.putIfAbsent(id, subIndex);
4963N/A Collection<Index> substring = new ArrayList<Index>();
4963N/A substring.add(subIndex);
4963N/A extensibleIndexMap.put(new IndexKey(attrType,
4963N/A ImportIndexType.EX_SUBSTRING, 0),substring);
4963N/A break;
4963N/A }
4963N/A }
4963N/A Collection<Index> sharedIndexes =
4963N/A attrIndex.getExtensibleIndexes().
4963N/A get(EXTENSIBLE_INDEXER_ID_SHARED);
4963N/A if(sharedIndexes !=null) {
4963N/A for(Index sharedIndex : sharedIndexes) {
4963N/A String name = sharedIndex.getName();
4963N/A if(name.equalsIgnoreCase(indexName))
4963N/A {
4963N/A entryContainer.clearDatabase(sharedIndex);
4963N/A Collection<Index> shared = new ArrayList<Index>();
4963N/A int id = System.identityHashCode(sharedIndex);
4963N/A idContainerMap.putIfAbsent(id, sharedIndex);
4963N/A shared.add(sharedIndex);
4963N/A extensibleIndexMap.put(new IndexKey(attrType,
4963N/A ImportIndexType.EX_SHARED, 0), shared);
4963N/A break;
4963N/A }
4963N/A }
4963N/A }
4963N/A }
4963N/A }
4963N/A }
4963N/A }
4963N/A else
4963N/A {
4963N/A clearAttributeIndexes(attrIndex, attrType);
4963N/A }
4963N/A }
4963N/A }
4963N/A }
4963N/A }
4963N/A
4963N/A
4963N/A private void clearAllIndexes() throws DatabaseException
4963N/A {
4963N/A for(Map.Entry<AttributeType, AttributeIndex> mapEntry :
4963N/A suffix.getAttrIndexMap().entrySet()) {
4963N/A AttributeType attributeType = mapEntry.getKey();
4963N/A AttributeIndex attributeIndex = mapEntry.getValue();
4963N/A clearAttributeIndexes(attributeIndex, attributeType);
4963N/A }
4963N/A for(VLVIndex vlvIndex : suffix.getEntryContainer().getVLVIndexes()) {
4963N/A entryContainer.clearDatabase(vlvIndex);
4963N/A }
4963N/A clearDN2IDIndexes();
4963N/A if(entryContainer.getDN2URI() != null)
4963N/A {
4963N/A clearDN2URI();
4963N/A }
4963N/A }
4963N/A
4963N/A
4963N/A private void clearVLVIndex(String name)
4963N/A throws DatabaseException
4963N/A {
4963N/A VLVIndex vlvIndex = entryContainer.getVLVIndex(name);
4963N/A entryContainer.clearDatabase(vlvIndex);
4963N/A vlvIndexes.add(vlvIndex);
4963N/A }
4963N/A
4963N/A
4963N/A private void clearDN2URI() throws DatabaseException
4963N/A {
4963N/A entryContainer.clearDatabase(entryContainer.getDN2URI());
4963N/A dn2uri = entryContainer.getDN2URI();
4963N/A }
4963N/A
4963N/A
4963N/A private void clearDN2IDIndexes() throws DatabaseException
4963N/A {
4963N/A entryContainer.clearDatabase(entryContainer.getDN2ID());
4963N/A entryContainer.clearDatabase(entryContainer.getID2Children());
4963N/A entryContainer.clearDatabase(entryContainer.getID2Subtree());
4963N/A dn2id = entryContainer.getDN2ID();
4963N/A }
4963N/A
4963N/A
4963N/A private void clearAttributeIndexes(AttributeIndex attrIndex,
4963N/A AttributeType attrType)
4963N/A throws DatabaseException
4963N/A {
4963N/A Index partialAttrIndex;
4963N/A if(attrIndex.getSubstringIndex() != null)
4963N/A {
4963N/A partialAttrIndex = attrIndex.getSubstringIndex();
4963N/A int id = System.identityHashCode(partialAttrIndex);
4963N/A idContainerMap.putIfAbsent(id, partialAttrIndex);
4963N/A entryContainer.clearDatabase(partialAttrIndex);
4963N/A IndexKey indexKey =
4963N/A new IndexKey(attrType, ImportIndexType.SUBSTRING,
4963N/A partialAttrIndex.getIndexEntryLimit());
4963N/A indexMap.put(indexKey, partialAttrIndex);
4963N/A }
4963N/A if(attrIndex.getOrderingIndex() != null)
4963N/A {
4963N/A partialAttrIndex = attrIndex.getOrderingIndex();
4963N/A int id = System.identityHashCode(partialAttrIndex);
4963N/A idContainerMap.putIfAbsent(id, partialAttrIndex);
4963N/A entryContainer.clearDatabase(partialAttrIndex);
4963N/A IndexKey indexKey =
4963N/A new IndexKey(attrType, ImportIndexType.ORDERING,
4963N/A partialAttrIndex.getIndexEntryLimit());
4963N/A indexMap.put(indexKey, partialAttrIndex);
4963N/A }
4963N/A if(attrIndex.getEqualityIndex() != null)
4963N/A {
4963N/A partialAttrIndex = attrIndex.getEqualityIndex();
4963N/A int id = System.identityHashCode(partialAttrIndex);
4963N/A idContainerMap.putIfAbsent(id, partialAttrIndex);
4963N/A entryContainer.clearDatabase(partialAttrIndex);
4963N/A IndexKey indexKey =
4963N/A new IndexKey(attrType, ImportIndexType.EQUALITY,
4963N/A partialAttrIndex.getIndexEntryLimit());
4963N/A indexMap.put(indexKey, partialAttrIndex);
4963N/A }
4963N/A if(attrIndex.getPresenceIndex() != null)
4963N/A {
4963N/A partialAttrIndex = attrIndex.getPresenceIndex();
4963N/A int id = System.identityHashCode(partialAttrIndex);
4963N/A idContainerMap.putIfAbsent(id, partialAttrIndex);
4963N/A entryContainer.clearDatabase(partialAttrIndex);
4963N/A IndexKey indexKey =
4963N/A new IndexKey(attrType, ImportIndexType.PRESENCE,
4963N/A partialAttrIndex.getIndexEntryLimit());
4963N/A indexMap.put(indexKey, partialAttrIndex);
4963N/A
4963N/A }
4963N/A if(attrIndex.getApproximateIndex() != null)
4963N/A {
4963N/A partialAttrIndex = attrIndex.getApproximateIndex();
4963N/A int id = System.identityHashCode(partialAttrIndex);
4963N/A idContainerMap.putIfAbsent(id, partialAttrIndex);
4963N/A entryContainer.clearDatabase(partialAttrIndex);
4963N/A IndexKey indexKey =
4963N/A new IndexKey(attrType, ImportIndexType.APPROXIMATE,
4963N/A partialAttrIndex.getIndexEntryLimit());
4963N/A indexMap.put(indexKey, partialAttrIndex);
4963N/A }
4963N/A Map<String,Collection<Index>> extensibleMap =
4963N/A attrIndex.getExtensibleIndexes();
4963N/A if(!extensibleMap.isEmpty()) {
4963N/A Collection<Index> subIndexes =
4963N/A attrIndex.getExtensibleIndexes().get(
4963N/A EXTENSIBLE_INDEXER_ID_SUBSTRING);
4963N/A if(subIndexes != null) {
4963N/A for(Index subIndex : subIndexes) {
4963N/A entryContainer.clearDatabase(subIndex);
4963N/A int id = System.identityHashCode(subIndex);
4963N/A idContainerMap.putIfAbsent(id, subIndex);
4963N/A }
4963N/A extensibleIndexMap.put(new IndexKey(attrType,
4963N/A ImportIndexType.EX_SUBSTRING, 0), subIndexes);
4963N/A }
4963N/A Collection<Index> sharedIndexes =
4963N/A attrIndex.getExtensibleIndexes().
4963N/A get(EXTENSIBLE_INDEXER_ID_SHARED);
4963N/A if(sharedIndexes !=null) {
4963N/A for(Index sharedIndex : sharedIndexes) {
4963N/A entryContainer.clearDatabase(sharedIndex);
4963N/A int id = System.identityHashCode(sharedIndex);
4963N/A idContainerMap.putIfAbsent(id, sharedIndex);
4963N/A }
4963N/A extensibleIndexMap.put(new IndexKey(attrType,
4963N/A ImportIndexType.EX_SHARED, 0), sharedIndexes);
4963N/A }
4963N/A }
4963N/A }
4963N/A
4963N/A
4963N/A private
4963N/A void processEntry(Entry entry, EntryID entryID) throws DatabaseException,
4963N/A ConfigException, DirectoryException, JebException,
4963N/A InterruptedException
4963N/A {
4963N/A if(dn2id != null)
4963N/A {
4765N/A processDN2ID(suffix, entry.getDN(), entryID);
4963N/A }
4963N/A if(dn2uri != null)
4963N/A {
4765N/A processDN2URI(suffix, null, entry);
4963N/A }
4963N/A processIndexes(entry, entryID);
4963N/A processExtensibleIndexes(entry, entryID);
4963N/A processVLVIndexes(entry, entryID);
4963N/A }
4963N/A
4963N/A
4963N/A private void processVLVIndexes(Entry entry, EntryID entryID)
4963N/A throws DatabaseException, JebException, DirectoryException
4963N/A {
4963N/A for(VLVIndex vlvIdx : suffix.getEntryContainer().getVLVIndexes()) {
4963N/A Transaction transaction = null;
4963N/A vlvIdx.addEntry(transaction, entryID, entry);
4963N/A }
4963N/A }
4963N/A
4963N/A
4963N/A private
4963N/A void processExtensibleIndexes(Entry entry, EntryID entryID) throws
4963N/A DatabaseException, DirectoryException, JebException,
4963N/A ConfigException, InterruptedException
4963N/A {
4963N/A for(Map.Entry<IndexKey, Collection<Index>> mapEntry :
4963N/A this.extensibleIndexMap.entrySet()) {
4963N/A IndexKey key = mapEntry.getKey();
4963N/A AttributeType attrType = key.getAttributeType();
4963N/A if(entry.hasAttribute(attrType)) {
4963N/A Collection<Index> indexes = mapEntry.getValue();
4963N/A for(Index index : indexes) {
4765N/A processAttribute(index, entry, entryID, key);
4963N/A }
4963N/A }
4963N/A }
4963N/A }
4963N/A
4963N/A
4963N/A private void
4963N/A processIndexes(Entry entry, EntryID entryID) throws
4963N/A DatabaseException, DirectoryException, JebException,
4963N/A ConfigException, InterruptedException
4963N/A {
4963N/A for(Map.Entry<IndexKey, Index> mapEntry :
4963N/A indexMap.entrySet()) {
4963N/A IndexKey key = mapEntry.getKey();
4963N/A AttributeType attrType = key.getAttributeType();
4963N/A if(entry.hasAttribute(attrType)) {
4963N/A ImportIndexType indexType = key.getIndexType();
4963N/A Index index = mapEntry.getValue();
4963N/A if(indexType == ImportIndexType.SUBSTRING)
4963N/A {
4963N/A processAttribute(index, entry, entryID,
4963N/A new IndexKey(attrType, ImportIndexType.SUBSTRING,
4963N/A index.getIndexEntryLimit()));
4963N/A }
4963N/A else
4963N/A {
4765N/A processAttribute(index, entry, entryID,
4963N/A new IndexKey(attrType, indexType,
4963N/A index.getIndexEntryLimit()));
4963N/A }
4963N/A }
4963N/A }
4963N/A }
4963N/A
4963N/A
4963N/A /**
4963N/A * Return the number of entries processed by the rebuild manager.
4963N/A *
4963N/A * @return The number of entries processed.
4963N/A */
4963N/A public long getEntriesProcess()
4963N/A {
4963N/A return this.entriesProcessed.get();
4963N/A }
4963N/A
4963N/A
4963N/A /**
4963N/A * Return the total number of entries to process by the rebuild manager.
4963N/A *
4963N/A * @return The total number for entries to process.
4963N/A */
4963N/A public long getTotEntries()
4963N/A {
4963N/A return this.totalEntries;
4963N/A }
4765N/A }
4765N/A
4765N/A /**
4963N/A * This class reports progress of rebuild index processing at fixed
4963N/A * intervals.
4963N/A */
4963N/A class RebuildFirstPhaseProgressTask extends TimerTask
4963N/A {
4963N/A /**
4963N/A * The number of records that had been processed at the time of the
4963N/A * previous progress report.
4963N/A */
4963N/A private long previousProcessed = 0;
4963N/A
4963N/A /**
4963N/A * The time in milliseconds of the previous progress report.
4963N/A */
4963N/A private long previousTime;
4963N/A
4963N/A /**
4963N/A * The environment statistics at the time of the previous report.
4963N/A */
4963N/A private EnvironmentStats prevEnvStats;
4963N/A
4963N/A /**
4963N/A * Create a new rebuild index progress task.
4963N/A *
4963N/A * @throws DatabaseException If an error occurred while accessing the JE
4963N/A * database.
4963N/A */
4963N/A public RebuildFirstPhaseProgressTask() throws DatabaseException
4963N/A {
4963N/A previousTime = System.currentTimeMillis();
4963N/A prevEnvStats = rootContainer.getEnvironmentStats(new StatsConfig());
4963N/A }
4765N/A
4765N/A /**
4963N/A * The action to be performed by this timer task.
4963N/A */
4963N/A public void run()
4963N/A {
4963N/A long latestTime = System.currentTimeMillis();
4963N/A long deltaTime = latestTime - previousTime;
4963N/A
4963N/A if (deltaTime == 0)
4963N/A {
4963N/A return;
4963N/A }
4963N/A long entriesProcessed = rebuildManager.getEntriesProcess();
4963N/A long deltaCount = (entriesProcessed - previousProcessed);
4963N/A float rate = 1000f*deltaCount / deltaTime;
4963N/A float completed = 0;
4963N/A if(rebuildManager.getTotEntries() > 0)
4963N/A {
4963N/A completed = 100f*entriesProcessed / rebuildManager.getTotEntries();
4963N/A }
4963N/A Message message = NOTE_JEB_REBUILD_PROGRESS_REPORT.get(completed,
4963N/A entriesProcessed, rebuildManager.getTotEntries(), rate);
4963N/A logError(message);
4963N/A try
4963N/A {
4963N/A Runtime runtime = Runtime.getRuntime();
4963N/A long freeMemory = runtime.freeMemory() / MB;
4963N/A EnvironmentStats envStats =
4963N/A rootContainer.getEnvironmentStats(new StatsConfig());
4963N/A long nCacheMiss =
4963N/A envStats.getNCacheMiss() - prevEnvStats.getNCacheMiss();
4963N/A
4963N/A float cacheMissRate = 0;
4963N/A if (deltaCount > 0)
4963N/A {
4963N/A cacheMissRate = nCacheMiss/(float)deltaCount;
4963N/A }
4963N/A message = NOTE_JEB_REBUILD_CACHE_AND_MEMORY_REPORT.get(
4963N/A freeMemory, cacheMissRate);
4963N/A logError(message);
4963N/A prevEnvStats = envStats;
4963N/A }
4963N/A catch (DatabaseException e)
4963N/A {
4963N/A
4963N/A }
4963N/A previousProcessed = entriesProcessed;
4963N/A previousTime = latestTime;
4963N/A }
4963N/A }
4765N/A
4765N/A
4591N/A /**
4963N/A * This class reports progress of first phase of import processing at
4963N/A * fixed intervals.
3339N/A */
4591N/A private final class FirstPhaseProgressTask extends TimerTask
3339N/A {
3339N/A /**
3339N/A * The number of entries that had been read at the time of the
3339N/A * previous progress report.
3339N/A */
3339N/A private long previousCount = 0;
3339N/A
3339N/A /**
3339N/A * The time in milliseconds of the previous progress report.
3339N/A */
3339N/A private long previousTime;
3339N/A
3339N/A /**
3339N/A * The environment statistics at the time of the previous report.
3339N/A */
4649N/A private EnvironmentStats previousStats;
3339N/A
3339N/A
4591N/A // Determines if eviction has been detected.
3339N/A private boolean evicting = false;
3339N/A
4591N/A // Entry count when eviction was detected.
3339N/A private long evictionEntryCount = 0;
3339N/A
3339N/A
4963N/A /**
3339N/A * Create a new import progress task.
3339N/A */
4591N/A public FirstPhaseProgressTask()
3339N/A {
3339N/A previousTime = System.currentTimeMillis();
4591N/A try
4591N/A {
4649N/A previousStats =
4591N/A rootContainer.getEnvironmentStats(new StatsConfig());
4591N/A }
4591N/A catch (DatabaseException e)
4591N/A {
4591N/A throw new RuntimeException(e);
4591N/A }
3339N/A }
3339N/A
3339N/A
3339N/A
4963N/A /**
4963N/A * The action to be performed by this timer task.
4963N/A */
4963N/A @Override
4963N/A public void run()
4963N/A {
4963N/A long latestCount = reader.getEntriesRead() + 0;
4963N/A long deltaCount = (latestCount - previousCount);
4963N/A long latestTime = System.currentTimeMillis();
4963N/A long deltaTime = latestTime - previousTime;
4963N/A Message message;
4963N/A if (deltaTime == 0)
4963N/A {
4963N/A return;
4963N/A }
4963N/A long entriesRead = reader.getEntriesRead();
4963N/A long entriesIgnored = reader.getEntriesIgnored();
4963N/A long entriesRejected = reader.getEntriesRejected();
4963N/A float rate = 1000f * deltaCount / deltaTime;
4963N/A message = NOTE_JEB_IMPORT_PROGRESS_REPORT.get(entriesRead,
4963N/A entriesIgnored, entriesRejected, 0, rate);
4963N/A logError(message);
4963N/A try
4591N/A {
4963N/A Runtime runTime = Runtime.getRuntime();
4963N/A long freeMemory = runTime.freeMemory()/MB;
4963N/A EnvironmentStats environmentStats;
4963N/A
4963N/A //If first phase skip DN validation is specified use the root container
4963N/A //stats, else use the temporary environment stats.
4963N/A if(skipDNValidation)
4963N/A {
4963N/A environmentStats =
4963N/A rootContainer.getEnvironmentStats(new StatsConfig());
4963N/A }
4963N/A else
4963N/A {
4963N/A environmentStats = tmpEnv.getEnvironmentStats(new StatsConfig());
4963N/A }
4963N/A long nCacheMiss = environmentStats.getNCacheMiss() -
4963N/A previousStats.getNCacheMiss();
4963N/A
4963N/A float cacheMissRate = 0;
4963N/A if (deltaCount > 0)
4963N/A {
4963N/A cacheMissRate = nCacheMiss / (float) deltaCount;
4963N/A }
4963N/A message =
4963N/A NOTE_JEB_IMPORT_CACHE_AND_MEMORY_REPORT.get(freeMemory,
4963N/A cacheMissRate);
4963N/A logError(message);
4963N/A long evictPasses = environmentStats.getNEvictPasses();
4963N/A long evictNodes = environmentStats.getNNodesExplicitlyEvicted();
4963N/A long evictBinsStrip = environmentStats.getNBINsStripped();
4963N/A long cleanerRuns = environmentStats.getNCleanerRuns();
4963N/A long cleanerDeletions = environmentStats.getNCleanerDeletions();
4963N/A long cleanerEntriesRead =
4963N/A environmentStats.getNCleanerEntriesRead();
4963N/A long cleanerINCleaned = environmentStats.getNINsCleaned();
4963N/A long checkPoints = environmentStats.getNCheckpoints();
4963N/A if (evictPasses != 0)
4963N/A {
4963N/A if (!evicting)
4649N/A {
4963N/A evicting = true;
4963N/A evictionEntryCount = reader.getEntriesRead();
4963N/A message =
4963N/A NOTE_JEB_IMPORT_LDIF_EVICTION_DETECTED
4963N/A .get(evictionEntryCount);
4963N/A logError(message);
3339N/A }
4963N/A message =
4963N/A NOTE_JEB_IMPORT_LDIF_EVICTION_DETECTED_STATS.get(
4963N/A evictPasses, evictNodes, evictBinsStrip);
4963N/A logError(message);
4963N/A }
4963N/A if (cleanerRuns != 0)
4963N/A {
4963N/A message =
4963N/A NOTE_JEB_IMPORT_LDIF_CLEANER_STATS.get(cleanerRuns,
4963N/A cleanerDeletions, cleanerEntriesRead,
4963N/A cleanerINCleaned);
4963N/A logError(message);
4963N/A }
4963N/A if (checkPoints > 1)
4963N/A {
4963N/A message =
4963N/A NOTE_JEB_IMPORT_LDIF_BUFFER_CHECKPOINTS.get(checkPoints);
4963N/A logError(message);
4963N/A }
4963N/A previousStats = environmentStats;
4591N/A }
4963N/A catch (DatabaseException e)
4963N/A {
4963N/A // Unlikely to happen and not critical.
4963N/A }
4963N/A previousCount = latestCount;
4963N/A previousTime = latestTime;
4963N/A }
3339N/A }
4591N/A
4591N/A
4591N/A
4591N/A /**
4963N/A * This class reports progress of the second phase of import processing at
4963N/A * fixed intervals.
4591N/A */
4765N/A class SecondPhaseProgressTask extends TimerTask
4591N/A {
4591N/A /**
4591N/A * The number of entries that had been read at the time of the
4591N/A * previous progress report.
4591N/A */
4591N/A private long previousCount = 0;
4591N/A
4591N/A /**
4591N/A * The time in milliseconds of the previous progress report.
4591N/A */
4591N/A private long previousTime;
4591N/A
4591N/A /**
4591N/A * The environment statistics at the time of the previous report.
4591N/A */
4649N/A private EnvironmentStats previousStats;
4591N/A
4591N/A // Determines if eviction has been detected.
4591N/A private boolean evicting = false;
4591N/A
4765N/A private long latestCount;
4591N/A
4649N/A /**
4591N/A * Create a new import progress task.
4963N/A *
4765N/A * @param latestCount The latest count of entries processed in phase one.
4591N/A */
4963N/A public SecondPhaseProgressTask (long latestCount)
4591N/A {
4591N/A previousTime = System.currentTimeMillis();
4765N/A this.latestCount = latestCount;
4591N/A try
4591N/A {
4649N/A previousStats =
4591N/A rootContainer.getEnvironmentStats(new StatsConfig());
4591N/A }
4591N/A catch (DatabaseException e)
4591N/A {
4591N/A throw new RuntimeException(e);
4591N/A }
4591N/A }
4591N/A
4591N/A
4591N/A /**
4591N/A * The action to be performed by this timer task.
4591N/A */
4591N/A @Override
4591N/A public void run()
4591N/A {
4591N/A long deltaCount = (latestCount - previousCount);
4591N/A long latestTime = System.currentTimeMillis();
4591N/A long deltaTime = latestTime - previousTime;
4591N/A Message message;
4591N/A if (deltaTime == 0)
4591N/A {
4591N/A return;
4591N/A }
4591N/A try
4591N/A {
4649N/A Runtime runTime = Runtime.getRuntime();
4649N/A long freeMemory = runTime.freeMemory() / MB;
4649N/A EnvironmentStats environmentStats =
4591N/A rootContainer.getEnvironmentStats(new StatsConfig());
4649N/A long nCacheMiss = environmentStats.getNCacheMiss() -
4649N/A previousStats.getNCacheMiss();
4591N/A
4591N/A float cacheMissRate = 0;
4591N/A if (deltaCount > 0)
4591N/A {
4591N/A cacheMissRate = nCacheMiss / (float) deltaCount;
4591N/A }
4591N/A message =
4591N/A NOTE_JEB_IMPORT_CACHE_AND_MEMORY_REPORT.get(freeMemory,
4591N/A cacheMissRate);
4591N/A logError(message);
4649N/A long evictPasses = environmentStats.getNEvictPasses();
4649N/A long evictNodes = environmentStats.getNNodesExplicitlyEvicted();
4649N/A long evictBinsStrip = environmentStats.getNBINsStripped();
4649N/A long cleanerRuns = environmentStats.getNCleanerRuns();
4649N/A long cleanerDeletions = environmentStats.getNCleanerDeletions();
4649N/A long cleanerEntriesRead = environmentStats.getNCleanerEntriesRead();
4649N/A long cleanerINCleaned = environmentStats.getNINsCleaned();
4649N/A long checkPoints = environmentStats.getNCheckpoints();
4591N/A if (evictPasses != 0)
4591N/A {
4591N/A if (!evicting)
4591N/A {
4591N/A evicting = true;
4591N/A }
4591N/A message =
4591N/A NOTE_JEB_IMPORT_LDIF_EVICTION_DETECTED_STATS.get(
4591N/A evictPasses, evictNodes, evictBinsStrip);
4591N/A logError(message);
4591N/A }
4591N/A if (cleanerRuns != 0)
4591N/A {
4591N/A message =
4591N/A NOTE_JEB_IMPORT_LDIF_CLEANER_STATS.get(cleanerRuns,
4591N/A cleanerDeletions, cleanerEntriesRead,
4591N/A cleanerINCleaned);
4591N/A logError(message);
4591N/A }
4591N/A if (checkPoints > 1)
4591N/A {
4591N/A message =
4591N/A NOTE_JEB_IMPORT_LDIF_BUFFER_CHECKPOINTS.get(checkPoints);
4591N/A logError(message);
4591N/A }
4649N/A previousStats = environmentStats;
4591N/A }
4591N/A catch (DatabaseException e)
4591N/A {
4591N/A // Unlikely to happen and not critical.
4591N/A }
4591N/A previousCount = latestCount;
4591N/A previousTime = latestTime;
4591N/A
4963N/A //Do DN index managers first.
4963N/A for(IndexManager indexMgrDN : DNIndexMgrList)
4963N/A {
4963N/A indexMgrDN.printStats(deltaTime);
4963N/A }
4963N/A //Do non-DN index managers.
4643N/A for(IndexManager indexMgr : indexMgrList)
4591N/A {
4591N/A indexMgr.printStats(deltaTime);
4591N/A }
4591N/A }
4591N/A }
4643N/A
4643N/A
4643N/A /**
4643N/A * A class to hold information about the entry determined by the LDIF reader.
4963N/A * Mainly the suffix the entry belongs under and the ID assigned to it by the
4963N/A * reader.
4643N/A *
4643N/A */
4643N/A public class EntryInformation
4643N/A {
4643N/A private EntryID entryID;
4643N/A private Suffix suffix;
4643N/A
4643N/A
4643N/A /**
4643N/A * Return the suffix associated with the entry.
4643N/A *
4643N/A * @return Entry's suffix instance;
4643N/A */
4643N/A public Suffix getSuffix()
4643N/A {
4643N/A return suffix;
4643N/A }
4643N/A
4643N/A /**
4643N/A * Set the suffix instance associated with the entry.
4643N/A *
4643N/A * @param suffix The suffix associated with the entry.
4643N/A */
4643N/A public void setSuffix(Suffix suffix)
4643N/A {
4643N/A this.suffix = suffix;
4643N/A }
4643N/A
4643N/A /**
4643N/A * Set the entry's ID.
4643N/A *
4643N/A * @param entryID The entry ID to set the entry ID to.
4643N/A */
4643N/A public void setEntryID(EntryID entryID)
4643N/A {
4643N/A this.entryID = entryID;
4643N/A }
4643N/A
4643N/A /**
4643N/A * Return the entry ID associated with the entry.
4643N/A *
4643N/A * @return The entry ID associated with the entry.
4643N/A */
4643N/A public EntryID getEntryID()
4643N/A {
4643N/A return entryID;
4643N/A }
4643N/A }
4643N/A
4643N/A /**
4643N/A * This class defines the individual index type available.
4643N/A *
4643N/A */
4765N/A public enum ImportIndexType {
4643N/A /**
4643N/A * The DN index type.
4643N/A **/
4643N/A DN,
4643N/A
4643N/A /**
4643N/A * The equality index type.
4643N/A **/
4643N/A EQUALITY,
4643N/A
4643N/A /**
4643N/A * The presence index type.
4643N/A **/
4643N/A PRESENCE,
4643N/A
4643N/A /**
4649N/A * The sub-string index type.
4643N/A **/
4643N/A SUBSTRING,
4643N/A
4643N/A /**
4643N/A * The ordering index type.
4643N/A **/
4643N/A ORDERING,
4643N/A
4643N/A /**
4643N/A * The approximate index type.
4643N/A **/
4643N/A APPROXIMATE,
4643N/A
4643N/A /**
4649N/A * The extensible sub-string index type.
4643N/A **/
4643N/A EX_SUBSTRING,
4643N/A
4643N/A /**
4643N/A * The extensible shared index type.
4643N/A **/
4765N/A EX_SHARED,
4765N/A
4765N/A /**
4765N/A * The vlv index type.
4765N/A */
4765N/A VLV
4643N/A }
4643N/A
4643N/A /**
4649N/A * This class is used as an index key for hash maps that need to
4649N/A * process multiple suffix index elements into a single queue and/or maps
4649N/A * based on both attribute type and index type
4649N/A * (ie., cn.equality, sn.equality,...).
4643N/A */
4643N/A public class IndexKey {
4643N/A
4963N/A private final AttributeType attributeType;
4765N/A private final ImportIndexType indexType;
4963N/A private final int entryLimit;
4765N/A
4643N/A
4643N/A /**
4963N/A * Create index key instance using the specified attribute type, index type
4963N/A * and index entry limit.
4643N/A *
4963N/A * @param attributeType The attribute type.
4643N/A * @param indexType The index type.
4963N/A * @param entryLimit The entry limit for the index.
4643N/A */
4963N/A IndexKey(AttributeType attributeType, ImportIndexType indexType,
4963N/A int entryLimit)
4643N/A {
4963N/A this.attributeType = attributeType;
4643N/A this.indexType = indexType;
4963N/A this.entryLimit = entryLimit;
4643N/A }
4643N/A
4765N/A /**
4765N/A * An equals method that uses both the attribute type and the index type.
4963N/A * Only returns {@code true} if the attribute type and index type are
4963N/A * equal.
4765N/A *
4765N/A * @param obj the object to compare.
4963N/A * @return {@code true} if the objects are equal, or {@code false} if they
4963N/A * are not.
4765N/A */
4765N/A public boolean equals(Object obj)
4765N/A {
4765N/A if (obj instanceof IndexKey) {
4765N/A IndexKey oKey = (IndexKey) obj;
4963N/A if(attributeType.equals(oKey.getAttributeType()) &&
4963N/A indexType.equals(oKey.getIndexType()))
4765N/A {
4963N/A return true;
4765N/A }
4643N/A }
4963N/A return false;
4765N/A }
4643N/A
4643N/A /**
4649N/A * A hash code method that adds the hash codes of the attribute type and
4643N/A * index type and returns that value.
4643N/A *
4963N/A * @return The combined hash values of attribute type hash code and the
4963N/A * index type hash code.
4643N/A */
4643N/A public int hashCode()
4643N/A {
4963N/A return attributeType.hashCode() + indexType.hashCode();
4643N/A }
4643N/A
4643N/A /**
4643N/A * Return the attribute type.
4643N/A *
4643N/A * @return The attribute type.
4643N/A */
4963N/A public AttributeType getAttributeType()
4643N/A {
4963N/A return attributeType;
4643N/A }
4643N/A
4643N/A /**
4643N/A * Return the index type.
4963N/A *
4643N/A * @return The index type.
4643N/A */
4765N/A public ImportIndexType getIndexType()
4643N/A {
4643N/A return indexType;
4643N/A }
4643N/A
4643N/A /**
4643N/A * Return the index key name, which is the attribute type primary name,
4643N/A * a period, and the index type name. Used for building file names and
4963N/A * progress output.
4643N/A *
4643N/A * @return The index key name.
4643N/A */
4643N/A public String getName()
4643N/A {
4963N/A return attributeType.getPrimaryName() + "." +
4765N/A StaticUtils.toLowerCase(indexType.name());
4643N/A }
4963N/A
4963N/A /**
4963N/A * Return the entry limit associated with the index.
4963N/A *
4963N/A * @return The entry limit.
4963N/A */
4963N/A public int getEntryLimit()
4963N/A {
4963N/A return entryLimit;
4963N/A }
4963N/A }
4963N/A
4963N/A
4963N/A /**
4963N/A * The temporary enviroment will be shared when multiple suffixes are being
4963N/A * processed. This interface is used by those suffix instance to do parental
4963N/A * checking of the DN cache.
4963N/A */
4963N/A public static interface DNCache {
4963N/A
4963N/A /**
4963N/A * Returns {@code true} if the specified DN is contained in the DN cache,
4963N/A * or {@code false} otherwise.
4963N/A *
4963N/A * @param dn The DN to check the presence of.
4963N/A * @return {@code true} if the cache contains the DN, or {@code false} if it
4963N/A * is not.
4963N/A * @throws DatabaseException If an error occurs reading the database.
4963N/A */
4963N/A public boolean contains(DN dn) throws DatabaseException;
4963N/A }
4963N/A
4963N/A /**
4963N/A * Temporary environment used to check DN's when DN validation is performed
4963N/A * during phase one processing. It is deleted after phase one processing.
4963N/A */
4963N/A
4963N/A public final class TmpEnv implements DNCache
4963N/A {
4963N/A private String envPath;
4963N/A private Environment environment;
4963N/A private static final String DB_NAME = "dn_cache";
4963N/A private Database dnCache;
4963N/A
4963N/A /**
4963N/A * Create a temporary DB environment and database to be used as a cache of
4963N/A * DNs when DN validation is performed in phase one processing.
4963N/A *
4963N/A * @param envPath The file path to create the enviroment under.
4963N/A * @throws DatabaseException If an error occurs either creating the
4963N/A * environment or the DN database.
4963N/A */
4963N/A public TmpEnv(File envPath) throws DatabaseException
4963N/A {
4963N/A EnvironmentConfig envConfig = new EnvironmentConfig();
4963N/A envConfig.setConfigParam(EnvironmentConfig.ENV_RUN_CLEANER, "true");
4963N/A envConfig.setReadOnly(false);
4963N/A envConfig.setAllowCreate(true);
4963N/A envConfig.setTransactional(false);
4963N/A envConfig.setConfigParam(EnvironmentConfig.ENV_IS_LOCKING, "true");
4963N/A envConfig.setConfigParam(EnvironmentConfig.ENV_RUN_CHECKPOINTER, "false");
4963N/A envConfig.setConfigParam(EnvironmentConfig.EVICTOR_LRU_ONLY, "false");
4963N/A envConfig.setConfigParam(EnvironmentConfig.EVICTOR_NODES_PER_SCAN, "128");
4963N/A envConfig.setConfigParam(EnvironmentConfig.MAX_MEMORY,
4963N/A Long.toString(tmpEnvCacheSize));
4963N/A DatabaseConfig dbConfig = new DatabaseConfig();
4963N/A dbConfig.setAllowCreate(true);
4963N/A dbConfig.setTransactional(false);
4963N/A dbConfig.setTemporary(true);
4963N/A environment = new Environment(envPath, envConfig);
4963N/A dnCache = environment.openDatabase(null, DB_NAME, dbConfig);
4963N/A this.envPath = envPath.getPath();
4963N/A }
4963N/A
4963N/A private static final long FNV_INIT = 0xcbf29ce484222325L;
4963N/A private static final long FNV_PRIME = 0x100000001b3L;
4963N/A
4963N/A //Hash the DN bytes. Uses the FNV-1a hash.
4963N/A private byte[] hashCode(byte[] b)
4963N/A {
4963N/A long hash = FNV_INIT;
5117N/A for (byte aB : b) {
5117N/A hash ^= aB;
4963N/A hash *= FNV_PRIME;
4963N/A }
4963N/A return JebFormat.entryIDToDatabase(hash);
4963N/A }
4963N/A
4963N/A /**
4963N/A * Shutdown the temporary environment.
4963N/A * @throws JebException If error occurs.
4963N/A */
4963N/A public void shutdown() throws JebException
4963N/A {
4963N/A dnCache.close();
4963N/A environment.close();
4963N/A EnvManager.removeFiles(envPath);
4963N/A }
4963N/A
4963N/A /**
4963N/A * Insert the specified DN into the DN cache. It will return {@code true} if
4963N/A * the DN does not already exist in the cache and was inserted, or
4963N/A * {@code false} if the DN exists already in the cache.
4963N/A *
4963N/A * @param dn The DN to insert in the cache.
4963N/A * @param val A database entry to use in the insert.
4963N/A * @param key A database entry to use in the insert.
4963N/A * @return {@code true} if the DN was inserted in the cache, or
4963N/A * {@code false} if the DN exists in the cache already and could
4963N/A * not be inserted.
4963N/A * @throws JebException If an error occurs accessing the database.
4963N/A */
4963N/A public boolean insert(DN dn, DatabaseEntry val, DatabaseEntry key)
4963N/A throws JebException
4963N/A {
4963N/A byte[] dnBytes = StaticUtils.getBytes(dn.toNormalizedString());
4963N/A int len = PackedInteger.getWriteIntLength(dnBytes.length);
4963N/A byte[] dataBytes = new byte[dnBytes.length + len];
4963N/A int pos = PackedInteger.writeInt(dataBytes, 0, dnBytes.length);
4963N/A System.arraycopy(dnBytes, 0, dataBytes, pos, dnBytes.length);
4963N/A val.setData(dataBytes);
4963N/A key.setData(hashCode(dnBytes));
4963N/A return insert(key, val, dnBytes);
4963N/A }
4963N/A
4963N/A private boolean insert(DatabaseEntry key, DatabaseEntry val, byte[] dnBytes)
4963N/A throws JebException
4963N/A {
4963N/A boolean inserted = true;
4963N/A Cursor cursor = null;
4963N/A try
4963N/A {
4963N/A cursor = dnCache.openCursor(null, CursorConfig.DEFAULT);
4963N/A OperationStatus status = cursor.putNoOverwrite(key, val);
4963N/A if(status == OperationStatus.KEYEXIST)
4963N/A {
4963N/A DatabaseEntry dns = new DatabaseEntry();
4963N/A inserted = false;
4963N/A status = cursor.getSearchKey(key, dns, LockMode.RMW);
4963N/A if(status == OperationStatus.NOTFOUND)
4963N/A {
4963N/A Message message = Message.raw(Category.JEB, Severity.SEVERE_ERROR,
4963N/A "Search DN cache failed.");
4963N/A throw new JebException(message);
4963N/A }
4963N/A if(!isDNMatched(dns, dnBytes))
4963N/A {
4963N/A addDN(dns, cursor, dnBytes);
4963N/A inserted = true;
4963N/A }
4963N/A }
4963N/A }
4963N/A finally
4963N/A {
4963N/A if(cursor != null)
4963N/A {
4963N/A cursor.close();
4963N/A }
4963N/A }
4963N/A return inserted;
4963N/A }
4963N/A
4963N/A //Add the DN to the DNs as because of a hash collision.
4963N/A private void addDN(DatabaseEntry val, Cursor cursor,
4963N/A byte[] dnBytes) throws JebException
4963N/A {
4963N/A int pos = 0;
4963N/A byte[] bytes = val.getData();
4963N/A int pLen = PackedInteger.getWriteIntLength(dnBytes.length);
4963N/A int totLen = bytes.length + (pLen + dnBytes.length);
4963N/A byte[] newRec = new byte[totLen];
4963N/A System.arraycopy(bytes, 0, newRec, 0, bytes.length);
4963N/A pos = bytes.length;
4963N/A pos = PackedInteger.writeInt(newRec, pos, dnBytes.length);
4963N/A System.arraycopy(dnBytes, 0, newRec, pos, dnBytes.length);
4963N/A DatabaseEntry newVal = new DatabaseEntry(newRec);
4963N/A OperationStatus status = cursor.putCurrent(newVal);
4963N/A if(status != OperationStatus.SUCCESS)
4963N/A {
4963N/A Message message = Message.raw(Category.JEB, Severity.SEVERE_ERROR,
4963N/A "Add of DN to DN cache failed.");
4963N/A throw new JebException(message);
4963N/A }
4963N/A }
4963N/A
4963N/A //Return true if the specified DN is in the DNs saved as a result of hash
4963N/A //collisions.
4963N/A private boolean isDNMatched(DatabaseEntry dns, byte[] dnBytes)
4963N/A {
4963N/A int pos = 0, len = 0;
4963N/A byte[] bytes = dns.getData();
4963N/A while(pos < dns.getData().length)
4963N/A {
4963N/A int pLen = PackedInteger.getReadIntLength(bytes, pos);
4963N/A len = PackedInteger.readInt(bytes, pos);
4963N/A if(dnComparator.compare(bytes, pos + pLen, len, dnBytes,
4963N/A dnBytes.length) == 0)
4963N/A {
4963N/A return true;
4963N/A }
4963N/A pos += pLen + len;
4963N/A }
4963N/A return false;
4963N/A }
4963N/A
4963N/A /**
4963N/A * Check if the specified DN is contained in the temporary DN cache.
4963N/A *
4963N/A * @param dn A DN check for.
4965N/A * @return {@code true} if the specified DN is in the temporary DN cache,
4965N/A * or {@code false} if it is not.
4963N/A */
4963N/A public boolean contains(DN dn)
4963N/A {
4963N/A boolean dnExists = false;
4963N/A Cursor cursor = null;
4963N/A DatabaseEntry key = new DatabaseEntry();
4963N/A byte[] dnBytes = StaticUtils.getBytes(dn.toNormalizedString());
4963N/A key.setData(hashCode(dnBytes));
4963N/A try {
4963N/A cursor = dnCache.openCursor(null, CursorConfig.DEFAULT);
4963N/A DatabaseEntry dns = new DatabaseEntry();
4963N/A OperationStatus status =
4963N/A cursor.getSearchKey(key, dns, LockMode.DEFAULT);
4963N/A if(status == OperationStatus.SUCCESS)
4963N/A {
4963N/A dnExists = isDNMatched(dns, dnBytes);
4963N/A }
4963N/A }
4963N/A finally
4963N/A {
4963N/A if(cursor != null)
4963N/A {
4963N/A cursor.close();
4963N/A }
4963N/A }
4963N/A return dnExists;
4963N/A }
4963N/A
4963N/A /**
4963N/A * Return temporary environment stats.
4963N/A *
4963N/A * @param statsConfig A stats configuration instance.
4963N/A *
4963N/A * @return Environment stats.
4963N/A * @throws DatabaseException If an error occurs retrieving the stats.
4963N/A */
4963N/A public EnvironmentStats getEnvironmentStats(StatsConfig statsConfig)
4963N/A throws DatabaseException
4963N/A {
4963N/A return environment.getStats(statsConfig);
4963N/A }
4963N/A }
4963N/A
4963N/A
4963N/A /**
4963N/A * Uncaught exception handler. Try and catch any uncaught exceptions, log
4963N/A * them and print a stack trace.
4963N/A */
4963N/A public
4963N/A class DefaultExceptionHandler implements Thread.UncaughtExceptionHandler {
4963N/A
4963N/A /**
4963N/A * {@inheritDoc}
4963N/A */
4963N/A public void uncaughtException(Thread t, Throwable e) {
4963N/A Message message = ERR_JEB_IMPORT_UNCAUGHT_EXCEPTION.get(e.getMessage());
4963N/A logError(message);
4963N/A e.printStackTrace();
4963N/A System.exit(1);
4963N/A }
4643N/A }
3339N/A}