IndexMergeThread.java revision bedb386242727f98834c64397487f48d5eb6116c
/*
* CDDL HEADER START
*
* The contents of this file are subject to the terms of the
* Common Development and Distribution License, Version 1.0 only
* (the "License"). You may not use this file except in compliance
* with the License.
*
* You can obtain a copy of the license at
* See the License for the specific language governing permissions
* and limitations under the License.
*
* When distributing Covered Code, include this CDDL HEADER in each
* file and include the License file at
* trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
* add the following below this CDDL HEADER, with the fields enclosed
* by brackets "[]" replaced with your own identifying information:
* Portions Copyright [yyyy] [name of copyright owner]
*
* CDDL HEADER END
*
*
* Portions Copyright 2006-2007 Sun Microsystems, Inc.
*/
/**
* A thread to merge a set of intermediate files from an index builder
* into an index database.
*/
public class IndexMergeThread extends DirectoryThread
{
/**
* The buffer size to use when reading data from disk.
*/
private static final int INPUT_STREAM_BUFFER_SIZE = 65536;
/**
* The configuration of the JE backend containing the index.
*/
/**
* The LDIF import configuration, which indicates whether we are
* appending to existing data.
*/
/**
* The indexer to generate and compare index keys.
*/
/**
* The index database being written.
*/
/**
* The index entry limit.
*/
int entryLimit;
/**
* The name of the index for use in file names and log messages.
*/
/**
* Indicates whether we are replacing existing data or not.
*/
private boolean replaceExisting = false;
/**
* A weak reference hash map used to cache byte arrays for holding DB keys.
*/
/**
* A file name filter to identify temporary files we have written.
*/
{
{
}
};
/**
* Create a new index merge thread.
* @param name The name of the index for use in file names and log messages.
* @param config The configuration of the JE backend containing the index.
* @param ldifImportConfig The LDIF import configuration, which indicates
* whether we are appending to existing data.
* @param index The index database to be written.
* @param entryLimit The configured index entry limit.
*/
{
super("Index Merge Thread " + name);
this.ldifImportConfig = ldifImportConfig;
this.entryLimit = entryLimit;
}
/**
* Run this thread.
*/
public void run()
{
try
{
merge();
}
catch (Exception e)
{
if (debugEnabled())
{
}
}
}
/**
* The merge phase builds the index from intermediate files written
* during entry processing. Each line of an intermediate file has data for
* one index key and the keys are in order. For each index key, the data from
* each intermediate file containing a line for that key must be merged and
* written to the index.
* @throws Exception If an error occurs.
*/
{
// An ordered map of the current input keys from each file.
// Open all the files.
{
return;
}
int msgID = MSGID_JEB_INDEX_MERGE_START;
byte[] mergedBytes = new byte[0];
try
{
{
// Open a reader for this file.
// Read a value from each file.
}
// Process the lowest input value until done.
try
{
while (true)
{
{
{
{
{
// Entry limit already exceeded.
break writeMergedValue;
}
}
}
{
}
if (replaceExisting)
{
{
}
}
{
}
else
{
}
{
arrayList = new LinkedList<byte[]>();
}
}
for (int r : mv.getReaders())
{
}
}
}
catch (NoSuchElementException e)
{
}
}
catch (Exception e)
{
if (debugEnabled())
{
}
throw e;
}
finally
{
// Close the readers.
{
for (MergeReader r : readers)
{
if (r != null)
{
r.dataInputStream.close();
}
}
}
// Delete all the files.
{
{
f.delete();
}
}
}
}
/**
* Reads the next line from one of the merge input files.
* @param inputs The ordered map of current input keys.
* @param readers The array of input readers.
* @param reader The index of the input reader we wish to read from.
* @throws IOException If an I/O error occurs while reading the input file.
*/
throws IOException
{
int len;
try
{
}
catch (EOFException e)
{
// End of file.
return;
}
byte[] keyBytes;
{
arrayList = new LinkedList<byte[]>();
}
{
}
else
{
}
if (replaceExisting)
{
}
// If this key is not yet in the ordered map then insert it,
// otherwise merge the data into the existing data for the key.
{
}
else
{
}
}
}