ID2Count.java revision a069e2643d6be5b34309179220b2777d42df13fc
/*
* CDDL HEADER START
*
* The contents of this file are subject to the terms of the
* Common Development and Distribution License, Version 1.0 only
* (the "License"). You may not use this file except in compliance
* with the License.
*
* You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
* or http://forgerock.org/license/CDDLv1.0.html.
* See the License for the specific language governing permissions
* and limitations under the License.
*
* When distributing Covered Code, include this CDDL HEADER in each
* file and include the License file at legal-notices/CDDLv1_0.txt.
* If applicable, add the following below this CDDL HEADER, with the
* fields enclosed by brackets "[]" replaced with your own identifying
* information:
* Portions Copyright [yyyy] [name of copyright owner]
*
* CDDL HEADER END
*
* Copyright 2015 ForgeRock AS
*/
package org.opends.server.backends.pluggable;
import org.forgerock.opendj.ldap.ByteSequence;
import org.forgerock.opendj.ldap.ByteString;
import org.forgerock.opendj.ldap.ByteStringBuilder;
import org.forgerock.util.Function;
import org.forgerock.util.Reject;
import org.forgerock.util.promise.NeverThrowsException;
import org.opends.server.backends.pluggable.spi.Cursor;
import org.opends.server.backends.pluggable.spi.Importer;
import org.opends.server.backends.pluggable.spi.ReadableTransaction;
import org.opends.server.backends.pluggable.spi.TreeName;
import org.opends.server.backends.pluggable.spi.UpdateFunction;
import org.opends.server.backends.pluggable.spi.WriteableTransaction;
import com.forgerock.opendj.util.PackedLong;
/**
* Store a counter associated to a key. Counter value is sharded amongst multiple keys to allow concurrent
* update without contention (at the price of a slower read).
*/
final class ID2Count extends AbstractTree
{
/**
* Must be a power of 2 @see <a href="http://en.wikipedia.org/wiki/Modulo_operation#Performance_issues">Performance
* issues</a>
*/
private static final long SHARD_COUNT = 4096;
private static final int LONG_SIZE = Long.SIZE / Byte.SIZE;
private static final EntryID TOTAL_COUNT_ENTRY_ID = new EntryID(PackedLong.COMPACTED_MAX_VALUE);
ID2Count(TreeName name)
{
super(name);
}
Cursor<EntryID, Long> openCursor(ReadableTransaction txn) {
return CursorTransformer.transformKeysAndValues(txn.openCursor(getName()),
new Function<ByteString, EntryID, Exception>()
{
@Override
public EntryID apply(ByteString value) throws Exception
{
return new EntryID(value.asReader().getCompactUnsigned());
}
}, new CursorTransformer.ValueTransformer<ByteString, ByteString, Long, NeverThrowsException>()
{
@Override
public Long transform(ByteString key, ByteString value) throws NeverThrowsException
{
return fromValue(value);
}
});
}
/**
* Add a value to the counter associated to the given key
* @param txn Database transaction
* @param entryID The entryID identifying to the counter
* @param delta The value to add. Can be negative to decrease counter value.
*/
void addDelta(WriteableTransaction txn, EntryID entryID, final long delta)
{
Reject.ifTrue(entryID.longValue() >= TOTAL_COUNT_ENTRY_ID.longValue(), "EntryID overflow.");
addToCounter(txn, entryID, delta);
addToCounter(txn, TOTAL_COUNT_ENTRY_ID, delta);
}
private void addToCounter(WriteableTransaction txn, EntryID entryID, final long delta)
{
final ByteSequence shardedKey = getShardedKey(entryID);
txn.update(getName(), shardedKey, new UpdateFunction()
{
@Override
public ByteSequence computeNewValue(ByteSequence oldValue)
{
final long currentValue = oldValue == null ? 0 : oldValue.asReader().getLong();
final long newValue = currentValue + delta;
return newValue == 0 ? null : toValue(newValue);
}
});
}
void importPut(Importer importer, EntryID entryID, long total)
{
Reject.ifTrue(entryID.longValue() >= TOTAL_COUNT_ENTRY_ID.longValue(), "EntryID overflow.");
importPut0(importer, entryID, total);
}
void importPutTotalCount(Importer importer, long total)
{
importPut0(importer, TOTAL_COUNT_ENTRY_ID, total);
}
private void importPut0(Importer importer, EntryID entryID, final long delta)
{
Reject.ifNull(importer, "importer must not be null");
if (delta != 0)
{
final ByteSequence shardedKey = getShardedKey(entryID);
importer.put(getName(), shardedKey, toValue(delta));
}
}
private ByteSequence getShardedKey(EntryID entryID)
{
final long bucket = Thread.currentThread().getId() & (SHARD_COUNT - 1);
return getKeyFromEntryIDAndBucket(entryID, bucket);
}
ByteString toValue(final long count)
{
Reject.ifFalse(count != 0, "count must be != 0");
return ByteString.valueOf(count);
}
long fromValue(ByteString value)
{
Reject.ifNull(value, "value must not be null");
return value.toLong();
}
/**
* Get the counter value for the specified key
* @param txn The transaction
* @param entryID The entryID identifying to the counter
* @return Value of the counter. 0 if no counter is associated yet.
*/
long getCount(ReadableTransaction txn, EntryID entryID)
{
long counterValue = 0;
try(final Cursor<EntryID, Long> cursor = openCursor(txn)) {
cursor.positionToKeyOrNext(getKeyFromEntryID(entryID));
while (cursor.isDefined() && cursor.getKey().equals(entryID))
{
counterValue += cursor.getValue().longValue();
cursor.next();
}
}
return counterValue;
}
private static ByteSequence getKeyFromEntryID(EntryID entryID) {
return new ByteStringBuilder(LONG_SIZE).appendCompactUnsigned(entryID.longValue());
}
private static ByteSequence getKeyFromEntryIDAndBucket(EntryID entryID, long bucket) {
return new ByteStringBuilder(LONG_SIZE + LONG_SIZE).appendCompactUnsigned(entryID.longValue())
.appendCompactUnsigned(bucket);
}
/**
* Get the total counter value. The total counter maintain the sum of all
* the counter contained in this tree.
* @param txn The transaction
* @return Sum of all the counter contained in this tree
*/
long getTotalCount(ReadableTransaction txn)
{
return getCount(txn, TOTAL_COUNT_ENTRY_ID);
}
/**
* Delete the counter associated to the given key
* @param txn The transaction
* @param entryID The entryID identifying the counter
* @return Value of the counter before it's deletion.
*/
long deleteCount(WriteableTransaction txn, EntryID entryID)
{
long counterValue = 0;
try(final Cursor<ByteString, ByteString> cursor = txn.openCursor(getName())) {
final ByteSequence encodedEntryID = getKeyFromEntryID(entryID);
if (cursor.positionToKeyOrNext(encodedEntryID)) {
while (cursor.getKey().startsWith(encodedEntryID))
{
counterValue += cursor.getValue().asReader().getLong();
txn.delete(getName(), cursor.getKey());
cursor.next();
}
}
}
addToCounter(txn, TOTAL_COUNT_ENTRY_ID, -counterValue);
return counterValue;
}
}