OnDiskMergeImporter.java revision dfe6772274d4642b2c332740ea6f65550b8cb855
/*
* CDDL HEADER START
*
* The contents of this file are subject to the terms of the
* Common Development and Distribution License, Version 1.0 only
* (the "License"). You may not use this file except in compliance
* with the License.
*
* You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
* or http://forgerock.org/license/CDDLv1.0.html.
* See the License for the specific language governing permissions
* and limitations under the License.
*
* When distributing Covered Code, include this CDDL HEADER in each
* file and include the License file at legal-notices/CDDLv1_0.txt.
* If applicable, add the following below this CDDL HEADER, with the
* fields enclosed by brackets "[]" replaced with your own identifying
* information:
* Portions Copyright [yyyy] [name of copyright owner]
*
* CDDL HEADER END
*
* Copyright 2015 ForgeRock AS.
*/
package org.opends.server.backends.pluggable;
import static java.nio.channels.FileChannel.*;
import static org.forgerock.util.Utils.*;
import static org.opends.messages.BackendMessages.*;
import static org.opends.server.util.DynamicConstants.*;
import static org.opends.server.util.StaticUtils.*;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileChannel.MapMode;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.StandardOpenOption;
import java.util.AbstractList;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.NavigableSet;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Set;
import java.util.SortedSet;
import java.util.TimerTask;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
import java.util.WeakHashMap;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.forgerock.i18n.slf4j.LocalizedLogger;
import org.forgerock.opendj.config.server.ConfigException;
import org.forgerock.opendj.ldap.ByteSequence;
import org.forgerock.opendj.ldap.ByteString;
import org.forgerock.opendj.ldap.ResultCode;
import org.forgerock.util.Reject;
import org.forgerock.util.Utils;
import org.forgerock.util.promise.PromiseImpl;
import org.opends.server.admin.std.meta.BackendIndexCfgDefn.IndexType;
import org.opends.server.admin.std.server.BackendIndexCfg;
import org.opends.server.admin.std.server.PluggableBackendCfg;
import org.opends.server.api.CompressedSchema;
import org.opends.server.backends.RebuildConfig;
import org.opends.server.backends.pluggable.AttributeIndex.MatchingRuleIndex;
import org.opends.server.backends.pluggable.CursorTransformer.SequentialCursorAdapter;
import org.opends.server.backends.pluggable.DN2ID.TreeVisitor;
import org.opends.server.backends.pluggable.ImportLDIFReader.EntryInformation;
import org.opends.server.backends.pluggable.OnDiskMergeImporter.ExternalSortChunk.InMemorySortedChunk;
import org.opends.server.backends.pluggable.spi.Cursor;
import org.opends.server.backends.pluggable.spi.Importer;
import org.opends.server.backends.pluggable.spi.ReadableTransaction;
import org.opends.server.backends.pluggable.spi.SequentialCursor;
import org.opends.server.backends.pluggable.spi.StorageRuntimeException;
import org.opends.server.backends.pluggable.spi.TreeName;
import org.opends.server.backends.pluggable.spi.UpdateFunction;
import org.opends.server.backends.pluggable.spi.WriteableTransaction;
import org.opends.server.core.DirectoryServer;
import org.opends.server.core.ServerContext;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.Entry;
import org.opends.server.types.InitializationException;
import org.opends.server.types.LDIFImportConfig;
import org.opends.server.types.LDIFImportResult;
import org.opends.server.util.Platform;
import com.forgerock.opendj.util.PackedLong;
// @Checkstyle:ignore
import sun.misc.Unsafe;
/**
* Imports LDIF data contained in files into the database. Because of the B-Tree structure used in backend, import is
* faster when records are inserted in ascending order. This prevents node locking/re-writing due to B-Tree inner nodes
* split. This is why import is performed in two phases: the first phase encode and sort all records while the second
* phase copy the sorted records into the database. Entries are read from an LDIF file by the {@link ImportLDIFReader}.
* Then, each entry are optionally validated and finally imported into a {@link Chunk} by the {@link EntryContainer}
* using a {@link PhaseOneWriteableTransaction}. Once all entries have been processed,
* {@link PhaseOneWriteableTransaction#getChunks()} get all the chunks which will be copied into the database
* concurrently using tasks created by the {@link ImporterTaskFactory}.
*/
final class OnDiskMergeImporter
{
private static final String DEFAULT_TMP_DIR = "import-tmp";
private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
/**
* Shim that allows properly constructing an {@link OnDiskMergeImporter} without polluting {@link ImportStrategy} and
* {@link RootContainer} with this importer inner workings.
*/
static class StrategyImpl implements ImportStrategy
{
private static final String PHASE1_REBUILDER_THREAD_NAME = "PHASE1-REBUILDER-%d";
private static final String PHASE2_REBUILDER_THREAD_NAME = "PHASE2-REBUILDER-%d";
private static final String PHASE1_IMPORTER_THREAD_NAME = "PHASE1-IMPORTER-%d";
private static final String PHASE2_IMPORTER_THREAD_NAME = "PHASE2-IMPORTER-%d";
private static final String SORTER_THREAD_NAME = "PHASE1-SORTER-%d";
/** Small heap threshold used to give more memory to JVM to attempt OOM errors. */
private static final int SMALL_HEAP_SIZE = 256 * MB;
private final ServerContext serverContext;
private final RootContainer rootContainer;
private final PluggableBackendCfg backendCfg;
StrategyImpl(ServerContext serverContext, RootContainer rootContainer, PluggableBackendCfg backendCfg)
{
this.serverContext = serverContext;
this.rootContainer = rootContainer;
this.backendCfg = backendCfg;
}
@Override
public LDIFImportResult importLDIF(LDIFImportConfig importConfig) throws Exception
{
final long availableMemory = calculateAvailableMemory();
final int threadCount =
importConfig.getThreadCount() == 0 ? Runtime.getRuntime().availableProcessors()
: importConfig.getThreadCount();
final int indexCount = getIndexCount();
final int nbBuffer = threadCount * indexCount * 2;
logger.info(NOTE_IMPORT_LDIF_TOT_MEM_BUF, availableMemory, nbBuffer);
final int bufferSize = computeBufferSize(nbBuffer, availableMemory);
logger.info(NOTE_IMPORT_LDIF_DB_MEM_BUF_INFO, DB_CACHE_SIZE, bufferSize);
logger.info(NOTE_IMPORT_STARTING, DirectoryServer.getVersionString(), BUILD_ID, REVISION);
logger.info(NOTE_IMPORT_THREAD_COUNT, threadCount);
final long startTime = System.currentTimeMillis();
final OnDiskMergeImporter importer;
final ExecutorService sorter = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors(),
newThreadFactory(null, SORTER_THREAD_NAME, true));
final LDIFReaderSource source =
new LDIFReaderSource(rootContainer, importConfig, PHASE1_IMPORTER_THREAD_NAME, threadCount);
try (final Importer dbStorage = rootContainer.getStorage().startImport();
final BufferPool bufferPool = new BufferPool(nbBuffer, bufferSize))
{
final File tempDir = prepareTempDir(backendCfg, importConfig.getTmpDirectory());
final Collection<EntryContainer> entryContainers = rootContainer.getEntryContainers();
final AbstractTwoPhaseImportStrategy importStrategy = importConfig.getSkipDNValidation()
? new SortAndImportWithoutDNValidation(entryContainers, dbStorage, tempDir, bufferPool, sorter)
: new SortAndImportWithDNValidation(entryContainers, dbStorage, tempDir, bufferPool, sorter);
importer = new OnDiskMergeImporter(PHASE2_IMPORTER_THREAD_NAME, importStrategy);
importer.doImport(source);
}
finally
{
sorter.shutdown();
}
logger.info(NOTE_IMPORT_PHASE_STATS, importer.getTotalTimeInMillis() / 1000, importer.getPhaseOneTimeInMillis()
/ 1000, importer.getPhaseTwoTimeInMillis() / 1000);
final long importTime = System.currentTimeMillis() - startTime;
float rate = 0;
if (importTime > 0)
{
rate = 1000f * source.getEntriesRead() / importTime;
}
logger.info(NOTE_IMPORT_FINAL_STATUS, source.getEntriesRead(), importer.getImportedCount(), source
.getEntriesIgnored(), source.getEntriesRejected(), 0, importTime / 1000, rate);
return new LDIFImportResult(source.getEntriesRead(), source.getEntriesRejected(), source
.getEntriesIgnored());
}
private int getIndexCount() throws ConfigException
{
int indexCount = 2; // dn2id, dn2uri
for (String indexName : backendCfg.listBackendIndexes())
{
final BackendIndexCfg index = backendCfg.getBackendIndex(indexName);
final SortedSet<IndexType> types = index.getIndexType();
if (types.contains(IndexType.EXTENSIBLE))
{
indexCount += types.size() - 1 + index.getIndexExtensibleMatchingRule().size();
}
else
{
indexCount += types.size();
}
}
indexCount += backendCfg.listBackendVLVIndexes().length;
return indexCount;
}
@Override
public void rebuildIndex(final RebuildConfig rebuildConfig) throws Exception
{
final long availableMemory = calculateAvailableMemory();
// Rebuild indexes
final OnDiskMergeImporter importer;
final ExecutorService sorter = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors(),
newThreadFactory(null, SORTER_THREAD_NAME, true));
try (final Importer dbStorage = rootContainer.getStorage().startImport())
{
final EntryContainer entryContainer = rootContainer.getEntryContainer(rebuildConfig.getBaseDN());
final long totalEntries =
entryContainer.getID2Entry().getRecordCount(new ImporterToWriteableTransactionAdapter(dbStorage));
final Set<String> indexesToRebuild = selectIndexesToRebuild(entryContainer, rebuildConfig, totalEntries);
if (rebuildConfig.isClearDegradedState())
{
visitIndexes(entryContainer, new SpecificIndexFilter(new TrustModifier(dbStorage, true), indexesToRebuild));
logger.info(NOTE_REBUILD_CLEARDEGRADEDSTATE_FINAL_STATUS, rebuildConfig.getRebuildList());
return;
}
final int threadCount = Runtime.getRuntime().availableProcessors();
final int nbBuffer = 2 * indexesToRebuild.size() * threadCount;
final int bufferSize = computeBufferSize(nbBuffer, availableMemory);
try (final BufferPool bufferPool = new BufferPool(nbBuffer, bufferSize))
{
final File tempDir = prepareTempDir(backendCfg, rebuildConfig.getTmpDirectory());
final AbstractTwoPhaseImportStrategy strategy =
new RebuildIndexStrategy(rootContainer.getEntryContainers(), dbStorage, tempDir, bufferPool, sorter,
indexesToRebuild);
importer = new OnDiskMergeImporter(PHASE2_REBUILDER_THREAD_NAME, strategy);
importer.doImport(
new ID2EntrySource(entryContainer, dbStorage, PHASE1_REBUILDER_THREAD_NAME, threadCount, totalEntries));
}
}
finally
{
sorter.shutdown();
}
final long totalTime = importer.getTotalTimeInMillis();
final float rate = totalTime > 0 ? 1000f * importer.getImportedCount() / totalTime : 0;
logger.info(NOTE_REBUILD_FINAL_STATUS, importer.getImportedCount(), totalTime / 1000, rate);
}
private static final Set<String> selectIndexesToRebuild(EntryContainer entryContainer, RebuildConfig rebuildConfig,
long totalEntries)
{
final SelectIndexName selector = new SelectIndexName();
switch (rebuildConfig.getRebuildMode())
{
case ALL:
visitIndexes(entryContainer, selector);
logger.info(NOTE_REBUILD_ALL_START, totalEntries);
break;
case DEGRADED:
visitIndexes(entryContainer, new DegradedIndexFilter(selector));
logger.info(NOTE_REBUILD_DEGRADED_START, totalEntries);
break;
case USER_DEFINED:
visitIndexes(entryContainer, new SpecificIndexFilter(selector, rebuildConfig.getRebuildList()));
if (!rebuildConfig.isClearDegradedState()) {
logger.info(NOTE_REBUILD_START, Utils.joinAsString(", ", rebuildConfig.getRebuildList()), totalEntries);
}
break;
}
return selector.getSelectedIndexNames();
}
private static File prepareTempDir(PluggableBackendCfg backendCfg, String tmpDirectory)
throws InitializationException
{
final File tempDir =
new File(getFileForPath(tmpDirectory != null ? tmpDirectory : DEFAULT_TMP_DIR), backendCfg.getBackendId());
recursiveDelete(tempDir);
if (!tempDir.exists() && !tempDir.mkdirs())
{
throw new InitializationException(ERR_IMPORT_CREATE_TMPDIR_ERROR.get(tempDir));
}
return tempDir;
}
private static int computeBufferSize(int nbBuffer, long availableMemory) throws InitializationException
{
if (BufferPool.supportOffHeap())
{
return MAX_BUFFER_SIZE;
}
final int bufferSize = Math.min((int) (availableMemory / nbBuffer), MAX_BUFFER_SIZE);
if (bufferSize < MIN_BUFFER_SIZE)
{
// Not enough memory.
throw new InitializationException(ERR_IMPORT_LDIF_LACK_MEM.get(availableMemory, nbBuffer * MIN_BUFFER_SIZE
+ REQUIRED_FREE_MEMORY));
}
return bufferSize;
}
/**
* Calculates the amount of available memory which can be used by this import, taking into account whether or not
* the import is running offline or online as a task.
*/
private long calculateAvailableMemory()
{
final Runtime runtime = Runtime.getRuntime();
// call twice gc to ensure finalizers are called
// and young to old gen references are properly gc'd
runtime.gc();
runtime.gc();
final long totalAvailableMemory;
if (DirectoryServer.isRunning())
{
// Online import/rebuild.
final long availableMemory = serverContext.getMemoryQuota().getAvailableMemory();
totalAvailableMemory = Math.max(availableMemory, 16 * MB);
}
else
{
// Offline import/rebuild.
totalAvailableMemory = Platform.getUsableMemoryForCaching();
}
// Now take into account various fudge factors.
int importMemPct = 90;
if (totalAvailableMemory <= SMALL_HEAP_SIZE)
{
// Be pessimistic when memory is low.
importMemPct -= 25;
}
final long usedMemory = runtime.totalMemory() - runtime.freeMemory() + DB_CACHE_SIZE + REQUIRED_FREE_MEMORY;
return (totalAvailableMemory * importMemPct / 100) - usedMemory;
}
}
/** Source of LDAP {@link Entry}s to process. */
private interface Source
{
/** Process {@link Entry}s extracted from a {@link Source}. */
interface EntryProcessor
{
void processEntry(EntryContainer container, EntryID entryID, Entry entry) throws Exception;
}
void processAllEntries(EntryProcessor processor) throws Exception;
boolean isCancelled();
}
/** Extract LDAP {@link Entry}s from an LDIF file. */
private static final class LDIFReaderSource implements Source
{
private static final String PHASE1_REPORTER_THREAD_NAME = "PHASE1-REPORTER-%d";
private final Map<DN, EntryContainer> entryContainers;
private final LDIFImportConfig importConfig;
private final ImportLDIFReader reader;
private final ExecutorService executor;
private final int nbThreads;
LDIFReaderSource(RootContainer rootContainer, LDIFImportConfig importConfig, String threadNameTemplate,
int nbThreads) throws IOException
{
this.importConfig = importConfig;
this.reader = new ImportLDIFReader(importConfig, rootContainer);
this.entryContainers = new HashMap<>();
for (EntryContainer container : rootContainer.getEntryContainers())
{
this.entryContainers.put(container.getBaseDN(), container);
}
this.nbThreads = nbThreads;
this.executor = Executors.newFixedThreadPool(nbThreads, newThreadFactory(null, threadNameTemplate, true));
}
@Override
public void processAllEntries(final EntryProcessor entryProcessor) throws Exception
{
final ScheduledExecutorService scheduler =
Executors.newSingleThreadScheduledExecutor(newThreadFactory(null, PHASE1_REPORTER_THREAD_NAME, true));
scheduler.scheduleAtFixedRate(new PhaseOneProgressReporter(), 10, 10, TimeUnit.SECONDS);
final CompletionService<Void> completion = new ExecutorCompletionService<>(executor);
try
{
for (int i = 0; i < nbThreads; i++)
{
completion.submit(new Callable<Void>()
{
@Override
public Void call() throws Exception
{
EntryInformation entryInfo;
while ((entryInfo = reader.readEntry(entryContainers)) != null && !importConfig.isCancelled())
{
final EntryContainer entryContainer = entryInfo.getEntryContainer();
final Entry entry = entryInfo.getEntry();
final DN entryDN = entry.getName();
final DN parentDN = entryContainer.getParentWithinBase(entryDN);
if (parentDN != null)
{
reader.waitIfPending(parentDN);
}
try
{
entryProcessor.processEntry(entryContainer, entryInfo.getEntryID(), entry);
}
catch (DirectoryException e)
{
reader.rejectEntry(entry, e.getMessageObject());
}
catch (Exception e)
{
reader.rejectEntry(entry, ERR_EXECUTION_ERROR.get(e));
}
finally
{
reader.removePending(entry.getName());
}
}
return null;
}
});
}
waitTasksTermination(completion, nbThreads);
}
finally
{
scheduler.shutdown();
executor.shutdown();
}
}
long getEntriesRead()
{
return reader.getEntriesRead();
}
long getEntriesIgnored()
{
return reader.getEntriesIgnored();
}
long getEntriesRejected()
{
return reader.getEntriesRejected();
}
@Override
public boolean isCancelled()
{
return importConfig.isCancelled();
}
/** This class reports progress of first phase of import processing at fixed intervals. */
private final class PhaseOneProgressReporter extends TimerTask
{
/** The number of entries that had been read at the time of the previous progress report. */
private long previousCount;
/** The time in milliseconds of the previous progress report. */
private long previousTime;
/** Create a new import progress task. */
public PhaseOneProgressReporter()
{
previousTime = System.currentTimeMillis();
}
/** The action to be performed by this timer task. */
@Override
public void run()
{
long entriesRead = reader.getEntriesRead();
long entriesIgnored = reader.getEntriesIgnored();
long entriesRejected = reader.getEntriesRejected();
long deltaCount = entriesRead - previousCount;
long latestTime = System.currentTimeMillis();
long deltaTime = latestTime - previousTime;
if (deltaTime == 0)
{
return;
}
float rate = 1000f * deltaCount / deltaTime;
logger.info(NOTE_IMPORT_PROGRESS_REPORT, entriesRead, entriesIgnored, entriesRejected, rate);
previousCount = entriesRead;
previousTime = latestTime;
}
}
}
/** Extract LDAP {@link Entry}s from an existing database. */
private static final class ID2EntrySource implements Source
{
private static final String PHASE1_REPORTER_THREAD_NAME = "REPORTER-%d";
private final EntryContainer entryContainer;
private final CompressedSchema schema;
private final Importer importer;
private final ExecutorService executor;
private final long nbTotalEntries;
private final AtomicLong nbEntriesProcessed = new AtomicLong();
private volatile boolean interrupted;
ID2EntrySource(EntryContainer entryContainer, Importer importer, String threadNameTemplate, int nbThread,
long nbTotalEntries)
{
this.nbTotalEntries = nbTotalEntries;
this.entryContainer = entryContainer;
this.importer = importer;
this.schema = entryContainer.getRootContainer().getCompressedSchema();
// by default (unfortunately) the ThreadPoolExecutor will throw an exception when queue is full.
this.executor =
new ThreadPoolExecutor(nbThread, nbThread, 0, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(nbThread * 2),
newThreadFactory(null, threadNameTemplate, true),
new RejectedExecutionHandler()
{
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
{
// this will block if the queue is full
try
{
executor.getQueue().put(r);
}
catch (InterruptedException e)
{
}
}
});
}
@Override
public void processAllEntries(final EntryProcessor entryProcessor) throws Exception
{
final ScheduledExecutorService scheduler =
Executors.newSingleThreadScheduledExecutor(newThreadFactory(null, PHASE1_REPORTER_THREAD_NAME, true));
scheduler.scheduleAtFixedRate(new PhaseOneProgressReporter(), 10, 10, TimeUnit.SECONDS);
final PromiseImpl<Void, Exception> promise = PromiseImpl.create();
try (final SequentialCursor<ByteString, ByteString> cursor =
importer.openCursor(entryContainer.getID2Entry().getName()))
{
while (cursor.next())
{
final ByteString key = cursor.getKey();
final ByteString value = cursor.getValue();
executor.submit(new Runnable()
{
@Override
public void run()
{
try
{
entryProcessor.processEntry(entryContainer,
new EntryID(key), ID2Entry.entryFromDatabase(value, schema));
nbEntriesProcessed.incrementAndGet();
}
catch (Exception e)
{
interrupted = true;
promise.handleException(e);
}
}
});
}
}
finally
{
executor.shutdown();
executor.awaitTermination(30, TimeUnit.SECONDS);
scheduler.shutdown();
}
// Forward exception if any
if (promise.isDone())
{
promise.getOrThrow(0, TimeUnit.SECONDS);
}
}
@Override
public boolean isCancelled()
{
return interrupted;
}
/** This class reports progress of first phase of import processing at fixed intervals. */
private final class PhaseOneProgressReporter extends TimerTask
{
/** The number of entries that had been read at the time of the previous progress report. */
private long previousCount;
/** The time in milliseconds of the previous progress report. */
private long previousTime;
/** Create a new import progress task. */
public PhaseOneProgressReporter()
{
previousTime = System.currentTimeMillis();
}
/** The action to be performed by this timer task. */
@Override
public void run()
{
long entriesRead = nbEntriesProcessed.get();
long deltaCount = entriesRead - previousCount;
long latestTime = System.currentTimeMillis();
long deltaTime = latestTime - previousTime;
final float progressPercent = nbTotalEntries > 0 ? Math.round((100f * entriesRead) / nbTotalEntries) : 0;
if (deltaTime == 0)
{
return;
}
float rate = 1000f * deltaCount / deltaTime;
logger.info(NOTE_REBUILD_PROGRESS_REPORT, progressPercent, entriesRead, nbTotalEntries, rate);
previousCount = entriesRead;
previousTime = latestTime;
}
}
}
/** Max size of phase one buffer. */
private static final int MAX_BUFFER_SIZE = 2 * MB;
/** Min size of phase one buffer. */
private static final int MIN_BUFFER_SIZE = 4 * KB;
/** DB cache size to use during import. */
private static final int DB_CACHE_SIZE = 4 * MB;
/** Required free memory for this importer. */
private static final int REQUIRED_FREE_MEMORY = 50 * MB;
/** LDIF reader. */
/** Map of DNs to Suffix objects. */
private final AbstractTwoPhaseImportStrategy importStrategy;
private final String phase2ThreadNameTemplate;
private final AtomicLong importedCount = new AtomicLong();
private long phaseOneTimeMs;
private long phaseTwoTimeMs;
private OnDiskMergeImporter(String phase2ThreadNameTemplate, AbstractTwoPhaseImportStrategy importStrategy)
{
this.phase2ThreadNameTemplate = phase2ThreadNameTemplate;
this.importStrategy = importStrategy;
}
private void doImport(final Source source) throws Exception
{
final long phaseOneStartTime = System.currentTimeMillis();
final PhaseOneWriteableTransaction transaction = new PhaseOneWriteableTransaction(importStrategy);
importedCount.set(0);
final ConcurrentMap<EntryContainer, CountDownLatch> importedContainers = new ConcurrentHashMap<>();
// Start phase one
source.processAllEntries(new Source.EntryProcessor()
{
@Override
public void processEntry(EntryContainer container, EntryID entryID, Entry entry) throws DirectoryException,
InterruptedException
{
CountDownLatch latch = importedContainers.get(container);
if (latch == null)
{
final CountDownLatch newLatch = new CountDownLatch(1);
if (importedContainers.putIfAbsent(container, newLatch) == null)
{
try
{
importStrategy.beforeImport(container);
}
finally
{
newLatch.countDown();
}
}
latch = importedContainers.get(container);
}
latch.await();
importStrategy.validate(container, entryID, entry);
container.importEntry(transaction, entryID, entry);
importedCount.incrementAndGet();
}
});
phaseOneTimeMs = System.currentTimeMillis() - phaseOneStartTime;
if (source.isCancelled())
{
throw new InterruptedException("Import processing canceled.");
}
// Start phase two
final long phaseTwoStartTime = System.currentTimeMillis();
try (final PhaseTwoProgressReporter progressReporter = new PhaseTwoProgressReporter())
{
final List<Callable<Void>> tasks = new ArrayList<>();
final Set<String> importedBaseDNs = new HashSet<>();
for (Map.Entry<TreeName, Chunk> treeChunk : transaction.getChunks().entrySet())
{
importedBaseDNs.add(treeChunk.getKey().getBaseDN());
tasks.add(importStrategy.newPhaseTwoTask(treeChunk.getKey(), treeChunk.getValue(), progressReporter));
}
invokeParallel(phase2ThreadNameTemplate, tasks);
}
// Finish import
for(EntryContainer entryContainer : importedContainers.keySet())
{
importStrategy.afterImport(entryContainer);
}
phaseTwoTimeMs = System.currentTimeMillis() - phaseTwoStartTime;
}
public long getImportedCount()
{
return importedCount.get();
}
public long getPhaseOneTimeInMillis()
{
return phaseOneTimeMs;
}
public long getPhaseTwoTimeInMillis()
{
return phaseTwoTimeMs;
}
public long getTotalTimeInMillis()
{
return phaseOneTimeMs + phaseTwoTimeMs;
}
/** Create {@link Chunk} depending on the {@link TreeName}. */
private interface ChunkFactory
{
Chunk newChunk(TreeName treeName) throws Exception;
}
/** Provides default behavior for two phases strategies. */
private static abstract class AbstractTwoPhaseImportStrategy implements ChunkFactory
{
protected final Map<String, EntryContainer> entryContainers;
protected final Executor sorter;
protected final Importer importer;
protected final BufferPool bufferPool;
protected final File tempDir;
AbstractTwoPhaseImportStrategy(Collection<EntryContainer> entryContainers, Importer importer, File tempDir,
BufferPool bufferPool, Executor sorter)
{
this.entryContainers = new HashMap<>(entryContainers.size());
for (EntryContainer container : entryContainers)
{
this.entryContainers.put(container.getTreePrefix(), container);
}
this.importer = importer;
this.tempDir = tempDir;
this.bufferPool = bufferPool;
this.sorter = sorter;
}
abstract void validate(EntryContainer entryContainer, EntryID entryID, Entry entry) throws DirectoryException;
void beforeImport(EntryContainer entryContainer)
{
clearEntryContainerTrees(entryContainer);
visitIndexes(entryContainer, new TrustModifier(importer, false));
}
abstract Callable<Void> newPhaseTwoTask(TreeName treeName, Chunk chunk, PhaseTwoProgressReporter progressReporter);
void afterImport(EntryContainer entryContainer)
{
visitIndexes(entryContainer, new TrustModifier(importer, true));
}
final void clearEntryContainerTrees(EntryContainer entryContainer)
{
for(Tree tree : entryContainer.listTrees())
{
importer.clearTree(tree.getName());
}
}
final Chunk newExternalSortChunk(TreeName treeName) throws Exception
{
return new ExternalSortChunk(tempDir, treeName.toString(), bufferPool,
newCollector(entryContainers.get(treeName.getBaseDN()), treeName), sorter);
}
final Callable<Void> newChunkCopierTask(TreeName treeName, final Chunk chunk,
PhaseTwoProgressReporter progressReporter)
{
return new CleanImportTask(progressReporter, chunk, treeName, importer);
}
final Callable<Void> newDN2IDImporterTask(TreeName treeName, final Chunk chunk,
PhaseTwoProgressReporter progressReporter, boolean dn2idAlreadyImported)
{
final EntryContainer entryContainer = entryContainers.get(treeName.getBaseDN());
final ID2Count id2count = entryContainer.getID2ChildrenCount();
return new DN2IDImporterTask(progressReporter, importer, tempDir, bufferPool, entryContainer.getDN2ID(), chunk,
id2count, newCollector(entryContainer, id2count.getName()), dn2idAlreadyImported);
}
static final Callable<Void> newFlushTask(final Chunk chunk)
{
return new Callable<Void>()
{
@Override
public Void call() throws Exception
{
try (final MeteredCursor<ByteString, ByteString> unusued = chunk.flip())
{
// force flush
}
return null;
}
};
}
}
/**
* No validation is performed, every {@link TreeName} (but id2entry) are imported into dedicated
* {@link ExternalSortChunk} before being imported into the {@link Importer}. id2entry which is directly copied into
* the database through {@link ImporterToChunkAdapter}.
*/
private static final class SortAndImportWithoutDNValidation extends AbstractTwoPhaseImportStrategy
{
SortAndImportWithoutDNValidation(Collection<EntryContainer> entryContainers, Importer importer, File tempDir,
BufferPool bufferPool, Executor sorter)
{
super(entryContainers, importer, tempDir, bufferPool, sorter);
}
@Override
public void validate(EntryContainer entryContainer, EntryID entryID, Entry entry)
{
// No validation performed. All entries are considered valid.
}
@Override
public Chunk newChunk(TreeName treeName) throws Exception
{
if (isID2Entry(treeName))
{
importer.clearTree(treeName);
return new MostlyOrderedChunk(asChunk(treeName, importer));
}
return newExternalSortChunk(treeName);
}
@Override
public Callable<Void> newPhaseTwoTask(TreeName treeName, final Chunk chunk,
PhaseTwoProgressReporter progressReporter)
{
if (isID2Entry(treeName))
{
return newFlushTask(chunk);
}
else if (isDN2ID(treeName))
{
return newDN2IDImporterTask(treeName, chunk, progressReporter, false);
}
return newChunkCopierTask(treeName, chunk, progressReporter);
}
}
/**
* This strategy performs two validations by ensuring that there is no duplicate entry (entry with same DN) and that
* the given entry has an existing parent. To do so, the dn2id is directly imported into the database in addition of
* id2entry. Others tree are externally sorted before being imported into the database.
*/
private static final class SortAndImportWithDNValidation extends AbstractTwoPhaseImportStrategy implements
ReadableTransaction
{
private static final int DN_CACHE_SIZE = 16;
private final LRUPresenceCache<DN> dnCache = new LRUPresenceCache<>(DN_CACHE_SIZE);
SortAndImportWithDNValidation(Collection<EntryContainer> entryContainers, Importer importer, File tempDir,
BufferPool bufferPool, Executor sorter)
{
super(entryContainers, importer, tempDir, bufferPool, sorter);
}
@Override
public Chunk newChunk(TreeName treeName) throws Exception
{
if (isID2Entry(treeName))
{
importer.clearTree(treeName);
return new MostlyOrderedChunk(asChunk(treeName, importer));
}
else if (isDN2ID(treeName))
{
importer.clearTree(treeName);
return asChunk(treeName, importer);
}
return newExternalSortChunk(treeName);
}
@Override
public Callable<Void> newPhaseTwoTask(TreeName treeName, final Chunk chunk,
PhaseTwoProgressReporter progressReporter)
{
if (isID2Entry(treeName))
{
return newFlushTask(chunk);
}
else if (isDN2ID(treeName))
{
return newDN2IDImporterTask(treeName, chunk, progressReporter, true);
}
return newChunkCopierTask(treeName, chunk, progressReporter);
}
@Override
public void validate(EntryContainer entryContainer, EntryID entryID, Entry entry) throws DirectoryException
{
final DN2ID dn2Id = entryContainer.getDN2ID();
final DN entryDN = entry.getName();
final DN parentDN = entryContainer.getParentWithinBase(entryDN);
if (parentDN != null && !dnCache.contains(parentDN) && dn2Id.get(this, parentDN) == null)
{
throw new DirectoryException(ResultCode.NO_SUCH_OBJECT, ERR_IMPORT_PARENT_NOT_FOUND.get(parentDN));
}
if (dn2Id.get(this, entryDN) != null)
{
throw new DirectoryException(ResultCode.ENTRY_ALREADY_EXISTS, ERR_ADD_ENTRY_ALREADY_EXISTS.get(entry));
}
dnCache.add(entryDN);
}
@Override
public ByteString read(TreeName treeName, ByteSequence key)
{
return importer.read(treeName, key);
}
@Override
public Cursor<ByteString, ByteString> openCursor(TreeName treeName)
{
throw new UnsupportedOperationException();
}
@Override
public long getRecordCount(TreeName treeName)
{
throw new UnsupportedOperationException();
}
}
/** Import only a specific indexes list while ignoring everything else. */
private static final class RebuildIndexStrategy extends AbstractTwoPhaseImportStrategy
{
private final Set<String> indexNames;
RebuildIndexStrategy(Collection<EntryContainer> entryContainers, Importer importer, File tempDir,
BufferPool bufferPool, Executor sorter, Collection<String> indexNames)
{
super(entryContainers, importer, tempDir, bufferPool, sorter);
this.indexNames = new HashSet<>(indexNames.size());
for(String indexName : indexNames)
{
this.indexNames.add(indexName.toLowerCase());
}
}
@Override
void beforeImport(EntryContainer entryContainer)
{
visitIndexes(entryContainer, new SpecificIndexFilter(new TrustModifier(importer, false), indexNames));
visitIndexes(entryContainer, new SpecificIndexFilter(new ClearDatabase(importer), indexNames));
}
@Override
void afterImport(EntryContainer entryContainer)
{
visitIndexes(entryContainer, new SpecificIndexFilter(new TrustModifier(importer, true), indexNames));
}
@Override
public Chunk newChunk(TreeName treeName) throws Exception
{
if (indexNames.contains(treeName.getIndexId().toLowerCase()))
{
return newExternalSortChunk(treeName);
}
// Ignore
return nullChunk();
}
@Override
public Callable<Void> newPhaseTwoTask(TreeName treeName, Chunk chunk, PhaseTwoProgressReporter progressReporter)
{
if (indexNames.contains(treeName.getIndexId().toLowerCase()))
{
if (isDN2ID(treeName))
{
return newDN2IDImporterTask(treeName, chunk, progressReporter, false);
}
return newChunkCopierTask(treeName, chunk, progressReporter);
}
// Do nothing (flush null chunk)
return newFlushTask(chunk);
}
@Override
public void validate(EntryContainer entryContainer, EntryID entryID, Entry entry) throws DirectoryException
{
// No validation performed. All entries are considered valid.
}
}
private static <V> List<V> invokeParallel(String threadNameTemplate, Collection<Callable<V>> tasks)
throws InterruptedException, ExecutionException
{
final ExecutorService executor = Executors.newCachedThreadPool(newThreadFactory(null, threadNameTemplate, true));
try
{
final CompletionService<V> completionService = new ExecutorCompletionService<>(executor);
for (Callable<V> task : tasks)
{
completionService.submit(task);
}
return waitTasksTermination(completionService, tasks.size());
}
finally
{
executor.shutdown();
}
}
/**
* A {@link WriteableTransaction} delegates the storage of data to {@link Chunk}s which are created on-demand for each
* {@link TreeName} through the provided {@link ChunkFactory}. Once there is no more data to import, call
* {@link #getChunks()} to get the resulting {@link Chunk}s containing the sorted data to import into database.
* {@link #put(TreeName, ByteSequence, ByteSequence)} is thread-safe. Since there is only one {@link Chunk} created
* per {@link TreeName}, the {@link Chunk#put(ByteSequence, ByteSequence)} method of returned {@link Chunk} must be
* thread-safe.
*/
private static final class PhaseOneWriteableTransaction implements WriteableTransaction
{
private final ConcurrentMap<TreeName, Chunk> chunks = new ConcurrentHashMap<>();
private final ChunkFactory chunkFactory;
PhaseOneWriteableTransaction(ChunkFactory chunkFactory)
{
this.chunkFactory = chunkFactory;
}
Map<TreeName, Chunk> getChunks()
{
return chunks;
}
/**
* Store record into a {@link Chunk}. Creating one if none is existing for the given treeName. This method is
* thread-safe.
*/
@Override
public void put(final TreeName treeName, ByteSequence key, ByteSequence value)
{
try
{
getOrCreateChunk(treeName).put(key, value);
}
catch (Exception e)
{
throw new StorageRuntimeException(e);
}
}
private Chunk getOrCreateChunk(final TreeName treeName) throws Exception
{
Chunk alreadyExistingChunk = chunks.get(treeName);
if (alreadyExistingChunk != null)
{
return alreadyExistingChunk;
}
final Chunk newChunk = chunkFactory.newChunk(treeName);
alreadyExistingChunk = chunks.putIfAbsent(treeName, newChunk);
if (alreadyExistingChunk != null)
{
// Another thread was faster at creating a new chunk, close this one.
newChunk.delete();
return alreadyExistingChunk;
}
return newChunk;
}
@Override
public ByteString read(TreeName treeName, ByteSequence key)
{
throw new UnsupportedOperationException();
}
@Override
public boolean update(TreeName treeName, ByteSequence key, UpdateFunction f)
{
throw new UnsupportedOperationException();
}
@Override
public Cursor<ByteString, ByteString> openCursor(TreeName treeName)
{
throw new UnsupportedOperationException();
}
@Override
public long getRecordCount(TreeName treeName)
{
throw new UnsupportedOperationException();
}
@Override
public void openTree(TreeName name, boolean createOnDemand)
{
throw new UnsupportedOperationException();
}
@Override
public void deleteTree(TreeName name)
{
throw new UnsupportedOperationException();
}
@Override
public boolean delete(TreeName treeName, ByteSequence key)
{
throw new UnsupportedOperationException();
}
}
/**
* Chunk implementations are a data storage with an optional limited capacity. Chunk are typically used by first
* adding data to the storage using {@link #put(ByteSequence, ByteSequence)} later on data can be sequentially
* accessed using {@link #flip()}.
*/
interface Chunk
{
/**
* Add data to the storage. Wherever this method is thread-safe or not is implementation dependent.
*
* @return true if the data were added to the storage, false if the chunk is full.
*/
boolean put(ByteSequence key, ByteSequence value);
/**
* Flip this chunk from write-only to read-only in order to get the previously stored data. This method must be
* called only once. After flip is called, Chunk instance must not be used anymore.
*
* @return a {@link MeteredCursor} to access the data
*/
MeteredCursor<ByteString, ByteString> flip();
/**
* Return size of data contained in this chunk. This size is guaranteed to be consistent only if there is no pending
* {@link #put(ByteSequence, ByteSequence)} operations.
*/
long size();
/**
* While chunk's memory and files are automatically garbage collected/deleted at exit, this method can be called to
* clean things now.
*/
void delete();
}
/**
* Store and sort data into multiple chunks. Thanks to the chunk rolling mechanism, this chunk can sort and store an
* unlimited amount of data. This class uses double-buffering: data are firstly stored in a
* {@link InMemorySortedChunk} which, once full, will be asynchronously sorted and copied into a
* {@link FileRegionChunk}. Duplicate keys are reduced by a {@link Collector}.
* {@link #put(ByteSequence, ByteSequence))} is thread-safe.
* This class is used in phase-one. There is one {@link ExternalSortChunk} per
* database tree, shared across all phase-one importer threads, in charge of storing/sorting records.
*/
static final class ExternalSortChunk implements Chunk
{
/** Name reported by the {@link MeteredCursor} after {@link #flip()}. */
private final String name;
/** Provides buffer used to store and sort chunk of data. */
private final BufferPool bufferPool;
/** File containing the regions used to store the data. */
private final File file;
private final FileChannel channel;
/** Pointer to the next available region in the file, typically at end of file. */
private final AtomicLong filePosition = new AtomicLong();
/** Collector used to reduces the number of duplicate keys during sort. */
private final Collector<?, ByteString> deduplicator;
/** Keep track of pending sorting tasks. */
private final CompletionService<MeteredCursor<ByteString, ByteString>> sorter;
/** Keep track of currently opened chunks. */
private final Set<Chunk> activeChunks = Collections.synchronizedSet(new HashSet<Chunk>());
/** Keep track of the number of chunks created. */
private final AtomicInteger nbSortedChunks = new AtomicInteger();
/** Size approximation of data contained in this chunk. */
private final AtomicLong size = new AtomicLong();
/** Active chunk for the current thread. */
private final ThreadLocal<Chunk> currentChunk = new ThreadLocal<Chunk>()
{
@Override
protected Chunk initialValue()
{
return nullChunk();
}
};
ExternalSortChunk(File tempDir, String name, BufferPool bufferPool, Collector<?, ByteString> collector,
Executor sortExecutor) throws IOException
{
FileChannel candidateChannel = null;
File candidateFile = null;
while (candidateChannel == null)
{
candidateFile = new File(tempDir, (name + UUID.randomUUID()).replaceAll("\\W+", "_"));
try
{
candidateChannel =
open(candidateFile.toPath(), StandardOpenOption.READ, StandardOpenOption.WRITE,
StandardOpenOption.CREATE_NEW, StandardOpenOption.SPARSE);
candidateFile.deleteOnExit();
}
catch (FileAlreadyExistsException ignore)
{
// someone else got it
}
}
this.name = name;
this.bufferPool = bufferPool;
this.deduplicator = collector;
this.file = candidateFile;
this.channel = candidateChannel;
this.sorter = new ExecutorCompletionService<>(sortExecutor);
}
@Override
public boolean put(final ByteSequence key, final ByteSequence value)
{
final Chunk chunk = currentChunk.get();
if (!chunk.put(key, value))
{
sortAndAppendChunkAsync(chunk);
activeChunks.remove(chunk);
final Chunk newChunk = new InMemorySortedChunk(name, bufferPool);
activeChunks.add(newChunk);
currentChunk.set(newChunk);
newChunk.put(key, value);
}
return true;
}
@Override
public MeteredCursor<ByteString, ByteString> flip()
{
for (Chunk chunk : activeChunks)
{
sortAndAppendChunkAsync(chunk);
}
try
{
return new CollectorCursor<>(
new CompositeCursor<>(name, waitTasksTermination(sorter, nbSortedChunks.get())), deduplicator);
}
catch (ExecutionException | InterruptedException e)
{
throw new StorageRuntimeException(e);
}
}
@Override
public long size()
{
long activeSize = 0;
for (Chunk chunk : activeChunks)
{
activeSize += chunk.size();
}
return size.get() + activeSize;
}
@Override
public void delete()
{
closeSilently(channel);
file.delete();
}
int getNbSortedChunks()
{
return nbSortedChunks.get();
}
private void sortAndAppendChunkAsync(final Chunk chunk)
{
size.addAndGet(chunk.size());
final long startOffset = filePosition.getAndAdd(chunk.size());
nbSortedChunks.incrementAndGet();
sorter.submit(new Callable<MeteredCursor<ByteString, ByteString>>()
{
@Override
public MeteredCursor<ByteString, ByteString> call() throws Exception
{
/*
* NOTE: The resulting size of the FileRegionChunk might be less than chunk.size() because of key
* de-duplication performed by the CollectorCursor. Thanks to SPARSE_FILE option, the delta between size
* allocated and the size actually used is not wasted.
*/
final Chunk persistentChunk = new FileRegionChunk(name, channel, startOffset, chunk.size());
try (final SequentialCursor<ByteString, ByteString> source =
new CollectorCursor<>(chunk.flip(), deduplicator))
{
copyIntoChunk(source, persistentChunk);
}
return persistentChunk.flip();
}
});
}
/**
* Store data inside fixed-size byte arrays. Data stored in this chunk are sorted by key during the flip() so that
* they can be cursored ascendantly. Byte arrays are supplied through a {@link BufferPool}. To allow sort operation,
* data must be accessible randomly. To do so, offsets of each key/value records are stored in the buffer. To
* maximize space occupation, buffer content is split in two parts: one contains records offset, the other contains
* the records themselves:
*
* <pre>
* ----------> offset writer direction ----------------> |<- free ->| <---- record writer direction ---
* +-----------------+-----------------+-----------------+----------+----------+----------+----------+
* | offset record 1 | offset record 2 | offset record n | .........| record n | record 2 | record 1 |
* +-----------------+-----------------+-----------------+----------+----------+----------+----------+
* </pre>
*
* Each record is the concatenation of a key/value (length are encoded using {@link PackedLong} representation)
*
* <pre>
* +------------+--------------+--------------+----------------+
* | key length | key bytes... | value length | value bytes... |
* +------------+--------------+--------------+----------------+
* </pre>
*/
static final class InMemorySortedChunk implements Chunk, Comparator<Integer>
{
private static final int INT_SIZE = Integer.SIZE / Byte.SIZE;
private final String metricName;
private final BufferPool bufferPool;
private final Buffer buffer;
private long totalBytes;
private int indexPos;
private int dataPos;
private int nbRecords;
InMemorySortedChunk(String name, BufferPool bufferPool)
{
this.metricName = name;
this.bufferPool = bufferPool;
this.buffer = bufferPool.get();
this.dataPos = buffer.length();
}
@Override
public boolean put(ByteSequence key, ByteSequence value)
{
final int keyHeaderSize = PackedLong.getEncodedSize(key.length());
final int valueHeaderSize = PackedLong.getEncodedSize(value.length());
final int keyRecordSize = keyHeaderSize + key.length();
final int recordSize = keyRecordSize + valueHeaderSize + value.length();
dataPos -= recordSize;
final int recordDataPos = dataPos;
final int recordIndexPos = indexPos;
indexPos += INT_SIZE;
if (indexPos > dataPos)
{
// Chunk is full
return false;
}
nbRecords++;
totalBytes += recordSize;
// Write record offset
buffer.writeInt(recordIndexPos, recordDataPos);
final int valuePos = writeDataAt(recordDataPos, key);
writeDataAt(valuePos, value);
return true;
}
private int writeDataAt(int offset, ByteSequence data)
{
final int headerSize = buffer.writeCompactUnsignedLong(offset, data.length());
buffer.writeByteSequence(offset + headerSize, data);
return offset + headerSize + data.length();
}
@Override
public long size()
{
return totalBytes;
}
@Override
public MeteredCursor<ByteString, ByteString> flip()
{
Collections.sort(new AbstractList<Integer>()
{
@Override
public Integer get(int index)
{
return getOffsetAtPosition(index * INT_SIZE);
}
private Integer getOffsetAtPosition(int pos)
{
return buffer.readInt(pos);
}
@Override
public Integer set(int index, Integer element)
{
final int pos = index * INT_SIZE;
final Integer valueA = getOffsetAtPosition(pos);
buffer.writeInt(pos, element);
return valueA;
}
@Override
public int size()
{
return nbRecords;
}
}, this);
return new InMemorySortedChunkCursor();
}
@Override
public int compare(Integer offsetA, Integer offsetB)
{
final int iOffsetA = offsetA.intValue();
final int iOffsetB = offsetB.intValue();
if (iOffsetA == iOffsetB)
{
return 0;
}
// Compare Keys
final int keyLengthA = (int) buffer.readCompactUnsignedLong(iOffsetA);
final int keyOffsetA = iOffsetA + PackedLong.getEncodedSize(keyLengthA);
final int keyLengthB = (int) buffer.readCompactUnsignedLong(iOffsetB);
final int keyOffsetB = iOffsetB + PackedLong.getEncodedSize(keyLengthB);
return buffer.compare(keyOffsetA, keyLengthA, keyOffsetB, keyLengthB);
}
@Override
public void delete()
{
bufferPool.release(buffer);
}
/** Cursor of the in-memory chunk. */
private final class InMemorySortedChunkCursor implements MeteredCursor<ByteString, ByteString>
{
private ByteString key;
private ByteString value;
private volatile long bytesRead;
private int indexOffset;
@Override
public boolean next()
{
if (bytesRead >= totalBytes)
{
key = value = null;
return false;
}
final int recordOffset = buffer.readInt(indexOffset);
final int keyLength = (int) buffer.readCompactUnsignedLong(recordOffset);
final int keyHeaderSize = PackedLong.getEncodedSize(keyLength);
key = buffer.readByteString(recordOffset + keyHeaderSize, keyLength);
final int valueOffset = recordOffset + keyHeaderSize + keyLength;
final int valueLength = (int) buffer.readCompactUnsignedLong(valueOffset);
final int valueHeaderSize = PackedLong.getEncodedSize(valueLength);
value = buffer.readByteString(valueOffset + valueHeaderSize, valueLength);
indexOffset += INT_SIZE;
bytesRead += keyHeaderSize + keyLength + valueHeaderSize + valueLength;
return true;
}
@Override
public boolean isDefined()
{
return key != null;
}
@Override
public ByteString getKey() throws NoSuchElementException
{
throwIfUndefined(this);
return key;
}
@Override
public ByteString getValue() throws NoSuchElementException
{
throwIfUndefined(this);
return value;
}
@Override
public void close()
{
key = value = null;
bufferPool.release(buffer);
}
@Override
public String getMetricName()
{
return metricName;
}
@Override
public long getNbBytesRead()
{
return bytesRead;
}
@Override
public long getNbBytesTotal()
{
return totalBytes;
}
}
}
/**
* Store data inside a region contained in a file. A regions is delimited by an offset and a length. The region is
* memory-mapped and the data are appended in the memory-mapped region until it is full. Region store a
* concatenation of key/value records: (Key & value sizes are stored using {@link PackedLong} format.)
*
* <pre>
* +------------+--------------+--------------+----------------+
* | key length | value length | key bytes... | value bytes... |
* +------------+--------------+--------------+----------------+
* </pre>
*/
static final class FileRegionChunk implements Chunk
{
private final String metricName;
private final FileChannel channel;
private final long startOffset;
private long size;
private MappedByteBuffer mmapBuffer;
private final OutputStream mmapBufferOS = new OutputStream()
{
@Override
public void write(int arg0) throws IOException
{
mmapBuffer.put((byte) arg0);
}
};
FileRegionChunk(String name, FileChannel channel, long startOffset, long size) throws IOException
{
this.metricName = name;
this.channel = channel;
this.startOffset = startOffset;
if (size > 0)
{
/*
* Make sure that the file is big-enough to encapsulate this memory-mapped region. Thanks to SPARSE_FILE this
* operation should be fast even for big region.
*/
channel.write(ByteBuffer.wrap(new byte[] { 0 }), (startOffset + size) - 1);
}
this.mmapBuffer = channel.map(MapMode.READ_WRITE, startOffset, size);
}
@Override
public boolean put(ByteSequence key, ByteSequence value)
{
final int recordSize =
PackedLong.getEncodedSize(key.length()) + key.length() + PackedLong.getEncodedSize(value.length()) + value
.length();
if (mmapBuffer.remaining() < recordSize)
{
// The regions is full
return false;
}
try
{
PackedLong.writeCompactUnsigned(mmapBufferOS, key.length());
PackedLong.writeCompactUnsigned(mmapBufferOS, value.length());
}
catch (IOException e)
{
throw new StorageRuntimeException(e);
}
key.copyTo(mmapBuffer);
value.copyTo(mmapBuffer);
return true;
}
@Override
public long size()
{
return mmapBuffer == null ? size : mmapBuffer.position();
}
@Override
public MeteredCursor<ByteString, ByteString> flip()
{
size = mmapBuffer.position();
/*
* We force OS to write dirty pages now so that they don't accumulate. Indeed, huge number of dirty pages might
* cause the OS to freeze the producer of those dirty pages (this importer) while it is swapping-out the pages.
*/
mmapBuffer.force();
mmapBuffer = null;
try
{
return new FileRegionChunkCursor(channel.map(MapMode.READ_ONLY, startOffset, size));
}
catch (IOException e)
{
throw new StorageRuntimeException(e);
}
}
@Override
public void delete()
{
// Nothing to do
}
/** Cursor through the specific memory-mapped file's region. */
private final class FileRegionChunkCursor implements MeteredCursor<ByteString, ByteString>
{
private final ByteBuffer region;
private final InputStream asInputStream = new InputStream()
{
@Override
public int read() throws IOException
{
return region.get() & 0xFF;
}
};
private ByteString key, value;
FileRegionChunkCursor(MappedByteBuffer data)
{
this.region = data;
}
@Override
public boolean next()
{
if (!region.hasRemaining())
{
key = value = null;
return false;
}
final int keyLength;
final int valueLength;
try
{
keyLength = (int) PackedLong.readCompactUnsignedLong(asInputStream);
valueLength = (int) PackedLong.readCompactUnsignedLong(asInputStream);
}
catch (IOException e)
{
throw new StorageRuntimeException(e);
}
final int recordSize = keyLength + valueLength;
final byte[] keyValueData = new byte[recordSize];
region.get(keyValueData, 0, recordSize);
key = ByteString.wrap(keyValueData, 0, keyLength);
value = ByteString.wrap(keyValueData, keyLength, valueLength);
return true;
}
@Override
public boolean isDefined()
{
return key != null;
}
@Override
public ByteString getKey() throws NoSuchElementException
{
throwIfUndefined(this);
return key;
}
@Override
public ByteString getValue() throws NoSuchElementException
{
throwIfUndefined(this);
return value;
}
@Override
public void close()
{
key = value = null;
}
@Override
public String getMetricName()
{
return metricName;
}
@Override
public long getNbBytesRead()
{
return region.position();
}
@Override
public long getNbBytesTotal()
{
return region.limit();
}
}
}
/** A cursor de-duplicating data with the same keys from a sorted cursor. */
static final class CollectorCursor<A, K, V> implements MeteredCursor<K, V>
{
private final MeteredCursor<K, ? extends V> delegate;
private final Collector<A, V> collector;
private boolean isDefined;
private K key;
private V value;
CollectorCursor(MeteredCursor<K, ? extends V> cursor, Collector<A, V> collector)
{
this.delegate = cursor;
this.collector = collector;
if (!delegate.isDefined())
{
delegate.next();
}
}
@Override
public boolean next()
{
isDefined = delegate.isDefined();
if (isDefined)
{
key = delegate.getKey();
accumulateValues();
}
return isDefined;
}
private void accumulateValues()
{
throwIfUndefined(this);
A resultContainer = collector.get();
do
{
resultContainer = collector.accept(resultContainer, delegate.getValue());
}
while (delegate.next() && key.equals(delegate.getKey()));
value = collector.merge(resultContainer);
// Delegate is one step beyond. When delegate.isDefined() return false, we have to return true once more.
isDefined = true;
}
@Override
public boolean isDefined()
{
return isDefined;
}
@Override
public K getKey() throws NoSuchElementException
{
throwIfUndefined(this);
return key;
}
@Override
public V getValue() throws NoSuchElementException
{
throwIfUndefined(this);
return value;
}
@Override
public void close()
{
key = null;
delegate.close();
}
@Override
public String getMetricName()
{
return delegate.getMetricName();
}
@Override
public long getNbBytesRead()
{
return delegate.getNbBytesRead();
}
@Override
public long getNbBytesTotal()
{
return delegate.getNbBytesTotal();
}
}
/** Provides a globally sorted cursor from multiple sorted cursors. */
static final class CompositeCursor<K extends Comparable<? super K>, V> implements MeteredCursor<K, V>
{
/** Contains the non empty and sorted cursors ordered in regards of their current key. */
private final NavigableSet<MeteredCursor<K, V>> orderedCursors;
private final String metricName;
private final long totalBytes;
private volatile long bytesRead;
private K key;
private V value;
CompositeCursor(String metricName, Collection<MeteredCursor<K, V>> cursors)
{
this.metricName = metricName;
this.orderedCursors = new TreeSet<>(new Comparator<MeteredCursor<K, V>>()
{
@Override
public int compare(MeteredCursor<K, V> o1, MeteredCursor<K, V> o2)
{
final int cmp = o1.getKey().compareTo(o2.getKey());
// Never return 0. Otherwise both cursors are considered equal and only one of them is kept by this set
return cmp == 0 ? Integer.compare(System.identityHashCode(o1), System.identityHashCode(o2)) : cmp;
}
});
long totalBytesSum = 0;
for (MeteredCursor<K, V> cursor : cursors)
{
long previousBytesRead = cursor.getNbBytesRead();
if (cursor.isDefined() || cursor.next())
{
if (orderedCursors.add(cursor))
{
bytesRead += (cursor.getNbBytesRead() - previousBytesRead);
totalBytesSum += cursor.getNbBytesTotal();
}
}
else
{
cursor.close();
}
}
this.totalBytes = totalBytesSum;
}
/**
* Try to get the next record from the cursor containing the lowest entry. If it reaches the end of the lowest
* cursor, it calls the close method and begins reading from the next lowest cursor.
*/
@Override
public boolean next()
{
final MeteredCursor<K, V> lowestCursor = orderedCursors.pollFirst();
if (lowestCursor == null)
{
key = null;
value = null;
return false;
}
key = lowestCursor.getKey();
value = lowestCursor.getValue();
long previousBytesRead = lowestCursor.getNbBytesRead();
if (lowestCursor.next())
{
bytesRead += (lowestCursor.getNbBytesRead() - previousBytesRead);
orderedCursors.add(lowestCursor);
}
else
{
lowestCursor.close();
}
return true;
}
@Override
public boolean isDefined()
{
return key != null;
}
@Override
public K getKey() throws NoSuchElementException
{
throwIfUndefined(this);
return key;
}
@Override
public V getValue() throws NoSuchElementException
{
throwIfUndefined(this);
return value;
}
@Override
public void close()
{
closeSilently(orderedCursors);
}
@Override
public String getMetricName()
{
return metricName;
}
@Override
public long getNbBytesRead()
{
return bytesRead;
}
@Override
public long getNbBytesTotal()
{
return totalBytes;
}
}
}
private static Chunk asChunk(TreeName treeName, Importer importer)
{
return new ImporterToChunkAdapter(treeName, importer);
}
/**
* Task to copy one {@link Chunk} into a database tree through an {@link Importer}. The specified tree is cleaned
* before receiving the data contained in the {@link Chunk}.
*/
private static final class CleanImportTask implements Callable<Void>
{
private final PhaseTwoProgressReporter reporter;
private final TreeName treeName;
private final Importer destination;
private final Chunk source;
CleanImportTask(PhaseTwoProgressReporter reporter, Chunk source, TreeName treeName, Importer destination)
{
this.source = source;
this.treeName = treeName;
this.destination = destination;
this.reporter = reporter;
}
@Override
public Void call()
{
destination.clearTree(treeName);
try (final SequentialCursor<ByteString, ByteString> sourceCursor = trackCursorProgress(reporter, source.flip()))
{
copyIntoChunk(sourceCursor, asChunk(treeName, destination));
}
return null;
}
}
private static void copyIntoChunk(SequentialCursor<ByteString, ByteString> source, Chunk destination)
{
while (source.next())
{
destination.put(source.getKey(), source.getValue());
}
}
/**
* This task optionally copy the dn2id chunk into the database and takes advantages of it's cursoring to compute the
* {@link ID2Count} index.
*/
private static final class DN2IDImporterTask implements Callable<Void>
{
private final PhaseTwoProgressReporter reporter;
private final Importer importer;
private final File tempDir;
private final BufferPool bufferPool;
private final DN2ID dn2id;
private final ID2Count id2count;
private final Collector<?, ByteString> id2countCollector;
private final Chunk dn2IdSourceChunk;
private final Chunk dn2IdDestination;
DN2IDImporterTask(PhaseTwoProgressReporter progressReporter, Importer importer, File tempDir, BufferPool bufferPool,
DN2ID dn2id, Chunk dn2IdChunk, ID2Count id2count, Collector<?, ByteString> id2countCollector,
boolean dn2idAlreadyImported)
{
this.reporter = progressReporter;
this.importer = importer;
this.tempDir = tempDir;
this.bufferPool = bufferPool;
this.dn2id = dn2id;
this.dn2IdSourceChunk = dn2IdChunk;
this.id2count = id2count;
this.id2countCollector = id2countCollector;
this.dn2IdDestination = dn2idAlreadyImported ? nullChunk() : asChunk(dn2id.getName(), importer);
}
@Override
public Void call() throws Exception
{
final Chunk id2CountChunk =
new ExternalSortChunk(tempDir, id2count.getName().toString(), bufferPool, id2countCollector,
sameThreadExecutor());
long totalNumberOfEntries = 0;
final TreeVisitor<ChildrenCount> visitor = new ID2CountTreeVisitorImporter(asImporter(id2CountChunk));
try (final MeteredCursor<ByteString, ByteString> chunkCursor = dn2IdSourceChunk.flip();
final SequentialCursor<ByteString, ByteString> dn2idCursor =
dn2id.openCursor(trackCursorProgress(reporter, chunkCursor), visitor))
{
while (dn2idCursor.next())
{
dn2IdDestination.put(dn2idCursor.getKey(), dn2idCursor.getValue());
totalNumberOfEntries++;
}
}
// -1 because baseDN is not counted
id2count.importPutTotalCount(asImporter(id2CountChunk), Math.max(0, totalNumberOfEntries - 1));
new CleanImportTask(reporter, id2CountChunk, id2count.getName(), importer).call();
return null;
}
/** TreeVisitor computing and importing the number of children per parent. */
private final class ID2CountTreeVisitorImporter implements TreeVisitor<ChildrenCount>
{
private final Importer importer;
ID2CountTreeVisitorImporter(Importer importer)
{
this.importer = importer;
}
@Override
public ChildrenCount beginParent(EntryID parentID)
{
return new ChildrenCount(parentID);
}
@Override
public void onChild(ChildrenCount parent, EntryID childID)
{
parent.numberOfChildren++;
}
@Override
public void endParent(ChildrenCount parent)
{
if (parent.numberOfChildren > 0)
{
id2count.importPut(importer, parent.parentEntryID, parent.numberOfChildren);
}
}
}
/** Keep track of the number of children during the dn2id visit. */
private static final class ChildrenCount
{
private final EntryID parentEntryID;
private long numberOfChildren;
private ChildrenCount(EntryID id)
{
this.parentEntryID = id;
}
}
}
private static Importer asImporter(Chunk chunk)
{
return new ChunkToImporterAdapter(chunk);
}
/**
* Delegates the storage of data to the {@link Importer}. This class has same thread-safeness as the supplied
* importer.
*/
private static final class ImporterToChunkAdapter implements Chunk
{
private final TreeName treeName;
private final Importer importer;
private final AtomicLong size = new AtomicLong();
ImporterToChunkAdapter(TreeName treeName, Importer importer)
{
this.treeName = treeName;
this.importer = importer;
}
@Override
public boolean put(ByteSequence key, ByteSequence value)
{
importer.put(treeName, key, value);
size.addAndGet(key.length() + value.length());
return true;
}
@Override
public MeteredCursor<ByteString, ByteString> flip()
{
return asProgressCursor(importer.openCursor(treeName), treeName.toString(), size.get());
}
@Override
public long size()
{
return size.get();
}
@Override
public void delete()
{
// Nothing to do
}
}
/**
* Delegates the {@link #put(TreeName, ByteSequence, ByteSequence)} method of {@link Importer} to a {@link Chunk}.
* {@link #createTree(TreeName)} is a no-op, other methods throw {@link UnsupportedOperationException}. This class has
* same thread-safeness as the supplied {@link Chunk}.
*/
private static final class ChunkToImporterAdapter implements Importer
{
private final Chunk chunk;
ChunkToImporterAdapter(Chunk chunk)
{
this.chunk = chunk;
}
@Override
public void put(TreeName treeName, ByteSequence key, ByteSequence value)
{
try
{
chunk.put(key, value);
}
catch (Exception e)
{
throw new StorageRuntimeException(e);
}
}
@Override
public void clearTree(TreeName treeName)
{
throw new UnsupportedOperationException();
}
@Override
public ByteString read(TreeName treeName, ByteSequence key)
{
throw new UnsupportedOperationException();
}
@Override
public SequentialCursor<ByteString, ByteString> openCursor(TreeName treeName)
{
throw new UnsupportedOperationException();
}
@Override
public void close()
{
// nothing to do
}
}
/**
* Write records into a delegated {@link Chunk} after performing a reordering of those records in regards of their key
* by using a best-effort algorithm. This class is intended to be used when records are initially ordered but might
* actually hit a chunk slightly disordered due to scheduling occurring in a multi-threaded environment. Records are
* buffered and sorted before being written to the delegated chunk. Because of the buffer mechanism, records might be
* written into the chunk after some delay. It's guaranteed that all entries will be written into the chunk only after
* the flip() method has been called. {@link #put(TreeName, ByteSequence, ByteSequence)} is thread-safe.
*/
private static final class MostlyOrderedChunk implements Chunk
{
/**
* Number of items to queue before writing them to the storage. This number must be at least equal to the number of
* threads which will access the put() method. If underestimated, {@link #put(ByteSequence, ByteSequence)} might
* lead to unordered copy. If overestimated, extra memory is wasted.
*/
private static final int QUEUE_SIZE = 1024;
private final NavigableMap<ByteSequence, ByteSequence> pendingRecords = new TreeMap<>();
private final int queueSize;
private final Chunk delegate;
MostlyOrderedChunk(Chunk delegate)
{
this.delegate = delegate;
this.queueSize = QUEUE_SIZE;
}
@Override
public void delete()
{
// Nothing to do
}
@Override
public synchronized boolean put(ByteSequence key, ByteSequence value)
{
pendingRecords.put(key, value);
if (pendingRecords.size() == queueSize)
{
/*
* Maximum size reached, take the record with the smallest key and persist it in the delegate chunk. this
* ensures records are (mostly) inserted in ascending key order, which is the optimal insert order for B-trees.
*/
final Map.Entry<ByteSequence, ByteSequence> lowestEntry = pendingRecords.pollFirstEntry();
return delegate.put(lowestEntry.getKey(), lowestEntry.getValue());
}
return true;
}
@Override
public MeteredCursor<ByteString, ByteString> flip()
{
// Purge pending entries
for (Map.Entry<ByteSequence, ByteSequence> lowestEntry : pendingRecords.entrySet())
{
delegate.put(lowestEntry.getKey(), lowestEntry.getValue());
}
return delegate.flip();
}
@Override
public long size()
{
return delegate.size();
}
}
private static Chunk nullChunk()
{
return NullChunk.INSTANCE;
}
/** An empty Chunk which cannot store data. */
private static final class NullChunk implements Chunk
{
private static final Chunk INSTANCE = new NullChunk();
@Override
public boolean put(ByteSequence key, ByteSequence value)
{
return false;
}
@Override
public long size()
{
return 0;
}
@Override
public void delete()
{
// Nothing to do
}
@Override
public MeteredCursor<ByteString, ByteString> flip()
{
return new MeteredCursor<ByteString, ByteString>()
{
@Override
public boolean next()
{
return false;
}
@Override
public boolean isDefined()
{
return false;
}
@Override
public ByteString getKey() throws NoSuchElementException
{
throw new NoSuchElementException();
}
@Override
public ByteString getValue() throws NoSuchElementException
{
throw new NoSuchElementException();
}
@Override
public void close()
{
// nothing to do
}
@Override
public String getMetricName()
{
return NullChunk.class.getSimpleName();
}
@Override
public long getNbBytesRead()
{
return 0;
}
@Override
public long getNbBytesTotal()
{
return 0;
}
};
}
}
/** Executor delegating the execution of task to the current thread. */
private static Executor sameThreadExecutor()
{
return new Executor()
{
@Override
public void execute(Runnable command)
{
command.run();
}
};
}
/** Collect the results of asynchronous tasks. */
private static <K> List<K> waitTasksTermination(CompletionService<K> completionService, int nbTasks)
throws InterruptedException, ExecutionException
{
final List<K> results = new ArrayList<>(nbTasks);
for (int i = 0; i < nbTasks; i++)
{
results.add(completionService.take().get());
}
return results;
}
/** Regularly report progress statistics from the registered list of {@link ProgressMetric}. */
private static final class PhaseTwoProgressReporter implements Runnable, Closeable
{
private static final String PHASE2_REPORTER_THREAD_NAME = "PHASE2-REPORTER-%d";
private final ScheduledExecutorService scheduler =
Executors.newSingleThreadScheduledExecutor(newThreadFactory(null, PHASE2_REPORTER_THREAD_NAME, true));
private final Map<MeteredCursor<?, ?>, Long> lastValues = new WeakHashMap<>();
private ScheduledFuture<?> scheduledTask;
private long lastRun = System.currentTimeMillis();
synchronized void addCursor(MeteredCursor<?, ?> cursor)
{
if (lastValues.put(cursor, 0L) == null)
{
logger.info(NOTE_IMPORT_LDIF_INDEX_STARTED, cursor.getMetricName(), 1, 1);
}
if (scheduledTask == null)
{
scheduledTask = scheduler.scheduleAtFixedRate(this, 10, 10, TimeUnit.SECONDS);
}
}
synchronized void removeCursor(MeteredCursor<?, ?> cursor)
{
if (lastValues.remove(cursor) != null)
{
logger.info(NOTE_IMPORT_LDIF_INDEX_CLOSE, cursor.getMetricName());
}
}
@Override
public synchronized void run()
{
final long deltaTime = System.currentTimeMillis() - lastRun;
if (deltaTime == 0)
{
return;
}
for (Map.Entry<MeteredCursor<?, ?>, Long> metricLastValue : lastValues.entrySet())
{
final MeteredCursor<?, ?> cursor = metricLastValue.getKey();
final long newValue = cursor.getNbBytesRead();
final long totalBytes = cursor.getNbBytesTotal();
final long valueProgress = newValue - metricLastValue.getValue();
final int progressPercent = totalBytes > 0 ? Math.round((100f * newValue) / cursor.getNbBytesTotal()) : 0;
final long progressRate = valueProgress / deltaTime;
final long progressRemaining = (cursor.getNbBytesTotal() - newValue) / 1024;
logger.info(NOTE_IMPORT_LDIF_PHASE_TWO_REPORT, cursor.getMetricName(), progressPercent, progressRemaining,
progressRate, 1, 1);
lastValues.put(cursor, newValue);
}
lastRun = System.currentTimeMillis();
}
@Override
public synchronized void close()
{
scheduledTask = null;
scheduler.shutdown();
}
}
/** Buffer used by {@link InMemorySortedChunk} to store and sort data. */
private interface Buffer extends Closeable
{
void writeInt(int position, int value);
int readInt(int position);
long readCompactUnsignedLong(int position);
ByteString readByteString(int position, int length);
int writeCompactUnsignedLong(int position, long value);
void writeByteSequence(int position, ByteSequence data);
int length();
int compare(int offsetA, int lengthA, int offsetB, int lengthB);
}
/**
* Pre-allocate and maintain a fixed number of re-usable {@code Buffer}s. This allow to keep controls of heap memory
* consumption and prevents the significant object allocation cost occurring for huge objects.
*/
static final class BufferPool implements Closeable
{
private final BlockingQueue<Buffer> pool;
private final int bufferSize;
private static final Unsafe unsafe;
private static final long BYTE_ARRAY_OFFSET;
static
{
try
{
Field field = Unsafe.class.getDeclaredField("theUnsafe");
field.setAccessible(true);
unsafe = (Unsafe)field.get(null);
BYTE_ARRAY_OFFSET = unsafe.arrayBaseOffset(byte[].class);
}
catch (Exception e)
{
throw new RuntimeException(e);
}
}
private static boolean supportOffHeap()
{
return unsafe != null;
}
BufferPool(int nbBuffer, int bufferSize)
{
this.pool = new ArrayBlockingQueue<>(nbBuffer);
this.bufferSize = bufferSize;
for (int i = 0; i < nbBuffer; i++)
{
pool.offer(supportOffHeap() ? new OffHeapBuffer(bufferSize) : new HeapBuffer(bufferSize));
}
}
public int getBufferSize()
{
return bufferSize;
}
private Buffer get()
{
try
{
return pool.take();
}
catch (InterruptedException e)
{
throw new StorageRuntimeException(e);
}
}
private void release(Buffer buffer)
{
try
{
pool.put(buffer);
}
catch (InterruptedException e)
{
throw new StorageRuntimeException(e);
}
}
public void setSize(int size)
{
while (pool.size() > size)
{
get();
}
}
@Override
public void close()
{
Buffer buffer;
while ((buffer = pool.poll()) != null)
{
closeSilently(buffer);
}
}
/** Off-heap buffer using Unsafe memory access. */
private final class OffHeapBuffer implements Buffer
{
private final long address;
private final int size;
private int position;
private final InputStream asInputStream = new InputStream()
{
@Override
public int read() throws IOException
{
return unsafe.getByte(address + position++);
}
};
private final OutputStream asOutputStream = new OutputStream()
{
@Override
public void write(int value) throws IOException
{
unsafe.putByte(address + position++, (byte) (value & 0xFF));
}
};
private boolean closed;
OffHeapBuffer(int size)
{
this.size = size;
this.address = unsafe.allocateMemory(size);
}
@Override
public void writeInt(final int position, final int value)
{
unsafe.putInt(address + position, value);
}
@Override
public int readInt(final int position)
{
return unsafe.getInt(address + position);
}
@Override
public int writeCompactUnsignedLong(final int position, long value)
{
try
{
this.position = position;
return PackedLong.writeCompactUnsigned(asOutputStream, value);
}
catch (IOException e)
{
throw new StorageRuntimeException(e);
}
}
@Override
public long readCompactUnsignedLong(final int position)
{
this.position = position;
try
{
return PackedLong.readCompactUnsignedLong(asInputStream);
}
catch (IOException e)
{
throw new IllegalStateException(e);
}
}
@Override
public void writeByteSequence(int position, ByteSequence data)
{
Reject.ifFalse(position + data.length() <= size);
long offset = address + position;
for(int i = 0 ; i < data.length() ; i++)
{
unsafe.putByte(offset++, data.byteAt(i));
}
}
@Override
public int length()
{
return size;
}
@Override
public ByteString readByteString(int position, int length)
{
Reject.ifFalse(position + length <= size);
final byte[] data = new byte[length];
unsafe.copyMemory(null, address + position, data, BYTE_ARRAY_OFFSET, length);
return ByteString.wrap(data);
}
@Override
public int compare(int offsetA, int lengthA, int offsetB, int lengthB)
{
final int len = Math.min(lengthA, lengthB);
for(int i = 0 ; i < len ; i++)
{
final int a = unsafe.getByte(address + offsetA + i) & 0xFF;
final int b = unsafe.getByte(address + offsetB + i) & 0xFF;
if ( a != b )
{
return a - b;
}
}
return lengthA - lengthB;
}
@Override
public void close() throws IOException
{
if (!closed)
{
unsafe.freeMemory(address);
}
closed = true;
}
}
/** Off-heap buffer using Unsafe memory access. */
private final class HeapBuffer implements Buffer
{
private final ByteBuffer buffer;
private final OutputStream asOutputStream = new OutputStream()
{
@Override
public void write(int b) throws IOException
{
buffer.put((byte) (b & 0xFF));
}
};
private final InputStream asInputStream = new InputStream()
{
@Override
public int read() throws IOException
{
return buffer.get() & 0xFF;
}
};
HeapBuffer(int size)
{
this.buffer = ByteBuffer.allocate(size);
}
@Override
public void writeInt(final int position, final int value)
{
buffer.putInt(position, value);
}
@Override
public int readInt(final int position)
{
return buffer.getInt(position);
}
@Override
public int writeCompactUnsignedLong(final int position, long value)
{
buffer.position(position);
try
{
return PackedLong.writeCompactUnsigned(asOutputStream, value);
}
catch (IOException e)
{
throw new StorageRuntimeException(e);
}
}
@Override
public long readCompactUnsignedLong(final int position)
{
buffer.position(position);
try
{
return PackedLong.readCompactUnsignedLong(asInputStream);
}
catch (IOException e)
{
throw new IllegalArgumentException(e);
}
}
@Override
public void writeByteSequence(int position, ByteSequence data)
{
buffer.position(position);
data.copyTo(buffer);
}
@Override
public int length()
{
return buffer.capacity();
}
@Override
public ByteString readByteString(int position, int length)
{
return ByteString.wrap(buffer.array(), buffer.arrayOffset() + position, length);
}
@Override
public int compare(int offsetA, int lengthA, int offsetB, int lengthB)
{
return readByteString(offsetA, lengthA).compareTo(readByteString(offsetB, lengthB));
}
@Override
public void close()
{
// Nothing to do
}
}
}
/** Extends {@link SequentialCursor} by providing metric related to cursor's progress. */
interface MeteredCursor<K, V> extends SequentialCursor<K, V>
{
String getMetricName();
long getNbBytesRead();
long getNbBytesTotal();
}
/** Add the cursor to the reporter and remove it once closed. */
private static <K, V> SequentialCursor<K, V> trackCursorProgress(final PhaseTwoProgressReporter reporter,
final MeteredCursor<K, V> cursor)
{
reporter.addCursor(cursor);
return new SequentialCursorDecorator<MeteredCursor<K, V>, K, V>(cursor)
{
@Override
public void close()
{
reporter.removeCursor(cursor);
cursor.close();
}
};
}
private static void throwIfUndefined(SequentialCursor<?, ?> cursor)
{
if (!cursor.isDefined())
{
throw new NoSuchElementException();
}
}
/**
* Get a new {@link Collector} which can be used to merge encoded values. The types of values to merged is deduced
* from the {@link TreeName}
*/
private static Collector<?, ByteString> newCollector(final EntryContainer entryContainer, final TreeName treeName)
{
final DefaultIndex index = getIndex(entryContainer, treeName);
if (index != null)
{
// key conflicts == merge EntryIDSets
return new EntryIDSetsCollector(index);
}
else if (isID2ChildrenCount(treeName))
{
// key conflicts == sum values
return new AddLongCollector(entryContainer.getID2ChildrenCount());
}
else if (isDN2ID(treeName) || isDN2URI(treeName) || isVLVIndex(entryContainer, treeName))
{
// key conflicts == exception
return UniqueValueCollector.getInstance();
}
throw new IllegalArgumentException("Unknown tree: " + treeName);
}
private static boolean isDN2ID(TreeName treeName)
{
return SuffixContainer.DN2ID_INDEX_NAME.equals(treeName.getIndexId());
}
private static boolean isDN2URI(TreeName treeName)
{
return SuffixContainer.DN2URI_INDEX_NAME.equals(treeName.getIndexId());
}
private static boolean isID2Entry(TreeName treeName)
{
return SuffixContainer.ID2ENTRY_INDEX_NAME.equals(treeName.getIndexId());
}
private static boolean isID2ChildrenCount(TreeName treeName)
{
return SuffixContainer.ID2CHILDREN_COUNT_NAME.equals(treeName.getIndexId());
}
private static boolean isVLVIndex(EntryContainer entryContainer, TreeName treeName)
{
for (VLVIndex vlvIndex : entryContainer.getVLVIndexes())
{
if (treeName.equals(vlvIndex.getName()))
{
return true;
}
}
return false;
}
private static DefaultIndex getIndex(EntryContainer entryContainer, TreeName treeName)
{
for (AttributeIndex attrIndex : entryContainer.getAttributeIndexes())
{
for (MatchingRuleIndex index : attrIndex.getNameToIndexes().values())
{
if (treeName.equals(index.getName()))
{
return index;
}
}
}
return null;
}
/**
* A mutable reduction operation that accumulates input elements into a mutable result container, optionally
* transforming the accumulated result into a final representation after all input elements have been processed.
* Reduction operations can be performed either sequentially or in parallel. A Collector is specified by three
* functions that work together to accumulate entries into a mutable result container, and optionally perform a final
* transform on the result. They are: Creation of a new result container (get()), incorporating a new data element
* into a result container (accept()), performing an optional final transform on the container (merge)
*
* @param <A>
* Accumulator type
* @param <R>
* Result type
* @see java.util.stream.Collector
*/
interface Collector<A, R>
{
/**
* Creates and returns a new mutable result container. Equivalent to A java.util.function.Collector.supplier().get()
*/
A get();
/**
* Accepts two partial results and merges them. The combiner function may fold state from one argument into the
* other and return that, or may return a new result container. Equivalent to
* java.util.function.Collector.accumulator().accept(A, R)
*/
A accept(A resultContainer, R value);
/**
* Perform the final transformation from the intermediate accumulation type A to the final result type R. Equivalent
* to R java.util.function.Collector.finisher().apply(A)
*/
R merge(A resultContainer);
}
/** {@link Collector} that throws an exception if multiple values have to be merged. */
static final class UniqueValueCollector<V> implements Collector<V, V>
{
private static final Collector<Object, Object> INSTANCE = new UniqueValueCollector<>();
static <V> Collector<V, V> getInstance()
{
return (Collector<V, V>) INSTANCE;
}
@Override
public V get()
{
return null;
}
@Override
public V accept(V previousValue, V value)
{
if (previousValue != null)
{
throw new IllegalArgumentException("Cannot accept multiple values (current=" + previousValue + ", new=" + value
+ ")");
}
return value;
}
@Override
public V merge(V latestValue)
{
if (latestValue == null)
{
throw new IllegalArgumentException("No value to merge but expected one");
}
return latestValue;
}
}
/**
* {@link Collector} that accepts encoded {@link EntryIDSet} objects and produces a {@link ByteString} representing
* the merged {@link EntryIDSet}.
*/
static final class EntryIDSetsCollector implements Collector<Collection<ByteString>, ByteString>
{
private final DefaultIndex index;
private final int indexLimit;
EntryIDSetsCollector(DefaultIndex index)
{
this.index = index;
this.indexLimit = index.getIndexEntryLimit();
}
@Override
public Collection<ByteString> get()
{
// LinkedList is used for it's O(1) add method (while ArrayList is O(n) when resize is required).
return new LinkedList<>();
}
@Override
public Collection<ByteString> accept(Collection<ByteString> resultContainer, ByteString value)
{
if (resultContainer.size() < indexLimit)
{
resultContainer.add(value);
}
/*
* else EntryIDSet is above index entry limits, discard additional values to avoid blowing up memory now, then
* discard all entries in merge()
*/
return resultContainer;
}
@Override
public ByteString merge(Collection<ByteString> resultContainer)
{
if (resultContainer.size() >= indexLimit)
{
return index.toValue(EntryIDSet.newUndefinedSet());
}
else if (resultContainer.size() == 1)
{
// Avoids unnecessary decoding + encoding
return resultContainer.iterator().next();
}
return index.toValue(buildEntryIDSet(resultContainer));
}
private EntryIDSet buildEntryIDSet(Collection<ByteString> encodedIDSets)
{
final long[] entryIDs = new long[indexLimit];
// accumulate in array
int i = 0;
for (ByteString encodedIDSet : encodedIDSets)
{
final EntryIDSet entryIDSet = index.decodeValue(ByteString.empty(), encodedIDSet);
if (!entryIDSet.isDefined() || i + entryIDSet.size() >= indexLimit)
{
// above index entry limit
return EntryIDSet.newUndefinedSet();
}
for (EntryID entryID : entryIDSet)
{
entryIDs[i++] = entryID.longValue();
}
}
Arrays.sort(entryIDs, 0, i);
return EntryIDSet.newDefinedSet(Arrays.copyOf(entryIDs, i));
}
}
/**
* {@link Collector} that accepts {@code long} values encoded into {@link ByteString} objects and produces a
* {@link ByteString} representing the sum of the supplied {@code long}s.
*/
static final class AddLongCollector implements Collector<Long, ByteString>
{
private final ID2Count id2count;
AddLongCollector(ID2Count id2count)
{
this.id2count = id2count;
}
@Override
public Long get()
{
return 0L;
}
@Override
public Long accept(Long resultContainer, ByteString value)
{
return resultContainer + id2count.fromValue(value);
}
@Override
public ByteString merge(Long resultContainer)
{
return id2count.toValue(resultContainer);
}
}
private static MeteredCursor<ByteString, ByteString> asProgressCursor(
SequentialCursor<ByteString, ByteString> delegate, String metricName, long totalSize)
{
return new MeteredSequentialCursorDecorator(delegate, metricName, totalSize);
}
/** Decorate {@link SequentialCursor} by providing progress information while cursoring. */
private static final class MeteredSequentialCursorDecorator extends
SequentialCursorDecorator<SequentialCursor<ByteString, ByteString>, ByteString, ByteString>implements
MeteredCursor<ByteString, ByteString>
{
private final String metricName;
private final long totalSize;
private volatile long bytesRead;
private MeteredSequentialCursorDecorator(SequentialCursor<ByteString, ByteString> delegate, String metricName,
long totalSize)
{
super(delegate);
this.metricName = metricName;
this.totalSize = totalSize;
}
@Override
public boolean next()
{
if (delegate.next())
{
bytesRead += delegate.getKey().length() + delegate.getValue().length();
return true;
}
return false;
}
@Override
public long getNbBytesRead()
{
return bytesRead;
}
@Override
public String getMetricName()
{
return metricName;
}
@Override
public long getNbBytesTotal()
{
return totalSize;
}
}
/** Helper allowing to create {@link SequentialCursor} decorator without having to re-implement all methods. */
static abstract class SequentialCursorDecorator<D extends SequentialCursor<K, V>, K, V> implements
SequentialCursor<K, V>
{
protected final D delegate;
SequentialCursorDecorator(D delegate)
{
this.delegate = delegate;
}
@Override
public boolean next()
{
return delegate.next();
}
@Override
public boolean isDefined()
{
return delegate.isDefined();
}
@Override
public K getKey() throws NoSuchElementException
{
return delegate.getKey();
}
@Override
public V getValue() throws NoSuchElementException
{
return delegate.getValue();
}
@Override
public void close()
{
delegate.close();
}
}
private static int visitIndexes(final EntryContainer entryContainer, IndexVisitor visitor)
{
int nbVisited = 0;
for (AttributeIndex attribute : entryContainer.getAttributeIndexes())
{
for (MatchingRuleIndex index : attribute.getNameToIndexes().values())
{
visitor.visitIndex(index);
visitor.visitIndexTree(index);
nbVisited++;
}
}
for (VLVIndex index : entryContainer.getVLVIndexes())
{
visitor.visitIndex(index);
visitor.visitIndexTree(index);
nbVisited++;
}
visitor.visitIndexTree(entryContainer.getDN2ID());
visitor.visitIndexTree(entryContainer.getDN2URI());
nbVisited += 2;
return nbVisited;
}
/** Visitor pattern allowing to process all type of indexes. */
private static abstract class IndexVisitor
{
void visitIndex(DefaultIndex index)
{
}
void visitIndex(VLVIndex index)
{
}
void visitSystemIndex(Tree index)
{
}
void visitIndexTree(Tree tree)
{
}
}
/** Update the trust state of the visited indexes. */
private static final class TrustModifier extends IndexVisitor
{
private final WriteableTransaction txn;
private final boolean trustValue;
TrustModifier(Importer importer, boolean trustValue)
{
this.txn = new ImporterToWriteableTransactionAdapter(importer);
this.trustValue = trustValue;
}
@Override
public void visitIndex(DefaultIndex index)
{
index.setTrusted(txn, trustValue);
}
@Override
public void visitIndex(VLVIndex index)
{
index.setTrusted(txn, trustValue);
}
}
/** Delete & recreate the database of the visited indexes. */
private static final class ClearDatabase extends IndexVisitor
{
private final Importer importer;
ClearDatabase(Importer importer)
{
this.importer = importer;
}
@Override
public void visitIndexTree(Tree index)
{
importer.clearTree(index.getName());
}
}
/** Visit indexes which are in a degraded state. */
private static final class DegradedIndexFilter extends IndexVisitor
{
private final IndexVisitor delegate;
DegradedIndexFilter(IndexVisitor delegate)
{
this.delegate = delegate;
}
@Override
public void visitIndex(DefaultIndex index)
{
if (!index.isTrusted())
{
delegate.visitIndexTree(index);
delegate.visitIndex(index);
}
}
@Override
public void visitIndex(VLVIndex index)
{
if (!index.isTrusted())
{
delegate.visitIndexTree(index);
delegate.visitIndex(index);
}
}
}
/** Maintain a list containing the names of the visited indexes. */
private static final class SelectIndexName extends IndexVisitor
{
private final Set<String> indexNames;
SelectIndexName()
{
this.indexNames = new HashSet<>();
}
public Set<String> getSelectedIndexNames()
{
return indexNames;
}
@Override
public void visitIndexTree(Tree index)
{
indexNames.add(index.getName().getIndexId());
}
}
/** Visit indexes only if their name match one contained in a list. */
private static final class SpecificIndexFilter extends IndexVisitor
{
private final IndexVisitor delegate;
private final Collection<String> indexNames;
SpecificIndexFilter(IndexVisitor delegate, Collection<String> indexNames)
{
this.delegate = delegate;
this.indexNames = new HashSet<>(indexNames.size());
for(String indexName : indexNames)
{
this.indexNames.add(indexName.toLowerCase());
}
}
@Override
public void visitIndex(DefaultIndex index)
{
if (indexNames.contains(index.getName().getIndexId()))
{
delegate.visitIndex(index);
}
}
@Override
public void visitIndex(VLVIndex index)
{
if (indexNames.contains(index.getName().getIndexId()))
{
delegate.visitIndex(index);
}
}
@Override
public void visitSystemIndex(Tree index)
{
if (indexNames.contains(index.getName().getIndexId()))
{
delegate.visitSystemIndex(index);
}
}
@Override
public void visitIndexTree(Tree index)
{
if (indexNames.contains(index.getName().getIndexId()))
{
delegate.visitIndexTree(index);
}
}
}
/**
* Thread-safe fixed-size cache which, once full, remove the least recently accessed entry. Composition is used here
* to ensure that only methods generating entry-access in the LinkedHashMap are actually used. Otherwise, the least
* recently used property of the cache would not be respected.
*/
private static final class LRUPresenceCache<T>
{
private final Map<T, Object> cache;
LRUPresenceCache(final int maxEntries)
{
// +1 because newly added entry is added before the least recently one is removed.
this.cache = Collections.synchronizedMap(new LinkedHashMap<T, Object>(maxEntries + 1, 1.0f, true)
{
@Override
protected boolean removeEldestEntry(Map.Entry<T, Object> eldest)
{
return size() >= maxEntries;
}
});
}
public boolean contains(T object)
{
return cache.get(object) != null;
}
public void add(T object)
{
cache.put(object, null);
}
}
/** Adapter allowing to use an {@link Importer} as a {@link WriteableTransaction}. */
private static final class ImporterToWriteableTransactionAdapter implements WriteableTransaction
{
private final Importer importer;
ImporterToWriteableTransactionAdapter(Importer importer)
{
this.importer = importer;
}
@Override
public ByteString read(TreeName treeName, ByteSequence key)
{
return importer.read(treeName, key);
}
@Override
public void put(TreeName treeName, ByteSequence key, ByteSequence value)
{
importer.put(treeName, key, value);
}
@Override
public boolean update(TreeName treeName, ByteSequence key, UpdateFunction f)
{
final ByteString value = importer.read(treeName, key);
final ByteSequence newValue = f.computeNewValue(value);
Reject.ifNull(newValue, "Importer cannot delete records.");
if (!Objects.equals(value, newValue))
{
importer.put(treeName, key, newValue);
return true;
}
return false;
}
@Override
public Cursor<ByteString, ByteString> openCursor(TreeName treeName)
{
return new SequentialCursorAdapter<>(importer.openCursor(treeName));
}
@Override
public long getRecordCount(TreeName treeName)
{
long counter = 0;
try (final SequentialCursor<ByteString, ByteString> cursor = importer.openCursor(treeName))
{
while (cursor.next())
{
counter++;
}
}
return counter;
}
@Override
public void openTree(TreeName name, boolean createOnDemand)
{
throw new UnsupportedOperationException();
}
@Override
public void deleteTree(TreeName name)
{
throw new UnsupportedOperationException();
}
@Override
public boolean delete(TreeName treeName, ByteSequence key)
{
throw new UnsupportedOperationException();
}
}
}