Skip to content

Commit

Permalink
Reuse CRC when appending index components
Browse files Browse the repository at this point in the history
  • Loading branch information
pkolaczk committed Sep 12, 2024
1 parent b1382ea commit 5915713
Show file tree
Hide file tree
Showing 3 changed files with 155 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.zip.CRC32;

import javax.annotation.Nullable;

Expand Down
170 changes: 153 additions & 17 deletions src/java/org/apache/cassandra/index/sai/utils/IndexFileUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,17 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;

import java.nio.channels.FileChannel;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.zip.CRC32;

import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import io.github.jbellis.jvector.disk.BufferedRandomAccessWriter;
import net.nicoulaj.compilecommand.annotations.DontInline;
import org.apache.cassandra.config.DatabaseDescriptor;
Expand Down Expand Up @@ -59,25 +62,25 @@ public class IndexFileUtils

private static final SequentialWriterOption writerOption = defaultWriterOption;

/**
* Remembers checksums of files so we don't have to recompute them from the beginning of the file whenever appending
* to a file. Keeps checksums with respective file lengths so we can detect some file changes like truncation
* or external appends which could omit this code, so this is still safe.
* Index files are append-only (immutable), so length checking is enough.
*/
private static final Cache<String, Guard<FileChecksum>> checksumCache = Caffeine.newBuilder()
.maximumSize(4096)
.build();

@VisibleForTesting
protected IndexFileUtils()
{}

public IndexOutputWriter openOutput(File file, ByteOrder order, boolean append) throws IOException
{
assert writerOption.finishOnClose() : "IndexOutputWriter relies on close() to sync with disk.";

var checksumWriter = new IncrementalChecksumSequentialWriter(file);
IndexOutputWriter indexOutputWriter = new IndexOutputWriter(checksumWriter, order);
if (append)
{
// Got to recalculate checksum for the file opened for append, otherwise final checksum will be wrong.
// Checksum verification is not happening here as it sis not guranteed that the file has the checksum/footer.
checksumWriter.recalculateChecksum();
indexOutputWriter.skipBytes(file.length());
}

return indexOutputWriter;
var checksumWriter = new IncrementalChecksumSequentialWriter(file, append);
return new IndexOutputWriter(checksumWriter, order);
}

public BufferedRandomAccessWriter openRandomAccessOutput(File file, boolean append) throws IOException
Expand Down Expand Up @@ -109,11 +112,46 @@ public interface ChecksumWriter

