OnDiskMergeImporter.java revision d127499b88dfe2d53370025b5d1de485ac6786d5
/*
* CDDL HEADER START
*
* The contents of this file are subject to the terms of the
* Common Development and Distribution License, Version 1.0 only
* (the "License"). You may not use this file except in compliance
* with the License.
*
* You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
* See the License for the specific language governing permissions
* and limitations under the License.
*
* When distributing Covered Code, include this CDDL HEADER in each
* file and include the License file at legal-notices/CDDLv1_0.txt.
* If applicable, add the following below this CDDL HEADER, with the
* fields enclosed by brackets "[]" replaced with your own identifying
* information:
* Portions Copyright [yyyy] [name of copyright owner]
*
* CDDL HEADER END
*
* Copyright 2015 ForgeRock AS.
*/
import org.opends.server.backends.pluggable.OnDiskMergeImporter.ExternalSortChunk.InMemorySortedChunk;
// @Checkstyle:ignore
/**
* Imports LDIF data contained in files into the database. Because of the B-Tree structure used in backend, import is
* faster when records are inserted in ascending order. This prevents node locking/re-writing due to B-Tree inner nodes
* split. This is why import is performed in two phases: the first phase encode and sort all records while the second
* phase copy the sorted records into the database. Entries are read from an LDIF file by the {@link ImportLDIFReader}.
* Then, each entry are optionally validated and finally imported into a {@link Chunk} by the {@link EntryContainer}
* using a {@link PhaseOneWriteableTransaction}. Once all entries have been processed,
* {@link PhaseOneWriteableTransaction#getChunks()} get all the chunks which will be copied into the database
* concurrently using tasks created by the {@link ImporterTaskFactory}.
*/
final class OnDiskMergeImporter
{
/**
* Shim that allows properly constructing an {@link OnDiskMergeImporter} without polluting {@link ImportStrategy} and
* {@link RootContainer} with this importer inner workings.
*/
static class StrategyImpl implements ImportStrategy
{
/** Small heap threshold used to give more memory to JVM to attempt OOM errors. */
private final ServerContext serverContext;
private final RootContainer rootContainer;
private final PluggableBackendCfg backendCfg;
StrategyImpl(ServerContext serverContext, RootContainer rootContainer, PluggableBackendCfg backendCfg)
{
this.serverContext = serverContext;
this.rootContainer = rootContainer;
this.backendCfg = backendCfg;
}
{
final long availableMemory = calculateAvailableMemory();
final int threadCount =
final int indexCount = getIndexCount();
final OnDiskMergeImporter importer;
final LDIFReaderSource source =
{
}
finally
{
}
logger.info(NOTE_IMPORT_PHASE_STATS, importer.getTotalTimeInMillis() / 1000, importer.getPhaseOneTimeInMillis()
float rate = 0;
if (importTime > 0)
{
}
.getEntriesIgnored());
}
private int getIndexCount() throws ConfigException
{
{
{
}
else
{
}
}
return indexCount;
}
{
final long availableMemory = calculateAvailableMemory();
// Rebuild indexes
final OnDiskMergeImporter importer;
{
final long totalEntries = entryContainer.getID2Entry().getRecordCount(asWriteableTransaction(dbStorage));
final Set<String> indexesToRebuild = selectIndexesToRebuild(entryContainer, rebuildConfig, totalEntries);
{
return;
}
if (indexesToRebuild.isEmpty())
{
// Early exit in case there is no index to rebuild.
return;
}
{
new RebuildIndexStrategy(rootContainer.getEntryContainers(), dbStorage, tempDir, bufferPool, sorter,
new ID2EntrySource(entryContainer, dbStorage, PHASE1_REBUILDER_THREAD_NAME, threadCount, totalEntries));
}
}
finally
{
}
}
private static final Set<String> selectIndexesToRebuild(EntryContainer entryContainer, RebuildConfig rebuildConfig,
long totalEntries)
{
switch (rebuildConfig.getRebuildMode())
{
case ALL:
break;
case DEGRADED:
break;
case USER_DEFINED:
visitIndexes(entryContainer, visitOnlyAttributesOrIndexes(rebuildConfig.getRebuildList(), selector));
if (!rebuildConfig.isClearDegradedState())
{
logger.info(NOTE_REBUILD_START, Utils.joinAsString(", ", selector.getSelectedIndexNames()), totalEntries);
}
break;
default:
throw new UnsupportedOperationException("Unsupported rebuild mode " + rebuildConfig.getRebuildMode());
}
{
// Always rebuild id2childrencount with dn2id.
}
return selector.getSelectedIndexNames();
}
throws InitializationException
{
new File(getFileForPath(tmpDirectory != null ? tmpDirectory : DEFAULT_TMP_DIR), backendCfg.getBackendId());
{
}
return tempDir;
}
private static int computeBufferSize(int nbBuffer, long availableMemory) throws InitializationException
{
if (BufferPool.supportOffHeap())
{
return MAX_BUFFER_SIZE;
}
if (bufferSize < MIN_BUFFER_SIZE)
{
// Not enough memory.
throw new InitializationException(ERR_IMPORT_LDIF_LACK_MEM.get(availableMemory, nbBuffer * MIN_BUFFER_SIZE
+ REQUIRED_FREE_MEMORY));
}
return bufferSize;
}
/**
* Calculates the amount of available memory which can be used by this import, taking into account whether or not
* the import is running offline or online as a task.
*/
private long calculateAvailableMemory()
{
// call twice gc to ensure finalizers are called
// and young to old gen references are properly gc'd
final long totalAvailableMemory;
if (DirectoryServer.isRunning())
{
}
else
{
}
// Now take into account various fudge factors.
int importMemPct = 90;
if (totalAvailableMemory <= SMALL_HEAP_SIZE)
{
// Be pessimistic when memory is low.
importMemPct -= 25;
}
final long usedMemory = runtime.totalMemory() - runtime.freeMemory() + DB_CACHE_SIZE + REQUIRED_FREE_MEMORY;
}
}
/** Source of LDAP {@link Entry}s to process. */
private interface Source
{
/** Process {@link Entry}s extracted from a {@link Source}. */
interface EntryProcessor
{
}
boolean isCancelled();
}
/** Extract LDAP {@link Entry}s from an LDIF file. */
private static final class LDIFReaderSource implements Source
{
private final LDIFImportConfig importConfig;
private final ImportLDIFReader reader;
private final ExecutorService executor;
private final int nbThreads;
LDIFReaderSource(RootContainer rootContainer, LDIFImportConfig importConfig, String threadNameTemplate,
int nbThreads) throws IOException
{
this.importConfig = importConfig;
this.entryContainers = new HashMap<>();
{
}
this.executor = Executors.newFixedThreadPool(nbThreads, newThreadFactory(null, threadNameTemplate, true));
}
{
final ScheduledExecutorService scheduler =
Executors.newSingleThreadScheduledExecutor(newThreadFactory(null, PHASE1_REPORTER_THREAD_NAME, true));
try
{
for (int i = 0; i < nbThreads; i++)
{
{
{
{
{
}
try
{
}
catch (DirectoryException e)
{
}
catch (Exception e)
{
}
finally
{
}
}
return null;
}
});
}
}
finally
{
}
}
long getEntriesRead()
{
return reader.getEntriesRead();
}
long getEntriesIgnored()
{
return reader.getEntriesIgnored();
}
long getEntriesRejected()
{
return reader.getEntriesRejected();
}
public boolean isCancelled()
{
return importConfig.isCancelled();
}
/** This class reports progress of first phase of import processing at fixed intervals. */
private final class PhaseOneProgressReporter extends TimerTask
{
/** The number of entries that had been read at the time of the previous progress report. */
private long previousCount;
/** The time in milliseconds of the previous progress report. */
private long previousTime;
/** Create a new import progress task. */
public PhaseOneProgressReporter()
{
}
/** The action to be performed by this timer task. */
public void run()
{
if (deltaTime == 0)
{
return;
}
}
}
}
/** Extract LDAP {@link Entry}s from an existing database. */
private static final class ID2EntrySource implements Source
{
private final EntryContainer entryContainer;
private final CompressedSchema schema;
private final ExecutorService executor;
private final long nbTotalEntries;
private volatile boolean interrupted;
ID2EntrySource(EntryContainer entryContainer, Importer importer, String threadNameTemplate, int nbThread,
long nbTotalEntries)
{
this.nbTotalEntries = nbTotalEntries;
this.entryContainer = entryContainer;
// by default (unfortunately) the ThreadPoolExecutor will throw an exception when queue is full.
this.executor =
new RejectedExecutionHandler()
{
{
// this will block if the queue is full
try
{
}
catch (InterruptedException e)
{
}
}
});
}
{
final ScheduledExecutorService scheduler =
Executors.newSingleThreadScheduledExecutor(newThreadFactory(null, PHASE1_REPORTER_THREAD_NAME, true));
{
{
{
public void run()
{
try
{
}
catch (Exception e)
{
interrupted = true;
}
}
});
}
}
finally
{
}
// Forward exception if any
{
}
}
public boolean isCancelled()
{
return interrupted;
}
/** This class reports progress of first phase of import processing at fixed intervals. */
private final class PhaseOneProgressReporter extends TimerTask
{
/** The number of entries that had been read at the time of the previous progress report. */
private long previousCount;
/** The time in milliseconds of the previous progress report. */
private long previousTime;
/** Create a new import progress task. */
public PhaseOneProgressReporter()
{
}
/** The action to be performed by this timer task. */
public void run()
{
final float progressPercent = nbTotalEntries > 0 ? Math.round((100f * entriesRead) / nbTotalEntries) : 0;
if (deltaTime == 0)
{
return;
}
}
}
}
/** Max size of phase one buffer. */
/** Min size of phase one buffer. */
/** DB cache size to use during import. */
/** Required free memory for this importer. */
/** LDIF reader. */
/** Map of DNs to Suffix objects. */
private final AbstractTwoPhaseImportStrategy importStrategy;
private final String phase2ThreadNameTemplate;
private long phaseOneTimeMs;
private long phaseTwoTimeMs;
private OnDiskMergeImporter(String phase2ThreadNameTemplate, AbstractTwoPhaseImportStrategy importStrategy)
{
this.importStrategy = importStrategy;
}
{
// Start phase one
{
public void processEntry(EntryContainer container, EntryID entryID, Entry entry) throws DirectoryException,
{
{
{
try
{
}
finally
{
}
}
}
}
});
if (source.isCancelled())
{
throw new InterruptedException("Import processing canceled.");
}
// Start phase two
{
{
tasks.add(importStrategy.newPhaseTwoTask(treeChunk.getKey(), treeChunk.getValue(), progressReporter));
}
}
// Finish import
{
}
}
public long getImportedCount()
{
return importedCount.get();
}
public long getPhaseOneTimeInMillis()
{
return phaseOneTimeMs;
}
public long getPhaseTwoTimeInMillis()
{
return phaseTwoTimeMs;
}
public long getTotalTimeInMillis()
{
return phaseOneTimeMs + phaseTwoTimeMs;
}
/** Create {@link Chunk} depending on the {@link TreeName}. */
private interface ChunkFactory
{
}
/** Provides default behavior for two phases strategies. */
private static abstract class AbstractTwoPhaseImportStrategy implements ChunkFactory
{
protected final BufferPool bufferPool;
AbstractTwoPhaseImportStrategy(Collection<EntryContainer> entryContainers, Importer importer, File tempDir,
{
{
}
this.bufferPool = bufferPool;
}
abstract void validate(EntryContainer entryContainer, EntryID entryID, Entry entry) throws DirectoryException;
{
}
abstract Callable<Void> newPhaseTwoTask(TreeName treeName, Chunk chunk, PhaseTwoProgressReporter progressReporter);
{
}
{
{
}
}
{
}
{
}
{
return new DN2IDImporterTask(progressReporter, importer, tempDir, bufferPool, entryContainer.getDN2ID(), chunk,
}
{
{
{
{
// force flush
}
return null;
}
};
}
}
/**
* No validation is performed, every {@link TreeName} (but id2entry) are imported into dedicated
* {@link ExternalSortChunk} before being imported into the {@link Importer}. id2entry which is directly copied into
* the database through {@link ImporterToChunkAdapter}.
*/
private static final class SortAndImportWithoutDNValidation extends AbstractTwoPhaseImportStrategy
{
SortAndImportWithoutDNValidation(Collection<EntryContainer> entryContainers, Importer importer, File tempDir,
{
}
{
// No validation performed. All entries are considered valid.
}
{
if (isID2Entry(treeName))
{
}
return newExternalSortChunk(treeName);
}
{
if (isID2Entry(treeName))
{
return newFlushTask(chunk);
}
{
}
}
}
/**
* This strategy performs two validations by ensuring that there is no duplicate entry (entry with same DN) and that
* the given entry has an existing parent. To do so, the dn2id is directly imported into the database in addition of
* id2entry. Others tree are externally sorted before being imported into the database.
*/
private static final class SortAndImportWithDNValidation extends AbstractTwoPhaseImportStrategy implements
{
private static final int DN_CACHE_SIZE = 16;
SortAndImportWithDNValidation(Collection<EntryContainer> entryContainers, Importer importer, File tempDir,
{
}
{
if (isID2Entry(treeName))
{
}
{
}
return newExternalSortChunk(treeName);
}
{
if (isID2Entry(treeName))
{
return newFlushTask(chunk);
}
{
}
}
public void validate(EntryContainer entryContainer, EntryID entryID, Entry entry) throws DirectoryException
{
{
}
{
throw new DirectoryException(ResultCode.ENTRY_ALREADY_EXISTS, ERR_ADD_ENTRY_ALREADY_EXISTS.get(entry));
}
}
{
}
{
throw new UnsupportedOperationException();
}
{
throw new UnsupportedOperationException();
}
}
/** Import only a specific indexes list while ignoring everything else. */
private static final class RebuildIndexStrategy extends AbstractTwoPhaseImportStrategy
{
{
{
}
}
{
}
{
}
{
{
return newExternalSortChunk(treeName);
}
// Ignore
return nullChunk();
}
public Callable<Void> newPhaseTwoTask(TreeName treeName, Chunk chunk, PhaseTwoProgressReporter progressReporter)
{
{
{
}
}
// Do nothing (flush null chunk)
return newFlushTask(chunk);
}
public void validate(EntryContainer entryContainer, EntryID entryID, Entry entry) throws DirectoryException
{
// No validation performed. All entries are considered valid.
}
}
{
final ExecutorService executor = Executors.newCachedThreadPool(newThreadFactory(null, threadNameTemplate, true));
try
{
{
}
}
finally
{
}
}
/**
* A {@link WriteableTransaction} delegates the storage of data to {@link Chunk}s which are created on-demand for each
* {@link TreeName} through the provided {@link ChunkFactory}. Once there is no more data to import, call
* {@link #getChunks()} to get the resulting {@link Chunk}s containing the sorted data to import into database.
* {@link #put(TreeName, ByteSequence, ByteSequence)} is thread-safe. Since there is only one {@link Chunk} created
* per {@link TreeName}, the {@link Chunk#put(ByteSequence, ByteSequence)} method of returned {@link Chunk} must be
* thread-safe.
*/
private static final class PhaseOneWriteableTransaction implements WriteableTransaction
{
private final ChunkFactory chunkFactory;
{
this.chunkFactory = chunkFactory;
}
{
return chunks;
}
/**
* Store record into a {@link Chunk}. Creating one if none is existing for the given treeName. This method is
* thread-safe.
*/
{
try
{
}
catch (Exception e)
{
throw new StorageRuntimeException(e);
}
}
{
if (alreadyExistingChunk != null)
{
return alreadyExistingChunk;
}
if (alreadyExistingChunk != null)
{
// Another thread was faster at creating a new chunk, close this one.
return alreadyExistingChunk;
}
return newChunk;
}
{
throw new UnsupportedOperationException();
}
{
throw new UnsupportedOperationException();
}
{
throw new UnsupportedOperationException();
}
{
throw new UnsupportedOperationException();
}
{
throw new UnsupportedOperationException();
}
{
throw new UnsupportedOperationException();
}
{
throw new UnsupportedOperationException();
}
}
/**
* Chunk implementations are a data storage with an optional limited capacity. Chunk are typically used by first
* adding data to the storage using {@link #put(ByteSequence, ByteSequence)} later on data can be sequentially
* accessed using {@link #flip()}.
*/
interface Chunk
{
/**
* Add data to the storage. Wherever this method is thread-safe or not is implementation dependent.
*
* @return true if the data were added to the storage, false if the chunk is full.
*/
/**
* Flip this chunk from write-only to read-only in order to get the previously stored data. This method must be
* called only once. After flip is called, Chunk instance must not be used anymore.
*
* @return a {@link MeteredCursor} to access the data
*/
/**
* Return size of data contained in this chunk. This size is guaranteed to be consistent only if there is no pending
* {@link #put(ByteSequence, ByteSequence)} operations.
*/
long size();
/**
* While chunk's memory and files are automatically garbage collected/deleted at exit, this method can be called to
* clean things now.
*/
void delete();
}
/**
* Store and sort data into multiple chunks. Thanks to the chunk rolling mechanism, this chunk can sort and store an
* unlimited amount of data. This class uses double-buffering: data are firstly stored in a
* {@link InMemorySortedChunk} which, once full, will be asynchronously sorted and copied into a
* {@link FileRegionChunk}. Duplicate keys are reduced by a {@link Collector}.
* {@link #put(ByteSequence, ByteSequence))} is thread-safe.
* This class is used in phase-one. There is one {@link ExternalSortChunk} per
* database tree, shared across all phase-one importer threads, in charge of storing/sorting records.
*/
static final class ExternalSortChunk implements Chunk
{
/** Name reported by the {@link MeteredCursor} after {@link #flip()}. */
/** Provides buffer used to store and sort chunk of data. */
private final BufferPool bufferPool;
/** File containing the regions used to store the data. */
private final FileChannel channel;
/** Pointer to the next available region in the file, typically at end of file. */
/** Collector used to reduces the number of duplicate keys during sort. */
/** Keep track of pending sorting tasks. */
/** Keep track of currently opened chunks. */
/** Keep track of the number of chunks created. */
/** Size approximation of data contained in this chunk. */
/** Active chunk for the current thread. */
{
protected Chunk initialValue()
{
return nullChunk();
}
};
ExternalSortChunk(File tempDir, String name, BufferPool bufferPool, Collector<?, ByteString> collector,
{
while (candidateChannel == null)
{
try
{
}
catch (FileAlreadyExistsException ignore)
{
// someone else got it
}
}
this.bufferPool = bufferPool;
this.deduplicator = collector;
this.file = candidateFile;
this.channel = candidateChannel;
}
{
{
}
return true;
}
{
{
}
try
{
return new CollectorCursor<>(
}
catch (ExecutionException | InterruptedException e)
{
throw new StorageRuntimeException(e);
}
}
public long size()
{
long activeSize = 0;
{
}
}
public void delete()
{
}
int getNbSortedChunks()
{
return nbSortedChunks.get();
}
{
{
{
/*
* NOTE: The resulting size of the FileRegionChunk might be less than chunk.size() because of key
* de-duplication performed by the CollectorCursor. Thanks to SPARSE_FILE option, the delta between size
* allocated and the size actually used is not wasted.
*/
{
}
return persistentChunk.flip();
}
});
}
/**
* Store data inside fixed-size byte arrays. Data stored in this chunk are sorted by key during the flip() so that
* they can be cursored ascendantly. Byte arrays are supplied through a {@link BufferPool}. To allow sort operation,
* data must be accessible randomly. To do so, offsets of each key/value records are stored in the buffer. To
* maximize space occupation, buffer content is split in two parts: one contains records offset, the other contains
* the records themselves:
*
* <pre>
* ----------> offset writer direction ----------------> |<- free ->| <---- record writer direction ---
* +-----------------+-----------------+-----------------+----------+----------+----------+----------+
* | offset record 1 | offset record 2 | offset record n | .........| record n | record 2 | record 1 |
* +-----------------+-----------------+-----------------+----------+----------+----------+----------+
* </pre>
*
* Each record is the concatenation of a key/value (length are encoded using {@link PackedLong} representation)
*
* <pre>
* +------------+--------------+--------------+----------------+
* | key length | key bytes... | value length | value bytes... |
* +------------+--------------+--------------+----------------+
* </pre>
*/
{
private final String metricName;
private final BufferPool bufferPool;
private long totalBytes;
private int indexPos;
private int dataPos;
private int nbRecords;
{
this.metricName = name;
this.bufferPool = bufferPool;
}
{
dataPos -= recordSize;
final int recordDataPos = dataPos;
final int recordIndexPos = indexPos;
{
// Chunk is full
return false;
}
nbRecords++;
totalBytes += recordSize;
// Write record offset
return true;
}
{
}
public long size()
{
return totalBytes;
}
{
{
{
}
{
}
{
return valueA;
}
public int size()
{
return nbRecords;
}
}, this);
return new InMemorySortedChunkCursor();
}
{
{
return 0;
}
// Compare Keys
}
public void delete()
{
}
/** Cursor of the in-memory chunk. */
{
private ByteString key;
private ByteString value;
private volatile long bytesRead;
private int indexOffset;
public boolean next()
{
if (bytesRead >= totalBytes)
{
return false;
}
indexOffset += INT_SIZE;
return true;
}
public boolean isDefined()
{
}
{
throwIfUndefined(this);
return key;
}
{
throwIfUndefined(this);
return value;
}
public void close()
{
}
public String getMetricName()
{
return metricName;
}
public long getNbBytesRead()
{
return bytesRead;
}
public long getNbBytesTotal()
{
return totalBytes;
}
}
}
/**
* Store data inside a region contained in a file. A regions is delimited by an offset and a length. The region is
* memory-mapped and the data are appended in the memory-mapped region until it is full. Region store a
* concatenation of key/value records: (Key & value sizes are stored using {@link PackedLong} format.)
*
* <pre>
* +------------+--------------+--------------+----------------+
* | key length | value length | key bytes... | value bytes... |
* +------------+--------------+--------------+----------------+
* </pre>
*/
static final class FileRegionChunk implements Chunk
{
private final String metricName;
private final FileChannel channel;
private final long startOffset;
private long size;
private MappedByteBuffer mmapBuffer;
{
{
}
};
{
this.metricName = name;
this.startOffset = startOffset;
if (size > 0)
{
/*
* Make sure that the file is big-enough to encapsulate this memory-mapped region. Thanks to SPARSE_FILE this
* operation should be fast even for big region.
*/
}
}
{
final int recordSize =
PackedLong.getEncodedSize(key.length()) + key.length() + PackedLong.getEncodedSize(value.length()) + value
.length();
{
// The regions is full
return false;
}
try
{
}
catch (IOException e)
{
throw new StorageRuntimeException(e);
}
return true;
}
public long size()
{
}
{
/*
* We force OS to write dirty pages now so that they don't accumulate. Indeed, huge number of dirty pages might
* cause the OS to freeze the producer of those dirty pages (this importer) while it is swapping-out the pages.
*/
mmapBuffer.force();
mmapBuffer = null;
try
{
}
catch (IOException e)
{
throw new StorageRuntimeException(e);
}
}
public void delete()
{
// Nothing to do
}
/** Cursor through the specific memory-mapped file's region. */
{
private final ByteBuffer region;
{
public int read() throws IOException
{
}
};
{
}
public boolean next()
{
if (!region.hasRemaining())
{
return false;
}
final int keyLength;
final int valueLength;
try
{
}
catch (IOException e)
{
throw new StorageRuntimeException(e);
}
final byte[] keyValueData = new byte[recordSize];
return true;
}
public boolean isDefined()
{
}
{
throwIfUndefined(this);
return key;
}
{
throwIfUndefined(this);
return value;
}
public void close()
{
}
public String getMetricName()
{
return metricName;
}
public long getNbBytesRead()
{
}
public long getNbBytesTotal()
{
}
}
}
/** A cursor de-duplicating data with the same keys from a sorted cursor. */
static final class CollectorCursor<A, K, V> implements MeteredCursor<K, V>
{
private final MeteredCursor<K, ? extends V> delegate;
private boolean isDefined;
private K key;
private V value;
{
{
}
}
public boolean next()
{
if (isDefined)
{
}
return isDefined;
}
private void accumulateValues()
{
throwIfUndefined(this);
do
{
}
// Delegate is one step beyond. When delegate.isDefined() return false, we have to return true once more.
isDefined = true;
}
public boolean isDefined()
{
return isDefined;
}
public K getKey() throws NoSuchElementException
{
throwIfUndefined(this);
return key;
}
public V getValue() throws NoSuchElementException
{
throwIfUndefined(this);
return value;
}
public void close()
{
}
public String getMetricName()
{
return delegate.getMetricName();
}
public long getNbBytesRead()
{
return delegate.getNbBytesRead();
}
public long getNbBytesTotal()
{
return delegate.getNbBytesTotal();
}
}
/** Provides a globally sorted cursor from multiple sorted cursors. */
static final class CompositeCursor<K extends Comparable<? super K>, V> implements MeteredCursor<K, V>
{
/** Contains the non empty and sorted cursors ordered in regards of their current key. */
private final String metricName;
private final long totalBytes;
private volatile long bytesRead;
private K key;
private V value;
{
this.metricName = metricName;
{
{
// Never return 0. Otherwise both cursors are considered equal and only one of them is kept by this set
}
});
long totalBytesSum = 0;
{
{
{
}
}
else
{
}
}
this.totalBytes = totalBytesSum;
}
/**
* Try to get the next record from the cursor containing the lowest entry. If it reaches the end of the lowest
* cursor, it calls the close method and begins reading from the next lowest cursor.
*/
public boolean next()
{
if (lowestCursor == null)
{
return false;
}
if (lowestCursor.next())
{
}
else
{
}
return true;
}
public boolean isDefined()
{
}
public K getKey() throws NoSuchElementException
{
throwIfUndefined(this);
return key;
}
public V getValue() throws NoSuchElementException
{
throwIfUndefined(this);
return value;
}
public void close()
{
}
public String getMetricName()
{
return metricName;
}
public long getNbBytesRead()
{
return bytesRead;
}
public long getNbBytesTotal()
{
return totalBytes;
}
}
}
{
}
/** Task to copy one {@link Chunk} into a database tree through an {@link Importer}. */
{
private final PhaseTwoProgressReporter reporter;
private final Importer destination;
ChunkCopierTask(PhaseTwoProgressReporter reporter, Chunk source, TreeName treeName, Importer destination)
{
this.destination = destination;
}
{
try (final SequentialCursor<ByteString, ByteString> sourceCursor = trackCursorProgress(reporter, source.flip()))
{
}
return null;
}
}
private static void copyIntoChunk(SequentialCursor<ByteString, ByteString> source, Chunk destination)
{
{
}
}
/**
* This task optionally copy the dn2id chunk into the database and takes advantages of it's cursoring to compute the
* {@link ID2Count} index.
*/
{
private final PhaseTwoProgressReporter reporter;
private final BufferPool bufferPool;
private final Chunk dn2IdSourceChunk;
private final Chunk dn2IdDestination;
DN2IDImporterTask(PhaseTwoProgressReporter progressReporter, Importer importer, File tempDir, BufferPool bufferPool,
boolean dn2idAlreadyImported)
{
this.reporter = progressReporter;
this.bufferPool = bufferPool;
this.dn2IdSourceChunk = dn2IdChunk;
this.id2countCollector = id2countCollector;
}
{
final Chunk id2CountChunk =
long totalNumberOfEntries = 0;
final TreeVisitor<ChildrenCount> visitor = new ID2CountTreeVisitorImporter(asImporter(id2CountChunk));
{
while (dn2idCursor.next())
{
}
}
// -1 because baseDN is not counted
return null;
}
/** TreeVisitor computing and importing the number of children per parent. */
{
{
}
{
return new ChildrenCount(parentID);
}
{
}
{
{
}
}
}
/** Keep track of the number of children during the dn2id visit. */
private static final class ChildrenCount
{
private final EntryID parentEntryID;
private long numberOfChildren;
{
this.parentEntryID = id;
}
}
}
{
return new ChunkToImporterAdapter(chunk);
}
/**
* Delegates the storage of data to the {@link Importer}. This class has same thread-safeness as the supplied
* importer.
*/
private static final class ImporterToChunkAdapter implements Chunk
{
{
}
{
return true;
}
{
}
public long size()
{
}
public void delete()
{
// Nothing to do
}
}
/**
* Delegates the {@link #put(TreeName, ByteSequence, ByteSequence)} method of {@link Importer} to a {@link Chunk}.
* {@link #createTree(TreeName)} is a no-op, other methods throw {@link UnsupportedOperationException}. This class has
* same thread-safeness as the supplied {@link Chunk}.
*/
private static final class ChunkToImporterAdapter implements Importer
{
{
}
{
try
{
}
catch (Exception e)
{
throw new StorageRuntimeException(e);
}
}
{
throw new UnsupportedOperationException();
}
{
throw new UnsupportedOperationException();
}
{
throw new UnsupportedOperationException();
}
public void close()
{
// nothing to do
}
}
/**
* Write records into a delegated {@link Chunk} after performing a reordering of those records in regards of their key
* by using a best-effort algorithm. This class is intended to be used when records are initially ordered but might
* actually hit a chunk slightly disordered due to scheduling occurring in a multi-threaded environment. Records are
* buffered and sorted before being written to the delegated chunk. Because of the buffer mechanism, records might be
* written into the chunk after some delay. It's guaranteed that all entries will be written into the chunk only after
* the flip() method has been called. {@link #put(TreeName, ByteSequence, ByteSequence)} is thread-safe.
*/
private static final class MostlyOrderedChunk implements Chunk
{
/**
* Number of items to queue before writing them to the storage. This number must be at least equal to the number of
* threads which will access the put() method. If underestimated, {@link #put(ByteSequence, ByteSequence)} might
* lead to unordered copy. If overestimated, extra memory is wasted.
*/
private static final int QUEUE_SIZE = 1024;
private final int queueSize;
{
this.queueSize = QUEUE_SIZE;
}
public void delete()
{
// Nothing to do
}
{
{
/*
* Maximum size reached, take the record with the smallest key and persist it in the delegate chunk. this
* ensures records are (mostly) inserted in ascending key order, which is the optimal insert order for B-trees.
*/
}
return true;
}
{
// Purge pending entries
{
}
}
public long size()
{
}
}
{
}
/** An empty Chunk which cannot store data. */
{
{
return false;
}
public long size()
{
return 0;
}
public void delete()
{
// Nothing to do
}
{
{
public boolean next()
{
return false;
}
public boolean isDefined()
{
return false;
}
{
throw new NoSuchElementException();
}
{
throw new NoSuchElementException();
}
public void close()
{
// nothing to do
}
public String getMetricName()
{
return NullChunk.class.getSimpleName();
}
public long getNbBytesRead()
{
return 0;
}
public long getNbBytesTotal()
{
return 0;
}
};
}
}
/** Executor delegating the execution of task to the current thread. */
private static Executor sameThreadExecutor()
{
return new Executor()
{
{
}
};
}
/** Collect the results of asynchronous tasks. */
private static <K> List<K> waitTasksTermination(CompletionService<K> completionService, int nbTasks)
{
for (int i = 0; i < nbTasks; i++)
{
}
return results;
}
/** Regularly report progress statistics from the registered list of {@link ProgressMetric}. */
{
private final ScheduledExecutorService scheduler =
Executors.newSingleThreadScheduledExecutor(newThreadFactory(null, PHASE2_REPORTER_THREAD_NAME, true));
private ScheduledFuture<?> scheduledTask;
{
{
}
if (scheduledTask == null)
{
}
}
{
{
}
}
public synchronized void run()
{
if (deltaTime == 0)
{
return;
}
{
final int progressPercent = totalBytes > 0 ? Math.round((100f * newValue) / cursor.getNbBytesTotal()) : 0;
logger.info(NOTE_IMPORT_LDIF_PHASE_TWO_REPORT, cursor.getMetricName(), progressPercent, progressRemaining,
}
}
public synchronized void close()
{
}
}
/** Buffer used by {@link InMemorySortedChunk} to store and sort data. */
{
long readCompactUnsignedLong(int position);
int length();
}
/**
* Pre-allocate and maintain a fixed number of re-usable {@code Buffer}s. This allow to keep controls of heap memory
* consumption and prevents the significant object allocation cost occurring for huge objects.
*/
static final class BufferPool implements Closeable
{
private final int bufferSize;
private static final long BYTE_ARRAY_OFFSET;
static
{
try
{
field.setAccessible(true);
}
catch (Exception e)
{
throw new RuntimeException(e);
}
}
private static boolean supportOffHeap()
{
}
{
this.bufferSize = bufferSize;
for (int i = 0; i < nbBuffer; i++)
{
}
}
public int getBufferSize()
{
return bufferSize;
}
{
try
{
}
catch (InterruptedException e)
{
throw new StorageRuntimeException(e);
}
}
{
try
{
}
catch (InterruptedException e)
{
throw new StorageRuntimeException(e);
}
}
{
{
get();
}
}
public void close()
{
{
}
}
/** Off-heap buffer using Unsafe memory access. */
private final class OffHeapBuffer implements Buffer
{
private final long address;
private final int size;
private int position;
{
public int read() throws IOException
{
}
};
{
{
}
};
private boolean closed;
OffHeapBuffer(int size)
{
}
{
}
{
}
{
try
{
}
catch (IOException e)
{
throw new StorageRuntimeException(e);
}
}
public long readCompactUnsignedLong(final int position)
{
try
{
}
catch (IOException e)
{
throw new IllegalStateException(e);
}
}
{
{
}
}
public int length()
{
return size;
}
{
}
{
for(int i = 0 ; i < len ; i++)
{
if ( a != b )
{
return a - b;
}
}
}
public void close() throws IOException
{
if (!closed)
{
}
closed = true;
}
}
/** Off-heap buffer using Unsafe memory access. */
private final class HeapBuffer implements Buffer
{
private final ByteBuffer buffer;
{
public void write(int b) throws IOException
{
}
};
{
public int read() throws IOException
{
}
};
HeapBuffer(int size)
{
}
{
}
{
}
{
try
{
}
catch (IOException e)
{
throw new StorageRuntimeException(e);
}
}
public long readCompactUnsignedLong(final int position)
{
try
{
}
catch (IOException e)
{
throw new IllegalArgumentException(e);
}
}
{
}
public int length()
{
}
{
}
{
}
public void close()
{
// Nothing to do
}
}
}
/** Extends {@link SequentialCursor} by providing metric related to cursor's progress. */
interface MeteredCursor<K, V> extends SequentialCursor<K, V>
{
long getNbBytesRead();
long getNbBytesTotal();
}
/** Add the cursor to the reporter and remove it once closed. */
private static <K, V> SequentialCursor<K, V> trackCursorProgress(final PhaseTwoProgressReporter reporter,
final MeteredCursor<K, V> cursor)
{
{
public void close()
{
}
};
}
{
{
throw new NoSuchElementException();
}
}
/**
* Get a new {@link Collector} which can be used to merge encoded values. The types of values to merged is deduced
* from the {@link TreeName}
*/
private static Collector<?, ByteString> newCollector(final EntryContainer entryContainer, final TreeName treeName)
{
{
// key conflicts == merge EntryIDSets
return new EntryIDSetsCollector(index);
}
else if (isID2ChildrenCount(treeName))
{
// key conflicts == sum values
}
{
// key conflicts == exception
return UniqueValueCollector.getInstance();
}
}
{
}
{
}
{
}
{
}
{
{
{
return true;
}
}
return false;
}
{
{
{
{
return index;
}
}
}
return null;
}
/**
* A mutable reduction operation that accumulates input elements into a mutable result container, optionally
* transforming the accumulated result into a final representation after all input elements have been processed.
* Reduction operations can be performed either sequentially or in parallel. A Collector is specified by three
* functions that work together to accumulate entries into a mutable result container, and optionally perform a final
* transform on the result. They are: Creation of a new result container (get()), incorporating a new data element
* into a result container (accept()), performing an optional final transform on the container (merge)
*
* @param <A>
* Accumulator type
* @param <R>
* Result type
* @see java.util.stream.Collector
*/
interface Collector<A, R>
{
/**
* Creates and returns a new mutable result container. Equivalent to A java.util.function.Collector.supplier().get()
*/
A get();
/**
* Accepts two partial results and merges them. The combiner function may fold state from one argument into the
* other and return that, or may return a new result container. Equivalent to
* java.util.function.Collector.accumulator().accept(A, R)
*/
/**
* Perform the final transformation from the intermediate accumulation type A to the final result type R. Equivalent
* to R java.util.function.Collector.finisher().apply(A)
*/
R merge(A resultContainer);
}
/** {@link Collector} that throws an exception if multiple values have to be merged. */
static final class UniqueValueCollector<V> implements Collector<V, V>
{
static <V> Collector<V, V> getInstance()
{
}
public V get()
{
return null;
}
{
if (previousValue != null)
{
throw new IllegalArgumentException("Cannot accept multiple values (current=" + previousValue + ", new=" + value
+ ")");
}
return value;
}
public V merge(V latestValue)
{
if (latestValue == null)
{
throw new IllegalArgumentException("No value to merge but expected one");
}
return latestValue;
}
}
/**
* {@link Collector} that accepts encoded {@link EntryIDSet} objects and produces a {@link ByteString} representing
* the merged {@link EntryIDSet}.
*/
{
private final DefaultIndex index;
private final int indexLimit;
{
}
{
// LinkedList is used for it's O(1) add method (while ArrayList is O(n) when resize is required).
return new LinkedList<>();
}
{
{
}
/*
* else EntryIDSet is above index entry limits, discard additional values to avoid blowing up memory now, then
* discard all entries in merge()
*/
return resultContainer;
}
{
{
}
{
// Avoids unnecessary decoding + encoding
}
}
{
final long[] entryIDs = new long[indexLimit];
// accumulate in array
int i = 0;
{
{
// above index entry limit
return EntryIDSet.newUndefinedSet();
}
{
}
}
}
}
/**
* {@link Collector} that accepts {@code long} values encoded into {@link ByteString} objects and produces a
* {@link ByteString} representing the sum of the supplied {@code long}s.
*/
{
{
}
{
return 0L;
}
{
}
{
}
}
{
}
/** Decorate {@link SequentialCursor} by providing progress information while cursoring. */
private static final class MeteredSequentialCursorDecorator extends
SequentialCursorDecorator<SequentialCursor<ByteString, ByteString>, ByteString, ByteString>implements
{
private final String metricName;
private final long totalSize;
private volatile long bytesRead;
private MeteredSequentialCursorDecorator(SequentialCursor<ByteString, ByteString> delegate, String metricName,
long totalSize)
{
super(delegate);
this.metricName = metricName;
}
public boolean next()
{
{
return true;
}
return false;
}
public long getNbBytesRead()
{
return bytesRead;
}
public String getMetricName()
{
return metricName;
}
public long getNbBytesTotal()
{
return totalSize;
}
}
/** Helper allowing to create {@link SequentialCursor} decorator without having to re-implement all methods. */
static abstract class SequentialCursorDecorator<D extends SequentialCursor<K, V>, K, V> implements
SequentialCursor<K, V>
{
protected final D delegate;
{
}
public boolean next()
{
}
public boolean isDefined()
{
}
public K getKey() throws NoSuchElementException
{
}
public V getValue() throws NoSuchElementException
{
}
public void close()
{
}
}
{
int nbVisited = 0;
{
{
nbVisited++;
}
}
{
nbVisited++;
}
nbVisited += 2;
return nbVisited;
}
/** Visitor pattern allowing to process all type of indexes. */
private interface IndexVisitor
{
}
{
}
/** Update the trust state of the visited indexes. */
private static final class TrustModifier implements IndexVisitor
{
private final WriteableTransaction txn;
private final boolean trustValue;
{
this.trustValue = trustValue;
}
{
}
{
}
{
// System indexes don't have trust status
}
}
{
return new ClearDatabase(importer);
}
/** Delete & recreate the database of the visited indexes. */
private static final class ClearDatabase implements IndexVisitor
{
{
}
{
}
{
}
{
}
{
}
}
{
return new DegradedIndexFilter(delegate);
}
/** Visit indexes which are in a degraded state. */
private static final class DegradedIndexFilter implements IndexVisitor
{
private final IndexVisitor delegate;
{
}
{
{
}
}
{
{
}
}
{
// System indexes don't have trust status
}
}
/** Maintain a list containing the names of the visited indexes. */
private static final class SelectIndexName implements IndexVisitor
{
{
this.indexNames = new HashSet<>();
}
{
return indexNames;
}
{
}
{
}
{
}
{
}
}
private static final IndexVisitor visitOnlyIndexes(Collection<String> indexNames, IndexVisitor delegate)
{
}
private static final IndexVisitor visitOnlyAttributesOrIndexes(Collection<String> indexOrAttributeNames,
{
}
/** Visit indexes only if their name match one contained in a list. */
private static final class SpecificIndexFilter implements IndexVisitor
{
private final IndexVisitor delegate;
private final boolean includeAttributeNames;
{
{
}
}
{
if (indexIncluded(index)
{
}
}
{
if (indexIncluded(index))
{
}
}
{
if (indexIncluded(index))
{
}
}
{
}
}
/**
* Thread-safe fixed-size cache which, once full, remove the least recently accessed entry. Composition is used here
* to ensure that only methods generating entry-access in the LinkedHashMap are actually used. Otherwise, the least
* recently used property of the cache would not be respected.
*/
private static final class LRUPresenceCache<T>
{
LRUPresenceCache(final int maxEntries)
{
// +1 because newly added entry is added before the least recently one is removed.
{
{
return size() >= maxEntries;
}
});
}
{
}
{
}
}
{
return new ImporterToWriteableTransactionAdapter(importer);
}
/** Adapter allowing to use an {@link Importer} as a {@link WriteableTransaction}. */
private static final class ImporterToWriteableTransactionAdapter implements WriteableTransaction
{
{
}
{
}
{
}
{
{
return true;
}
return false;
}
{
}
{
long counter = 0;
{
{
counter++;
}
}
return counter;
}
{
throw new UnsupportedOperationException();
}
{
throw new UnsupportedOperationException();
}
{
throw new UnsupportedOperationException();
}
}
}