OnDiskMergeStorageImporter.java revision 412ad6b800c1fbd15661110e21d836b888231cce
/*
* CDDL HEADER START
*
* The contents of this file are subject to the terms of the
* Common Development and Distribution License, Version 1.0 only
* (the "License"). You may not use this file except in compliance
* with the License.
*
* You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
* See the License for the specific language governing permissions
* and limitations under the License.
*
* When distributing Covered Code, include this CDDL HEADER in each
* file and include the License file at legal-notices/CDDLv1_0.txt.
* If applicable, add the following below this CDDL HEADER, with the
* fields enclosed by brackets "[]" replaced with your own identifying
* information:
* Portions Copyright [yyyy] [name of copyright owner]
*
* CDDL HEADER END
*
*
* Copyright 2008-2010 Sun Microsystems, Inc.
* Portions Copyright 2011-2015 ForgeRock AS
*/
/**
* This class provides the engine that performs both importing of LDIF files and
* the rebuilding of indexes.
*/
final class OnDiskMergeStorageImporter
{
private static UnsupportedOperationException notImplemented()
{
return new UnsupportedOperationException("Not implemented");
}
/** Data to put into id2entry tree. */
private static final class Id2EntryData
{
{
}
{
}
{
return getClass().getSimpleName()
+ "(suffix=" + suffix
+ ", entryID=" + entryID
}
}
/** Runnable putting data into id2entry tree in batches. */
private final class Id2EntryPutTask implements Runnable
{
private static final int BATCH_SIZE = 100;
private volatile boolean moreData = true;
private final NavigableSet<Id2EntryData> dataToPut = new ConcurrentSkipListSet<>(new Comparator<Id2EntryData>()
{
{
}
});
{
}
{
if (enoughDataToPut())
{
synchronized (dataToPut)
{
}
}
}
public void run()
{
try
{
while (!isCanceled() && moreData)
{
if (enoughDataToPut())
{
}
else
{
synchronized (dataToPut)
{
if (moreData)
{
}
}
}
}
{
}
}
catch (Exception e)
{
logger.traceException(e);
}
}
private boolean enoughDataToPut()
{
}
{
{
{
int count = 0;
{
count++;
}
}
});
}
private void finishedWrites()
{
moreData = false;
synchronized (dataToPut)
{
}
}
{
{
}
{
}
return super.toString();
}
}
/**
* Represents an on-disk buffer file, accessed via memory mapped files.
* <p>
* Data to write is appended in-memory before being dumped into a {@link MappedByteBuffer}
* for writing to disk.
*/
private static final class Buffer
{
private final File bufferFile;
private final FileChannel fileChannel;
/** TODO JNR offer configuration for this. */
/**
* Maps {@link ByteSequence} keys to (conflicting) values.
* <p>
* This will be persisted once {@link #maximumExpectedSizeOnDisk} reaches the
* {@link #bufferSize}.
*/
/** Projected occupied disk for the data stored in {@link #inMemoryStore}. */
private int maximumExpectedSizeOnDisk;
private long totalBytes;
{
{
}
}
{
{
return "r";
}
{
return "rw";
}
}
{
{
}
{
}
}
private void flushToMappedByteBuffer() throws IOException
{
{
// FIXME JNR merge values before put
// Edit: Merging during phase one is slower than not merging at all,
// perhaps due to merging importIDSets for keys that will exceed index entry limits anyway?
{
}
}
{
}
byteBuffer.force();
}
{
// FIXME JNR when merging duplicate keys during phase one,
// maximumExpectedSizeOnDisk is an acceptable over approximation
return fileChannel.map(MapMode.READ_WRITE, getLastPosition(bufferPositions), maximumExpectedSizeOnDisk);
}
{
}
{
}
{
// Need to do all of this because b.copyTo(byteBuffer) calls ByteBuffer.flip().
// Why does it do that?
b.copyTo(byteBuffer);
}
void flush() throws IOException
{
}
private void writeBufferIndexFile()
{
{
}
catch (FileNotFoundException e)
{
logger.traceException(e);
}
}
{
{
{
}
}
}
private void readBufferPositions() throws IOException
{
{
}
{
}
}
{
return getClass().getSimpleName()
}
}
/** A cursor performing the "merge" phase of the on-disk merge. */
private static final class MergingCursor<K, V> implements SequentialCursor<K, V>
{
private final MergingConsumer<V> merger;
private K key;
private V value;
private boolean isDefined;
{
}
public boolean next()
{
{
{
return isDefined = false;
}
return isDefined = true;
}
{
return isDefined = true;
}
else
{
// no more data to compute
return isDefined = false;
}
}
private void accumulateValues()
{
{
}
}
public boolean isDefined()
{
return isDefined;
}
public K getKey() throws NoSuchElementException
{
return key;
}
public V getValue() throws NoSuchElementException
{
return value;
}
private void throwIfNotDefined()
{
if (!isDefined())
{
throw new NoSuchElementException();
}
}
public void close()
{
isDefined = false;
}
}
/** A cursor implementation keeping stats about reading progress. */
private static final class ProgressCursor<K, V> implements Cursor<K, V>
{
{
this.delegate = delegateCursor;
}
public String getBufferFileName()
{
}
private long getTotalBytes()
{
return buffer.totalBytes;
}
private int getBytesRead()
{
int count = 0;
{
}
return count;
}
public boolean next()
{
}
public boolean isDefined()
{
}
public K getKey() throws NoSuchElementException
{
}
public V getValue() throws NoSuchElementException
{
}
public void close()
{
}
{
}
{
}
public boolean positionToLastKey()
{
return delegate.positionToLastKey();
}
public boolean positionToIndex(int index)
{
}
}
/** A cursor implementation aggregating several cursors and ordering them by their key value. */
private static final class CompositeCursor<K extends Comparable<? super K>, V> implements Cursor<K, V>
{
private static final byte UNINITIALIZED = 0;
private static final byte READY = 1;
private static final byte CLOSED = 2;
/**
* The state of this cursor. One of {@link #UNINITIALIZED}, {@link #READY} or {@link #CLOSED}
*/
private byte state = UNINITIALIZED;
/**
* The cursors are sorted based on the key change of each cursor to consider the next change
* across all cursors.
*/
private final NavigableSet<SequentialCursor<K, V>> cursors = new TreeSet<>(new Comparator<SequentialCursor<K, V>>()
{
{
if (cmp == 0)
{
// Never return 0. Otherwise both cursors are considered equal
// and only one of them is kept by this set
}
return cmp;
}
});
{
{
{
}
}
}
public boolean next()
{
{
return false;
}
// If previous state was ready, then we must advance the first cursor
// To keep consistent the cursors' order in the SortedSet, it is necessary
// to remove the first cursor, then add it again after moving it forward.
if (state == UNINITIALIZED)
{
}
{
{
}
}
return isDefined();
}
public boolean isDefined()
{
}
public K getKey() throws NoSuchElementException
{
}
public V getValue() throws NoSuchElementException
{
}
private void throwIfNotDefined()
{
if (!isDefined())
{
throw new NoSuchElementException();
}
}
public void close()
{
}
{
if (isDefined())
{
}
return "not defined";
}
{
throw notImplemented();
}
{
throw notImplemented();
}
public boolean positionToLastKey()
{
throw notImplemented();
}
public boolean positionToIndex(int index)
{
throw notImplemented();
}
}
/** A cursor implementation reading key/value pairs from memory mapped files, a.k.a {@link MappedByteBuffer}. */
{
private final ByteBuffer byteBuffer;
private final int startPos;
private final int endPos;
// TODO JNR build ByteSequence implementation reading from memory mapped files?
private final ByteStringBuilder valueBuffer = new ByteStringBuilder();//FIXME JNR bad: do zero copy?
private int currentPos;
private boolean isDefined;
{
this.byteBuffer = byteBuffer;
this.currentPos = startPos;
}
public boolean next()
{
isDefined = false;
if (currentPos >= endPos)
{
return isDefined = false;
}
return isDefined = true;
}
{
currentPos += INT_SIZE;
currentPos += length;
}
public boolean isDefined()
{
return isDefined;
}
{
return keyBuffer.toByteString();
}
{
return valueBuffer.toByteString();
}
private void throwIfNotDefined()
{
if (!isDefined())
{
throw new NoSuchElementException();
}
}
public void close()
{
// nothing to do
}
public int getBytesRead()
{
return currentPos - startPos;
}
{
if (isDefined())
{
return "<key=" + key + "(" + key.toHexString() + "), value=" + value + "(" + value.toHexString() + ")>";
}
return "not defined";
}
}
/** A storage using memory mapped files, a.k.a {@link MappedByteBuffer}. */
private static final class MemoryMappedStorage implements Storage
{
{
}
{
return new MemoryMappedBufferImporter(bufferDir);
}
{
throw notImplemented();
}
{
{
{
try
{
return buffer.openCursor();
}
catch (IOException e)
{
throw new StorageRuntimeException(e);
}
}
{
throw notImplemented();
}
{
throw notImplemented();
}
});
}
{
throw notImplemented();
}
public void removeStorageFiles() throws StorageRuntimeException
{
throw notImplemented();
}
public StorageStatus getStorageStatus()
{
throw notImplemented();
}
public boolean supportsBackupAndRestore()
{
throw notImplemented();
}
public void close()
{
throw notImplemented();
}
{
throw notImplemented();
}
public void removeBackup(BackupDirectory backupDirectory, String backupID) throws DirectoryException
{
throw notImplemented();
}
{
throw notImplemented();
}
{
{
{
{
}
}
}
return results;
}
}
/** An importer using memory mapped files, a.k.a {@link MappedByteBuffer}. */
private static final class MemoryMappedBufferImporter implements Importer
{
{
}
{
try
{
}
catch (IOException e)
{
logger.traceException(e);
}
}
{
{
// Creates sub directories for each suffix
// FIXME JNR cannot directly use DN names as directory + file names
}
return buffer;
}
public void close()
{
try
{
{
}
}
catch (IOException e)
{
throw new StorageRuntimeException(e);
}
}
{
throw notImplemented();
}
{
throw notImplemented();
}
{
throw notImplemented();
}
}
/**
* Shim that allows properly constructing an {@link OnDiskMergeStorageImporter} without polluting
* {@link ImportStrategy} and {@link RootContainer} with this importer inner workings.
*/
static final class StrategyImpl implements ImportStrategy
{
private final PluggableBackendCfg backendCfg;
{
this.backendCfg = backendCfg;
}
{
try
{
return new OnDiskMergeStorageImporter(rootContainer, importConfig, backendCfg, serverContext).processImport();
}
catch (DirectoryException | InitializationException e)
{
logger.traceException(e);
throw e;
}
catch (ConfigException e)
{
logger.traceException(e);
}
catch (Exception e)
{
logger.traceException(e);
throw new DirectoryException(getServerErrorResultCode(),
}
}
}
private static final int TIMER_INTERVAL = 10000;
/** Defaults for DB cache. */
/**
* Defaults for LDIF reader buffers, min memory required to import and default
* size for byte buffers.
*/
/** Max size of phase one buffer. */
/** Min size of phase one buffer. */
/** Small heap threshold used to give more memory to JVM to attempt OOM errors. */
/** Root container. */
private final RootContainer rootContainer;
/** Import configuration. */
private final LDIFImportConfig importCfg;
private final ServerContext serverContext;
/** LDIF reader. */
private ImportLDIFReader reader;
/** Phase one imported entries count. */
/** Migrated entry count. */
private int migratedCount;
/** Phase one buffer size in bytes. */
private int bufferSize;
/** Index count. */
private final int indexCount;
/** Thread count. */
private int threadCount;
/** Whether DN validation should be performed. If true, then it is performed during phase one. */
private final boolean validateDNs;
/** Temp scratch directory. */
/** Available memory at the start of the import. */
private long availableMemory;
/** Size in bytes of DB cache. */
private long dbCacheSize;
/** Map of DNs to Suffix objects. */
/** Set to true if the backend was cleared. */
private final boolean clearedBackend;
/** Used to shutdown import if an error occurs in phase one. */
private volatile boolean isCanceled;
/** Number of phase one buffers. */
private int phaseOneBufferCount;
{
this.rootContainer = rootContainer;
this.serverContext = serverContext;
{
}
else
{
}
// Determine the number of indexes.
// be careful: requires that a few data has been set
}
private File prepareTempDir(PluggableBackendCfg backendCfg, String tmpDirectory) throws InitializationException
{
{
}
return tempDir;
}
/**
* Returns whether the backend must be cleared.
*
* @param importCfg
* the import configuration object
* @param backendCfg
* the backend configuration object
* @return true if the backend must be cleared, false otherwise
* @see #prepareSuffix(WriteableTransaction, EntryContainer) for per-suffix cleanups.
*/
{
return !importCfg.appendToExistingData()
/*
* Why do we clear when there is only one baseDN?
* any baseDN for which data is imported will be cleared anyway (see getSuffix()),
* so if there is only one baseDN for this backend, then clear it now.
*/
}
{
{
{
}
else
{
}
}
return indexes;
}
/**
* Calculate buffer sizes and initialize properties based on memory.
*
* @throws InitializationException
* If a problem occurs during calculation.
*/
private void computeMemoryRequirements() throws InitializationException
{
// Calculate amount of usable memory. This will need to take into account
// various fudge factors, including the number of IO buffers used by the
// scratch writers (1 per index).
{
}
// We need caching when doing DN validation
{
}
else
{
}
final int oldThreadCount = threadCount;
{
while (true)
{
// Scratch writers allocate 4 buffers per index as well.
// We need (2 * bufferSize) to fit in an int for the insertByteStream
// and deleteByteStream constructors.
if (bufferSize > MAX_BUFFER_SIZE)
{
if (validateDNs)
{
// The buffers are big enough: the memory is best used for the DN2ID temp DB
if (!clearedBackend)
{
}
}
break;
}
else if (bufferSize > MIN_BUFFER_SIZE)
{
// This is acceptable.
break;
}
else if (threadCount > 1)
{
// Retry using less threads.
threadCount--;
}
else
{
// Not enough memory.
}
}
}
if (oldThreadCount != threadCount)
{
}
}
/**
* Calculates the amount of available memory which can be used by this import,
* taking into account whether or not the import is running offline or online
* as a task.
*/
private void calculateAvailableMemory()
{
final long totalAvailableMemory;
if (DirectoryServer.isRunning())
{
}
else
{
}
// Now take into account various fudge factors.
int importMemPct = 90;
if (totalAvailableMemory <= SMALL_HEAP_SIZE)
{
// Be pessimistic when memory is low.
importMemPct -= 25;
}
}
private boolean isCanceled()
{
}
private void initializeSuffixes(WriteableTransaction txn) throws ConfigException, DirectoryException
{
{
{
}
}
}
throws ConfigException, DirectoryException
{
switch (importCommand.getSuffixImportStrategy())
{
case APPEND_OR_REPLACE:
return new Suffix(entryContainer);
case SKIP_SUFFIX:
return null;
case CLEAR_SUFFIX:
break;
case MERGE_DB_WITH_LDIF:
// Create a temp entry container
{
}
break;
case INCLUDE_EXCLUDE_BRANCHES:
break;
default:
throw new DirectoryException(getServerErrorResultCode(),
}
}
{
}
{
try {
try
{
}
catch (IOException ioe)
{
}
{
{
setIndexesTrusted(txn, false);
}
});
if (isCanceled())
{
throw new InterruptedException("Import processing canceled.");
}
if (isCanceled())
{
throw new InterruptedException("Import processing canceled.");
}
{
{
setIndexesTrusted(txn, true);
}
});
float rate = 0;
if (importTime > 0)
{
}
}
finally
{
}
}
private void switchEntryContainers(WriteableTransaction txn) throws StorageRuntimeException, InitializationException
{
{
{
replacement.lock();
}
}
}
private void setIndexesTrusted(WriteableTransaction txn, boolean trusted) throws StorageRuntimeException
{
try
{
{
if (trusted)
{
s.setIndexesTrusted(txn);
}
else
{
}
}
}
catch (StorageRuntimeException ex)
{
}
}
/**
* Reads all entries from id2entry, and:
* <ol>
* <li>compute how the entry is indexed for each index</li>
* <li>store the result of indexing entries into in-memory index buffers</li>
* <li>each time an in-memory index buffer is filled, sort it and write it to scratch files.
* The scratch files will be read by phaseTwo to perform on-disk merge</li>
* </ol>
* TODO JNR fix all javadocs
*/
{
{
execService.submit(new MigrateExistingEntriesTask(backendStorage, tmpImporter, id2EntryPutTask)).get();
{
{
}
}
}
finally
{
progressTask.run();
}
}
{
}
{
{
}
{
}
}
{
{
{
}
}
finally
{
progressTask.run();
}
}
{
{
{
SequentialCursor<ByteString, ByteString> cursor = new MergingCursor<>(cursor0, getMerger(treeName)))
{
{
}
}
return null;
}
{
{
// key conflicts == merge EntryIDSets
return new ImportIDSetsMerger(index);
}
{
// key conflicts == sum values
}
{
// key conflicts == exception
return new NoMultipleValuesConsumer<>();
}
}
{
{
{
return true;
}
}
return false;
}
{
{
{
{
return index;
}
}
}
return null;
}
});
}
/**
* Copies JDK 8 Consumer.
* <p>
* FIXME Remove once we move to Java 8.
*
* @see java.util.function.Consumer Consumer from JDK 8
*/
private static interface Consumer<T>
{
/**
* Performs this operation on the given argument.
*
* @param t
* the input argument
*/
void accept(T t);
}
/**
* A merging consumer that merges the supplied values.
* <p>
* Sample usage:
*
* <pre>
* while (it.hasNext())
* {
* mergingConsumer.accept(it.next());
* }
* Object result = mergingConsumer.merge();
*
* <pre>
*
* @param <T>
* the type of the arguments and the returned merged value
* @see java.util.function.Consumer Consumer from JDK 8
*/
private static interface MergingConsumer<T> extends Consumer<T>
{
/**
* Merges the arguments provided via {@link Consumer#accept(Object)}.
*
* @return the merged value
*/
T merge();
}
/** {@link MergingConsumer} that throws an exception when given several values to accept. */
private static final class NoMultipleValuesConsumer<V> implements MergingConsumer<V>
{
private V first;
private boolean moreThanOne;
{
{
}
else
{
moreThanOne = true;
}
}
public V merge()
{
// copy before cleaning state
final boolean mustThrow = moreThanOne;
final V mergedValue = first;
// clean up state
moreThanOne = false;
if (mustThrow)
{
throw new IllegalArgumentException();
}
return mergedValue;
}
}
/**
* {@link MergingConsumer} that accepts {@link ByteSequence} objects
* and produces a {@link ByteSequence} representing the merged {@link ImportIDSet}.
*/
{
private final MatchingRuleIndex index;
private boolean aboveIndexEntryLimit;
{
}
{
if (!aboveIndexEntryLimit)
{
{
}
else
{
aboveIndexEntryLimit = true;
}
}
}
public ByteString merge()
{
try
{
if (aboveIndexEntryLimit)
{
}
{
// Avoids unnecessary decoding + encoding
}
}
finally
{
// reset state
aboveIndexEntryLimit = false;
}
}
{
// accumulate in array
int i = 0;
{
{
// above index entry limit
return EntryIDSet.newUndefinedSet();
}
{
}
}
// trim the array to the actual size
// due to how the entryIDSets are built, there should not be any duplicate entryIDs
}
}
/**
* {@link MergingConsumer} that accepts {@link ByteSequence} objects
* and produces a {@link ByteSequence} representing the added {@code long}s.
*/
{
private final ID2Count id2ChildrenCount;
private long count;
{
this.id2ChildrenCount = id2ChildrenCount;
}
{
}
public ByteString merge()
{
count = 0;
return result;
}
}
/** Task used to migrate excluded branch. */
private final class MigrateExcludedTask extends ImportTask
{
{
}
{
{
{
return null;
}
});
return null;
}
{
{
{
try
{
{
{
/*
* This is the base entry for a branch that was excluded in the
* import so we must migrate all entries in this branch over to
* the new entry container.
*/
while (success
&& !isCanceled())
{
}
}
}
}
catch (Exception e)
{
isCanceled = true;
throw e;
}
finally
{
}
}
}
}
}
/** Task to migrate existing entries. */
private final class MigrateExistingEntriesTask extends ImportTask
{
private MigrateExistingEntriesTask(final Storage storage, Importer importer, Id2EntryPutTask id2EntryPutTask)
{
}
{
{
{
return null;
}
});
return null;
}
{
{
{
try
{
while (success
&& !isCanceled())
{
{
}
else
{
/*
* This is the base entry for a branch that will be included
* in the import so we do not want to copy the branch to the
* new entry container.
*/
/*
* Advance the cursor to next entry at the same level in the DIT
* skipping all the entries in this branch.
*/
}
}
}
catch (Exception e)
{
isCanceled = true;
throw e;
}
finally
{
}
}
}
}
{
{
{
}
}
return includeBranches;
}
}
/**
* This task performs phase reading and processing of the entries read from
* the LDIF file(s). This task is used if the append flag wasn't specified.
*/
{
private final Id2EntryPutTask id2EntryPutTask;
{
this.id2EntryPutTask = id2EntryPutTask;
}
{
call0();
return null;
}
{
try
{
{
if (isCanceled())
{
return;
}
}
}
catch (Exception e)
{
isCanceled = true;
throw e;
}
}
{
try
{
{
return;
}
}
finally
{
}
if (!validateDNs)
{
}
}
/**
* Examine the DN for duplicates and missing parents.
*
* @return true if the import operation can proceed with the provided entry, false otherwise
*/
@SuppressWarnings("javadoc")
{
//Perform parent checking.
{
return false;
}
{
return false;
}
return true;
}
{
}
{
{
}
}
{
{
{
{
{
{
}
}
}
}
}
}
{
{
}
}
}
/** This class reports progress of first phase of import processing at fixed intervals. */
private final class FirstPhaseProgressTask extends TimerTask
{
/** The number of entries that had been read at the time of the previous progress report. */
private long previousCount;
/** The time in milliseconds of the previous progress report. */
private long previousTime;
/** Create a new import progress task. */
public FirstPhaseProgressTask()
{
}
/** The action to be performed by this timer task. */
public void run()
{
if (deltaTime == 0)
{
return;
}
}
}
/** This class reports progress of the second phase of import processing at fixed intervals. */
private class SecondPhaseProgressTask extends TimerTask
{
/** The time in milliseconds of the previous progress report. */
private long previousTime;
/** Create a new import progress task. */
private SecondPhaseProgressTask()
{
}
{
if (cursor instanceof ProgressCursor)
{
}
}
/** The action to be performed by this timer task. */
public void run()
{
if (deltaTime == 0)
{
return;
}
for (Iterator<Map.Entry<ProgressCursor<?, ?>, Integer>> it = cursors.entrySet().iterator(); it.hasNext();)
{
{
}
}
}
{
if (lastBytesRead == tmpBytesRead)
{
return;
}
// Kilo and milli approximately cancel out.
logger.info(NOTE_IMPORT_LDIF_PHASE_TWO_REPORT, cursor.getBufferFileName(), bytesReadPercent, kiloBytesRemaining,
}
}
/** Used to check DN's when DN validation is performed during phase one processing. */
private final class Dn2IdDnCache implements DNCache
{
{
}
{
try
{
{
{
}
});
}
catch (Exception e)
{
throw new StorageRuntimeException(e);
}
}
{
try
{
{
{
}
});
}
catch (Exception e)
{
throw new StorageRuntimeException(e);
}
}
public void close()
{
// Nothing to do
}
}
}