class IncrementalChecksumSequentialWriter extends SequentialWriter implements ChecksumWriter
{
private final CRC32 checksum = new CRC32();

IncrementalChecksumSequentialWriter(File file)
/** Protects the checksum so only one Writer can update it */
private Guard<FileChecksum> checksumGuard;
/** Current (running) checksum from the befginning of the file till the current position */
private FileChecksum checksum;
/** Remembers the checksum after closing this writer */
private long finalChecksum;

IncrementalChecksumSequentialWriter(File file, boolean append) throws IOException
{
super(file, writerOption);

while (checksum == null)
{
checksumGuard = checksumCache.get(file.path(), s -> new Guard<>(new FileChecksum()));
checksum = checksumGuard.tryLock();

if (checksum == null)
// If we're here this means some other Writer did not unlock the checksum object,
// so we can't use the same checksum safely, as there is a slight probability it
// is in active use. This is not necessarily a bug - e.g. it is also possible
// the other writer was interrupted and the client code simply forgot to close() it.
// Therefore, we'll get a new one, just to be safe.
checksumCache.invalidate(file.path());
}

if (append)
{
var fileLength = file.length();
skipBytes(fileLength);

// It is possible we didn't get a good checksum, because the cache has limited size.
// We could have gotten a fresh zero checksum. In this case we need to recalculate:
if (checksum.fileLength != fileLength)
recalculateChecksum();
}
else
{
// We might be overwriting an existing file
checksum.reset();
}
}

/**
Expand Down Expand Up @@ -148,6 +186,8 @@ public void recalculateChecksum() throws IOException
b = ch.read(buf);
}
}

assert checksum.fileLength == position();
}

@Override
Expand Down Expand Up @@ -204,7 +244,7 @@ public void writeLong(long v) throws IOException

public long getChecksum()
{
return checksum.getValue();
return checksum != null ? checksum.getValue() : finalChecksum;
}

// To avoid double-counting bytes in the checksum.
Expand All @@ -228,5 +268,101 @@ private void addTochecksum(long bytes, int count)
else
while (count > 0) checksum.update((int) (bytes >>> (8 * (origCount - count--))));
}

@Override
public void close()
{
try
{
super.close();
}
finally
{
// Copy the checksum value to a field in order to make the checksum value available past close().
// Release the FileChecksum object so it can be used by another writer.
finalChecksum = checksum.getValue();
checksum = null;
checksumGuard.release();
}
}
}

/**
* A lightweight helper to guard against concurrent access to an object.
* Used when we know object should be owned by one owner at a time.
*/
static class Guard<T>
{
T inner;
AtomicBoolean locked = new AtomicBoolean(false);

public Guard(T inner)
{
this.inner = inner;
}

/**
* Locks the object and returns it.
* If it was already locked, return null.
* @return protected object
*/
public T tryLock()
{
return locked.compareAndSet(false, true)
? inner
: null;
}

public void release()
{
locked.set(false);
}
}

/**
* Computes the checksum from the beginning of a file.
* Keeps track of the number of bytes processed.
* We need the number of bytes so we can invalidate the checksum if the file was appended or truncated.
*/
static class FileChecksum
{
long fileLength = 0;
CRC32 checksum = new CRC32();

public void reset()
{
fileLength = 0;
checksum.reset();
}

public void update(int b)
{
fileLength += 1;
checksum.update(b);
}

public void update(byte[] b)
{
fileLength += b.length;
checksum.update(b);
}

public void update(byte[] b, int off, int len)
{
fileLength += len;
checksum.update(b, off, len);
}

public void update(ByteBuffer b)
{
fileLength += b.remaining();
checksum.update(b);
}

public long getValue()
{
return checksum.getValue();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ public final Throwable abort(Throwable accumulate)
}

@Override
public final void close()
public void close()
{
if (option.finishOnClose())
txnProxy.finish();
Expand Down

1 comment on commit 5915713

@cassci-bot
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Build rejected: 5 NEW test failure(s) in 6 builds., Build 7: ran 17605 tests with 13 failures and 128 skipped.
Butler analysis done on ds-cassandra-pr-gate/cndb-10873-segment-checksum vs last 16 runs of ds-cassandra-build-nightly/main.
org.apache.cassandra.index.sai.cql.QueryWriteLifecycleTest.testWriteLifecycle[aa_CompoundKeyDataModel{primaryKey=p, c}]: test is constantly failing. No failures on upstream;
branch story: [FFFF] vs upstream: [++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++]; [NEW]
org.apache.cassandra.index.sai.cql.TinySegmentQueryWriteLifecycleTest.testWriteLifecycle[ca_CompositePartitionKeyDataModel{primaryKey=p1, p2}]: test is constantly failing. No failures on upstream;
branch story: [F] vs upstream: [++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++]; [NEW]
org.apache.cassandra.index.sai.cql.types.collections.maps.MapEntriesIntTest.test[version=aa,dataset=map<int,int>,wide=true,scenario=MEMTABLE_QUERY]: test failed in the recent build. No failures on upstream;
branch story: [F+++] vs upstream: [++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++]; [NEW]
org.apache.cassandra.utils.binlog.BinLogTest.testTruncationReleasesLogSpace: test failed in the recent build. No failures on upstream;
branch story: [F+++] vs upstream: [++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++]; [NEW]
org.apache.cassandra.index.sai.cql.VectorSiftSmallTest.testMultiSegmentBuild: test is constantly failing. No failures on upstream;
branch story: [FFFFFF] vs upstream: [++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++]; [NEW]
butler comparison

Please sign in to comment.