Importer.java revision 3598
3339N/A/*
3339N/A * CDDL HEADER START
3339N/A *
3339N/A * The contents of this file are subject to the terms of the
3339N/A * Common Development and Distribution License, Version 1.0 only
3339N/A * (the "License"). You may not use this file except in compliance
3339N/A * with the License.
3339N/A *
3339N/A * You can obtain a copy of the license at
3339N/A * trunk/opends/resource/legal-notices/OpenDS.LICENSE
3339N/A * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
3339N/A * See the License for the specific language governing permissions
3339N/A * and limitations under the License.
3339N/A *
3339N/A * When distributing Covered Code, include this CDDL HEADER in each
3339N/A * file and include the License file at
3339N/A * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
3339N/A * add the following below this CDDL HEADER, with the fields enclosed
3339N/A * by brackets "[]" replaced with your own identifying information:
3339N/A * Portions Copyright [yyyy] [name of copyright owner]
3339N/A *
3339N/A * CDDL HEADER END
3339N/A *
3339N/A *
3339N/A * Copyright 2008 Sun Microsystems, Inc.
3339N/A */
3339N/A
3339N/Apackage org.opends.server.backends.jeb.importLDIF;
3339N/A
3339N/Aimport org.opends.server.types.*;
3339N/Aimport org.opends.server.loggers.debug.DebugTracer;
3339N/Aimport static org.opends.server.loggers.debug.DebugLogger.getTracer;
3339N/Aimport static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
3339N/Aimport static org.opends.server.loggers.ErrorLogger.logError;
3339N/Aimport org.opends.server.admin.std.server.LocalDBBackendCfg;
3339N/Aimport org.opends.server.util.LDIFReader;
3339N/Aimport org.opends.server.util.StaticUtils;
3339N/Aimport org.opends.server.util.LDIFException;
3339N/Aimport org.opends.server.util.RuntimeInformation;
3339N/Aimport static org.opends.server.util.DynamicConstants.BUILD_ID;
3339N/Aimport static org.opends.server.util.DynamicConstants.REVISION_NUMBER;
3339N/Aimport org.opends.server.config.ConfigException;
3339N/Aimport org.opends.server.core.DirectoryServer;
3339N/Aimport org.opends.server.backends.jeb.*;
3339N/Aimport org.opends.server.protocols.asn1.ASN1OctetString;
3339N/Aimport org.opends.messages.Message;
3339N/Aimport org.opends.messages.JebMessages;
3339N/Aimport static org.opends.messages.JebMessages.*;
3339N/Aimport java.util.concurrent.CopyOnWriteArrayList;
3339N/Aimport java.util.concurrent.LinkedBlockingQueue;
3339N/Aimport java.util.concurrent.TimeUnit;
3339N/Aimport java.util.*;
3339N/Aimport java.io.IOException;
3339N/A
3339N/Aimport com.sleepycat.je.*;
3339N/A
3339N/A/**
3339N/A * Performs a LDIF import.
3339N/A */
3339N/A
3339N/Apublic class Importer implements Thread.UncaughtExceptionHandler {
3339N/A
3339N/A
3339N/A /**
3339N/A * The tracer object for the debug logger.
3339N/A */
3339N/A private static final DebugTracer TRACER = getTracer();
3339N/A
3339N/A /**
3339N/A * The JE backend configuration.
3339N/A */
3339N/A private LocalDBBackendCfg config;
3339N/A
3339N/A /**
3339N/A * The root container used for this import job.
3339N/A */
3339N/A private RootContainer rootContainer;
3339N/A
3339N/A /**
3339N/A * The LDIF import configuration.
3339N/A */
3339N/A private LDIFImportConfig ldifImportConfig;
3339N/A
3339N/A /**
3339N/A * The LDIF reader.
3339N/A */
3339N/A private LDIFReader reader;
3339N/A
3339N/A /**
3339N/A * Map of base DNs to their import context.
3339N/A */
3339N/A private LinkedHashMap<DN, DNContext> importMap =
3339N/A new LinkedHashMap<DN, DNContext>();
3339N/A
3339N/A
3339N/A /**
3339N/A * The number of entries migrated.
3339N/A */
3339N/A private int migratedCount;
3339N/A
3339N/A /**
3339N/A * The number of entries imported.
3339N/A */
3339N/A private int importedCount;
3339N/A
3339N/A /**
3339N/A * The number of milliseconds between job progress reports.
3339N/A */
3339N/A private long progressInterval = 10000;
3339N/A
3339N/A /**
3339N/A * The progress report timer.
3339N/A */
3339N/A private Timer timer;
3339N/A
3339N/A //Thread array.
3339N/A private CopyOnWriteArrayList<WorkThread> threads;
3339N/A
3339N/A //Progress task.
3339N/A private ProgressTask pTask;
3339N/A
3339N/A //Number of entries import before checking if cleaning is needed after
3339N/A //eviction has been detected.
3339N/A private static final int entryCleanInterval = 250000;
3339N/A
3339N/A //Minimum buffer amount to give to a buffer manager.
3339N/A private static final long minBuffer = 1024 * 1024;
3339N/A
3339N/A //Total available memory for the buffer managers.
3339N/A private long totalAvailBufferMemory = 0;
3339N/A
3339N/A //Memory size to be used for the DB cache in string format.
3339N/A private String dbCacheSizeStr;
3339N/A
3339N/A //Used to do an initial clean after eviction has been detected.
3339N/A private boolean firstClean=false;
3339N/A
3339N/A //A thread threw an Runtime exception stop the import.
3339N/A private boolean unCaughtExceptionThrown = false;
3339N/A
3598N/A //Set to true if substring indexes are defined.
3598N/A private boolean hasSubIndexes = false;
3598N/A
3339N/A /**
3339N/A * Create a new import job with the specified ldif import config.
3339N/A *
3339N/A * @param ldifImportConfig The LDIF import config.
3598N/A * @param hasSubIndexes <CODE>True</CODE> If substring indexes are defined.
3339N/A */
3598N/A public Importer(LDIFImportConfig ldifImportConfig, boolean hasSubIndexes)
3339N/A {
3339N/A this.ldifImportConfig = ldifImportConfig;
3339N/A this.threads = new CopyOnWriteArrayList<WorkThread>();
3598N/A this.hasSubIndexes = hasSubIndexes;
3339N/A calcMemoryLimits();
3339N/A }
3339N/A
3339N/A /**
3339N/A * Start the worker threads.
3339N/A *
3339N/A * @throws DatabaseException If a DB problem occurs.
3339N/A */
3339N/A private void startWorkerThreads()
3339N/A throws DatabaseException {
3339N/A
3339N/A int importThreadCount = config.getImportThreadCount();
3339N/A //Figure out how much buffer memory to give to each context.
3339N/A int contextCount = importMap.size();
3339N/A long memoryPerContext = totalAvailBufferMemory / contextCount;
3339N/A //Below min, use the min value.
3339N/A if(memoryPerContext < minBuffer) {
3339N/A Message msg =
3363N/A NOTE_JEB_IMPORT_LDIF_BUFFER_CONTEXT_AVAILMEM.get(memoryPerContext,
3339N/A minBuffer);
3339N/A logError(msg);
3339N/A memoryPerContext = minBuffer;
3339N/A }
3339N/A // Create one set of worker threads/buffer managers for each base DN.
3339N/A for (DNContext context : importMap.values()) {
3598N/A BufferManager bufferManager =
3598N/A new BufferManager(memoryPerContext, importThreadCount);
3339N/A context.setBufferManager(bufferManager);
3339N/A for (int i = 0; i < importThreadCount; i++) {
3339N/A WorkThread t = new WorkThread(context.getWorkQueue(), i,
3598N/A bufferManager, rootContainer);
3339N/A t.setUncaughtExceptionHandler(this);
3339N/A threads.add(t);
3339N/A t.start();
3339N/A }
3339N/A }
3339N/A // Start a timer for the progress report.
3339N/A timer = new Timer();
3339N/A TimerTask progressTask = new ProgressTask();
3339N/A //Used to get at extra functionality such as eviction detected.
3339N/A pTask = (ProgressTask) progressTask;
3339N/A timer.scheduleAtFixedRate(progressTask, progressInterval,
3339N/A progressInterval);
3339N/A
3339N/A }
3339N/A
3339N/A
3339N/A /**
3339N/A * Import a ldif using the specified root container.
3339N/A *
3339N/A * @param rootContainer The root container.
3339N/A * @return A LDIF result.
3339N/A * @throws DatabaseException If a DB error occurs.
3339N/A * @throws IOException If a IO error occurs.
3339N/A * @throws org.opends.server.backends.jeb.JebException If a JEB error occurs.
3339N/A * @throws DirectoryException If a directory error occurs.
3339N/A * @throws ConfigException If a configuration has an error.
3339N/A */
3339N/A public LDIFImportResult processImport(RootContainer rootContainer)
3339N/A throws DatabaseException, IOException, JebException, DirectoryException,
3339N/A ConfigException {
3339N/A
3339N/A // Create an LDIF reader. Throws an exception if the file does not exist.
3339N/A reader = new LDIFReader(ldifImportConfig);
3339N/A this.rootContainer = rootContainer;
3339N/A this.config = rootContainer.getConfiguration();
3339N/A
3339N/A Message message;
3339N/A long startTime;
3339N/A try {
3339N/A int importThreadCount = config.getImportThreadCount();
3363N/A message = NOTE_JEB_IMPORT_STARTING.get(DirectoryServer.getVersionString(),
3339N/A BUILD_ID, REVISION_NUMBER);
3339N/A logError(message);
3384N/A message = NOTE_JEB_IMPORT_THREAD_COUNT.get(importThreadCount);
3339N/A logError(message);
3339N/A RuntimeInformation.logInfo();
3339N/A for (EntryContainer entryContainer : rootContainer.getEntryContainers()) {
3339N/A DNContext DNContext = getImportContext(entryContainer);
3339N/A if(DNContext != null) {
3339N/A importMap.put(entryContainer.getBaseDN(), DNContext);
3339N/A }
3339N/A }
3339N/A // Make a note of the time we started.
3339N/A startTime = System.currentTimeMillis();
3339N/A startWorkerThreads();
3339N/A try {
3339N/A importedCount = 0;
3339N/A migratedCount = 0;
3339N/A migrateExistingEntries();
3339N/A processLDIF();
3339N/A migrateExcludedEntries();
3339N/A } finally {
3339N/A if(!unCaughtExceptionThrown) {
3339N/A cleanUp();
3339N/A switchContainers();
3339N/A }
3339N/A }
3339N/A }
3339N/A finally {
3339N/A reader.close();
3339N/A }
3339N/A importProlog(startTime);
3339N/A return new LDIFImportResult(reader.getEntriesRead(),
3339N/A reader.getEntriesRejected(),
3339N/A reader.getEntriesIgnored());
3339N/A }
3339N/A
3339N/A /**
3339N/A * Switch containers if the migrated entries were written to the temporary
3339N/A * container.
3339N/A *
3339N/A * @throws DatabaseException If a DB problem occurs.
3339N/A * @throws JebException If a JEB problem occurs.
3339N/A */
3339N/A private void switchContainers() throws DatabaseException, JebException {
3339N/A
3339N/A for(DNContext importContext : importMap.values()) {
3339N/A DN baseDN = importContext.getBaseDN();
3339N/A EntryContainer srcEntryContainer =
3339N/A importContext.getSrcEntryContainer();
3339N/A if(srcEntryContainer != null) {
3339N/A if (debugEnabled()) {
3339N/A TRACER.debugInfo("Deleteing old entry container for base DN " +
3339N/A "%s and renaming temp entry container", baseDN);
3339N/A }
3339N/A EntryContainer unregEC =
3339N/A rootContainer.unregisterEntryContainer(baseDN);
3339N/A //Make sure the unregistered EC for the base DN is the same as
3339N/A //the one in the import context.
3339N/A if(unregEC != srcEntryContainer) {
3339N/A if(debugEnabled()) {
3339N/A TRACER.debugInfo("Current entry container used for base DN " +
3339N/A "%s is not the same as the source entry container used " +
3339N/A "during the migration process.", baseDN);
3339N/A }
3339N/A rootContainer.registerEntryContainer(baseDN, unregEC);
3339N/A continue;
3339N/A }
3339N/A srcEntryContainer.lock();
3339N/A srcEntryContainer.delete();
3339N/A srcEntryContainer.unlock();
3339N/A EntryContainer newEC = importContext.getEntryContainer();
3339N/A newEC.lock();
3339N/A newEC.setDatabasePrefix(baseDN.toNormalizedString());
3339N/A newEC.unlock();
3339N/A rootContainer.registerEntryContainer(baseDN, newEC);
3339N/A }
3339N/A }
3339N/A }
3339N/A
3339N/A /**
3339N/A * Create and log messages at the end of the successful import.
3339N/A *
3339N/A * @param startTime The time the import started.
3339N/A */
3339N/A private void importProlog(long startTime) {
3339N/A Message message;
3339N/A long finishTime = System.currentTimeMillis();
3339N/A long importTime = (finishTime - startTime);
3339N/A
3339N/A float rate = 0;
3339N/A if (importTime > 0)
3339N/A {
3339N/A rate = 1000f*importedCount / importTime;
3339N/A }
3339N/A
3363N/A message = NOTE_JEB_IMPORT_FINAL_STATUS.
3339N/A get(reader.getEntriesRead(), importedCount,
3339N/A reader.getEntriesIgnored(), reader.getEntriesRejected(),
3339N/A migratedCount, importTime/1000, rate);
3339N/A logError(message);
3339N/A
3363N/A message = NOTE_JEB_IMPORT_ENTRY_LIMIT_EXCEEDED_COUNT.get(
3339N/A getEntryLimitExceededCount());
3339N/A logError(message);
3339N/A
3339N/A }
3339N/A
3339N/A
3339N/A /**
3339N/A * Run the cleaner if it is needed.
3339N/A *
3339N/A * @param entriesRead The number of entries read so far.
3339N/A * @param evictEntryNumber The number of entries to run the cleaner after
3339N/A * being read.
3339N/A * @throws DatabaseException If a DB problem occurs.
3339N/A */
3339N/A private void
3339N/A runCleanerIfNeeded(long entriesRead, long evictEntryNumber)
3339N/A throws DatabaseException {
3339N/A if(!firstClean || (entriesRead % evictEntryNumber) == 0) {
3339N/A //Make sure work queue is empty before starting.
3339N/A drainWorkQueue();
3363N/A Message msg = NOTE_JEB_IMPORT_LDIF_CLEAN.get();
3339N/A runCleaner(msg);
3339N/A if(!firstClean) {
3339N/A firstClean=true;
3339N/A }
3339N/A }
3339N/A }
3339N/A
3339N/A /**
3339N/A * Run the cleaner, pausing the task thread output.
3339N/A *
3339N/A * @param header Message to be printed before cleaning.
3339N/A * @throws DatabaseException If a DB problem occurs.
3339N/A */
3339N/A private void runCleaner(Message header) throws DatabaseException {
3339N/A Message msg;
3339N/A long startTime = System.currentTimeMillis();
3339N/A //Need to force a checkpoint.
3339N/A rootContainer.importForceCheckPoint();
3339N/A logError(header);
3339N/A pTask.setPause(true);
3339N/A //Actually clean the files.
3339N/A int cleaned = rootContainer.cleanedLogFiles();
3339N/A //This checkpoint removes the files if any were cleaned.
3339N/A if(cleaned > 0) {
3363N/A msg = NOTE_JEB_IMPORT_LDIF_CLEANER_REMOVE_LOGS.get(cleaned);
3339N/A logError(msg);
3339N/A rootContainer.importForceCheckPoint();
3339N/A }
3339N/A pTask.setPause(false);
3339N/A long finishTime = System.currentTimeMillis();
3339N/A long cleanTime = (finishTime - startTime) / 1000;
3363N/A msg = NOTE_JEB_IMPORT_LDIF_CLEANER_RUN_DONE.get(cleanTime, cleaned);
3339N/A logError(msg);
3339N/A }
3339N/A
3339N/A /**
3339N/A * Process a LDIF reader.
3339N/A *
3339N/A * @throws JebException If a JEB problem occurs.
3339N/A * @throws DatabaseException If a DB problem occurs.
3339N/A * @throws IOException If an IO exception occurs.
3339N/A */
3339N/A private void
3339N/A processLDIF() throws JebException, DatabaseException, IOException {
3363N/A Message message = NOTE_JEB_IMPORT_LDIF_START.get();
3339N/A logError(message);
3339N/A do {
3339N/A if (ldifImportConfig.isCancelled()) {
3339N/A break;
3339N/A }
3339N/A if(threads.size() <= 0) {
3339N/A message = ERR_JEB_IMPORT_NO_WORKER_THREADS.get();
3339N/A throw new JebException(message);
3339N/A }
3339N/A if(unCaughtExceptionThrown) {
3339N/A abortImport();
3339N/A }
3339N/A try {
3339N/A // Read the next entry.
3339N/A Entry entry = reader.readEntry();
3339N/A // Check for end of file.
3339N/A if (entry == null) {
3363N/A message = NOTE_JEB_IMPORT_LDIF_END.get();
3339N/A logError(message);
3339N/A
3339N/A break;
3339N/A }
3339N/A // Route it according to base DN.
3339N/A DNContext DNContext = getImportConfig(entry.getDN());
3339N/A processEntry(DNContext, entry);
3339N/A //If the progress task has noticed eviction proceeding, start running
3339N/A //the cleaner.
3339N/A if(pTask.isEvicting()) {
3339N/A runCleanerIfNeeded(reader.getEntriesRead(), entryCleanInterval);
3339N/A }
3339N/A } catch (LDIFException e) {
3339N/A if (debugEnabled()) {
3339N/A TRACER.debugCaught(DebugLogLevel.ERROR, e);
3339N/A }
3339N/A } catch (DirectoryException e) {
3339N/A if (debugEnabled()) {
3339N/A TRACER.debugCaught(DebugLogLevel.ERROR, e);
3339N/A }
3339N/A } catch (DatabaseException e) {
3339N/A if (debugEnabled()) {
3339N/A TRACER.debugCaught(DebugLogLevel.ERROR, e);
3339N/A }
3339N/A }
3339N/A } while (true);
3339N/A }
3339N/A
3339N/A /**
3339N/A * Process an entry using the specified import context.
3339N/A *
3339N/A * @param DNContext The import context.
3339N/A * @param entry The entry to process.
3339N/A */
3339N/A private void processEntry(DNContext DNContext, Entry entry) {
3339N/A //Add this DN to the pending map.
3339N/A DNContext.addPending(entry.getDN());
3339N/A addEntryQueue(DNContext, entry);
3339N/A }
3339N/A
3339N/A /**
3339N/A * Add work item to specified import context's queue.
3339N/A * @param context The import context.
3339N/A * @param item The work item to add.
3339N/A * @return <CODE>True</CODE> if the the work item was added to the queue.
3339N/A */
3339N/A private boolean
3339N/A addQueue(DNContext context, WorkElement item) {
3339N/A try {
3339N/A while(!context.getWorkQueue().offer(item, 1000,
3339N/A TimeUnit.MILLISECONDS)) {
3339N/A if(threads.size() <= 0) {
3339N/A // All worker threads died. We must stop now.
3339N/A return false;
3339N/A }
3339N/A }
3339N/A } catch (InterruptedException e) {
3339N/A if (debugEnabled()) {
3339N/A TRACER.debugCaught(DebugLogLevel.ERROR, e);
3339N/A }
3339N/A }
3339N/A return true;
3339N/A }
3339N/A
3339N/A
3339N/A /**
3339N/A * Wait until the work queue is empty.
3339N/A */
3339N/A private void drainWorkQueue() {
3339N/A if(threads.size() > 0) {
3339N/A for (DNContext context : importMap.values()) {
3339N/A while (context.getWorkQueue().size() > 0) {
3339N/A try {
3339N/A Thread.sleep(100);
3339N/A } catch (Exception e) {
3339N/A // No action needed.
3339N/A }
3339N/A }
3339N/A }
3339N/A }
3339N/A }
3339N/A
3339N/A private void abortImport() throws JebException {
3339N/A //Stop work threads telling them to skip substring flush.
3430N/A stopWorkThreads(false);
3339N/A timer.cancel();
3339N/A Message message = ERR_JEB_IMPORT_LDIF_ABORT.get();
3339N/A throw new JebException(message);
3339N/A }
3339N/A
3339N/A /**
3339N/A * Stop work threads.
3339N/A *
3339N/A * @param abort <CODE>True</CODE> if stop work threads was called from an
3339N/A * abort.
3339N/A * @throws JebException if a Jeb error occurs.
3339N/A */
3339N/A private void
3430N/A stopWorkThreads(boolean abort) throws JebException {
3339N/A for (WorkThread t : threads) {
3339N/A t.stopProcessing();
3339N/A }
3339N/A // Wait for each thread to stop.
3339N/A for (WorkThread t : threads) {
3339N/A try {
3339N/A if(!abort && unCaughtExceptionThrown) {
3339N/A timer.cancel();
3339N/A Message message = ERR_JEB_IMPORT_LDIF_ABORT.get();
3339N/A throw new JebException(message);
3339N/A }
3339N/A t.join();
3339N/A importedCount += t.getImportedCount();
3339N/A } catch (InterruptedException ie) {
3339N/A // No action needed?
3339N/A }
3339N/A }
3339N/A }
3339N/A
3339N/A /**
3339N/A * Clean up after a successful import.
3339N/A *
3339N/A * @throws DatabaseException If a DB error occurs.
3339N/A * @throws JebException If a Jeb error occurs.
3339N/A */
3339N/A private void cleanUp() throws DatabaseException, JebException {
3339N/A Message msg;
3339N/A //Drain the work queue.
3339N/A drainWorkQueue();
3430N/A pTask.setPause(true);
3430N/A long startTime = System.currentTimeMillis();
3430N/A stopWorkThreads(true);
3430N/A //Flush the buffer managers.
3339N/A for(DNContext context : importMap.values()) {
3339N/A context.getBufferManager().prepareFlush();
3430N/A context.getBufferManager().flushAll();
3339N/A }
3339N/A long finishTime = System.currentTimeMillis();
3339N/A long flushTime = (finishTime - startTime) / 1000;
3363N/A msg = NOTE_JEB_IMPORT_LDIF_BUFFER_FLUSH_COMPLETED.get(flushTime);
3339N/A logError(msg);
3339N/A timer.cancel();
3339N/A for(DNContext context : importMap.values()) {
3339N/A context.setIndexesTrusted();
3339N/A }
3363N/A msg = NOTE_JEB_IMPORT_LDIF_FINAL_CLEAN.get();
3339N/A //Run the cleaner.
3339N/A runCleaner(msg);
3339N/A }
3339N/A
3339N/A /**
3339N/A * Uncaught exception handler.
3339N/A *
3339N/A * @param t The thread working when the exception was thrown.
3339N/A * @param e The exception.
3339N/A */
3339N/A public void uncaughtException(Thread t, Throwable e) {
3339N/A unCaughtExceptionThrown = true;
3339N/A threads.remove(t);
3339N/A Message msg = ERR_JEB_IMPORT_THREAD_EXCEPTION.get(
3339N/A t.getName(), StaticUtils.stackTraceToSingleLineString(e.getCause()));
3339N/A logError(msg);
3339N/A }
3339N/A
3339N/A /**
3339N/A * Get the entry limit exceeded counts from the indexes.
3339N/A *
3339N/A * @return Count of the index with entry limit exceeded values.
3339N/A */
3339N/A private int getEntryLimitExceededCount() {
3339N/A int count = 0;
3339N/A for (DNContext ic : importMap.values())
3339N/A {
3339N/A count += ic.getEntryContainer().getEntryLimitExceededCount();
3339N/A }
3339N/A return count;
3339N/A }
3339N/A
3339N/A /**
3339N/A * Return an import context related to the specified DN.
3339N/A * @param dn The dn.
3339N/A * @return An import context.
3339N/A * @throws DirectoryException If an directory error occurs.
3339N/A */
3339N/A private DNContext getImportConfig(DN dn) throws DirectoryException {
3339N/A DNContext DNContext = null;
3339N/A DN nodeDN = dn;
3339N/A
3339N/A while (DNContext == null && nodeDN != null) {
3339N/A DNContext = importMap.get(nodeDN);
3339N/A if (DNContext == null)
3339N/A {
3339N/A nodeDN = nodeDN.getParentDNInSuffix();
3339N/A }
3339N/A }
3339N/A
3339N/A if (nodeDN == null) {
3339N/A // The entry should not have been given to this backend.
3339N/A Message message =
3339N/A JebMessages.ERR_JEB_INCORRECT_ROUTING.get(String.valueOf(dn));
3339N/A throw new DirectoryException(ResultCode.NO_SUCH_OBJECT, message);
3339N/A }
3339N/A
3339N/A return DNContext;
3339N/A }
3339N/A
3339N/A /**
3339N/A * Creates an import context for the specified entry container.
3339N/A *
3339N/A * @param entryContainer The entry container.
3339N/A * @return Import context to use during import.
3339N/A * @throws DatabaseException If a database error occurs.
3339N/A * @throws JebException If a JEB error occurs.
3339N/A * @throws ConfigException If a configuration contains error.
3339N/A */
3339N/A private DNContext getImportContext(EntryContainer entryContainer)
3339N/A throws DatabaseException, JebException, ConfigException {
3339N/A DN baseDN = entryContainer.getBaseDN();
3339N/A EntryContainer srcEntryContainer = null;
3339N/A List<DN> includeBranches = new ArrayList<DN>();
3339N/A List<DN> excludeBranches = new ArrayList<DN>();
3339N/A
3339N/A if(!ldifImportConfig.appendToExistingData() &&
3339N/A !ldifImportConfig.clearBackend())
3339N/A {
3339N/A for(DN dn : ldifImportConfig.getExcludeBranches())
3339N/A {
3339N/A if(baseDN.equals(dn))
3339N/A {
3339N/A // This entire base DN was explicitly excluded. Skip.
3339N/A return null;
3339N/A }
3339N/A if(baseDN.isAncestorOf(dn))
3339N/A {
3339N/A excludeBranches.add(dn);
3339N/A }
3339N/A }
3339N/A
3339N/A if(!ldifImportConfig.getIncludeBranches().isEmpty())
3339N/A {
3339N/A for(DN dn : ldifImportConfig.getIncludeBranches())
3339N/A {
3339N/A if(baseDN.isAncestorOf(dn))
3339N/A {
3339N/A includeBranches.add(dn);
3339N/A }
3339N/A }
3339N/A
3339N/A if(includeBranches.isEmpty())
3339N/A {
3339N/A // There are no branches in the explicitly defined include list under
3339N/A // this base DN. Skip this base DN alltogether.
3339N/A
3339N/A return null;
3339N/A }
3339N/A
3339N/A // Remove any overlapping include branches.
3339N/A Iterator<DN> includeBranchIterator = includeBranches.iterator();
3339N/A while(includeBranchIterator.hasNext())
3339N/A {
3339N/A DN includeDN = includeBranchIterator.next();
3339N/A boolean keep = true;
3339N/A for(DN dn : includeBranches)
3339N/A {
3339N/A if(!dn.equals(includeDN) && dn.isAncestorOf(includeDN))
3339N/A {
3339N/A keep = false;
3339N/A break;
3339N/A }
3339N/A }
3339N/A if(!keep)
3339N/A {
3339N/A includeBranchIterator.remove();
3339N/A }
3339N/A }
3339N/A
3339N/A // Remvoe any exclude branches that are not are not under a include
3339N/A // branch since they will be migrated as part of the existing entries
3339N/A // outside of the include branches anyways.
3339N/A Iterator<DN> excludeBranchIterator = excludeBranches.iterator();
3339N/A while(excludeBranchIterator.hasNext())
3339N/A {
3339N/A DN excludeDN = excludeBranchIterator.next();
3339N/A boolean keep = false;
3339N/A for(DN includeDN : includeBranches)
3339N/A {
3339N/A if(includeDN.isAncestorOf(excludeDN))
3339N/A {
3339N/A keep = true;
3339N/A break;
3339N/A }
3339N/A }
3339N/A if(!keep)
3339N/A {
3339N/A excludeBranchIterator.remove();
3339N/A }
3339N/A }
3339N/A
3339N/A if(includeBranches.size() == 1 && excludeBranches.size() == 0 &&
3339N/A includeBranches.get(0).equals(baseDN))
3339N/A {
3339N/A // This entire base DN is explicitly included in the import with
3339N/A // no exclude branches that we need to migrate. Just clear the entry
3339N/A // container.
3339N/A entryContainer.lock();
3339N/A entryContainer.clear();
3339N/A entryContainer.unlock();
3339N/A }
3339N/A else
3339N/A {
3339N/A // Create a temp entry container
3339N/A srcEntryContainer = entryContainer;
3339N/A entryContainer =
3339N/A rootContainer.openEntryContainer(baseDN,
3339N/A baseDN.toNormalizedString() +
3339N/A "_importTmp");
3339N/A }
3339N/A }
3339N/A }
3339N/A
3339N/A // Create an import context.
3339N/A DNContext DNContext = new DNContext();
3339N/A DNContext.setConfig(config);
3339N/A DNContext.setLDIFImportConfig(this.ldifImportConfig);
3339N/A DNContext.setLDIFReader(reader);
3339N/A
3339N/A DNContext.setBaseDN(baseDN);
3339N/A DNContext.setEntryContainer(entryContainer);
3339N/A DNContext.setSrcEntryContainer(srcEntryContainer);
3339N/A
3339N/A //Create queue.
3339N/A LinkedBlockingQueue<WorkElement> works =
3339N/A new LinkedBlockingQueue<WorkElement>
3339N/A (config.getImportQueueSize());
3339N/A DNContext.setWorkQueue(works);
3339N/A
3339N/A // Set the include and exclude branches
3339N/A DNContext.setIncludeBranches(includeBranches);
3339N/A DNContext.setExcludeBranches(excludeBranches);
3339N/A
3339N/A return DNContext;
3339N/A }
3339N/A
3339N/A /**
3339N/A * Add specified context and entry to the work queue.
3339N/A *
3339N/A * @param context The context related to the entry DN.
3339N/A * @param entry The entry to work on.
3339N/A * @return <CODE>True</CODE> if the element was added to the work queue.
3339N/A */
3339N/A private boolean
3339N/A addEntryQueue(DNContext context, Entry entry) {
3339N/A WorkElement element =
3339N/A WorkElement.decode(entry, context);
3339N/A return addQueue(context, element);
3339N/A }
3339N/A
3339N/A /**
3339N/A * Calculate the memory usage for the substring buffer and the DB cache.
3339N/A */
3339N/A private void calcMemoryLimits() {
3339N/A Message msg;
3339N/A Runtime runtime = Runtime.getRuntime();
3339N/A long freeMemory = runtime.freeMemory();
3339N/A long maxMemory = runtime.maxMemory();
3339N/A long totMemory = runtime.totalMemory();
3339N/A long totFreeMemory = (freeMemory + (maxMemory - totMemory));
3598N/A long dbCacheLimit = (totFreeMemory * 45) / 100;
3598N/A //If there are now substring indexes defined, set the DB cache
3598N/A //size to 60% and take a minimal substring buffer.
3598N/A if(!hasSubIndexes) {
3598N/A dbCacheLimit = (totFreeMemory * 60) / 100;
3598N/A }
3339N/A dbCacheSizeStr = Long.toString(dbCacheLimit);
3339N/A totalAvailBufferMemory = (totFreeMemory * 10) / 100;
3339N/A if(totalAvailBufferMemory < (10 * minBuffer)) {
3339N/A msg =
3363N/A NOTE_JEB_IMPORT_LDIF_BUFFER_TOT_AVAILMEM.get(totalAvailBufferMemory,
3339N/A (10 * minBuffer));
3339N/A logError(msg);
3339N/A totalAvailBufferMemory = (10 * minBuffer);
3598N/A } else if(!hasSubIndexes) {
3598N/A totalAvailBufferMemory = (10 * minBuffer);
3339N/A }
3363N/A msg=NOTE_JEB_IMPORT_LDIF_MEMORY_INFO.get(dbCacheLimit,
3339N/A totalAvailBufferMemory);
3339N/A logError(msg);
3339N/A }
3339N/A
3339N/A /**
3339N/A * Return the string representation of the DB cache size.
3339N/A *
3339N/A * @return DB cache size string.
3339N/A */
3339N/A public String getDBCacheSize() {
3339N/A return dbCacheSizeStr;
3339N/A }
3339N/A
3339N/A /**
3339N/A * Migrate any existing entries.
3339N/A *
3339N/A * @throws JebException If a JEB error occurs.
3339N/A * @throws DatabaseException If a DB error occurs.
3339N/A * @throws DirectoryException If a directory error occurs.
3339N/A */
3339N/A private void migrateExistingEntries()
3339N/A throws JebException, DatabaseException, DirectoryException {
3339N/A for(DNContext context : importMap.values()) {
3339N/A EntryContainer srcEntryContainer = context.getSrcEntryContainer();
3339N/A if(srcEntryContainer != null &&
3339N/A !context.getIncludeBranches().isEmpty()) {
3339N/A DatabaseEntry key = new DatabaseEntry();
3339N/A DatabaseEntry data = new DatabaseEntry();
3339N/A LockMode lockMode = LockMode.DEFAULT;
3339N/A OperationStatus status;
3363N/A Message message = NOTE_JEB_IMPORT_MIGRATION_START.get(
3339N/A "existing", String.valueOf(context.getBaseDN()));
3339N/A logError(message);
3339N/A Cursor cursor =
3339N/A srcEntryContainer.getDN2ID().openCursor(null,
3339N/A CursorConfig.READ_COMMITTED);
3339N/A try {
3339N/A status = cursor.getFirst(key, data, lockMode);
3339N/A while(status == OperationStatus.SUCCESS &&
3339N/A !ldifImportConfig.isCancelled()) {
3339N/A if(threads.size() <= 0) {
3339N/A message = ERR_JEB_IMPORT_NO_WORKER_THREADS.get();
3339N/A throw new JebException(message);
3339N/A }
3339N/A DN dn = DN.decode(new ASN1OctetString(key.getData()));
3339N/A if(!context.getIncludeBranches().contains(dn)) {
3339N/A EntryID id = new EntryID(data);
3538N/A Entry entry =
3538N/A srcEntryContainer.getID2Entry().get(null,
3538N/A id, LockMode.DEFAULT);
3339N/A processEntry(context, entry);
3339N/A migratedCount++;
3339N/A status = cursor.getNext(key, data, lockMode);
3339N/A } else {
3339N/A // This is the base entry for a branch that will be included
3339N/A // in the import so we don't want to copy the branch to the new
3339N/A // entry container.
3339N/A
3339N/A /**
3339N/A * Advance the cursor to next entry at the same level in the DIT
3339N/A * skipping all the entries in this branch.
3339N/A * Set the next starting value to a value of equal length but
3339N/A * slightly greater than the previous DN. Since keys are compared
3339N/A * in reverse order we must set the first byte (the comma).
3339N/A * No possibility of overflow here.
3339N/A */
3339N/A byte[] begin =
3339N/A StaticUtils.getBytes("," + dn.toNormalizedString());
3339N/A begin[0] = (byte) (begin[0] + 1);
3339N/A key.setData(begin);
3339N/A status = cursor.getSearchKeyRange(key, data, lockMode);
3339N/A }
3339N/A }
3339N/A } finally {
3339N/A cursor.close();
3339N/A }
3339N/A }
3339N/A }
3339N/A }
3339N/A
3339N/A
3339N/A /**
3339N/A * Migrate excluded entries.
3339N/A *
3339N/A * @throws JebException If a JEB error occurs.
3339N/A * @throws DatabaseException If a DB error occurs.
3339N/A * @throws DirectoryException If a directory error occurs.
3339N/A */
3339N/A private void migrateExcludedEntries()
3339N/A throws JebException, DatabaseException, DirectoryException {
3339N/A for(DNContext importContext : importMap.values()) {
3339N/A EntryContainer srcEntryContainer = importContext.getSrcEntryContainer();
3339N/A if(srcEntryContainer != null &&
3339N/A !importContext.getExcludeBranches().isEmpty()) {
3339N/A DatabaseEntry key = new DatabaseEntry();
3339N/A DatabaseEntry data = new DatabaseEntry();
3339N/A LockMode lockMode = LockMode.DEFAULT;
3339N/A OperationStatus status;
3363N/A Message message = NOTE_JEB_IMPORT_MIGRATION_START.get(
3339N/A "excluded", String.valueOf(importContext.getBaseDN()));
3339N/A logError(message);
3339N/A Cursor cursor =
3339N/A srcEntryContainer.getDN2ID().openCursor(null,
3339N/A CursorConfig.READ_COMMITTED);
3339N/A Comparator<byte[]> dn2idComparator =
3339N/A srcEntryContainer.getDN2ID().getComparator();
3339N/A try {
3339N/A for(DN excludedDN : importContext.getExcludeBranches()) {
3339N/A byte[] suffix =
3339N/A StaticUtils.getBytes(excludedDN.toNormalizedString());
3339N/A key.setData(suffix);
3339N/A status = cursor.getSearchKeyRange(key, data, lockMode);
3339N/A if(status == OperationStatus.SUCCESS &&
3339N/A Arrays.equals(key.getData(), suffix)) {
3339N/A // This is the base entry for a branch that was excluded in the
3339N/A // import so we must migrate all entries in this branch over to
3339N/A // the new entry container.
3339N/A byte[] end =
3339N/A StaticUtils.getBytes("," + excludedDN.toNormalizedString());
3339N/A end[0] = (byte) (end[0] + 1);
3339N/A
3339N/A while(status == OperationStatus.SUCCESS &&
3339N/A dn2idComparator.compare(key.getData(), end) < 0 &&
3339N/A !ldifImportConfig.isCancelled()) {
3339N/A if(threads.size() <= 0) {
3339N/A message = ERR_JEB_IMPORT_NO_WORKER_THREADS.get();
3339N/A throw new JebException(message);
3339N/A }
3339N/A EntryID id = new EntryID(data);
3538N/A Entry entry = srcEntryContainer.getID2Entry().get(null,
3538N/A id, LockMode.DEFAULT);
3339N/A processEntry(importContext, entry);
3339N/A migratedCount++;
3339N/A status = cursor.getNext(key, data, lockMode);
3339N/A }
3339N/A }
3339N/A }
3339N/A }
3339N/A finally
3339N/A {
3339N/A cursor.close();
3339N/A }
3339N/A }
3339N/A }
3339N/A }
3339N/A
3339N/A
3339N/A /**
3339N/A * This class reports progress of the import job at fixed intervals.
3339N/A */
3339N/A private final class ProgressTask extends TimerTask
3339N/A {
3339N/A /**
3339N/A * The number of entries that had been read at the time of the
3339N/A * previous progress report.
3339N/A */
3339N/A private long previousCount = 0;
3339N/A
3339N/A /**
3339N/A * The time in milliseconds of the previous progress report.
3339N/A */
3339N/A private long previousTime;
3339N/A
3339N/A /**
3339N/A * The environment statistics at the time of the previous report.
3339N/A */
3339N/A private EnvironmentStats prevEnvStats;
3339N/A
3339N/A /**
3339N/A * The number of bytes in a megabyte.
3339N/A * Note that 1024*1024 bytes may eventually become known as a mebibyte(MiB).
3339N/A */
3339N/A public static final int bytesPerMegabyte = 1024*1024;
3339N/A
3339N/A //Determines if the ldif is being read.
3339N/A private boolean ldifRead = false;
3339N/A
3339N/A //Determines if eviction has been detected.
3339N/A private boolean evicting = false;
3339N/A
3339N/A //Entry count when eviction was detected.
3339N/A private long evictionEntryCount = 0;
3339N/A
3339N/A //Suspend output.
3339N/A private boolean pause = false;
3339N/A
3339N/A /**
3339N/A * Create a new import progress task.
3339N/A * @throws DatabaseException If an error occurs in the JE database.
3339N/A */
3339N/A public ProgressTask() throws DatabaseException
3339N/A {
3339N/A previousTime = System.currentTimeMillis();
3339N/A prevEnvStats = rootContainer.getEnvironmentStats(new StatsConfig());
3339N/A }
3339N/A
3339N/A /**
3339N/A * Return if reading the LDIF file.
3339N/A */
3339N/A public void ldifRead() {
3339N/A ldifRead=true;
3339N/A }
3339N/A
3339N/A /**
3339N/A * Return value of evicting flag.
3339N/A *
3339N/A * @return <CODE>True</CODE> if eviction is detected.
3339N/A */
3339N/A public boolean isEvicting() {
3339N/A return evicting;
3339N/A }
3339N/A
3339N/A /**
3339N/A * Return count of entries when eviction was detected.
3339N/A *
3339N/A * @return The entry count when eviction was detected.
3339N/A */
3339N/A public long getEvictionEntryCount() {
3339N/A return evictionEntryCount;
3339N/A }
3339N/A
3339N/A /**
3339N/A * Suspend output if true.
3339N/A *
3339N/A * @param v The value to set the suspend value to.
3339N/A */
3339N/A public void setPause(boolean v) {
3339N/A pause=v;
3339N/A }
3339N/A
3339N/A /**
3339N/A * The action to be performed by this timer task.
3339N/A */
3339N/A public void run() {
3339N/A long latestCount = reader.getEntriesRead() + 0;
3339N/A long deltaCount = (latestCount - previousCount);
3339N/A long latestTime = System.currentTimeMillis();
3339N/A long deltaTime = latestTime - previousTime;
3339N/A Message message;
3339N/A if (deltaTime == 0) {
3339N/A return;
3339N/A }
3339N/A if(pause) {
3339N/A return;
3339N/A }
3339N/A if(!ldifRead) {
3339N/A long numRead = reader.getEntriesRead();
3339N/A long numIgnored = reader.getEntriesIgnored();
3339N/A long numRejected = reader.getEntriesRejected();
3339N/A float rate = 1000f*deltaCount / deltaTime;
3363N/A message = NOTE_JEB_IMPORT_PROGRESS_REPORT.get(
3339N/A numRead, numIgnored, numRejected, 0, rate);
3339N/A logError(message);
3339N/A }
3339N/A try
3339N/A {
3339N/A Runtime runtime = Runtime.getRuntime();
3339N/A long freeMemory = runtime.freeMemory() / bytesPerMegabyte;
3339N/A EnvironmentStats envStats =
3339N/A rootContainer.getEnvironmentStats(new StatsConfig());
3339N/A long nCacheMiss =
3339N/A envStats.getNCacheMiss() - prevEnvStats.getNCacheMiss();
3339N/A
3339N/A float cacheMissRate = 0;
3339N/A if (deltaCount > 0) {
3339N/A cacheMissRate = nCacheMiss/(float)deltaCount;
3339N/A }
3372N/A message = NOTE_JEB_IMPORT_CACHE_AND_MEMORY_REPORT.get(
3339N/A freeMemory, cacheMissRate);
3339N/A logError(message);
3339N/A long evictPasses = envStats.getNEvictPasses();
3339N/A long evictNodes = envStats.getNNodesExplicitlyEvicted();
3339N/A long evictBinsStrip = envStats.getNBINsStripped();
3339N/A int cleanerRuns = envStats.getNCleanerRuns();
3339N/A int cleanerDeletions = envStats.getNCleanerDeletions();
3339N/A int cleanerEntriesRead = envStats.getNCleanerEntriesRead();
3339N/A int cleanerINCleaned = envStats.getNINsCleaned();
3339N/A int checkPoints = envStats.getNCheckpoints();
3339N/A if(evictPasses != 0) {
3339N/A if(!evicting) {
3339N/A evicting=true;
3339N/A if(!ldifRead) {
3339N/A evictionEntryCount=reader.getEntriesRead();
3339N/A message =
3363N/A NOTE_JEB_IMPORT_LDIF_EVICTION_DETECTED.get(evictionEntryCount);
3339N/A logError(message);
3339N/A }
3339N/A }
3339N/A message =
3363N/A NOTE_JEB_IMPORT_LDIF_EVICTION_DETECTED_STATS.get(evictPasses,
3339N/A evictNodes, evictBinsStrip);
3339N/A logError(message);
3339N/A }
3339N/A if(cleanerRuns != 0) {
3363N/A message = NOTE_JEB_IMPORT_LDIF_CLEANER_STATS.get(cleanerRuns,
3339N/A cleanerDeletions, cleanerEntriesRead, cleanerINCleaned);
3339N/A logError(message);
3339N/A }
3339N/A if(checkPoints > 1) {
3363N/A message = NOTE_JEB_IMPORT_LDIF_BUFFER_CHECKPOINTS.get(checkPoints);
3339N/A logError(message);
3339N/A }
3339N/A prevEnvStats = envStats;
3339N/A } catch (DatabaseException e) {
3339N/A // Unlikely to happen and not critical.
3339N/A }
3339N/A previousCount = latestCount;
3339N/A previousTime = latestTime;
3339N/A }
3339N/A }
3339N/A}
3339N/A