Importer.java revision f0a048d41a13eca4cba405da9403c2469ca3d1ea
0N/A/*
2581N/A * CDDL HEADER START
0N/A *
0N/A * The contents of this file are subject to the terms of the
0N/A * Common Development and Distribution License, Version 1.0 only
0N/A * (the "License"). You may not use this file except in compliance
2362N/A * with the License.
0N/A *
2362N/A * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
0N/A * or http://forgerock.org/license/CDDLv1.0.html.
0N/A * See the License for the specific language governing permissions
0N/A * and limitations under the License.
0N/A *
0N/A * When distributing Covered Code, include this CDDL HEADER in each
0N/A * file and include the License file at legal-notices/CDDLv1_0.txt.
0N/A * If applicable, add the following below this CDDL HEADER, with the
0N/A * fields enclosed by brackets "[]" replaced with your own identifying
0N/A * information:
0N/A * Portions Copyright [yyyy] [name of copyright owner]
0N/A *
2362N/A * CDDL HEADER END
2362N/A *
2362N/A *
0N/A * Copyright 2008-2010 Sun Microsystems, Inc.
0N/A * Portions Copyright 2011-2015 ForgeRock AS
0N/A */
0N/Apackage org.opends.server.backends.pluggable;
0N/A
0N/Aimport static org.opends.messages.BackendMessages.*;
0N/Aimport static org.opends.server.admin.std.meta.BackendIndexCfgDefn.IndexType.*;
0N/Aimport static org.opends.server.backends.pluggable.EntryIDSet.*;
0N/Aimport static org.opends.server.backends.pluggable.SuffixContainer.*;
0N/Aimport static org.opends.server.util.DynamicConstants.*;
0N/Aimport static org.opends.server.util.ServerConstants.*;
0N/Aimport static org.opends.server.util.StaticUtils.*;
0N/A
0N/Aimport java.io.BufferedInputStream;
0N/Aimport java.io.BufferedOutputStream;
0N/Aimport java.io.ByteArrayOutputStream;
0N/Aimport java.io.Closeable;
0N/Aimport java.io.DataInputStream;
0N/Aimport java.io.DataOutputStream;
0N/Aimport java.io.File;
1945N/Aimport java.io.FileInputStream;
1945N/Aimport java.io.FileNotFoundException;
1945N/Aimport java.io.FileOutputStream;
1945N/Aimport java.io.IOException;
1945N/Aimport java.io.RandomAccessFile;
1945N/Aimport java.lang.reflect.InvocationHandler;
1945N/Aimport java.lang.reflect.Method;
0N/Aimport java.lang.reflect.Proxy;
0N/Aimport java.util.ArrayList;
0N/Aimport java.util.Arrays;
0N/Aimport java.util.Collection;
0N/Aimport java.util.Collections;
0N/Aimport java.util.HashMap;
0N/Aimport java.util.HashSet;
0N/Aimport java.util.Iterator;
0N/Aimport java.util.LinkedHashMap;
0N/Aimport java.util.LinkedList;
0N/Aimport java.util.List;
0N/Aimport java.util.Map;
0N/Aimport java.util.NavigableSet;
0N/Aimport java.util.Set;
0N/Aimport java.util.SortedSet;
0N/Aimport java.util.Timer;
0N/Aimport java.util.TimerTask;
0N/Aimport java.util.TreeMap;
0N/Aimport java.util.TreeSet;
0N/Aimport java.util.concurrent.ArrayBlockingQueue;
0N/Aimport java.util.concurrent.BlockingQueue;
0N/Aimport java.util.concurrent.Callable;
0N/Aimport java.util.concurrent.ConcurrentHashMap;
0N/Aimport java.util.concurrent.CopyOnWriteArrayList;
0N/Aimport java.util.concurrent.ExecutionException;
0N/Aimport java.util.concurrent.ExecutorService;
0N/Aimport java.util.concurrent.Executors;
0N/Aimport java.util.concurrent.Future;
0N/Aimport java.util.concurrent.LinkedBlockingQueue;
0N/Aimport java.util.concurrent.ScheduledThreadPoolExecutor;
0N/Aimport java.util.concurrent.Semaphore;
0N/Aimport java.util.concurrent.TimeUnit;
0N/Aimport java.util.concurrent.atomic.AtomicBoolean;
0N/Aimport java.util.concurrent.atomic.AtomicInteger;
0N/Aimport java.util.concurrent.atomic.AtomicLong;
0N/A
0N/Aimport org.forgerock.i18n.slf4j.LocalizedLogger;
0N/Aimport org.forgerock.opendj.config.server.ConfigException;
0N/Aimport org.forgerock.opendj.ldap.ByteSequence;
0N/Aimport org.forgerock.opendj.ldap.ByteSequenceReader;
0N/Aimport org.forgerock.opendj.ldap.ByteString;
0N/Aimport org.forgerock.opendj.ldap.ByteStringBuilder;
0N/Aimport org.forgerock.opendj.ldap.spi.IndexingOptions;
0N/Aimport org.forgerock.util.Utils;
0N/Aimport org.opends.server.admin.std.meta.BackendIndexCfgDefn.IndexType;
2028N/Aimport org.opends.server.admin.std.server.BackendIndexCfg;
0N/Aimport org.opends.server.admin.std.server.PersistitBackendCfg;
0N/Aimport org.opends.server.admin.std.server.PluggableBackendCfg;
0N/Aimport org.opends.server.backends.RebuildConfig;
0N/Aimport org.opends.server.backends.RebuildConfig.RebuildMode;
0N/Aimport org.opends.server.backends.persistit.PersistItStorage;
0N/Aimport org.opends.server.backends.pluggable.AttributeIndex.MatchingRuleIndex;
0N/Aimport org.opends.server.backends.pluggable.spi.Cursor;
647N/Aimport org.opends.server.backends.pluggable.spi.ReadOperation;
0N/Aimport org.opends.server.backends.pluggable.spi.ReadableTransaction;
0N/Aimport org.opends.server.backends.pluggable.spi.Storage;
0N/Aimport org.opends.server.backends.pluggable.spi.StorageRuntimeException;
0N/Aimport org.opends.server.backends.pluggable.spi.TreeName;
0N/Aimport org.opends.server.backends.pluggable.spi.UpdateFunction;
0N/Aimport org.opends.server.backends.pluggable.spi.WriteOperation;
0N/Aimport org.opends.server.backends.pluggable.spi.WriteableTransaction;
0N/Aimport org.opends.server.core.DirectoryServer;
0N/Aimport org.opends.server.core.ServerContext;
0N/Aimport org.opends.server.types.AttributeType;
0N/Aimport org.opends.server.types.DN;
0N/Aimport org.opends.server.types.DirectoryException;
0N/Aimport org.opends.server.types.Entry;
0N/Aimport org.opends.server.types.InitializationException;
0N/Aimport org.opends.server.types.LDIFImportConfig;
0N/Aimport org.opends.server.types.LDIFImportResult;
0N/Aimport org.opends.server.util.Platform;
0N/A
0N/A/**
0N/A * This class provides the engine that performs both importing of LDIF files and
0N/A * the rebuilding of indexes.
0N/A */
0N/Afinal class Importer
0N/A{
0N/A private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
0N/A
0N/A private static final int TIMER_INTERVAL = 10000;
0N/A private static final String DEFAULT_TMP_DIR = "import-tmp";
0N/A private static final String DN_CACHE_DIR = "dn-cache";
0N/A
0N/A /** Defaults for DB cache. */
0N/A private static final int MAX_DB_CACHE_SIZE = 8 * MB;
0N/A private static final int MAX_DB_LOG_SIZE = 10 * MB;
0N/A private static final int MIN_DB_CACHE_SIZE = 4 * MB;
0N/A
0N/A /**
0N/A * Defaults for LDIF reader buffers, min memory required to import and default
0N/A * size for byte buffers.
0N/A */
0N/A private static final int READER_WRITER_BUFFER_SIZE = 8 * KB;
0N/A private static final int MIN_DB_CACHE_MEMORY = MAX_DB_CACHE_SIZE + MAX_DB_LOG_SIZE;
0N/A
0N/A /** Max size of phase one buffer. */
0N/A private static final int MAX_BUFFER_SIZE = 2 * MB;
517N/A /** Min size of phase one buffer. */
0N/A private static final int MIN_BUFFER_SIZE = 4 * KB;
0N/A /** Min size of phase two read-ahead cache. */
0N/A private static final int MIN_READ_AHEAD_CACHE_SIZE = 2 * KB;
0N/A /** Small heap threshold used to give more memory to JVM to attempt OOM errors. */
0N/A private static final int SMALL_HEAP_SIZE = 256 * MB;
0N/A
0N/A /** The DN attribute type. */
0N/A private static final AttributeType DN_TYPE;
0N/A
0N/A /** Root container. */
0N/A private final RootContainer rootContainer;
0N/A /** Import configuration. */
0N/A private final LDIFImportConfig importCfg;
0N/A private final ServerContext serverContext;
0N/A
0N/A /** LDIF reader. */
0N/A private ImportLDIFReader reader;
0N/A /** Phase one buffer count. */
0N/A private final AtomicInteger bufferCount = new AtomicInteger(0);
0N/A /** Phase one imported entries count. */
0N/A private final AtomicLong importCount = new AtomicLong(0);
0N/A /** Migrated entry count. */
0N/A private int migratedCount;
0N/A
0N/A /** Phase one buffer size in bytes. */
0N/A private int bufferSize;
0N/A /** Index count. */
0N/A private final int indexCount;
0N/A /** Thread count. */
0N/A private int threadCount;
0N/A
0N/A /** Whether DN validation should be performed. If true, then it is performed during phase one. */
0N/A private final boolean validateDNs;
0N/A
0N/A /** Temp scratch directory. */
0N/A private final File tempDir;
0N/A /** DN cache used when DN validation is done in first phase. */
0N/A private final DNCache dnCache;
0N/A /** Size in bytes of DN cache. */
0N/A private long dnCacheSize;
0N/A /** Available memory at the start of the import. */
0N/A private long availableMemory;
0N/A /** Size in bytes of DB cache. */
0N/A private long dbCacheSize;
0N/A
2581N/A /** The executor service used for the buffer sort tasks. */
2581N/A private ExecutorService bufferSortService;
0N/A /** The executor service used for the scratch file processing tasks. */
0N/A private ExecutorService scratchFileWriterService;
0N/A
0N/A /** Queue of free index buffers -- used to re-cycle index buffers. */
0N/A private final BlockingQueue<IndexOutputBuffer> freeBufferQueue = new LinkedBlockingQueue<>();
0N/A
0N/A /**
0N/A * Map of index keys to index buffers. Used to allocate sorted index buffers
0N/A * to a index writer thread.
16N/A */
0N/A private final Map<IndexKey, BlockingQueue<IndexOutputBuffer>> indexKeyQueueMap = new ConcurrentHashMap<>();
0N/A
0N/A /** Map of DB containers to index managers. Used to start phase 2. */
0N/A private final List<IndexManager> indexMgrList = new LinkedList<>();
0N/A /** Map of DB containers to DN-based index managers. Used to start phase 2. */
0N/A private final List<IndexManager> DNIndexMgrList = new LinkedList<>();
0N/A
0N/A /**
0N/A * Futures used to indicate when the index file writers are done flushing
0N/A * their work queues and have exited. End of phase one.
0N/A */
0N/A private final List<Future<Void>> scratchFileWriterFutures = new CopyOnWriteArrayList<>();
0N/A /**
0N/A * List of index file writer tasks. Used to signal stopScratchFileWriters to
0N/A * the index file writer tasks when the LDIF file has been done.
0N/A */
0N/A private final List<ScratchFileWriterTask> scratchFileWriterList;
0N/A
0N/A /** Map of DNs to Suffix objects. */
0N/A private final Map<DN, Suffix> dnSuffixMap = new LinkedHashMap<>();
0N/A /** Map of indexIDs to database containers. */
2581N/A private final ConcurrentHashMap<Integer, Index> indexIDToIndexMap = new ConcurrentHashMap<>();
2581N/A /** Map of indexIDs to entry containers. */
2581N/A private final ConcurrentHashMap<Integer, EntryContainer> indexIDToECMap = new ConcurrentHashMap<>();
2581N/A
2581N/A /** Used to synchronize when a scratch file index writer is first setup. */
2581N/A private final Object synObj = new Object();
2581N/A
0N/A /** Rebuild index manager used when rebuilding indexes. */
0N/A private final RebuildIndexManager rebuildManager;
0N/A
2581N/A /** Set to true if the backend was cleared. */
0N/A private final boolean clearedBackend;
0N/A
0N/A /** Used to shutdown import if an error occurs in phase one. */
0N/A private volatile boolean isCanceled;
0N/A
0N/A /** Number of phase one buffers. */
0N/A private int phaseOneBufferCount;
0N/A
0N/A static
0N/A {
0N/A AttributeType attrType = DirectoryServer.getAttributeType("dn");
0N/A if (attrType == null)
0N/A {
0N/A attrType = DirectoryServer.getDefaultAttributeType("dn");
0N/A }
0N/A DN_TYPE = attrType;
0N/A }
0N/A
0N/A /**
0N/A * Create a new import job with the specified rebuild index config.
0N/A *
0N/A * @param rebuildConfig
0N/A * The rebuild index configuration.
0N/A * @param cfg
0N/A * The local DB back-end configuration.
0N/A * @throws InitializationException
0N/A * If a problem occurs during initialization.
0N/A * @throws StorageRuntimeException
0N/A * If an error occurred when opening the DB.
0N/A * @throws ConfigException
0N/A * If a problem occurs during initialization.
0N/A */
0N/A Importer(RootContainer rootContainer, RebuildConfig rebuildConfig, PluggableBackendCfg cfg,
0N/A ServerContext serverContext) throws InitializationException, StorageRuntimeException, ConfigException
0N/A {
0N/A this.rootContainer = rootContainer;
0N/A this.importCfg = null;
0N/A this.serverContext = serverContext;
0N/A this.threadCount = 1;
0N/A this.rebuildManager = new RebuildIndexManager(rootContainer.getStorage(), rebuildConfig, cfg);
0N/A this.indexCount = rebuildManager.getIndexCount();
0N/A this.clearedBackend = false;
0N/A this.scratchFileWriterList = new ArrayList<>(indexCount);
0N/A
0N/A this.tempDir = prepareTempDir(cfg, rebuildConfig.getTmpDirectory());
2028N/A computeMemoryRequirements();
2028N/A this.validateDNs = false;
2028N/A this.dnCache = null;
0N/A }
0N/A
0N/A /**
0N/A * Create a new import job with the specified ldif import config.
0N/A *
0N/A * @param importCfg
0N/A * The LDIF import configuration.
0N/A * @param backendCfg
0N/A * The local DB back-end configuration.
0N/A * @throws InitializationException
0N/A * If a problem occurs during initialization.
0N/A * @throws ConfigException
0N/A * If a problem occurs reading the configuration.
0N/A * @throws StorageRuntimeException
0N/A * If an error occurred when opening the DB.
0N/A */
0N/A Importer(RootContainer rootContainer, LDIFImportConfig importCfg, PluggableBackendCfg backendCfg,
0N/A ServerContext serverContext) throws InitializationException, ConfigException, StorageRuntimeException
0N/A {
0N/A this.rootContainer = rootContainer;
2581N/A this.rebuildManager = null;
2581N/A this.importCfg = importCfg;
2581N/A this.serverContext = serverContext;
2581N/A
2581N/A if (importCfg.getThreadCount() == 0)
2581N/A {
2581N/A this.threadCount = Runtime.getRuntime().availableProcessors() * 2;
2581N/A }
2581N/A else
2581N/A {
2581N/A this.threadCount = importCfg.getThreadCount();
2581N/A }
2581N/A
2581N/A // Determine the number of indexes.
2581N/A this.indexCount = getTotalIndexCount(backendCfg);
2581N/A
2581N/A this.clearedBackend = mustClearBackend(importCfg, backendCfg);
2581N/A this.scratchFileWriterList = new ArrayList<>(indexCount);
2581N/A
2581N/A validateDNs = !importCfg.getSkipDNValidation();
2581N/A this.tempDir = prepareTempDir(backendCfg, importCfg.getTmpDirectory());
0N/A // be careful: requires that a few data has been set
647N/A computeMemoryRequirements();
647N/A
647N/A if (validateDNs)
2581N/A {
647N/A final File dnCachePath = new File(tempDir, DN_CACHE_DIR);
647N/A dnCachePath.mkdirs();
647N/A this.dnCache = new DNCacheImpl(dnCachePath);
2581N/A }
647N/A else
647N/A {
647N/A this.dnCache = null;
647N/A }
647N/A }
647N/A
2581N/A private File prepareTempDir(PluggableBackendCfg backendCfg, String tmpDirectory) throws InitializationException
647N/A {
0N/A File parentDir = getFileForPath(tmpDirectory != null ? tmpDirectory : DEFAULT_TMP_DIR);
0N/A File tempDir = new File(parentDir, backendCfg.getBackendId());
0N/A recursiveDelete(tempDir);
0N/A if (!tempDir.exists() && !tempDir.mkdirs())
0N/A {
0N/A throw new InitializationException(ERR_IMPORT_CREATE_TMPDIR_ERROR.get(tempDir));
0N/A }
0N/A return tempDir;
0N/A }
0N/A
0N/A /**
0N/A * Returns whether the backend must be cleared.
0N/A *
0N/A * @param importCfg
0N/A * the import configuration object
0N/A * @param backendCfg
0N/A * the backend configuration object
0N/A * @return true if the backend must be cleared, false otherwise
0N/A * @see Importer#getSuffix(WriteableTransaction, EntryContainer) for per-suffix cleanups.
0N/A */
0N/A static boolean mustClearBackend(LDIFImportConfig importCfg, PluggableBackendCfg backendCfg)
0N/A {
0N/A return !importCfg.appendToExistingData()
517N/A && (importCfg.clearBackend() || backendCfg.getBaseDN().size() <= 1);
0N/A /*
0N/A * Why do we clear when there is only one baseDN?
0N/A * any baseDN for which data is imported will be cleared anyway (see getSuffix()),
0N/A * so if there is only one baseDN for this backend, then clear it now.
0N/A */
647N/A }
0N/A
2581N/A private static int getTotalIndexCount(PluggableBackendCfg backendCfg) throws ConfigException
0N/A {
0N/A int indexes = 2; // dn2id, dn2uri
0N/A for (String indexName : backendCfg.listBackendIndexes())
0N/A {
0N/A BackendIndexCfg index = backendCfg.getBackendIndex(indexName);
0N/A SortedSet<IndexType> types = index.getIndexType();
647N/A if (types.contains(IndexType.EXTENSIBLE))
2581N/A {
0N/A indexes += types.size() - 1 + index.getIndexExtensibleMatchingRule().size();
0N/A }
0N/A else
0N/A {
0N/A indexes += types.size();
0N/A }
0N/A }
0N/A return indexes;
0N/A }
0N/A
0N/A /**
0N/A * Return the suffix instance in the specified map that matches the specified
0N/A * DN.
0N/A *
0N/A * @param dn
0N/A * The DN to search for.
0N/A * @param map
0N/A * The map to search.
0N/A * @return The suffix instance that matches the DN, or null if no match is
0N/A * found.
0N/A */
0N/A public static Suffix getMatchSuffix(DN dn, Map<DN, Suffix> map)
0N/A {
0N/A Suffix suffix = null;
0N/A DN nodeDN = dn;
0N/A
0N/A while (suffix == null && nodeDN != null)
0N/A {
0N/A suffix = map.get(nodeDN);
0N/A if (suffix == null)
0N/A {
0N/A nodeDN = nodeDN.getParentDNInSuffix();
0N/A }
0N/A }
0N/A return suffix;
0N/A }
0N/A
0N/A /**
0N/A * Calculate buffer sizes and initialize properties based on memory.
0N/A *
0N/A * @throws InitializationException
0N/A * If a problem occurs during calculation.
0N/A */
647N/A private void computeMemoryRequirements() throws InitializationException
0N/A {
647N/A // Calculate amount of usable memory. This will need to take into account
0N/A // various fudge factors, including the number of IO buffers used by the
647N/A // scratch writers (1 per index).
0N/A calculateAvailableMemory();
647N/A
647N/A final long usableMemory = availableMemory - (indexCount * READER_WRITER_BUFFER_SIZE);
647N/A
647N/A // We need caching when doing DN validation or rebuilding indexes.
647N/A if (validateDNs || rebuildManager != null)
647N/A {
0N/A // DN validation: calculate memory for DB cache, DN2ID temporary cache, and buffers.
0N/A if (System.getProperty(PROPERTY_RUNNING_UNIT_TESTS) != null)
647N/A {
0N/A dbCacheSize = 500 * KB;
0N/A dnCacheSize = 500 * KB;
0N/A }
647N/A else if (usableMemory < (MIN_DB_CACHE_MEMORY + MIN_DB_CACHE_SIZE))
0N/A {
0N/A dbCacheSize = MIN_DB_CACHE_SIZE;
0N/A dnCacheSize = MIN_DB_CACHE_SIZE;
0N/A }
0N/A else if (!clearedBackend)
0N/A {
0N/A // Appending to existing data so reserve extra memory for the DB cache
0N/A // since it will be needed for dn2id queries.
0N/A dbCacheSize = usableMemory * 33 / 100;
2581N/A dnCacheSize = usableMemory * 33 / 100;
0N/A }
0N/A else
0N/A {
0N/A dbCacheSize = MAX_DB_CACHE_SIZE;
0N/A dnCacheSize = usableMemory * 66 / 100;
0N/A }
0N/A }
0N/A else
0N/A {
0N/A // No DN validation: calculate memory for DB cache and buffers.
0N/A
0N/A // No need for DN2ID cache.
0N/A dnCacheSize = 0;
0N/A
0N/A if (System.getProperty(PROPERTY_RUNNING_UNIT_TESTS) != null)
0N/A {
0N/A dbCacheSize = 500 * KB;
0N/A }
0N/A else if (usableMemory < MIN_DB_CACHE_MEMORY)
0N/A {
0N/A dbCacheSize = MIN_DB_CACHE_SIZE;
0N/A }
0N/A else
0N/A {
0N/A // No need to differentiate between append/clear backend, since dn2id is
0N/A // not being queried.
0N/A dbCacheSize = MAX_DB_CACHE_SIZE;
0N/A }
0N/A }
0N/A
0N/A final long phaseOneBufferMemory = usableMemory - dbCacheSize - dnCacheSize;
0N/A final int oldThreadCount = threadCount;
0N/A if (indexCount != 0) // Avoid / by zero
0N/A {
0N/A while (true)
0N/A {
0N/A phaseOneBufferCount = 2 * indexCount * threadCount;
0N/A
0N/A // Scratch writers allocate 4 buffers per index as well.
0N/A final int totalPhaseOneBufferCount = phaseOneBufferCount + (4 * indexCount);
0N/A long longBufferSize = phaseOneBufferMemory / totalPhaseOneBufferCount;
0N/A // We need (2 * bufferSize) to fit in an int for the insertByteStream
0N/A // and deleteByteStream constructors.
0N/A bufferSize = (int) Math.min(longBufferSize, Integer.MAX_VALUE / 2);
0N/A
0N/A if (bufferSize > MAX_BUFFER_SIZE)
0N/A {
0N/A if (validateDNs)
0N/A {
0N/A // The buffers are big enough: the memory is best used for the DN2ID temp DB
0N/A bufferSize = MAX_BUFFER_SIZE;
0N/A
0N/A final long extraMemory = phaseOneBufferMemory - (totalPhaseOneBufferCount * bufferSize);
0N/A if (!clearedBackend)
0N/A {
0N/A dbCacheSize += extraMemory / 2;
0N/A dnCacheSize += extraMemory / 2;
0N/A }
0N/A else
0N/A {
0N/A dnCacheSize += extraMemory;
0N/A }
0N/A }
0N/A
0N/A break;
0N/A }
0N/A else if (bufferSize > MIN_BUFFER_SIZE)
0N/A {
0N/A // This is acceptable.
0N/A break;
0N/A }
0N/A else if (threadCount > 1)
0N/A {
0N/A // Retry using less threads.
0N/A threadCount--;
0N/A }
0N/A else
0N/A {
0N/A // Not enough memory.
0N/A final long minimumPhaseOneBufferMemory = totalPhaseOneBufferCount * MIN_BUFFER_SIZE;
0N/A throw new InitializationException(ERR_IMPORT_LDIF_LACK_MEM.get(
0N/A usableMemory, minimumPhaseOneBufferMemory + dbCacheSize + dnCacheSize));
0N/A }
0N/A }
0N/A }
0N/A
0N/A if (oldThreadCount != threadCount)
0N/A {
0N/A logger.info(NOTE_IMPORT_ADJUST_THREAD_COUNT, oldThreadCount, threadCount);
0N/A }
0N/A
0N/A logger.info(NOTE_IMPORT_LDIF_TOT_MEM_BUF, availableMemory, phaseOneBufferCount);
0N/A if (dnCacheSize > 0)
0N/A {
0N/A logger.info(NOTE_IMPORT_LDIF_TMP_ENV_MEM, dnCacheSize);
0N/A }
0N/A logger.info(NOTE_IMPORT_LDIF_DB_MEM_BUF_INFO, dbCacheSize, bufferSize);
0N/A }
0N/A
0N/A /**
0N/A * Calculates the amount of available memory which can be used by this import,
0N/A * taking into account whether or not the import is running offline or online
0N/A * as a task.
0N/A */
0N/A private void calculateAvailableMemory()
0N/A {
0N/A final long totalAvailableMemory;
0N/A if (DirectoryServer.isRunning())
0N/A {
0N/A // Online import/rebuild.
0N/A final long availableMemory = serverContext.getMemoryQuota().getAvailableMemory();
517N/A totalAvailableMemory = Math.max(availableMemory, 16 * MB);
0N/A }
0N/A else
0N/A {
0N/A // Offline import/rebuild.
0N/A totalAvailableMemory = Platform.getUsableMemoryForCaching();
0N/A }
0N/A
0N/A // Now take into account various fudge factors.
0N/A int importMemPct = 90;
517N/A if (totalAvailableMemory <= SMALL_HEAP_SIZE)
0N/A {
0N/A // Be pessimistic when memory is low.
0N/A importMemPct -= 25;
0N/A }
0N/A if (rebuildManager != null)
0N/A {
0N/A // Rebuild seems to require more overhead.
0N/A importMemPct -= 15;
0N/A }
0N/A
0N/A availableMemory = totalAvailableMemory * importMemPct / 100;
0N/A }
0N/A
0N/A private void initializeIndexBuffers()
517N/A {
0N/A for (int i = 0; i < phaseOneBufferCount; i++)
0N/A {
0N/A freeBufferQueue.add(new IndexOutputBuffer(bufferSize));
0N/A }
0N/A }
0N/A
0N/A private void initializeSuffixes(WriteableTransaction txn) throws StorageRuntimeException,
0N/A ConfigException
0N/A {
517N/A for (EntryContainer ec : rootContainer.getEntryContainers())
0N/A {
0N/A Suffix suffix = getSuffix(txn, ec);
0N/A if (suffix != null)
0N/A {
0N/A dnSuffixMap.put(ec.getBaseDN(), suffix);
0N/A generateIndexID(suffix);
0N/A }
0N/A }
0N/A }
0N/A
0N/A /**
0N/A * Mainly used to support multiple suffixes. Each index in each suffix gets an
0N/A * unique ID to identify which DB it needs to go to in phase two processing.
0N/A */
0N/A private void generateIndexID(Suffix suffix)
0N/A {
0N/A for (AttributeIndex attributeIndex : suffix.getAttrIndexMap().values())
0N/A {
0N/A for (Index index : attributeIndex.getNameToIndexes().values())
0N/A {
0N/A putInIdContainerMap(index);
0N/A }
0N/A }
0N/A }
0N/A
0N/A private void putInIdContainerMap(Index index)
0N/A {
0N/A if (index != null)
0N/A {
0N/A indexIDToIndexMap.putIfAbsent(getIndexID(index), index);
0N/A }
0N/A }
0N/A
0N/A private static int getIndexID(DatabaseContainer index)
0N/A {
0N/A return System.identityHashCode(index);
0N/A }
0N/A
0N/A private Suffix getSuffix(WriteableTransaction txn, EntryContainer entryContainer)
0N/A throws ConfigException
0N/A {
0N/A DN baseDN = entryContainer.getBaseDN();
0N/A EntryContainer sourceEntryContainer = null;
0N/A List<DN> includeBranches = new ArrayList<>();
0N/A List<DN> excludeBranches = new ArrayList<>();
0N/A
0N/A if (!importCfg.appendToExistingData()
0N/A && !importCfg.clearBackend())
0N/A {
0N/A for (DN dn : importCfg.getExcludeBranches())
0N/A {
0N/A if (baseDN.equals(dn))
0N/A {
0N/A // This entire base DN was explicitly excluded. Skip.
0N/A return null;
0N/A }
0N/A if (baseDN.isAncestorOf(dn))
0N/A {
0N/A excludeBranches.add(dn);
0N/A }
0N/A }
0N/A
0N/A if (!importCfg.getIncludeBranches().isEmpty())
0N/A {
0N/A for (DN dn : importCfg.getIncludeBranches())
0N/A {
0N/A if (baseDN.isAncestorOf(dn))
0N/A {
0N/A includeBranches.add(dn);
0N/A }
0N/A }
0N/A
0N/A if (includeBranches.isEmpty())
0N/A {
0N/A /*
0N/A * There are no branches in the explicitly defined include list under
0N/A * this base DN. Skip this base DN all together.
0N/A */
0N/A return null;
0N/A }
0N/A
0N/A // Remove any overlapping include branches.
0N/A Iterator<DN> includeBranchIterator = includeBranches.iterator();
0N/A while (includeBranchIterator.hasNext())
1308N/A {
1308N/A DN includeDN = includeBranchIterator.next();
0N/A if (!isAnyNotEqualAndAncestorOf(includeBranches, includeDN))
1308N/A {
1308N/A includeBranchIterator.remove();
1308N/A }
1308N/A }
0N/A
1308N/A // Remove any exclude branches that are not are not under a include
1308N/A // branch since they will be migrated as part of the existing entries
0N/A // outside of the include branches anyways.
0N/A Iterator<DN> excludeBranchIterator = excludeBranches.iterator();
0N/A while (excludeBranchIterator.hasNext())
2028N/A {
2028N/A DN excludeDN = excludeBranchIterator.next();
2028N/A if (!isAnyAncestorOf(includeBranches, excludeDN))
2028N/A {
2028N/A excludeBranchIterator.remove();
2028N/A }
2028N/A }
2028N/A
2028N/A if (excludeBranches.isEmpty()
2028N/A && includeBranches.size() == 1
2028N/A && includeBranches.get(0).equals(baseDN))
2028N/A {
2028N/A // This entire base DN is explicitly included in the import with
2028N/A // no exclude branches that we need to migrate. Just clear the entry
2028N/A // container.
2028N/A clearSuffix(entryContainer);
2028N/A }
2028N/A else
2028N/A {
2028N/A // Create a temp entry container
2028N/A sourceEntryContainer = entryContainer;
2028N/A entryContainer = createEntryContainer(txn, baseDN);
2028N/A }
2028N/A }
2028N/A }
2028N/A return new Suffix(entryContainer, sourceEntryContainer, includeBranches, excludeBranches);
2028N/A }
2028N/A
2028N/A private EntryContainer createEntryContainer(WriteableTransaction txn, DN baseDN) throws ConfigException
2028N/A {
2028N/A try
2028N/A {
2028N/A DN tempDN = baseDN.child(DN.valueOf("dc=importTmp"));
2028N/A return rootContainer.openEntryContainer(tempDN, txn);
2028N/A }
2028N/A catch (DirectoryException e)
2028N/A {
2028N/A throw new ConfigException(e.getMessageObject());
0N/A }
0N/A }
0N/A
0N/A private static void clearSuffix(EntryContainer entryContainer)
0N/A {
0N/A entryContainer.lock();
0N/A entryContainer.clear();
0N/A entryContainer.unlock();
0N/A }
0N/A
0N/A private static boolean isAnyNotEqualAndAncestorOf(List<DN> dns, DN childDN)
0N/A {
0N/A for (DN dn : dns)
0N/A {
0N/A if (!dn.equals(childDN) && dn.isAncestorOf(childDN))
0N/A {
0N/A return false;
0N/A }
0N/A }
0N/A return true;
0N/A }
0N/A
0N/A private static boolean isAnyAncestorOf(List<DN> dns, DN childDN)
0N/A {
0N/A for (DN dn : dns)
0N/A {
0N/A if (dn.isAncestorOf(childDN))
0N/A {
0N/A return true;
0N/A }
0N/A }
0N/A return false;
0N/A }
0N/A
0N/A /**
0N/A * Rebuild the indexes using the specified root container.
0N/A *
0N/A * @throws ConfigException
0N/A * If a configuration error occurred.
0N/A * @throws InitializationException
0N/A * If an initialization error occurred.
0N/A * @throws StorageRuntimeException
0N/A * If the database had an error.
0N/A * @throws InterruptedException
0N/A * If an interrupted error occurred.
0N/A * @throws ExecutionException
0N/A * If an execution error occurred.
0N/A */
0N/A public void rebuildIndexes() throws ConfigException, InitializationException, StorageRuntimeException,
0N/A InterruptedException, ExecutionException
0N/A {
0N/A try
0N/A {
0N/A if (rebuildManager.rebuildConfig.isClearDegradedState())
0N/A {
0N/A clearDegradedState();
0N/A }
0N/A else
0N/A {
0N/A rebuildIndexes0();
0N/A }
0N/A }
0N/A catch (Exception e)
0N/A {
0N/A logger.traceException(e);
0N/A }
0N/A }
0N/A
0N/A private void clearDegradedState() throws Exception
0N/A {
0N/A rootContainer.getStorage().write(new WriteOperation()
0N/A {
0N/A @Override
0N/A public void run(WriteableTransaction txn) throws Exception
0N/A {
0N/A final long startTime = System.currentTimeMillis();
0N/A rebuildManager.initialize();
0N/A rebuildManager.printStartMessage(txn);
0N/A rebuildManager.clearDegradedState(txn);
0N/A recursiveDelete(tempDir);
0N/A rebuildManager.printStopMessage(startTime);
0N/A }
0N/A });
0N/A }
0N/A
0N/A private void rebuildIndexes0() throws Exception
0N/A {
0N/A final long startTime = System.currentTimeMillis();
0N/A final Storage storage = rootContainer.getStorage();
0N/A storage.write(new WriteOperation()
0N/A {
0N/A @Override
0N/A public void run(WriteableTransaction txn) throws Exception
0N/A {
0N/A rebuildManager.initialize();
0N/A rebuildManager.printStartMessage(txn);
0N/A rebuildManager.preRebuildIndexes(txn);
0N/A }
0N/A });
0N/A
0N/A rebuildManager.rebuildIndexesPhaseOne();
0N/A rebuildManager.throwIfCancelled();
0N/A rebuildManager.rebuildIndexesPhaseTwo();
0N/A
0N/A storage.write(new WriteOperation()
0N/A {
0N/A @Override
0N/A public void run(WriteableTransaction txn) throws Exception
0N/A {
0N/A rebuildManager.postRebuildIndexes(txn);
0N/A }
0N/A });
0N/A recursiveDelete(tempDir);
0N/A rebuildManager.printStopMessage(startTime);
0N/A }
0N/A
0N/A /**
517N/A * Import a LDIF using the specified root container.
0N/A *
517N/A * @return A LDIF result.
0N/A * @throws Exception
0N/A * If the import failed
0N/A */
0N/A public LDIFImportResult processImport() throws Exception
0N/A {
0N/A try {
0N/A try
0N/A {
0N/A reader = new ImportLDIFReader(importCfg, rootContainer);
0N/A }
0N/A catch (IOException ioe)
0N/A {
0N/A throw new InitializationException(ERR_IMPORT_LDIF_READER_IO_ERROR.get(), ioe);
0N/A }
0N/A
0N/A logger.info(NOTE_IMPORT_STARTING, DirectoryServer.getVersionString(), BUILD_ID, REVISION_NUMBER);
0N/A logger.info(NOTE_IMPORT_THREAD_COUNT, threadCount);
0N/A
0N/A final Storage storage = rootContainer.getStorage();
0N/A storage.write(new WriteOperation()
0N/A {
0N/A @Override
0N/A public void run(WriteableTransaction txn) throws Exception
0N/A {
0N/A initializeSuffixes(txn);
0N/A setIndexesTrusted(txn, false);
0N/A }
0N/A });
0N/A
0N/A final long startTime = System.currentTimeMillis();
0N/A importPhaseOne();
0N/A final long phaseOneFinishTime = System.currentTimeMillis();
0N/A if (validateDNs)
0N/A {
0N/A dnCache.close();
0N/A }
0N/A
0N/A if (isCanceled)
0N/A {
0N/A throw new InterruptedException("Import processing canceled.");
0N/A }
0N/A
0N/A final long phaseTwoTime = System.currentTimeMillis();
0N/A importPhaseTwo();
0N/A if (isCanceled)
0N/A {
0N/A throw new InterruptedException("Import processing canceled.");
0N/A }
0N/A final long phaseTwoFinishTime = System.currentTimeMillis();
0N/A
0N/A storage.write(new WriteOperation()
0N/A {
0N/A @Override
0N/A public void run(WriteableTransaction txn) throws Exception
0N/A {
0N/A setIndexesTrusted(txn, true);
0N/A switchEntryContainers(txn);
0N/A }
0N/A });
0N/A recursiveDelete(tempDir);
0N/A final long finishTime = System.currentTimeMillis();
0N/A final long importTime = finishTime - startTime;
0N/A logger.info(NOTE_IMPORT_PHASE_STATS, importTime / 1000,
0N/A (phaseOneFinishTime - startTime) / 1000,
0N/A (phaseTwoFinishTime - phaseTwoTime) / 1000);
0N/A float rate = 0;
517N/A if (importTime > 0)
0N/A {
0N/A rate = 1000f * reader.getEntriesRead() / importTime;
0N/A }
0N/A logger.info(NOTE_IMPORT_FINAL_STATUS, reader.getEntriesRead(), importCount.get(),
0N/A reader.getEntriesIgnored(), reader.getEntriesRejected(),
0N/A migratedCount, importTime / 1000, rate);
0N/A return new LDIFImportResult(reader.getEntriesRead(),
0N/A reader.getEntriesRejected(), reader.getEntriesIgnored());
0N/A }
0N/A finally
0N/A {
0N/A close(reader);
0N/A if (validateDNs)
0N/A {
0N/A close(dnCache);
0N/A }
0N/A }
0N/A }
0N/A
0N/A private void switchEntryContainers(WriteableTransaction txn) throws StorageRuntimeException, InitializationException
0N/A {
517N/A for (Suffix suffix : dnSuffixMap.values())
0N/A {
0N/A DN baseDN = suffix.getBaseDN();
0N/A EntryContainer entryContainer = suffix.getSrcEntryContainer();
0N/A if (entryContainer != null)
0N/A {
0N/A final EntryContainer toDelete = rootContainer.unregisterEntryContainer(baseDN);
0N/A toDelete.lock();
0N/A toDelete.close();
0N/A toDelete.delete(txn);
0N/A toDelete.unlock();
0N/A
0N/A final EntryContainer replacement = suffix.getEntryContainer();
0N/A replacement.lock();
0N/A replacement.setDatabasePrefix(baseDN.toNormalizedUrlSafeString());
0N/A replacement.unlock();
0N/A rootContainer.registerEntryContainer(baseDN, replacement);
1113N/A }
1113N/A }
1113N/A }
1113N/A
1139N/A private void setIndexesTrusted(WriteableTransaction txn, boolean trusted) throws StorageRuntimeException
1113N/A {
1113N/A try
1113N/A {
0N/A for (Suffix s : dnSuffixMap.values())
0N/A {
0N/A s.setIndexesTrusted(txn, trusted);
0N/A }
0N/A }
0N/A catch (StorageRuntimeException ex)
0N/A {
0N/A throw new StorageRuntimeException(NOTE_IMPORT_LDIF_TRUSTED_FAILED.get(ex.getMessage()).toString());
0N/A }
0N/A }
0N/A
0N/A /**
0N/A * Reads all entries from id2entry, and:
0N/A * <ol>
0N/A * <li>compute how the entry is indexed for each index</li>
0N/A * <li>store the result of indexing entries into in-memory index buffers</li>
0N/A * <li>each time an in-memory index buffer is filled, sort it and write it to scratch files.
0N/A * The scratch files will be read by phaseTwo to perform on-disk merge</li>
0N/A * </ol>
0N/A */
0N/A private void importPhaseOne() throws Exception
0N/A {
0N/A initializeIndexBuffers();
0N/A
0N/A final ScheduledThreadPoolExecutor timerService = new ScheduledThreadPoolExecutor(1);
0N/A scheduleAtFixedRate(timerService, new FirstPhaseProgressTask());
0N/A scratchFileWriterService = Executors.newFixedThreadPool(2 * indexCount);
0N/A bufferSortService = Executors.newFixedThreadPool(threadCount);
0N/A final ExecutorService execService = Executors.newFixedThreadPool(threadCount);
0N/A
0N/A final Storage storage = rootContainer.getStorage();
0N/A execService.submit(new MigrateExistingTask(storage)).get();
0N/A
0N/A final List<Callable<Void>> tasks = new ArrayList<>(threadCount);
0N/A if (importCfg.appendToExistingData()
0N/A && importCfg.replaceExistingEntries())
0N/A {
0N/A for (int i = 0; i < threadCount; i++)
0N/A {
0N/A tasks.add(new AppendReplaceTask(storage));
0N/A }
0N/A }
0N/A else
0N/A {
0N/A for (int i = 0; i < threadCount; i++)
0N/A {
0N/A tasks.add(new ImportTask(storage));
0N/A }
0N/A }
0N/A execService.invokeAll(tasks);
0N/A tasks.clear();
517N/A
0N/A execService.submit(new MigrateExcludedTask(storage)).get();
0N/A
0N/A stopScratchFileWriters();
0N/A getAll(scratchFileWriterFutures);
0N/A
0N/A shutdownAll(timerService, execService, bufferSortService, scratchFileWriterService);
0N/A
0N/A // Try to clear as much memory as possible.
0N/A clearAll(scratchFileWriterList, scratchFileWriterFutures, freeBufferQueue);
0N/A indexKeyQueueMap.clear();
0N/A }
0N/A
0N/A private static void scheduleAtFixedRate(ScheduledThreadPoolExecutor timerService, Runnable task)
0N/A {
0N/A timerService.scheduleAtFixedRate(task, TIMER_INTERVAL, TIMER_INTERVAL, TimeUnit.MILLISECONDS);
0N/A }
0N/A
0N/A private static void shutdownAll(ExecutorService... executorServices) throws InterruptedException
0N/A {
0N/A for (ExecutorService executorService : executorServices)
0N/A {
0N/A executorService.shutdown();
0N/A }
0N/A for (ExecutorService executorService : executorServices)
0N/A {
0N/A executorService.awaitTermination(30, TimeUnit.SECONDS);
0N/A }
0N/A }
0N/A
0N/A private static void clearAll(Collection<?>... cols)
0N/A {
0N/A for (Collection<?> col : cols)
0N/A {
0N/A col.clear();
0N/A }
0N/A }
0N/A
0N/A private void importPhaseTwo() throws Exception
0N/A {
517N/A ScheduledThreadPoolExecutor timerService = new ScheduledThreadPoolExecutor(1);
0N/A scheduleAtFixedRate(timerService, new SecondPhaseProgressTask());
0N/A try
0N/A {
0N/A processIndexFiles();
0N/A }
0N/A finally
0N/A {
0N/A shutdownAll(timerService);
0N/A }
0N/A }
0N/A
0N/A /**
0N/A * Performs on-disk merge by reading several scratch files at once
0N/A * and write their ordered content into the target indexes.
0N/A */
2581N/A private void processIndexFiles() throws Exception
0N/A {
2581N/A if (bufferCount.get() == 0)
0N/A {
0N/A return;
0N/A }
0N/A int dbThreads = Runtime.getRuntime().availableProcessors();
0N/A if (dbThreads < 4)
2581N/A {
2581N/A dbThreads = 4;
2581N/A }
0N/A
0N/A // Calculate memory / buffer counts.
0N/A final long usableMemory = availableMemory - dbCacheSize;
0N/A int readAheadSize;
0N/A int buffers;
0N/A while (true)
0N/A {
0N/A final List<IndexManager> allIndexMgrs = new ArrayList<>(DNIndexMgrList);
0N/A allIndexMgrs.addAll(indexMgrList);
0N/A Collections.sort(allIndexMgrs, Collections.reverseOrder());
0N/A
0N/A buffers = 0;
0N/A final int limit = Math.min(dbThreads, allIndexMgrs.size());
0N/A for (int i = 0; i < limit; i++)
0N/A {
0N/A buffers += allIndexMgrs.get(i).numberOfBuffers;
0N/A }
0N/A
0N/A readAheadSize = (int) (usableMemory / buffers);
0N/A if (readAheadSize > bufferSize)
0N/A {
0N/A // Cache size is never larger than the buffer size.
0N/A readAheadSize = bufferSize;
0N/A break;
0N/A }
0N/A else if (readAheadSize > MIN_READ_AHEAD_CACHE_SIZE)
0N/A {
0N/A // This is acceptable.
0N/A break;
0N/A }
0N/A else if (dbThreads > 1)
0N/A {
0N/A // Reduce thread count.
0N/A dbThreads--;
0N/A }
0N/A else
0N/A {
0N/A // Not enough memory - will need to do batching for the biggest indexes.
517N/A readAheadSize = MIN_READ_AHEAD_CACHE_SIZE;
0N/A buffers = (int) (usableMemory / readAheadSize);
0N/A
0N/A logger.warn(WARN_IMPORT_LDIF_LACK_MEM_PHASE_TWO, usableMemory);
0N/A break;
517N/A }
0N/A }
0N/A
0N/A // Ensure that there are minimum two threads available for parallel
0N/A // processing of smaller indexes.
0N/A dbThreads = Math.max(2, dbThreads);
0N/A
0N/A logger.info(NOTE_IMPORT_LDIF_PHASE_TWO_MEM_REPORT, availableMemory, readAheadSize, buffers);
0N/A
0N/A // Start indexing tasks.
0N/A ExecutorService dbService = Executors.newFixedThreadPool(dbThreads);
647N/A Semaphore permits = new Semaphore(buffers);
0N/A
0N/A // Start DN processing first.
0N/A Storage storage = rootContainer.getStorage();
0N/A storage.close();
0N/A try (final org.opends.server.backends.pluggable.spi.Importer importer = storage.startImport())
0N/A {
0N/A List<Future<Void>> futures = new LinkedList<>();
0N/A submitIndexDBWriteTasks(DNIndexMgrList, importer, dbService, permits, buffers, readAheadSize, futures);
0N/A submitIndexDBWriteTasks(indexMgrList, importer, dbService, permits, buffers, readAheadSize, futures);
0N/A getAll(futures);
0N/A }
0N/A finally
0N/A {
0N/A storage.open();
647N/A }
0N/A
0N/A shutdownAll(dbService);
0N/A }
0N/A
0N/A private void submitIndexDBWriteTasks(List<IndexManager> indexMgrs,
0N/A org.opends.server.backends.pluggable.spi.Importer importer,
0N/A ExecutorService dbService, Semaphore permits, int buffers, int readAheadSize, List<Future<Void>> futures)
0N/A {
0N/A for (IndexManager indexMgr : indexMgrs)
0N/A {
0N/A futures.add(dbService.submit(new IndexDBWriteTask(importer, indexMgr, permits, buffers, readAheadSize)));
0N/A }
0N/A }
0N/A
0N/A private static <T> void getAll(List<Future<T>> futures) throws InterruptedException, ExecutionException
0N/A {
0N/A for (Future<?> result : futures)
0N/A {
0N/A result.get();
0N/A }
0N/A }
0N/A
0N/A private void stopScratchFileWriters()
0N/A {
0N/A final IndexOutputBuffer stopProcessing = IndexOutputBuffer.poison();
0N/A for (ScratchFileWriterTask task : scratchFileWriterList)
0N/A {
0N/A task.queue.add(stopProcessing);
647N/A }
0N/A }
0N/A
0N/A /** Task used to migrate excluded branch. */
0N/A private final class MigrateExcludedTask extends ImportTask
0N/A {
0N/A private MigrateExcludedTask(final Storage storage)
0N/A {
0N/A super(storage);
0N/A }
0N/A
0N/A @Override
0N/A void call0(WriteableTransaction txn) throws Exception
0N/A {
0N/A for (Suffix suffix : dnSuffixMap.values())
0N/A {
0N/A EntryContainer entryContainer = suffix.getSrcEntryContainer();
0N/A if (entryContainer != null && !suffix.getExcludeBranches().isEmpty())
0N/A {
0N/A logger.info(NOTE_IMPORT_MIGRATION_START, "excluded", suffix.getBaseDN());
0N/A Cursor<ByteString, ByteString> cursor = txn.openCursor(entryContainer.getDN2ID().getName());
0N/A try
0N/A {
0N/A for (DN excludedDN : suffix.getExcludeBranches())
0N/A {
0N/A final ByteString key = DnKeyFormat.dnToDNKey(excludedDN, suffix.getBaseDN().size());
0N/A boolean success = cursor.positionToKeyOrNext(key);
0N/A if (success && key.equals(cursor.getKey()))
0N/A {
0N/A // This is the base entry for a branch that was excluded in the
0N/A // import so we must migrate all entries in this branch over to
0N/A // the new entry container.
0N/A ByteStringBuilder end = new ByteStringBuilder(key.length() + 1);
0N/A end.append((byte) 0x01);
647N/A
0N/A while (success
0N/A && ByteSequence.COMPARATOR.compare(key, end) < 0
0N/A && !importCfg.isCancelled()
0N/A && !isCanceled)
0N/A {
0N/A EntryID id = new EntryID(cursor.getValue());
0N/A Entry entry = entryContainer.getID2Entry().get(txn, id);
0N/A processEntry(txn, entry, rootContainer.getNextEntryID(), suffix);
0N/A migratedCount++;
0N/A success = cursor.next();
0N/A }
647N/A }
647N/A }
0N/A flushIndexBuffers();
0N/A }
647N/A catch (Exception e)
0N/A {
0N/A logger.error(ERR_IMPORT_LDIF_MIGRATE_EXCLUDED_TASK_ERR, e.getMessage());
647N/A isCanceled = true;
647N/A throw e;
647N/A }
0N/A finally
0N/A {
647N/A close(cursor);
0N/A }
647N/A }
647N/A }
647N/A }
647N/A }
647N/A
647N/A /** Task to migrate existing entries. */
0N/A private final class MigrateExistingTask extends ImportTask
0N/A {
0N/A private MigrateExistingTask(final Storage storage)
0N/A {
0N/A super(storage);
0N/A }
0N/A
0N/A @Override
647N/A void call0(WriteableTransaction txn) throws Exception
0N/A {
0N/A for (Suffix suffix : dnSuffixMap.values())
0N/A {
0N/A EntryContainer entryContainer = suffix.getSrcEntryContainer();
0N/A if (entryContainer != null && !suffix.getIncludeBranches().isEmpty())
0N/A {
0N/A logger.info(NOTE_IMPORT_MIGRATION_START, "existing", suffix.getBaseDN());
0N/A Cursor<ByteString, ByteString> cursor = txn.openCursor(entryContainer.getDN2ID().getName());
0N/A try
0N/A {
0N/A final List<ByteString> includeBranches = includeBranchesAsBytes(suffix);
0N/A boolean success = cursor.next();
0N/A while (success
0N/A && !importCfg.isCancelled()
0N/A && !isCanceled)
0N/A {
0N/A final ByteString key = cursor.getKey();
0N/A if (!includeBranches.contains(key))
0N/A {
0N/A EntryID id = new EntryID(key);
0N/A Entry entry = entryContainer.getID2Entry().get(txn, id);
0N/A processEntry(txn, entry, rootContainer.getNextEntryID(), suffix);
0N/A migratedCount++;
0N/A success = cursor.next();
0N/A }
0N/A else
0N/A {
0N/A // This is the base entry for a branch that will be included
0N/A // in the import so we don't want to copy the branch to the
0N/A // new entry container.
0N/A
0N/A /*
0N/A * Advance the cursor to next entry at the same level in the DIT
517N/A * skipping all the entries in this branch. Set the next
0N/A * starting value to a value of equal length but slightly
0N/A * greater than the previous DN. Since keys are compared in
0N/A * reverse order we must set the first byte (the comma). No
0N/A * possibility of overflow here.
0N/A */
0N/A ByteStringBuilder begin = new ByteStringBuilder(key.length() + 1);
0N/A begin.append(key);
0N/A begin.append((byte) 0x01);
0N/A success = cursor.positionToKeyOrNext(begin);
0N/A }
0N/A }
0N/A flushIndexBuffers();
0N/A }
0N/A catch (Exception e)
0N/A {
0N/A logger.error(ERR_IMPORT_LDIF_MIGRATE_EXISTING_TASK_ERR, e.getMessage());
0N/A isCanceled = true;
0N/A throw e;
0N/A }
0N/A finally
0N/A {
0N/A close(cursor);
0N/A }
0N/A }
0N/A }
0N/A }
0N/A
0N/A private List<ByteString> includeBranchesAsBytes(Suffix suffix)
0N/A {
0N/A List<ByteString> includeBranches = new ArrayList<>(suffix.getIncludeBranches().size());
0N/A for (DN includeBranch : suffix.getIncludeBranches())
0N/A {
0N/A if (includeBranch.isDescendantOf(suffix.getBaseDN()))
0N/A {
0N/A includeBranches.add(DnKeyFormat.dnToDNKey(includeBranch, suffix.getBaseDN().size()));
0N/A }
0N/A }
0N/A return includeBranches;
0N/A }
0N/A }
0N/A
0N/A /**
0N/A * Task to perform append/replace processing.
0N/A */
0N/A private class AppendReplaceTask extends ImportTask
0N/A {
0N/A public AppendReplaceTask(final Storage storage)
0N/A {
0N/A super(storage);
0N/A }
0N/A
0N/A private final Set<ByteString> insertKeySet = new HashSet<>();
0N/A private final Set<ByteString> deleteKeySet = new HashSet<>();
0N/A private final EntryInformation entryInfo = new EntryInformation();
0N/A private Entry oldEntry;
0N/A private EntryID entryID;
0N/A
0N/A @Override
0N/A void call0(WriteableTransaction txn) throws Exception
0N/A {
0N/A try
0N/A {
0N/A while (true)
0N/A {
0N/A if (importCfg.isCancelled() || isCanceled)
0N/A {
517N/A freeBufferQueue.add(IndexOutputBuffer.poison());
0N/A return;
0N/A }
0N/A oldEntry = null;
0N/A Entry entry = reader.readEntry(dnSuffixMap, entryInfo);
0N/A if (entry == null)
0N/A {
0N/A break;
0N/A }
0N/A entryID = entryInfo.getEntryID();
0N/A Suffix suffix = entryInfo.getSuffix();
0N/A processEntry(txn, entry, suffix);
0N/A }
0N/A flushIndexBuffers();
0N/A }
0N/A catch (Exception e)
0N/A {
0N/A logger.error(ERR_IMPORT_LDIF_APPEND_REPLACE_TASK_ERR, e.getMessage());
0N/A isCanceled = true;
0N/A throw e;
0N/A }
0N/A }
0N/A
0N/A void processEntry(WriteableTransaction txn, Entry entry, Suffix suffix)
0N/A throws DirectoryException, StorageRuntimeException, InterruptedException
0N/A {
0N/A DN entryDN = entry.getName();
0N/A
0N/A EntryID oldID = suffix.getDN2ID().get(txn, entryDN);
0N/A if (oldID != null)
0N/A {
0N/A oldEntry = suffix.getID2Entry().get(txn, oldID);
0N/A }
0N/A
0N/A if (oldEntry == null)
0N/A {
0N/A if (validateDNs && !dnSanityCheck(txn, entryDN, entry, suffix))
0N/A {
0N/A suffix.removePending(entryDN);
0N/A return;
0N/A }
0N/A suffix.removePending(entryDN);
0N/A processDN2ID(suffix, entryDN, entryID);
0N/A }
0N/A else
0N/A {
0N/A suffix.removePending(entryDN);
0N/A entryID = oldID;
0N/A }
0N/A
0N/A processDN2URI(txn, suffix, oldEntry, entry);
0N/A suffix.getID2Entry().put(txn, entryID, entry);
0N/A if (oldEntry != null)
0N/A {
0N/A processAllIndexes(suffix, entry, entryID);
0N/A }
0N/A else
0N/A {
0N/A processIndexes(suffix, entry, entryID);
0N/A }
0N/A processVLVIndexes(txn, suffix, entry, entryID);
0N/A importCount.getAndIncrement();
0N/A }
0N/A
0N/A void processAllIndexes(Suffix suffix, Entry entry, EntryID entryID) throws StorageRuntimeException,
0N/A InterruptedException
0N/A {
0N/A for (Map.Entry<AttributeType, AttributeIndex> mapEntry : suffix.getAttrIndexMap().entrySet())
0N/A {
0N/A fillIndexKey(mapEntry.getValue(), entry, mapEntry.getKey(), entryID);
0N/A }
0N/A }
0N/A
0N/A @Override
0N/A void processAttribute(MatchingRuleIndex index, Entry entry, EntryID entryID, IndexingOptions options,
0N/A IndexKey indexKey) throws StorageRuntimeException, InterruptedException
0N/A {
0N/A if (oldEntry != null)
0N/A {
0N/A deleteKeySet.clear();
0N/A index.indexEntry(oldEntry, deleteKeySet, options);
0N/A for (ByteString delKey : deleteKeySet)
0N/A {
0N/A processKey(index, delKey, entryID, indexKey, false);
0N/A }
0N/A }
0N/A insertKeySet.clear();
0N/A index.indexEntry(entry, insertKeySet, options);
0N/A for (ByteString key : insertKeySet)
0N/A {
0N/A processKey(index, key, entryID, indexKey, true);
0N/A }
0N/A }
0N/A }
0N/A
0N/A /**
0N/A * This task performs phase reading and processing of the entries read from
0N/A * the LDIF file(s). This task is used if the append flag wasn't specified.
0N/A */
0N/A private class ImportTask implements Callable<Void>
0N/A {
0N/A private final Storage storage;
0N/A private final Map<IndexKey, IndexOutputBuffer> indexBufferMap = new HashMap<>();
0N/A private final Set<ByteString> insertKeySet = new HashSet<>();
647N/A private final EntryInformation entryInfo = new EntryInformation();
0N/A private final IndexKey dnIndexKey = new IndexKey(DN_TYPE, DN2ID_INDEX_NAME, 1);
0N/A
0N/A public ImportTask(final Storage storage)
0N/A {
0N/A this.storage = storage;
0N/A }
0N/A
0N/A /** {@inheritDoc} */
0N/A @Override
0N/A public final Void call() throws Exception
0N/A {
647N/A storage.write(new WriteOperation()
0N/A {
0N/A @Override
0N/A public void run(WriteableTransaction txn) throws Exception
0N/A {
0N/A call0(txn);
0N/A }
0N/A });
0N/A return null;
0N/A }
647N/A
0N/A void call0(WriteableTransaction txn) throws Exception
0N/A {
0N/A try
0N/A {
0N/A while (true)
0N/A {
0N/A if (importCfg.isCancelled() || isCanceled)
0N/A {
0N/A freeBufferQueue.add(IndexOutputBuffer.poison());
0N/A return;
0N/A }
0N/A Entry entry = reader.readEntry(dnSuffixMap, entryInfo);
0N/A if (entry == null)
0N/A {
0N/A break;
0N/A }
0N/A EntryID entryID = entryInfo.getEntryID();
0N/A Suffix suffix = entryInfo.getSuffix();
0N/A processEntry(txn, entry, entryID, suffix);
0N/A }
0N/A flushIndexBuffers();
0N/A }
0N/A catch (Exception e)
0N/A {
0N/A logger.error(ERR_IMPORT_LDIF_IMPORT_TASK_ERR, e.getMessage());
0N/A isCanceled = true;
0N/A throw e;
0N/A }
0N/A }
0N/A
0N/A void processEntry(WriteableTransaction txn, Entry entry, EntryID entryID, Suffix suffix)
0N/A throws DirectoryException, StorageRuntimeException, InterruptedException
0N/A {
0N/A DN entryDN = entry.getName();
0N/A if (validateDNs && !dnSanityCheck(txn, entryDN, entry, suffix))
0N/A {
0N/A suffix.removePending(entryDN);
0N/A return;
0N/A }
0N/A suffix.removePending(entryDN);
0N/A processDN2ID(suffix, entryDN, entryID);
0N/A processDN2URI(txn, suffix, null, entry);
0N/A processIndexes(suffix, entry, entryID);
0N/A processVLVIndexes(txn, suffix, entry, entryID);
0N/A suffix.getID2Entry().put(txn, entryID, entry);
0N/A importCount.getAndIncrement();
0N/A }
0N/A
0N/A /** Examine the DN for duplicates and missing parents. */
0N/A boolean dnSanityCheck(WriteableTransaction txn, DN entryDN, Entry entry, Suffix suffix)
0N/A throws StorageRuntimeException, InterruptedException
0N/A {
0N/A //Perform parent checking.
0N/A DN parentDN = suffix.getEntryContainer().getParentWithinBase(entryDN);
0N/A if (parentDN != null && !suffix.isParentProcessed(txn, parentDN, dnCache, clearedBackend))
0N/A {
0N/A reader.rejectEntry(entry, ERR_IMPORT_PARENT_NOT_FOUND.get(parentDN));
0N/A return false;
0N/A }
0N/A //If the backend was not cleared, then the dn2id needs to checked first
0N/A //for DNs that might not exist in the DN cache. If the DN is not in
0N/A //the suffixes dn2id DB, then the dn cache is used.
0N/A if (!clearedBackend)
0N/A {
0N/A EntryID id = suffix.getDN2ID().get(txn, entryDN);
0N/A if (id != null || !dnCache.insert(entryDN))
0N/A {
0N/A reader.rejectEntry(entry, WARN_IMPORT_ENTRY_EXISTS.get());
0N/A return false;
0N/A }
0N/A }
0N/A else if (!dnCache.insert(entryDN))
0N/A {
0N/A reader.rejectEntry(entry, WARN_IMPORT_ENTRY_EXISTS.get());
0N/A return false;
0N/A }
0N/A return true;
0N/A }
0N/A
0N/A void processIndexes(Suffix suffix, Entry entry, EntryID entryID) throws StorageRuntimeException,
0N/A InterruptedException
0N/A {
0N/A for (Map.Entry<AttributeType, AttributeIndex> mapEntry : suffix.getAttrIndexMap().entrySet())
0N/A {
0N/A AttributeType attributeType = mapEntry.getKey();
0N/A if (entry.hasAttribute(attributeType))
0N/A {
0N/A fillIndexKey(mapEntry.getValue(), entry, attributeType, entryID);
0N/A }
0N/A }
0N/A }
0N/A
0N/A void fillIndexKey(AttributeIndex attrIndex, Entry entry, AttributeType attrType, EntryID entryID)
0N/A throws InterruptedException, StorageRuntimeException
0N/A {
0N/A final IndexingOptions options = attrIndex.getIndexingOptions();
0N/A
0N/A for (Map.Entry<String, MatchingRuleIndex> mapEntry : attrIndex.getNameToIndexes().entrySet())
0N/A {
0N/A processAttribute(mapEntry.getValue(), mapEntry.getKey(), entry, attrType, entryID, options);
0N/A }
0N/A }
0N/A
0N/A void processVLVIndexes(WriteableTransaction txn, Suffix suffix, Entry entry, EntryID entryID)
0N/A throws DirectoryException
0N/A {
0N/A final EntryContainer entryContainer = suffix.getEntryContainer();
0N/A final IndexBuffer buffer = new IndexBuffer(entryContainer);
0N/A for (VLVIndex vlvIdx : entryContainer.getVLVIndexes())
0N/A {
0N/A vlvIdx.addEntry(buffer, entryID, entry);
0N/A }
0N/A buffer.flush(txn);
0N/A }
0N/A
0N/A private void processAttribute(MatchingRuleIndex index, String indexID, Entry entry,
0N/A AttributeType attributeType, EntryID entryID, IndexingOptions options) throws InterruptedException
0N/A {
0N/A if (index != null)
0N/A {
0N/A IndexKey indexKey = new IndexKey(attributeType, indexID, index.getIndexEntryLimit());
0N/A processAttribute(index, entry, entryID, options, indexKey);
0N/A }
0N/A }
0N/A
0N/A void processAttribute(MatchingRuleIndex index, Entry entry, EntryID entryID, IndexingOptions options,
0N/A IndexKey indexKey) throws StorageRuntimeException, InterruptedException
0N/A {
0N/A insertKeySet.clear();
0N/A index.indexEntry(entry, insertKeySet, options);
0N/A for (ByteString key : insertKeySet)
0N/A {
0N/A processKey(index, key, entryID, indexKey, true);
0N/A }
0N/A }
0N/A
0N/A void flushIndexBuffers() throws InterruptedException, ExecutionException
0N/A {
0N/A final ArrayList<Future<Void>> futures = new ArrayList<>();
0N/A for (IndexOutputBuffer indexBuffer : indexBufferMap.values())
0N/A {
0N/A indexBuffer.discard();
0N/A futures.add(bufferSortService.submit(new SortTask(indexBuffer)));
0N/A }
0N/A indexBufferMap.clear();
0N/A getAll(futures);
517N/A }
0N/A
0N/A int processKey(DatabaseContainer container, ByteString key, EntryID entryID,
0N/A IndexKey indexKey, boolean insert) throws InterruptedException
0N/A {
0N/A int sizeNeeded = IndexOutputBuffer.getRequiredSize(key.length(), entryID.longValue());
0N/A IndexOutputBuffer indexBuffer = indexBufferMap.get(indexKey);
0N/A if (indexBuffer == null)
0N/A {
0N/A indexBuffer = getNewIndexBuffer(sizeNeeded, indexKey);
0N/A indexBufferMap.put(indexKey, indexBuffer);
0N/A }
517N/A else if (!indexBuffer.isSpaceAvailable(key, entryID.longValue()))
0N/A {
0N/A // complete the current buffer...
0N/A bufferSortService.submit(new SortTask(indexBuffer));
0N/A // ... and get a new one
0N/A indexBuffer = getNewIndexBuffer(sizeNeeded, indexKey);
0N/A indexBufferMap.put(indexKey, indexBuffer);
0N/A }
517N/A int indexID = getIndexID(container);
0N/A indexBuffer.add(key, entryID, indexID, insert);
0N/A return indexID;
0N/A }
0N/A
0N/A IndexOutputBuffer getNewIndexBuffer(int size, IndexKey indexKey) throws InterruptedException
517N/A {
0N/A IndexOutputBuffer indexBuffer;
0N/A if (size > bufferSize)
0N/A {
0N/A indexBuffer = new IndexOutputBuffer(size);
0N/A indexBuffer.discard();
0N/A }
0N/A else
0N/A {
0N/A indexBuffer = freeBufferQueue.take();
517N/A if (indexBuffer == null)
0N/A {
0N/A throw new InterruptedException("Index buffer processing error.");
0N/A }
0N/A }
0N/A if (indexBuffer.isPoison())
0N/A {
0N/A throw new InterruptedException("Cancel processing received.");
0N/A }
0N/A indexBuffer.setIndexKey(indexKey);
0N/A return indexBuffer;
0N/A }
0N/A
0N/A void processDN2ID(Suffix suffix, DN dn, EntryID entryID)
0N/A throws InterruptedException
0N/A {
0N/A DN2ID dn2id = suffix.getDN2ID();
0N/A ByteString dnBytes = DnKeyFormat.dnToDNKey(dn, suffix.getBaseDN().size());
0N/A int indexID = processKey(dn2id, dnBytes, entryID, dnIndexKey, true);
0N/A indexIDToECMap.putIfAbsent(indexID, suffix.getEntryContainer());
517N/A }
0N/A
0N/A void processDN2URI(WriteableTransaction txn, Suffix suffix, Entry oldEntry, Entry newEntry)
0N/A throws StorageRuntimeException
0N/A {
0N/A DN2URI dn2uri = suffix.getDN2URI();
0N/A if (oldEntry != null)
0N/A {
0N/A dn2uri.replaceEntry(txn, oldEntry, newEntry);
517N/A }
0N/A else
0N/A {
0N/A dn2uri.addEntry(txn, newEntry);
0N/A }
0N/A }
0N/A }
0N/A
0N/A /**
0N/A * This task reads sorted records from the temporary index scratch files,
0N/A * processes the records and writes the results to the index database. The DN
0N/A * index is treated differently then non-DN indexes.
0N/A */
0N/A private final class IndexDBWriteTask implements Callable<Void>
0N/A {
0N/A private final org.opends.server.backends.pluggable.spi.Importer importer;
0N/A private final IndexManager indexMgr;
0N/A private final int cacheSize;
0N/A /** indexID => DNState map */
0N/A private final Map<Integer, DNState> dnStateMap = new HashMap<>();
0N/A private final Semaphore permits;
0N/A private final int maxPermits;
0N/A private final AtomicLong bytesRead = new AtomicLong();
0N/A private long lastBytesRead;
0N/A private final AtomicInteger keyCount = new AtomicInteger();
0N/A private RandomAccessFile bufferFile;
0N/A private DataInputStream bufferIndexFile;
0N/A private int remainingBuffers;
0N/A private volatile int totalBatches;
0N/A private AtomicInteger batchNumber = new AtomicInteger();
0N/A private int nextBufferID;
0N/A private int ownedPermits;
0N/A private volatile boolean isRunning;
0N/A
0N/A /**
0N/A * Creates a new index DB writer.
0N/A *
0N/A * @param importer
0N/A * The importer
0N/A * @param indexMgr
0N/A * The index manager.
0N/A * @param permits
0N/A * The semaphore used for restricting the number of buffer allocations.
0N/A * @param maxPermits
0N/A * The maximum number of buffers which can be allocated.
0N/A * @param cacheSize
0N/A * The buffer cache size.
0N/A */
0N/A public IndexDBWriteTask(org.opends.server.backends.pluggable.spi.Importer importer, IndexManager indexMgr,
0N/A Semaphore permits, int maxPermits, int cacheSize)
0N/A {
0N/A this.importer = importer;
0N/A this.indexMgr = indexMgr;
0N/A this.permits = permits;
0N/A this.maxPermits = maxPermits;
0N/A this.cacheSize = cacheSize;
0N/A }
0N/A
0N/A /**
0N/A * Initializes this task.
0N/A *
0N/A * @throws IOException
0N/A * If an IO error occurred.
0N/A */
0N/A public void beginWriteTask() throws IOException
0N/A {
0N/A bufferFile = new RandomAccessFile(indexMgr.getBufferFile(), "r");
0N/A bufferIndexFile =
0N/A new DataInputStream(new BufferedInputStream(new FileInputStream(
0N/A indexMgr.getBufferIndexFile())));
0N/A
0N/A remainingBuffers = indexMgr.getNumberOfBuffers();
0N/A totalBatches = (remainingBuffers / maxPermits) + 1;
0N/A batchNumber.set(0);
0N/A nextBufferID = 0;
0N/A ownedPermits = 0;
0N/A
0N/A logger.info(NOTE_IMPORT_LDIF_INDEX_STARTED, indexMgr.getBufferFileName(), remainingBuffers, totalBatches);
0N/A
0N/A indexMgr.setIndexDBWriteTask(this);
0N/A isRunning = true;
0N/A }
0N/A
0N/A /**
0N/A * Returns the next batch of buffers to be processed, blocking until enough
0N/A * buffer permits are available.
0N/A *
0N/A * @return The next batch of buffers, or {@code null} if there are no more
0N/A * buffers to be processed.
0N/A * @throws Exception
0N/A * If an exception occurred.
0N/A */
0N/A public NavigableSet<IndexInputBuffer> getNextBufferBatch() throws Exception
0N/A {
0N/A // First release any previously acquired permits.
0N/A if (ownedPermits > 0)
0N/A {
0N/A permits.release(ownedPermits);
0N/A ownedPermits = 0;
0N/A }
0N/A
0N/A // Block until we can either get enough permits for all buffers, or the
0N/A // maximum number of permits.
0N/A final int permitRequest = Math.min(remainingBuffers, maxPermits);
0N/A if (permitRequest == 0)
0N/A {
0N/A // No more work to do.
0N/A return null;
0N/A }
0N/A permits.acquire(permitRequest);
0N/A
0N/A // Update counters.
0N/A ownedPermits = permitRequest;
0N/A remainingBuffers -= permitRequest;
0N/A batchNumber.incrementAndGet();
0N/A
0N/A // Create all the index buffers for the next batch.
0N/A final NavigableSet<IndexInputBuffer> buffers = new TreeSet<>();
0N/A for (int i = 0; i < permitRequest; i++)
0N/A {
0N/A final long bufferBegin = bufferIndexFile.readLong();
0N/A final long bufferEnd = bufferIndexFile.readLong();
0N/A buffers.add(
0N/A new IndexInputBuffer(indexMgr, bufferFile.getChannel(),
0N/A bufferBegin, bufferEnd, nextBufferID++, cacheSize));
0N/A }
0N/A
0N/A return buffers;
0N/A }
0N/A
0N/A /** Finishes this task. */
0N/A private void endWriteTask(org.opends.server.backends.pluggable.spi.Importer importer)
0N/A {
0N/A isRunning = false;
0N/A
0N/A // First release any previously acquired permits.
0N/A if (ownedPermits > 0)
0N/A {
0N/A permits.release(ownedPermits);
0N/A ownedPermits = 0;
0N/A }
0N/A
0N/A try
0N/A {
0N/A if (indexMgr.isDN2ID())
0N/A {
0N/A for (DNState dnState : dnStateMap.values())
0N/A {
0N/A dnState.finalFlush(importer);
0N/A }
0N/A
0N/A if (!isCanceled)
0N/A {
0N/A logger.info(NOTE_IMPORT_LDIF_DN_CLOSE, indexMgr.getDNCount());
0N/A }
0N/A }
0N/A else
0N/A {
0N/A if (!isCanceled)
0N/A {
0N/A logger.info(NOTE_IMPORT_LDIF_INDEX_CLOSE, indexMgr.getBufferFileName());
0N/A }
0N/A }
0N/A }
0N/A finally
0N/A {
0N/A close(bufferFile, bufferIndexFile);
0N/A
0N/A indexMgr.getBufferFile().delete();
0N/A indexMgr.getBufferIndexFile().delete();
0N/A }
0N/A }
0N/A
0N/A /**
0N/A * Print out progress stats.
0N/A *
0N/A * @param deltaTime
0N/A * The time since the last update.
0N/A */
0N/A public void printStats(long deltaTime)
0N/A {
0N/A if (isRunning)
0N/A {
0N/A final long bufferFileSize = indexMgr.getBufferFileSize();
0N/A final long tmpBytesRead = bytesRead.get();
0N/A final int currentBatch = batchNumber.get();
0N/A
0N/A final long bytesReadInterval = tmpBytesRead - lastBytesRead;
0N/A final int bytesReadPercent =
0N/A Math.round((100f * tmpBytesRead) / bufferFileSize);
0N/A
0N/A // Kilo and milli approximately cancel out.
0N/A final long kiloBytesRate = bytesReadInterval / deltaTime;
0N/A final long kiloBytesRemaining = (bufferFileSize - tmpBytesRead) / 1024;
0N/A
0N/A logger.info(NOTE_IMPORT_LDIF_PHASE_TWO_REPORT, indexMgr.getBufferFileName(),
0N/A bytesReadPercent, kiloBytesRemaining, kiloBytesRate, currentBatch, totalBatches);
0N/A
0N/A lastBytesRead = tmpBytesRead;
0N/A }
0N/A }
0N/A
0N/A /** {@inheritDoc} */
0N/A @Override
0N/A public Void call() throws Exception
0N/A {
0N/A call0(importer);
0N/A return null;
0N/A }
0N/A
0N/A private void call0(org.opends.server.backends.pluggable.spi.Importer importer) throws Exception
0N/A {
0N/A if (isCanceled)
0N/A {
0N/A return;
0N/A }
0N/A
0N/A ImportIDSet insertIDSet = null;
0N/A ImportIDSet deleteIDSet = null;
0N/A ImportRecord previousRecord = null;
0N/A try
0N/A {
0N/A beginWriteTask();
0N/A
0N/A NavigableSet<IndexInputBuffer> bufferSet;
0N/A while ((bufferSet = getNextBufferBatch()) != null)
0N/A {
0N/A if (isCanceled)
0N/A {
0N/A return;
0N/A }
0N/A
0N/A while (!bufferSet.isEmpty())
0N/A {
0N/A IndexInputBuffer b = bufferSet.pollFirst();
1945N/A if (!b.currentRecord().equals(previousRecord))
1945N/A {
1945N/A if (previousRecord != null)
1945N/A {
1945N/A addToDB(importer, previousRecord.getIndexID(), insertIDSet, deleteIDSet);
0N/A }
0N/A
0N/A // this is a new record
0N/A final ImportRecord newRecord = b.currentRecord();
0N/A insertIDSet = newImportIDSet(newRecord);
0N/A deleteIDSet = newImportIDSet(newRecord);
0N/A
0N/A previousRecord = newRecord;
0N/A }
0N/A
0N/A // merge all entryIds into the idSets
0N/A b.mergeIDSet(insertIDSet);
0N/A b.mergeIDSet(deleteIDSet);
0N/A
0N/A if (b.hasMoreData())
0N/A {
0N/A b.fetchNextRecord();
0N/A bufferSet.add(b);
0N/A }
0N/A }
0N/A
0N/A if (previousRecord != null)
0N/A {
517N/A addToDB(importer, previousRecord.getIndexID(), insertIDSet, deleteIDSet);
0N/A }
0N/A }
0N/A }
0N/A catch (Exception e)
0N/A {
0N/A logger.error(ERR_IMPORT_LDIF_INDEX_WRITE_DB_ERR, indexMgr.getBufferFileName(), e.getMessage());
0N/A throw e;
}
finally
{
endWriteTask(importer);
}
}
private ImportIDSet newImportIDSet(ImportRecord record)
{
if (indexMgr.isDN2ID())
{
return new ImportIDSet(record.getKey(), newDefinedSet(), 1);
}
final Index index = indexIDToIndexMap.get(record.getIndexID());
return new ImportIDSet(record.getKey(), newDefinedSet(), index.getIndexEntryLimit());
}
private void addToDB(org.opends.server.backends.pluggable.spi.Importer importer, int indexID, ImportIDSet insertSet,
ImportIDSet deleteSet) throws DirectoryException
{
keyCount.incrementAndGet();
if (indexMgr.isDN2ID())
{
addDN2ID(importer, indexID, insertSet);
}
else
{
if (!deleteSet.isDefined() || deleteSet.size() > 0)
{
final Index index = indexIDToIndexMap.get(indexID);
index.importRemove(importer, deleteSet);
}
if (!insertSet.isDefined() || insertSet.size() > 0)
{
final Index index = indexIDToIndexMap.get(indexID);
index.importPut(importer, insertSet);
}
}
}
private void addDN2ID(org.opends.server.backends.pluggable.spi.Importer importer, int indexID, ImportIDSet idSet)
throws DirectoryException
{
DNState dnState = dnStateMap.get(indexID);
if (dnState == null)
{
dnState = new DNState(indexIDToECMap.get(indexID));
dnStateMap.put(indexID, dnState);
}
if (dnState.checkParent(importer, idSet))
{
dnState.writeToDN2ID(importer, idSet.getKey());
}
}
private void addBytesRead(int bytesRead)
{
this.bytesRead.addAndGet(bytesRead);
}
/**
* This class is used to by a index DB merge thread performing DN processing
* to keep track of the state of individual DN2ID index processing.
*/
private final class DNState
{
private static final int DN_STATE_CACHE_SIZE = 64 * KB;
private final EntryContainer entryContainer;
private final TreeName dn2id;
private final TreeMap<ByteString, EntryID> parentIDMap = new TreeMap<>();
private final Map<EntryID, AtomicLong> id2childrenCountTree = new TreeMap<>();
private ByteSequence parentDN;
private final ByteStringBuilder lastDN = new ByteStringBuilder();
private EntryID parentID, lastID, entryID;
private long totalNbEntries;
private DNState(EntryContainer entryContainer)
{
this.entryContainer = entryContainer;
dn2id = entryContainer.getDN2ID().getName();
}
private ByteSequence getParent(ByteSequence dn)
{
int parentIndex = DnKeyFormat.findDNKeyParent(dn);
if (parentIndex < 0)
{
// This is the root or base DN
return null;
}
return dn.subSequence(0, parentIndex).toByteString();
}
/** Why do we still need this if we are checking parents in the first phase? */
boolean checkParent(org.opends.server.backends.pluggable.spi.Importer importer, ImportIDSet idSet)
throws StorageRuntimeException
{
entryID = idSet.iterator().next();
parentDN = getParent(idSet.getKey());
if (bypassCacheForAppendMode())
{
// If null is returned then this is a suffix DN.
if (parentDN != null)
{
parentID = get(importer, dn2id, parentDN);
if (parentID == null)
{
// We have a missing parent. Maybe parent checking was turned off?
// Just ignore.
return false;
}
}
}
else if (parentIDMap.isEmpty())
{
parentIDMap.put(idSet.getKey().toByteString(), entryID);
return true;
}
else if (lastID != null && lastDN.equals(parentDN))
{
parentIDMap.put(lastDN.toByteString(), lastID);
parentID = lastID;
lastDN.clear().append(idSet.getKey());
lastID = entryID;
return true;
}
else if (parentIDMap.lastKey().equals(parentDN))
{
parentID = parentIDMap.get(parentDN);
lastDN.clear().append(idSet.getKey());
lastID = entryID;
return true;
}
else if (parentIDMap.containsKey(parentDN))
{
EntryID newParentID = parentIDMap.get(parentDN);
ByteSequence key = parentIDMap.lastKey();
while (!parentDN.equals(key))
{
parentIDMap.remove(key);
key = parentIDMap.lastKey();
}
parentIDMap.put(idSet.getKey().toByteString(), entryID);
parentID = newParentID;
lastDN.clear().append(idSet.getKey());
lastID = entryID;
}
else
{
// We have a missing parent. Maybe parent checking was turned off?
// Just ignore.
parentID = null;
return false;
}
return true;
}
private AtomicLong getId2childrenCounter()
{
AtomicLong counter = id2childrenCountTree.get(parentID);
if (counter == null)
{
counter = new AtomicLong();
id2childrenCountTree.put(parentID, counter);
}
return counter;
}
/**
* For append data, bypass the {@link #parentIDMap} cache, and lookup the parent DN in the
* DN2ID index.
*/
private boolean bypassCacheForAppendMode()
{
return importCfg != null && importCfg.appendToExistingData();
}
private EntryID get(org.opends.server.backends.pluggable.spi.Importer importer, TreeName dn2id, ByteSequence dn)
throws StorageRuntimeException
{
ByteString value = importer.read(dn2id, dn);
return value != null ? new EntryID(value) : null;
}
void writeToDN2ID(org.opends.server.backends.pluggable.spi.Importer importer, ByteSequence key)
throws DirectoryException
{
importer.put(dn2id, key, entryID.toByteString());
indexMgr.addTotDNCount(1);
if (parentID != null)
{
incrementChildrenCounter(importer);
}
}
private void incrementChildrenCounter(org.opends.server.backends.pluggable.spi.Importer importer)
{
final AtomicLong counter = getId2childrenCounter();
counter.incrementAndGet();
if (id2childrenCountTree.size() > DN_STATE_CACHE_SIZE)
{
flush(importer);
}
}
private void flush(org.opends.server.backends.pluggable.spi.Importer importer)
{
for (Map.Entry<EntryID, AtomicLong> childrenCounter : id2childrenCountTree.entrySet())
{
final EntryID entryID = childrenCounter.getKey();
final long totalForEntryID = childrenCounter.getValue().get();
totalNbEntries += totalForEntryID;
entryContainer.getID2ChildrenCount().importPut(importer, entryID, totalForEntryID);
}
id2childrenCountTree.clear();
}
void finalFlush(org.opends.server.backends.pluggable.spi.Importer importer)
{
flush(importer);
entryContainer.getID2ChildrenCount().importPutTotalCount(importer, totalNbEntries);
}
}
}
/**
* This task writes the temporary scratch index files using the sorted buffers
* read from a blocking queue private to each index.
*/
private final class ScratchFileWriterTask implements Callable<Void>
{
private static final int DRAIN_TO = 3;
private final IndexManager indexMgr;
private final BlockingQueue<IndexOutputBuffer> queue;
/** Stream where to output insert ImportIDSet data. */
private final ByteArrayOutputStream insertByteStream = new ByteArrayOutputStream(2 * bufferSize);
private final DataOutputStream insertByteDataStream = new DataOutputStream(insertByteStream);
/** Stream where to output delete ImportIDSet data. */
private final ByteArrayOutputStream deleteByteStream = new ByteArrayOutputStream(2 * bufferSize);
private final DataOutputStream bufferStream;
private final DataOutputStream bufferIndexStream;
private final TreeSet<IndexOutputBuffer> indexSortedSet = new TreeSet<>();
private int insertKeyCount, deleteKeyCount;
private int bufferCount;
private boolean poisonSeen;
public ScratchFileWriterTask(BlockingQueue<IndexOutputBuffer> queue,
IndexManager indexMgr) throws FileNotFoundException
{
this.queue = queue;
this.indexMgr = indexMgr;
this.bufferStream = newDataOutputStream(indexMgr.getBufferFile());
this.bufferIndexStream = newDataOutputStream(indexMgr.getBufferIndexFile());
}
private DataOutputStream newDataOutputStream(File file) throws FileNotFoundException
{
return new DataOutputStream(new BufferedOutputStream(new FileOutputStream(file), READER_WRITER_BUFFER_SIZE));
}
/** {@inheritDoc} */
@Override
public Void call() throws IOException, InterruptedException
{
long offset = 0;
List<IndexOutputBuffer> l = new LinkedList<>();
try
{
while (true)
{
final IndexOutputBuffer indexBuffer = queue.take();
long beginOffset = offset;
long bufferLen;
if (!queue.isEmpty())
{
queue.drainTo(l, DRAIN_TO);
l.add(indexBuffer);
bufferLen = writeIndexBuffers(l);
for (IndexOutputBuffer id : l)
{
if (!id.isDiscarded())
{
id.reset();
freeBufferQueue.add(id);
}
}
l.clear();
}
else
{
if (indexBuffer.isPoison())
{
break;
}
bufferLen = writeIndexBuffer(indexBuffer);
if (!indexBuffer.isDiscarded())
{
indexBuffer.reset();
freeBufferQueue.add(indexBuffer);
}
}
offset += bufferLen;
// Write buffer index information.
bufferIndexStream.writeLong(beginOffset);
bufferIndexStream.writeLong(offset);
bufferCount++;
Importer.this.bufferCount.incrementAndGet();
if (poisonSeen)
{
break;
}
}
return null;
}
catch (IOException e)
{
logger.error(ERR_IMPORT_LDIF_INDEX_FILEWRITER_ERR, indexMgr.getBufferFile().getAbsolutePath(), e.getMessage());
isCanceled = true;
throw e;
}
finally
{
close(bufferStream, bufferIndexStream);
indexMgr.setBufferInfo(bufferCount, indexMgr.getBufferFile().length());
}
}
private long writeIndexBuffer(IndexOutputBuffer indexBuffer) throws IOException
{
long bufferLen = 0;
final int numberKeys = indexBuffer.getNumberKeys();
for (int i = 0; i < numberKeys; i++)
{
if (i == 0)
{
// first record, initialize all
indexBuffer.setPosition(i);
resetStreams();
}
else if (!indexBuffer.sameKeyAndIndexID(i))
{
// this is a new record, save previous record ...
bufferLen += writeRecord(indexBuffer.currentRecord());
// ... and reinitialize all
indexBuffer.setPosition(i);
resetStreams();
}
appendNextEntryIDToStream(indexBuffer, i);
}
if (numberKeys > 0)
{
// save the last record
bufferLen += writeRecord(indexBuffer.currentRecord());
}
return bufferLen;
}
private long writeIndexBuffers(List<IndexOutputBuffer> buffers) throws IOException
{
resetStreams();
long bufferID = 0;
long bufferLen = 0;
for (IndexOutputBuffer b : buffers)
{
if (b.isPoison())
{
poisonSeen = true;
}
else
{
b.setPosition(0);
b.setBufferID(bufferID++);
indexSortedSet.add(b);
}
}
ImportRecord previousRecord = null;
while (!indexSortedSet.isEmpty())
{
final IndexOutputBuffer b = indexSortedSet.pollFirst();
if (!b.currentRecord().equals(previousRecord))
{
if (previousRecord != null)
{
bufferLen += writeRecord(previousRecord);
resetStreams();
}
// this is a new record
previousRecord = b.currentRecord();
}
appendNextEntryIDToStream(b, b.getPosition());
if (b.hasMoreData())
{
b.nextRecord();
indexSortedSet.add(b);
}
}
if (previousRecord != null)
{
bufferLen += writeRecord(previousRecord);
}
return bufferLen;
}
private void resetStreams()
{
insertByteStream.reset();
insertKeyCount = 0;
deleteByteStream.reset();
deleteKeyCount = 0;
}
private void appendNextEntryIDToStream(IndexOutputBuffer indexBuffer, int position)
{
if (indexBuffer.isInsertRecord(position))
{
if (insertKeyCount++ <= indexMgr.getIndexEntryLimit())
{
indexBuffer.writeEntryID(insertByteStream, position);
}
// else do not bother appending, this value will not be read.
// instead, a special value will be written to show the index entry limit is exceeded
}
else
{
indexBuffer.writeEntryID(deleteByteStream, position);
deleteKeyCount++;
}
}
private int writeByteStreams() throws IOException
{
if (insertKeyCount > indexMgr.getIndexEntryLimit())
{
// special handling when index entry limit has been exceeded
insertKeyCount = 1;
insertByteStream.reset();
insertByteDataStream.writeLong(IndexInputBuffer.UNDEFINED_SIZE);
}
int insertSize = INT_SIZE;
bufferStream.writeInt(insertKeyCount);
if (insertByteStream.size() > 0)
{
insertByteStream.writeTo(bufferStream);
}
int deleteSize = INT_SIZE;
bufferStream.writeInt(deleteKeyCount);
if (deleteByteStream.size() > 0)
{
deleteByteStream.writeTo(bufferStream);
}
return insertSize + insertByteStream.size() + deleteSize + deleteByteStream.size();
}
private int writeHeader(int indexID, int keySize) throws IOException
{
bufferStream.writeInt(indexID);
bufferStream.writeInt(keySize);
return 2 * INT_SIZE;
}
private int writeRecord(ImportRecord record) throws IOException
{
final ByteSequence key = record.getKey();
int keySize = key.length();
int headerSize = writeHeader(record.getIndexID(), keySize);
key.copyTo(bufferStream);
int bodySize = writeByteStreams();
return headerSize + keySize + bodySize;
}
/** {@inheritDoc} */
@Override
public String toString()
{
return getClass().getSimpleName() + "(" + indexMgr.getBufferFileName() + ": " + indexMgr.getBufferFile() + ")";
}
}
/**
* This task main function is to sort the index buffers given to it from the
* import tasks reading the LDIF file. It will also create a index file writer
* task and corresponding queue if needed. The sorted index buffers are put on
* the index file writer queues for writing to a temporary file.
*/
private final class SortTask implements Callable<Void>
{
private final IndexOutputBuffer indexBuffer;
public SortTask(IndexOutputBuffer indexBuffer)
{
this.indexBuffer = indexBuffer;
}
/** {@inheritDoc} */
@Override
public Void call() throws Exception
{
if ((importCfg != null && importCfg.isCancelled())
|| isCanceled)
{
isCanceled = true;
return null;
}
indexBuffer.sort();
final IndexKey indexKey = indexBuffer.getIndexKey();
if (!indexKeyQueueMap.containsKey(indexKey))
{
createIndexWriterTask(indexKey);
}
indexKeyQueueMap.get(indexKey).add(indexBuffer);
return null;
}
private void createIndexWriterTask(IndexKey indexKey) throws FileNotFoundException
{
synchronized (synObj)
{
if (indexKeyQueueMap.containsKey(indexKey))
{
return;
}
boolean isDN2ID = DN2ID_INDEX_NAME.equals(indexKey.getIndexID());
IndexManager indexMgr = new IndexManager(indexKey.getName(), isDN2ID, indexKey.getEntryLimit());
if (isDN2ID)
{
DNIndexMgrList.add(indexMgr);
}
else
{
indexMgrList.add(indexMgr);
}
BlockingQueue<IndexOutputBuffer> newQueue = new ArrayBlockingQueue<>(phaseOneBufferCount);
ScratchFileWriterTask indexWriter = new ScratchFileWriterTask(newQueue, indexMgr);
scratchFileWriterList.add(indexWriter);
scratchFileWriterFutures.add(scratchFileWriterService.submit(indexWriter));
indexKeyQueueMap.put(indexKey, newQueue);
}
}
}
/**
* The index manager class has several functions:
* <ol>
* <li>It is used to carry information about index processing created in phase one to phase two</li>
* <li>It collects statistics about phase two processing for each index</li>
* <li>It manages opening and closing the scratch index files</li>
* </ol>
*/
final class IndexManager implements Comparable<IndexManager>
{
private final File bufferFile;
private final String bufferFileName;
private final File bufferIndexFile;
private final boolean isDN2ID;
private final int indexEntryLimit;
private int numberOfBuffers;
private long bufferFileSize;
private long totalDNs;
private volatile IndexDBWriteTask writer;
private IndexManager(String fileName, boolean isDN2ID, int indexEntryLimit)
{
this.bufferFileName = fileName;
this.bufferFile = new File(tempDir, bufferFileName);
this.bufferIndexFile = new File(tempDir, bufferFileName + ".index");
this.isDN2ID = isDN2ID;
this.indexEntryLimit = indexEntryLimit > 0 ? indexEntryLimit : Integer.MAX_VALUE;
}
private void setIndexDBWriteTask(IndexDBWriteTask writer)
{
this.writer = writer;
}
private File getBufferFile()
{
return bufferFile;
}
private long getBufferFileSize()
{
return bufferFileSize;
}
private File getBufferIndexFile()
{
return bufferIndexFile;
}
private void setBufferInfo(int numberOfBuffers, long bufferFileSize)
{
this.numberOfBuffers = numberOfBuffers;
this.bufferFileSize = bufferFileSize;
}
/**
* Updates the bytes read counter.
*
* @param bytesRead
* The number of bytes read.
*/
void addBytesRead(int bytesRead)
{
if (writer != null)
{
writer.addBytesRead(bytesRead);
}
}
private void addTotDNCount(int delta)
{
totalDNs += delta;
}
private long getDNCount()
{
return totalDNs;
}
private boolean isDN2ID()
{
return isDN2ID;
}
private void printStats(long deltaTime)
{
if (writer != null)
{
writer.printStats(deltaTime);
}
}
/**
* Returns the file name associated with this index manager.
*
* @return The file name associated with this index manager.
*/
String getBufferFileName()
{
return bufferFileName;
}
private int getIndexEntryLimit()
{
return indexEntryLimit;
}
/** {@inheritDoc} */
@Override
public int compareTo(IndexManager mgr)
{
return numberOfBuffers - mgr.numberOfBuffers;
}
private int getNumberOfBuffers()
{
return numberOfBuffers;
}
/** {@inheritDoc} */
@Override
public String toString()
{
return getClass().getSimpleName() + "(" + bufferFileName + ": " + bufferFile + ")";
}
}
/** The rebuild index manager handles all rebuild index related processing. */
private class RebuildIndexManager extends ImportTask
{
/** Rebuild index configuration. */
private final RebuildConfig rebuildConfig;
/** Local DB backend configuration. */
private final PluggableBackendCfg cfg;
/** Map of index keys to indexes. */
private final Map<IndexKey, MatchingRuleIndex> indexMap = new LinkedHashMap<>();
/** List of VLV indexes. */
private final List<VLVIndex> vlvIndexes = new LinkedList<>();
/** The suffix instance. */
private Suffix suffix;
/** The entry container. */
private EntryContainer entryContainer;
/** The DN2ID index. */
private DN2ID dn2id;
/** The DN2URI index. */
private DN2URI dn2uri;
/** Total entries to be processed. */
private long totalEntries;
/** Total entries processed. */
private final AtomicLong entriesProcessed = new AtomicLong(0);
RebuildIndexManager(Storage storage, RebuildConfig rebuildConfig, PluggableBackendCfg cfg)
{
super(storage);
this.rebuildConfig = rebuildConfig;
this.cfg = cfg;
}
void initialize() throws ConfigException, InitializationException
{
entryContainer = rootContainer.getEntryContainer(rebuildConfig.getBaseDN());
suffix = new Suffix(entryContainer, null, null, null);
}
private void printStartMessage(WriteableTransaction txn) throws StorageRuntimeException
{
totalEntries = suffix.getID2Entry().getRecordCount(txn);
switch (rebuildConfig.getRebuildMode())
{
case ALL:
logger.info(NOTE_REBUILD_ALL_START, totalEntries);
break;
case DEGRADED:
logger.info(NOTE_REBUILD_DEGRADED_START, totalEntries);
break;
default:
if (!rebuildConfig.isClearDegradedState()
&& logger.isInfoEnabled())
{
String indexes = Utils.joinAsString(", ", rebuildConfig.getRebuildList());
logger.info(NOTE_REBUILD_START, indexes, totalEntries);
}
break;
}
}
void printStopMessage(long rebuildStartTime)
{
long finishTime = System.currentTimeMillis();
long totalTime = finishTime - rebuildStartTime;
float rate = 0;
if (totalTime > 0)
{
rate = 1000f * entriesProcessed.get() / totalTime;
}
if (!rebuildConfig.isClearDegradedState())
{
logger.info(NOTE_REBUILD_FINAL_STATUS, entriesProcessed.get(), totalTime / 1000, rate);
}
}
@Override
void call0(WriteableTransaction txn) throws Exception
{
ID2Entry id2entry = entryContainer.getID2Entry();
Cursor<ByteString, ByteString> cursor = txn.openCursor(id2entry.getName());
try
{
while (cursor.next())
{
if (isCanceled)
{
return;
}
EntryID entryID = new EntryID(cursor.getKey());
Entry entry =
ID2Entry.entryFromDatabase(cursor.getValue(),
entryContainer.getRootContainer().getCompressedSchema());
processEntry(txn, entry, entryID);
entriesProcessed.getAndIncrement();
}
flushIndexBuffers();
}
catch (Exception e)
{
logger.traceException(e);
logger.error(ERR_IMPORT_LDIF_REBUILD_INDEX_TASK_ERR, stackTraceToSingleLineString(e));
isCanceled = true;
throw e;
}
finally
{
close(cursor);
}
}
private void clearDegradedState(WriteableTransaction txn)
{
setIndexesListsToBeRebuilt(txn);
logger.info(NOTE_REBUILD_CLEARDEGRADEDSTATE_FINAL_STATUS, rebuildConfig.getRebuildList());
postRebuildIndexes(txn);
}
private void preRebuildIndexes(WriteableTransaction txn)
{
setIndexesListsToBeRebuilt(txn);
setRebuildListIndexesTrusted(txn, false);
clearIndexesToBeRebuilt(txn);
}
private void throwIfCancelled() throws InterruptedException
{
if (isCanceled)
{
throw new InterruptedException("Rebuild Index canceled.");
}
}
private void postRebuildIndexes(WriteableTransaction txn)
{
setRebuildListIndexesTrusted(txn, true);
}
@SuppressWarnings("fallthrough")
private void setIndexesListsToBeRebuilt(WriteableTransaction txn) throws StorageRuntimeException
{
// Depends on rebuild mode, (re)building indexes' lists.
final RebuildMode mode = rebuildConfig.getRebuildMode();
switch (mode)
{
case ALL:
rebuildIndexMap(txn, false);
// falls through
case DEGRADED:
if (mode == RebuildMode.ALL)
{
dn2id = entryContainer.getDN2ID();
}
if (mode == RebuildMode.ALL || entryContainer.getDN2URI() == null)
{
dn2uri = entryContainer.getDN2URI();
}
if (mode == RebuildMode.DEGRADED
|| entryContainer.getAttributeIndexes().isEmpty())
{
rebuildIndexMap(txn, true); // only degraded.
}
if (mode == RebuildMode.ALL || vlvIndexes.isEmpty())
{
vlvIndexes.addAll(entryContainer.getVLVIndexes());
}
break;
case USER_DEFINED:
// false may be required if the user wants to rebuild specific index.
rebuildIndexMap(txn, false);
break;
default:
break;
}
}
private void rebuildIndexMap(WriteableTransaction txn, boolean onlyDegraded)
{
// rebuildList contains the user-selected index(in USER_DEFINED mode).
final List<String> rebuildList = rebuildConfig.getRebuildList();
for (final Map.Entry<AttributeType, AttributeIndex> mapEntry : suffix.getAttrIndexMap().entrySet())
{
final AttributeType attributeType = mapEntry.getKey();
final AttributeIndex attributeIndex = mapEntry.getValue();
if (rebuildConfig.getRebuildMode() == RebuildMode.ALL
|| rebuildConfig.getRebuildMode() == RebuildMode.DEGRADED)
{
// Get all existing indexes for all && degraded mode.
rebuildAttributeIndexes(txn, attributeIndex, attributeType, onlyDegraded);
}
else if (!rebuildList.isEmpty())
{
// Get indexes for user defined index.
for (final String index : rebuildList)
{
if (attributeType.getNameOrOID().toLowerCase().equals(index.toLowerCase()))
{
rebuildAttributeIndexes(txn, attributeIndex, attributeType, onlyDegraded);
}
}
}
}
}
private void rebuildAttributeIndexes(WriteableTransaction txn, AttributeIndex attrIndex, AttributeType attrType,
boolean onlyDegraded) throws StorageRuntimeException
{
for (Map.Entry<String, MatchingRuleIndex> mapEntry : attrIndex.getNameToIndexes().entrySet())
{
fillIndexMap(txn, attrType, mapEntry.getValue(), mapEntry.getKey(), onlyDegraded);
}
}
private void fillIndexMap(WriteableTransaction txn, AttributeType attrType, MatchingRuleIndex index,
String importIndexID, boolean onlyDegraded)
{
if ((!onlyDegraded || !index.isTrusted())
&& (!rebuildConfig.isClearDegradedState() || index.getRecordCount(txn) == 0))
{
putInIdContainerMap(index);
final IndexKey key = new IndexKey(attrType, importIndexID, index.getIndexEntryLimit());
indexMap.put(key, index);
}
}
private void clearIndexesToBeRebuilt(WriteableTransaction txn) throws StorageRuntimeException
{
if (dn2uri != null)
{
entryContainer.clearDatabase(txn, entryContainer.getDN2URI());
}
if (dn2id != null)
{
entryContainer.clearDatabase(txn, entryContainer.getDN2ID());
entryContainer.clearDatabase(txn, entryContainer.getID2ChildrenCount());
}
for (Map.Entry<IndexKey, MatchingRuleIndex> mapEntry : indexMap.entrySet())
{
final Index index = mapEntry.getValue();
if (!index.isTrusted())
{
entryContainer.clearDatabase(txn, index);
}
}
for (final VLVIndex vlvIndex : entryContainer.getVLVIndexes())
{
if (!vlvIndex.isTrusted())
{
entryContainer.clearDatabase(txn, vlvIndex);
}
}
}
private void setRebuildListIndexesTrusted(WriteableTransaction txn, boolean trusted) throws StorageRuntimeException
{
try
{
setTrusted(txn, indexMap.values(), trusted);
for (VLVIndex vlvIndex : vlvIndexes)
{
vlvIndex.setTrusted(txn, trusted);
}
}
catch (StorageRuntimeException ex)
{
throw new StorageRuntimeException(NOTE_IMPORT_LDIF_TRUSTED_FAILED.get(ex.getMessage()).toString());
}
}
private void setTrusted(WriteableTransaction txn, final Collection<MatchingRuleIndex> indexes, boolean trusted)
{
if (indexes != null && !indexes.isEmpty())
{
for (Index index : indexes)
{
index.setTrusted(txn, trusted);
}
}
}
/** @see Importer#importPhaseOne(WriteableTransaction) */
private void rebuildIndexesPhaseOne() throws StorageRuntimeException, InterruptedException,
ExecutionException
{
initializeIndexBuffers();
Timer timer = scheduleAtFixedRate(new RebuildFirstPhaseProgressTask());
scratchFileWriterService = Executors.newFixedThreadPool(2 * indexCount);
bufferSortService = Executors.newFixedThreadPool(threadCount);
ExecutorService rebuildIndexService = Executors.newFixedThreadPool(threadCount);
List<Callable<Void>> tasks = new ArrayList<>(threadCount);
for (int i = 0; i < threadCount; i++)
{
tasks.add(this);
}
List<Future<Void>> results = rebuildIndexService.invokeAll(tasks);
getAll(results);
stopScratchFileWriters();
getAll(scratchFileWriterFutures);
// Try to clear as much memory as possible.
shutdownAll(rebuildIndexService, bufferSortService, scratchFileWriterService);
timer.cancel();
clearAll(tasks, results, scratchFileWriterList, scratchFileWriterFutures, freeBufferQueue);
indexKeyQueueMap.clear();
}
private void rebuildIndexesPhaseTwo() throws Exception
{
final Timer timer = scheduleAtFixedRate(new SecondPhaseProgressTask());
try
{
processIndexFiles();
}
finally
{
timer.cancel();
}
}
private Timer scheduleAtFixedRate(TimerTask task)
{
final Timer timer = new Timer();
timer.scheduleAtFixedRate(task, TIMER_INTERVAL, TIMER_INTERVAL);
return timer;
}
private int getIndexCount() throws ConfigException, StorageRuntimeException,
InitializationException
{
switch (rebuildConfig.getRebuildMode())
{
case ALL:
return getTotalIndexCount(cfg);
case DEGRADED:
// FIXME: since the environment is not started we cannot determine which
// indexes are degraded. As a workaround, be conservative and assume all
// indexes need rebuilding.
return getTotalIndexCount(cfg);
default:
return getRebuildListIndexCount(cfg);
}
}
private int getRebuildListIndexCount(PluggableBackendCfg cfg)
throws StorageRuntimeException, ConfigException, InitializationException
{
final List<String> rebuildList = rebuildConfig.getRebuildList();
if (rebuildList.isEmpty())
{
return 0;
}
int indexCount = 0;
for (String index : rebuildList)
{
final String lowerName = index.toLowerCase();
if (DN2ID_INDEX_NAME.equals(lowerName))
{
indexCount += 3;
}
else if ("dn2uri".equals(lowerName))
{
indexCount++;
}
else if (lowerName.startsWith("vlv."))
{
if (lowerName.length() < 5)
{
throw new StorageRuntimeException(ERR_VLV_INDEX_NOT_CONFIGURED.get(lowerName).toString());
}
indexCount++;
}
else if (ID2SUBTREE_INDEX_NAME.equals(lowerName)
|| ID2CHILDREN_INDEX_NAME.equals(lowerName))
{
throw attributeIndexNotConfigured(index);
}
else
{
final String[] attrIndexParts = lowerName.split("\\.");
if (attrIndexParts.length <= 0 || attrIndexParts.length > 3)
{
throw attributeIndexNotConfigured(index);
}
AttributeType attrType = DirectoryServer.getAttributeType(attrIndexParts[0]);
if (attrType == null)
{
throw attributeIndexNotConfigured(index);
}
if (attrIndexParts.length != 1)
{
final String indexType = attrIndexParts[1];
if (attrIndexParts.length == 2)
{
if ("presence".equals(indexType)
|| "equality".equals(indexType)
|| "ordering".equals(indexType)
|| "substring".equals(indexType)
|| "approximate".equals(indexType))
{
indexCount++;
}
else
{
throw attributeIndexNotConfigured(index);
}
}
else // attrIndexParts.length == 3
{
if (!findExtensibleMatchingRule(cfg, indexType + "." + attrIndexParts[2]))
{
throw attributeIndexNotConfigured(index);
}
indexCount++;
}
}
else
{
boolean found = false;
for (final String idx : cfg.listBackendIndexes())
{
if (idx.equalsIgnoreCase(index))
{
found = true;
final BackendIndexCfg indexCfg = cfg.getBackendIndex(idx);
indexCount += getAttributeIndexCount(indexCfg.getIndexType(),
PRESENCE, EQUALITY, ORDERING, SUBSTRING, APPROXIMATE);
indexCount += getExtensibleIndexCount(indexCfg);
}
}
if (!found)
{
throw attributeIndexNotConfigured(index);
}
}
}
}
return indexCount;
}
private InitializationException attributeIndexNotConfigured(String index)
{
return new InitializationException(ERR_ATTRIBUTE_INDEX_NOT_CONFIGURED.get(index));
}
private boolean findExtensibleMatchingRule(PluggableBackendCfg cfg, String indexExRuleName) throws ConfigException
{
for (String idx : cfg.listBackendIndexes())
{
BackendIndexCfg indexCfg = cfg.getBackendIndex(idx);
if (indexCfg.getIndexType().contains(EXTENSIBLE))
{
for (String exRule : indexCfg.getIndexExtensibleMatchingRule())
{
if (exRule.equalsIgnoreCase(indexExRuleName))
{
return true;
}
}
}
}
return false;
}
private int getAttributeIndexCount(SortedSet<IndexType> indexTypes, IndexType... toFinds)
{
int result = 0;
for (IndexType toFind : toFinds)
{
if (indexTypes.contains(toFind))
{
result++;
}
}
return result;
}
private int getExtensibleIndexCount(BackendIndexCfg indexCfg)
{
int result = 0;
if (indexCfg.getIndexType().contains(EXTENSIBLE))
{
boolean shared = false;
for (final String exRule : indexCfg.getIndexExtensibleMatchingRule())
{
if (exRule.endsWith(".sub"))
{
result++;
}
else if (!shared)
{
shared = true;
result++;
}
}
}
return result;
}
private void processEntry(WriteableTransaction txn, Entry entry, EntryID entryID)
throws DirectoryException, StorageRuntimeException, InterruptedException
{
if (dn2id != null)
{
processDN2ID(suffix, entry.getName(), entryID);
}
if (dn2uri != null)
{
processDN2URI(txn, suffix, null, entry);
}
processIndexes(entry, entryID);
processVLVIndexes(txn, entry, entryID);
}
private void processVLVIndexes(WriteableTransaction txn, Entry entry, EntryID entryID)
throws StorageRuntimeException, DirectoryException
{
final IndexBuffer buffer = new IndexBuffer(entryContainer);
for (VLVIndex vlvIdx : suffix.getEntryContainer().getVLVIndexes())
{
vlvIdx.addEntry(buffer, entryID, entry);
}
buffer.flush(txn);
}
private void processIndexes(Entry entry, EntryID entryID)
throws StorageRuntimeException, InterruptedException
{
for (Map.Entry<IndexKey, MatchingRuleIndex> mapEntry : indexMap.entrySet())
{
IndexKey key = mapEntry.getKey();
AttributeType attrType = key.getAttributeType();
if (entry.hasAttribute(attrType))
{
AttributeIndex attributeIndex = entryContainer.getAttributeIndex(attrType);
IndexingOptions options = attributeIndex.getIndexingOptions();
MatchingRuleIndex index = mapEntry.getValue();
processAttribute(index, entry, entryID, options, key);
}
}
}
/**
* Return the number of entries processed by the rebuild manager.
*
* @return The number of entries processed.
*/
long getEntriesProcessed()
{
return this.entriesProcessed.get();
}
/**
* Return the total number of entries to process by the rebuild manager.
*
* @return The total number for entries to process.
*/
long getTotalEntries()
{
return this.totalEntries;
}
}
/**
* This class reports progress of rebuild index processing at fixed intervals.
*/
private class RebuildFirstPhaseProgressTask extends TimerTask
{
/**
* The number of records that had been processed at the time of the previous
* progress report.
*/
private long previousProcessed;
/** The time in milliseconds of the previous progress report. */
private long previousTime;
/**
* Create a new rebuild index progress task.
*
* @throws StorageRuntimeException
* If an error occurred while accessing the database.
*/
public RebuildFirstPhaseProgressTask() throws StorageRuntimeException
{
previousTime = System.currentTimeMillis();
}
/**
* The action to be performed by this timer task.
*/
@Override
public void run()
{
long latestTime = System.currentTimeMillis();
long deltaTime = latestTime - previousTime;
if (deltaTime == 0)
{
return;
}
long entriesProcessed = rebuildManager.getEntriesProcessed();
long deltaCount = entriesProcessed - previousProcessed;
float rate = 1000f * deltaCount / deltaTime;
float completed = 0;
if (rebuildManager.getTotalEntries() > 0)
{
completed = 100f * entriesProcessed / rebuildManager.getTotalEntries();
}
logger.info(NOTE_REBUILD_PROGRESS_REPORT, completed, entriesProcessed, rebuildManager.getTotalEntries(), rate);
previousProcessed = entriesProcessed;
previousTime = latestTime;
}
}
/**
* This class reports progress of first phase of import processing at fixed
* intervals.
*/
private final class FirstPhaseProgressTask extends TimerTask
{
/**
* The number of entries that had been read at the time of the previous
* progress report.
*/
private long previousCount;
/** The time in milliseconds of the previous progress report. */
private long previousTime;
/** Create a new import progress task. */
public FirstPhaseProgressTask()
{
previousTime = System.currentTimeMillis();
}
/** The action to be performed by this timer task. */
@Override
public void run()
{
long entriesRead = reader.getEntriesRead();
long entriesIgnored = reader.getEntriesIgnored();
long entriesRejected = reader.getEntriesRejected();
long deltaCount = entriesRead - previousCount;
long latestTime = System.currentTimeMillis();
long deltaTime = latestTime - previousTime;
if (deltaTime == 0)
{
return;
}
float rate = 1000f * deltaCount / deltaTime;
logger.info(NOTE_IMPORT_PROGRESS_REPORT, entriesRead, entriesIgnored, entriesRejected, rate);
previousCount = entriesRead;
previousTime = latestTime;
}
}
/**
* This class reports progress of the second phase of import processing at
* fixed intervals.
*/
private class SecondPhaseProgressTask extends TimerTask
{
/** The time in milliseconds of the previous progress report. */
private long previousTime;
/** Create a new import progress task. */
public SecondPhaseProgressTask()
{
previousTime = System.currentTimeMillis();
}
/** The action to be performed by this timer task. */
@Override
public void run()
{
long latestTime = System.currentTimeMillis();
long deltaTime = latestTime - previousTime;
if (deltaTime == 0)
{
return;
}
previousTime = latestTime;
//Do DN index managers first.
for (IndexManager indexMgrDN : DNIndexMgrList)
{
indexMgrDN.printStats(deltaTime);
}
//Do non-DN index managers.
for (IndexManager indexMgr : indexMgrList)
{
indexMgr.printStats(deltaTime);
}
}
}
/**
* A class to hold information about the entry determined by the LDIF reader.
* Mainly the suffix the entry belongs under and the ID assigned to it by the
* reader.
*/
public class EntryInformation
{
private EntryID entryID;
private Suffix suffix;
/**
* Return the suffix associated with the entry.
*
* @return Entry's suffix instance;
*/
private Suffix getSuffix()
{
return suffix;
}
/**
* Set the suffix instance associated with the entry.
*
* @param suffix
* The suffix associated with the entry.
*/
public void setSuffix(Suffix suffix)
{
this.suffix = suffix;
}
/**
* Set the entry's ID.
*
* @param entryID
* The entry ID to set the entry ID to.
*/
public void setEntryID(EntryID entryID)
{
this.entryID = entryID;
}
/**
* Return the entry ID associated with the entry.
*
* @return The entry ID associated with the entry.
*/
private EntryID getEntryID()
{
return entryID;
}
}
/**
* This class is used as an index key for hash maps that need to process multiple suffix index
* elements into a single queue and/or maps based on both attribute type and index ID (ie.,
* cn.equality, sn.equality,...).
*/
public static class IndexKey
{
private final AttributeType attributeType;
private final String indexID;
private final int entryLimit;
/**
* Create index key instance using the specified attribute type, index ID and index entry
* limit.
*
* @param attributeType
* The attribute type.
* @param indexID
* The index ID taken from the matching rule's indexer.
* @param entryLimit
* The entry limit for the index.
*/
private IndexKey(AttributeType attributeType, String indexID, int entryLimit)
{
this.attributeType = attributeType;
this.indexID = indexID;
this.entryLimit = entryLimit;
}
/**
* An equals method that uses both the attribute type and the index ID. Only returns
* {@code true} if the attribute type and index ID are equal.
*
* @param obj
* the object to compare.
* @return {@code true} if the objects are equal, or {@code false} if they are not.
*/
@Override
public boolean equals(Object obj)
{
if (obj instanceof IndexKey)
{
IndexKey oKey = (IndexKey) obj;
if (attributeType.equals(oKey.getAttributeType())
&& indexID.equals(oKey.getIndexID()))
{
return true;
}
}
return false;
}
/**
* A hash code method that adds the hash codes of the attribute type and index ID and returns
* that value.
*
* @return The combined hash values of attribute type hash code and the index ID hash code.
*/
@Override
public int hashCode()
{
return attributeType.hashCode() + indexID.hashCode();
}
/**
* Return the attribute type.
*
* @return The attribute type.
*/
private AttributeType getAttributeType()
{
return attributeType;
}
/**
* Return the index ID.
*
* @return The index ID.
*/
private String getIndexID()
{
return indexID;
}
/**
* Return the index key name, which is the attribute type primary name, a period, and the index
* ID name. Used for building file names and progress output.
*
* @return The index key name.
*/
private String getName()
{
return attributeType.getPrimaryName() + "." + indexID;
}
/**
* Return the entry limit associated with the index.
*
* @return The entry limit.
*/
private int getEntryLimit()
{
return entryLimit;
}
/** {@inheritDoc} */
@Override
public String toString()
{
return getClass().getSimpleName()
+ "(index=" + attributeType.getNameOrOID() + "." + indexID
+ ", entryLimit=" + entryLimit
+ ")";
}
}
/**
* This interface is used by those suffix instance to do parental checking of the DN cache.
* <p>
* It will be shared when multiple suffixes are being processed.
*/
public static interface DNCache extends Closeable
{
/**
* Insert the specified DN into the DN cache. It will return {@code true} if the DN does not
* already exist in the cache and was inserted, or {@code false} if the DN exists already in the
* cache.
*
* @param dn
* The DN to insert in the cache.
* @return {@code true} if the DN was inserted in the cache, or {@code false} if the DN exists
* in the cache already and could not be inserted.
* @throws StorageRuntimeException
* If an error occurs accessing the database.
*/
boolean insert(DN dn);
/**
* Returns whether the specified DN is contained in the DN cache.
*
* @param dn
* The DN to check the presence of.
* @return {@code true} if the cache contains the DN, or {@code false} if it
* is not.
* @throws StorageRuntimeException
* If an error occurs reading the database.
*/
boolean contains(DN dn) throws StorageRuntimeException;
/**
* Shuts the DN cache down.
*
* @throws StorageRuntimeException
* If error occurs.
*/
@Override
void close();
}
/** Invocation handler for the {@link PluggableBackendCfg} proxy. */
private static final class BackendCfgHandler implements InvocationHandler
{
private final Map<String, Object> returnValues;
private BackendCfgHandler(final Map<String, Object> returnValues)
{
this.returnValues = returnValues;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
{
final String methodName = method.getName();
if ((methodName.startsWith("add") || methodName.startsWith("remove")) && methodName.endsWith("ChangeListener"))
{
// ignore calls to (add|remove)*ChangeListener() methods
return null;
}
final Object returnValue = returnValues.get(methodName);
if (returnValue != null)
{
return returnValue;
}
throw new IllegalArgumentException("Unhandled method call on proxy ("
+ BackendCfgHandler.class.getSimpleName()
+ ") for method (" + method
+ ") with arguments (" + Arrays.toString(args) + ")");
}
}
/**
* Used to check DN's when DN validation is performed during phase one processing.
* It is deleted after phase one processing.
*/
private final class DNCacheImpl implements DNCache
{
private static final String DB_NAME = "dn_cache";
private final TreeName dnCache = new TreeName("", DB_NAME);
private final Storage storage;
private DNCacheImpl(File dnCachePath) throws StorageRuntimeException
{
final Map<String, Object> returnValues = new HashMap<>();
returnValues.put("getDBDirectory", dnCachePath.getAbsolutePath());
returnValues.put("getBackendId", DB_NAME);
returnValues.put("getDBCacheSize", 0L);
returnValues.put("getDBCachePercent", 10);
returnValues.put("isDBTxnNoSync", true);
returnValues.put("getDBDirectoryPermissions", "700");
returnValues.put("getDiskLowThreshold", Long.valueOf(200 * MB));
returnValues.put("getDiskFullThreshold", Long.valueOf(100 * MB));
try
{
returnValues.put("dn", DN.valueOf("ds-cfg-backend-id=importDNCache,cn=Backends,cn=config"));
storage = new PersistItStorage(newPersistitBackendCfgProxy(returnValues),
DirectoryServer.getInstance().getServerContext());
storage.open();
storage.write(new WriteOperation()
{
@Override
public void run(WriteableTransaction txn) throws Exception
{
txn.openTree(dnCache);
}
});
}
catch (Exception e)
{
throw new StorageRuntimeException(e);
}
}
private PersistitBackendCfg newPersistitBackendCfgProxy(Map<String, Object> returnValues)
{
return (PersistitBackendCfg) Proxy.newProxyInstance(
getClass().getClassLoader(),
new Class<?>[] { PersistitBackendCfg.class },
new BackendCfgHandler(returnValues));
}
private static final long FNV_INIT = 0xcbf29ce484222325L;
private static final long FNV_PRIME = 0x100000001b3L;
/** Hash the DN bytes. Uses the FNV-1a hash. */
private ByteString fnv1AHashCode(ByteString b)
{
long hash = FNV_INIT;
for (int i = 0; i < b.length(); i++)
{
hash ^= b.byteAt(i);
hash *= FNV_PRIME;
}
return ByteString.valueOf(hash);
}
@Override
public void close() throws StorageRuntimeException
{
try
{
storage.close();
}
finally
{
storage.removeStorageFiles();
}
}
@Override
public boolean insert(DN dn) throws StorageRuntimeException
{
// Use a compact representation for key
// and a reversible representation for value
final ByteString key = fnv1AHashCode(dn.toNormalizedByteString());
final ByteStringBuilder dnValue = new ByteStringBuilder().append(dn.toString());
return insert(key, dnValue);
}
private boolean insert(final ByteString key, final ByteStringBuilder dn) throws StorageRuntimeException
{
final AtomicBoolean updateResult = new AtomicBoolean();
try
{
storage.write(new WriteOperation()
{
@Override
public void run(WriteableTransaction txn) throws Exception
{
updateResult.set(txn.update(dnCache, key, new UpdateFunction()
{
@Override
public ByteSequence computeNewValue(ByteSequence existingDns)
{
if (containsDN(existingDns, dn))
{
// no change
return existingDns;
}
else if (existingDns != null)
{
return addDN(existingDns, dn);
}
else
{
return singletonList(dn);
}
}
/** Add the DN to the DNs because of a hash collision. */
private ByteSequence addDN(final ByteSequence dnList, final ByteSequence dntoAdd)
{
final ByteStringBuilder builder = new ByteStringBuilder(dnList.length() + INT_SIZE + dntoAdd.length());
builder.append(dnList);
builder.append(dntoAdd.length());
builder.append(dntoAdd);
return builder;
}
/** Create a list of dn made of one element. */
private ByteSequence singletonList(final ByteSequence dntoAdd)
{
final ByteStringBuilder singleton = new ByteStringBuilder(dntoAdd.length() + INT_SIZE);
singleton.append(dntoAdd.length());
singleton.append(dntoAdd);
return singleton;
}
}));
}
});
}
catch (StorageRuntimeException e)
{
throw e;
}
catch (Exception e)
{
throw new StorageRuntimeException(e);
}
return updateResult.get();
}
/** Return true if the specified DN is in the DNs saved as a result of hash collisions. */
private boolean containsDN(ByteSequence existingDns, ByteStringBuilder dn)
{
if (existingDns != null && existingDns.length() > 0)
{
// TODO JNR remove call to toByteArray() on next line?
final byte[] existingDnsBytes = existingDns.toByteArray();
final ByteSequenceReader reader = existingDns.asReader();
int previousPos = 0;
while (reader.remaining() != 0)
{
int pLen = INT_SIZE;
int len = reader.getInt();
ImportRecord r1 = ImportRecord.from(ByteString.wrap(existingDnsBytes, previousPos + pLen, len), 0);
ImportRecord r2 = ImportRecord.from(dn, 0);
if (r1.equals(r2))
{
return true;
}
reader.skip(len);
previousPos = reader.position();
}
}
return false;
}
@Override
public boolean contains(final DN dn)
{
try
{
return storage.read(new ReadOperation<Boolean>()
{
@Override
public Boolean run(ReadableTransaction txn) throws Exception
{
final ByteString key = fnv1AHashCode(dn.toNormalizedByteString());
final ByteString existingDns = txn.read(dnCache, key);
if (existingDns != null)
{
final ByteStringBuilder dnBytes = new ByteStringBuilder().append(dn.toString());
return containsDN(existingDns, dnBytes);
}
return false;
}
});
}
catch (StorageRuntimeException e)
{
throw e;
}
catch (Exception e)
{
throw new StorageRuntimeException(e);
}
}
}
}