/*
* CDDL HEADER START
*
* The contents of this file are subject to the terms of the
* Common Development and Distribution License, Version 1.0 only
* (the "License"). You may not use this file except in compliance
* with the License.
*
* You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
* or http://forgerock.org/license/CDDLv1.0.html.
* See the License for the specific language governing permissions
* and limitations under the License.
*
* When distributing Covered Code, include this CDDL HEADER in each
* file and include the License file at legal-notices/CDDLv1_0.txt.
* If applicable, add the following below this CDDL HEADER, with the
* fields enclosed by brackets "[]" replaced with your own identifying
* information:
* Portions Copyright [yyyy] [name of copyright owner]
*
* CDDL HEADER END
*
*
* Copyright 2014-2015 ForgeRock AS.
*/
package org.opends.server.replication.server.changelog.file;
import static org.opends.messages.ReplicationMessages.*;
import java.io.BufferedWriter;
import java.io.Closeable;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.forgerock.i18n.LocalizableMessage;
import org.forgerock.i18n.slf4j.LocalizedLogger;
import org.forgerock.util.Pair;
import org.forgerock.util.Reject;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.file.Log.RepositionableCursor;
import org.opends.server.util.StaticUtils;
/**
* A log file, containing part of a {@code Log}. The log file may be:
* <ul>
* <li>write-enabled : allowing to append key-value records and read records
* from cursors,</li>
* <li>read-only : allowing to read records from cursors.</li>
* </ul>
* <p>
* A log file is NOT intended to be used directly, but only has part of a
* {@link Log}.
*
* @param <K>
* Type of the key of a record, which must be comparable.
* @param <V>
* Type of the value of a record.
*/
final class LogFile<K extends Comparable<K>, V> implements Closeable
{
private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
/** The file containing the records. */
private final File logfile;
/** The pool to obtain a reader on the log. */
private final LogReaderPool<K, V> readerPool;
/**
* The writer on the log file, which may be {@code null} if log file is not
* write-enabled.
*/
private final BlockLogWriter<K, V> writer;
/** Indicates if log is enabled for write. */
private final boolean isWriteEnabled;
/** Lock used to ensure write atomicity. */
private final Lock exclusiveLock;
/** Lock used to ensure that log file is in a consistent state when reading it. */
private final Lock sharedLock;
private Record<K, V> newestRecord;
/**
* Creates a new log file.
*
* @param logFilePath
* Path of the log file.
* @param parser
* Parser of records.
* @param isWriteEnabled
* {@code true} if this changelog is write-enabled, {@code false}
* otherwise.
* @throws ChangelogException
* If a problem occurs during initialization.
*/
private LogFile(final File logFilePath, final RecordParser<K, V> parser, boolean isWriteEnabled)
throws ChangelogException
{
Reject.ifNull(logFilePath, parser);
this.logfile = logFilePath;
this.isWriteEnabled = isWriteEnabled;
createLogFileIfNotExists();
if (isWriteEnabled)
{
ensureLogFileIsValid(parser);
writer = BlockLogWriter.newWriter(new LogWriter(logfile), parser);
}
else
{
writer = null;
}
readerPool = new LogReaderPool<>(logfile, parser);
final ReadWriteLock rwLock = new ReentrantReadWriteLock();
exclusiveLock = rwLock.writeLock();
sharedLock = rwLock.readLock();
}
/**
* Creates a read-only log file with the provided root path and record parser.
*
* @param <K>
* Type of the key of a record, which must be comparable.
* @param <V>
* Type of the value of a record.
* @param logFilePath
* Path of the log file.
* @param parser
* Parser of records.
* @return a read-only log file
* @throws ChangelogException
* If a problem occurs during initialization.
*/
static <K extends Comparable<K>, V> LogFile<K, V> newReadOnlyLogFile(final File logFilePath,
final RecordParser<K, V> parser) throws ChangelogException
{
return new LogFile<>(logFilePath, parser, false);
}
/**
* Creates a write-enabled log file that appends records to the end of file,
* with the provided root path and record parser.
*
* @param <K>
* Type of the key of a record, which must be comparable.
* @param <V>
* Type of the value of a record.
* @param logFilePath
* Path of the log file.
* @param parser
* Parser of records.
* @return a write-enabled log file
* @throws ChangelogException
* If a problem occurs during initialization.
*/
static <K extends Comparable<K>, V> LogFile<K, V> newAppendableLogFile(final File logFilePath,
final RecordParser<K, V> parser) throws ChangelogException
{
return new LogFile<>(logFilePath, parser, true);
}
/**
* Returns the file containing the records.
*
* @return the file
*/
File getFile()
{
return logfile;
}
private void checkLogIsEnabledForWrite() throws ChangelogException
{
if (!isWriteEnabled)
{
throw new ChangelogException(WARN_CHANGELOG_NOT_ENABLED_FOR_WRITE.get(logfile.getPath()));
}
}
private void createLogFileIfNotExists() throws ChangelogException
{
try
{
if (!logfile.exists())
{
logfile.createNewFile();
}
}
catch (IOException e)
{
throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_CREATE_LOG_FILE.get(logfile.getPath()), e);
}
}
/**
* Ensure that log file is not corrupted, by checking it is valid and cleaning
* the end of file if necessary, to remove a partially written record.
* <p>
* If log file is cleaned to remove a partially written record, then a message
* is logged for information.
*
* @throws ChangelogException
* If an error occurs or if log file is corrupted and can't be
* cleaned
*/
private void ensureLogFileIsValid(final RecordParser<K, V> parser) throws ChangelogException
{
try (final RandomAccessFile readerWriter = new RandomAccessFile(logfile, "rws");
final BlockLogReader<K, V> reader = BlockLogReader.newReader(logfile, readerWriter, parser))
{
final long lastValidPosition = reader.checkLogIsValid();
if (lastValidPosition != -1)
{
// truncate the file to point where last valid record has been read
readerWriter.setLength(lastValidPosition);
logger.error(INFO_CHANGELOG_LOG_FILE_RECOVERED.get(logfile.getPath()));
}
}
catch (IOException e)
{
throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_RECOVER_LOG_FILE.get(
logfile.getPath(),
StaticUtils.stackTraceToSingleLineString(e)));
}
}
/**
* Add the provided record at the end of this log.
* <p>
* In order to ensure that record is written out of buffers and persisted
* to file system, it is necessary to explicitely call the
* {@code syncToFileSystem()} method.
*
* @param record
* The record to add.
* @throws ChangelogException
* If the record can't be added to the log.
*/
void append(final Record<K, V> record) throws ChangelogException
{
checkLogIsEnabledForWrite();
exclusiveLock.lock();
try
{
writer.write(record);
newestRecord = record;
}
finally
{
exclusiveLock.unlock();
}
}
/**
* Dump this log file as a text file, intended for debugging purpose only.
*
* @param dumpFile
* File that will contains log records using a human-readable format
* @throws ChangelogException
* If an error occurs during dump
*/
void dumpAsTextFile(File dumpFile) throws ChangelogException
{
try (final BufferedWriter textWriter = new BufferedWriter(new FileWriter(dumpFile));
final DBCursor<Record<K, V>> cursor = getCursor())
{
while (cursor.getRecord() != null)
{
Record<K, V> record = cursor.getRecord();
textWriter.write("key=" + record.getKey());
textWriter.write(" | ");
textWriter.write("value=" + record.getValue());
textWriter.write('\n');
cursor.next();
}
}
catch (IOException e)
{
// No I18N needed, used for debugging purpose only
throw new ChangelogException(
LocalizableMessage.raw("Error when dumping content of log '%s' in target file : '%s'", getPath(), dumpFile),
e);
}
}
/**
* Synchronize all records added with the file system, ensuring that records
* are effectively persisted.
* <p>
* After a successful call to this method, it is guaranteed that all records
* added to the log are persisted to the file system.
*
* @throws ChangelogException
* If the synchronization fails.
*/
void syncToFileSystem() throws ChangelogException
{
checkLogIsEnabledForWrite();
sharedLock.lock();
try
{
writer.sync();
}
catch (Exception e)
{
throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_SYNC.get(getPath()), e);
}
finally
{
sharedLock.unlock();
}
}
/**
* Returns a cursor that allows to retrieve the records from this log,
* starting at the first position.
*
* @return a cursor on the log records, which is never {@code null}
* @throws ChangelogException
* If the cursor can't be created.
*/
LogFileCursor<K, V> getCursor() throws ChangelogException
{
return new LogFileCursor<>(this);
}
/**
* Returns a cursor initialized to the provided record and position in file.
*
* @param record
* The initial record this cursor points on
* @param position
* The file position this cursor points on
* @return the cursor
* @throws ChangelogException
* If a problem occurs while creating the cursor.
*/
LogFileCursor<K, V> getCursorInitialisedTo(Record<K,V> record, long position) throws ChangelogException
{
return new LogFileCursor<>(this, record, position);
}
/**
* Returns the oldest (first) record from this log.
*
* @return the oldest record, which may be {@code null} if there is no record
* in the log.
* @throws ChangelogException
* If an error occurs while retrieving the record.
*/
Record<K, V> getOldestRecord() throws ChangelogException
{
try (final DBCursor<Record<K, V>> cursor = getCursor())
{
return cursor.next() ? cursor.getRecord() : null;
}
}
/**
* Returns the newest (last) record from this log.
*
* @return the newest record, which may be {@code null}
* @throws ChangelogException
* If an error occurs while retrieving the record.
*/
Record<K, V> getNewestRecord() throws ChangelogException
{
if (newestRecord == null)
{
try (BlockLogReader<K, V> reader = getReader())
{
sharedLock.lock();
try
{
newestRecord = reader.getNewestRecord();
}
finally
{
sharedLock.unlock();
}
}
catch (IOException ioe)
{
throw new ChangelogException(ERR_CHANGELOG_CANNOT_READ_NEWEST_RECORD.get(logfile.getAbsolutePath()), ioe);
}
}
return newestRecord;
}
/**
* Returns the number of records in the log.
*
* @return the number of records
* @throws ChangelogException
* If an error occurs.
*/
long getNumberOfRecords() throws ChangelogException
{
// TODO : need a more efficient way to retrieve it
try (final DBCursor<Record<K, V>> cursor = getCursor())
{
long counter = 0L;
while (cursor.next())
{
counter++;
}
return counter;
}
}
/** {@inheritDoc} */
@Override
public void close()
{
if (isWriteEnabled)
{
try
{
syncToFileSystem();
}
catch (ChangelogException e)
{
logger.traceException(e);
}
writer.close();
}
readerPool.shutdown();
}
/**
* Delete this log file (file is physically removed). Should be called only
* when log file is closed.
*
* @throws ChangelogException
* If log file can't be deleted.
*/
void delete() throws ChangelogException
{
exclusiveLock.lock();
try
{
final boolean isDeleted = logfile.delete();
if (!isDeleted)
{
throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DELETE_LOG_FILE.get(getPath()));
}
}
finally
{
exclusiveLock.unlock();
}
}
/**
* Return the size of this log file in bytes.
*
* @return the size of log file
*/
long getSizeInBytes()
{
return writer.getBytesWritten();
}
/** The path of this log file as a String. */
private String getPath()
{
return logfile.getPath();
}
/**
* Returns a reader for this log.
* <p>
* Assumes that calling methods ensure that log is not closed.
*/
private BlockLogReader<K, V> getReader() throws ChangelogException
{
return readerPool.get();
}
/** Release the provided reader. */
private void releaseReader(BlockLogReader<K, V> reader) {
readerPool.release(reader);
}
/** {@inheritDoc} */
@Override
public int hashCode()
{
return logfile.hashCode();
}
/** {@inheritDoc} */
@Override
public boolean equals(Object that)
{
if (this == that)
{
return true;
}
if (!(that instanceof LogFile))
{
return false;
}
final LogFile<?, ?> other = (LogFile<?, ?>) that;
return logfile.equals(other.logfile);
}
@Override
public String toString()
{
return "LogFile [logfile=" + logfile + ", isWriteEnabled=" + isWriteEnabled + "]";
}
/** Implements a repositionable cursor on the log file. */
static final class LogFileCursor<K extends Comparable<K>, V> implements RepositionableCursor<K,V>
{
/** The underlying log on which entries are read. */
private final LogFile<K, V> logFile;
/** To read the records. */
private final BlockLogReader<K, V> reader;
/** The current available record, may be {@code null}. */
private Record<K,V> currentRecord;
/**
* The initial record when starting from a given key. It must be
* stored because it is read in advance.
*/
private Record<K,V> initialRecord;
/**
* Creates a cursor on the provided log.
*
* @param logFile
* The log on which the cursor read records.
* @throws ChangelogException
* If an error occurs when creating the cursor.
*/
private LogFileCursor(final LogFile<K, V> logFile) throws ChangelogException
{
this.logFile = logFile;
this.reader = logFile.getReader();
}
/**
* Creates a cursor on the provided log, initialised to the provided record and
* pointing to the provided file position.
* <p>
* Note: there is no check to ensure that provided record and file position are
* consistent. It is the responsibility of the caller of this method.
*/
private LogFileCursor(LogFile<K, V> logFile, Record<K, V> record, long filePosition) throws ChangelogException
{
this.logFile = logFile;
this.reader = logFile.getReader();
this.currentRecord = record;
logFile.sharedLock.lock();
try
{
reader.seekToPosition(filePosition);
}
finally
{
logFile.sharedLock.unlock();
}
}
/** {@inheritDoc} */
@Override
public boolean next() throws ChangelogException
{
if (initialRecord != null)
{
// initial record is used only once
currentRecord = initialRecord;
initialRecord = null;
return true;
}
logFile.sharedLock.lock();
try
{
currentRecord = reader.readRecord();
}
finally
{
logFile.sharedLock.unlock();
}
return currentRecord != null;
}
/** {@inheritDoc} */
@Override
public Record<K,V> getRecord()
{
return currentRecord;
}
/** {@inheritDoc} */
@Override
public boolean positionTo(final K key, final KeyMatchingStrategy match, final PositionStrategy pos)
throws ChangelogException {
final Pair<Boolean, Record<K, V>> result;
logFile.sharedLock.lock();
try
{
result = reader.seekToRecord(key, match, pos);
}
finally
{
logFile.sharedLock.unlock();
}
final boolean found = result.getFirst();
initialRecord = found ? result.getSecond() : null;
return found;
}
/** {@inheritDoc} */
@Override
public void close()
{
logFile.releaseReader(reader);
}
/**
* Returns the file position this cursor is pointing at.
*
* @return the position of reader on the log file
* @throws ChangelogException
* If an error occurs.
*/
long getFilePosition() throws ChangelogException
{
logFile.sharedLock.lock();
try
{
return reader.getFilePosition();
}
finally
{
logFile.sharedLock.unlock();
}
}
/** {@inheritDoc} */
@Override
public String toString()
{
return String.format("Cursor on log file: %s, current record: %s", logFile.logfile, currentRecord);
}
}
}