Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CNDB-8187 Add metrics about time spent writing SAI (compaction and flush) #1329

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.SortedMap;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import com.google.common.collect.Maps;
Expand Down Expand Up @@ -57,6 +58,7 @@
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.RandomAccessReader;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.utils.ApproximateTime;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.Throwables;
import org.apache.cassandra.utils.UUIDGen;
Expand Down Expand Up @@ -134,6 +136,7 @@ private String logMessage(String message) {
private boolean indexSSTable(SSTableReader sstable, Set<StorageAttachedIndex> indexes)
{
logger.debug(logMessage("Starting index build on {}"), sstable.descriptor);
long startTimeNanos = ApproximateTime.nanoTime();

CountDownLatch perSSTableFileLock = null;
StorageAttachedIndexWriter indexWriter = null;
Expand Down Expand Up @@ -162,7 +165,7 @@ private boolean indexSSTable(SSTableReader sstable, Set<StorageAttachedIndex> in
prepareForRebuild(indexDescriptor.perIndexComponents(index.getIndexContext()), replacedComponents);

long keyCount = SSTableReader.getApproximateKeyCount(Set.of(sstable));
indexWriter = new StorageAttachedIndexWriter(indexDescriptor, metadata, indexes, txn, keyCount, perIndexComponentsOnly);
indexWriter = new StorageAttachedIndexWriter(indexDescriptor, metadata, indexes, txn, keyCount, perIndexComponentsOnly, group.table().metric);

long previousKeyPosition = 0;
indexWriter.begin();
Expand Down Expand Up @@ -220,7 +223,9 @@ private boolean indexSSTable(SSTableReader sstable, Set<StorageAttachedIndex> in

completeSSTable(txn, indexWriter, sstable, indexes, perSSTableFileLock, replacedComponents);
}
logger.debug("Completed indexing sstable {}", sstable.descriptor);
long timeTaken = ApproximateTime.nanoTime() - startTimeNanos;
eolivelli marked this conversation as resolved.
Show resolved Hide resolved
group.table().metric.storageAttachedIndexRebuildTime.update(timeTaken);
logger.debug("Completed indexing sstable {} in {} seconds", sstable.descriptor, TimeUnit.NANOSECONDS.toSeconds(timeTaken));

return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ public SSTableFlushObserver getFlushObserver(Descriptor descriptor, LifecycleNew
IndexDescriptor indexDescriptor = IndexDescriptor.empty(descriptor);
try
{
return new StorageAttachedIndexWriter(indexDescriptor, tableMetadata, indices, tracker, keyCount);
return new StorageAttachedIndexWriter(indexDescriptor, tableMetadata, indices, tracker, keyCount, baseCfs.metric);
}
catch (Throwable t)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@
import org.apache.cassandra.index.sai.utils.PrimaryKey;
import org.apache.cassandra.io.sstable.SSTable;
import org.apache.cassandra.io.sstable.format.SSTableFlushObserver;
import org.apache.cassandra.metrics.TableMetrics;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.utils.ApproximateTime;
import org.apache.cassandra.utils.Throwables;

/**
Expand All @@ -61,28 +63,32 @@ public class StorageAttachedIndexWriter implements SSTableFlushObserver
private final Stopwatch stopwatch = Stopwatch.createUnstarted();
private final RowMapping rowMapping;
private final OperationType opType;
private final TableMetrics tableMetrics;

private DecoratedKey currentKey;
private boolean tokenOffsetWriterCompleted = false;
private boolean aborted = false;

private long sstableRowId = 0;
private long totalTimeSpent = 0;

public StorageAttachedIndexWriter(IndexDescriptor indexDescriptor,
TableMetadata tableMetadata,
Collection<StorageAttachedIndex> indices,
LifecycleNewTracker lifecycleNewTracker,
long keyCount) throws IOException
long keyCount,
TableMetrics tableMetrics) throws IOException
{
this(indexDescriptor, tableMetadata, indices, lifecycleNewTracker, keyCount, false);
this(indexDescriptor, tableMetadata, indices, lifecycleNewTracker, keyCount, false, tableMetrics);
}

public StorageAttachedIndexWriter(IndexDescriptor indexDescriptor,
TableMetadata tableMetadata,
Collection<StorageAttachedIndex> indices,
LifecycleNewTracker lifecycleNewTracker,
long keyCount,
boolean perIndexComponentsOnly) throws IOException
boolean perIndexComponentsOnly,
TableMetrics tableMetrics) throws IOException
{
// We always write at the latest version (through what that version is can be configured for specific cases)
var onDiskFormat = Version.latest().onDiskFormat();
Expand All @@ -106,6 +112,7 @@ public StorageAttachedIndexWriter(IndexDescriptor indexDescriptor,
this.perSSTableWriter = perIndexComponentsOnly
? PerSSTableWriter.NONE
: onDiskFormat.newPerSSTableWriter(indexDescriptor);
this.tableMetrics = tableMetrics;
}

@Override
Expand Down Expand Up @@ -187,78 +194,86 @@ public void staticRow(Row staticRow, long position)
@Override
public void complete(SSTable sstable)
{
if (aborted) return;

long start = stopwatch.elapsed(TimeUnit.MILLISECONDS);

logger.trace(indexDescriptor.logMessage("Completed partition iteration for index flush for SSTable {}. Elapsed time: {} ms"),
indexDescriptor.descriptor,
start);

long startComplete = ApproximateTime.nanoTime();
try
{
perSSTableWriter.complete(stopwatch);
tokenOffsetWriterCompleted = true;
long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
logger.trace(indexDescriptor.logMessage("Completed per-SSTable write for SSTable {}. Duration: {} ms. Total elapsed time: {} ms."),
indexDescriptor.descriptor,
elapsed - start,
elapsed);
if (aborted) return;

start = elapsed;
long start = stopwatch.elapsed(TimeUnit.MILLISECONDS);

rowMapping.complete();
logger.trace(indexDescriptor.logMessage("Completed partition iteration for index flush for SSTable {}. Elapsed time: {} ms"),
indexDescriptor.descriptor,
start);

for (PerIndexWriter perIndexWriter : perIndexWriters)
try
{
perIndexWriter.complete(stopwatch);

// The handling of components when we flush/compact is a tad backward: instead of registering the
// components as we write them, all the components are collected beforehand in `SSTableWriter#create`,
// which means this is a superset of possible components, but if any components are not written for
// those reason, this needs to be fixed afterward. One case for SAI component for instance is empty
// indexes: if a particular sstable has nothing indexed for a particular index, then only the completion
// marker for that index is kept on disk but no other components, so we need to remove the components
// that were "optimistically" added (and more generally, future index implementation may have some
// components that are only optionally present based on specific conditions).
// Note 1: for index build/rebuild on existing sstable, `SSTableWriter#create` is not used, and instead
// we do only register components written (see `StorageAttachedIndexBuilder#completeSSTable`).
// Note 2: as hinted above, an alternative here would be to change the whole handling of components,
// registering components only as they are effectively written. This is a larger refactor, with some
// subtleties involved, so it is left as potential future work.
if (opType == OperationType.FLUSH || opType == OperationType.COMPACTION)
perSSTableWriter.complete(stopwatch);
tokenOffsetWriterCompleted = true;
long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
logger.trace(indexDescriptor.logMessage("Completed per-SSTable write for SSTable {}. Duration: {} ms. Total elapsed time: {} ms."),
indexDescriptor.descriptor,
elapsed - start,
elapsed);

start = elapsed;

rowMapping.complete();

for (PerIndexWriter perIndexWriter : perIndexWriters)
{
var writtenComponents = perIndexWriter.writtenComponents().allAsCustomComponents();
var registeredComponents = IndexDescriptor.perIndexComponentsForNewlyFlushedSSTable(perIndexWriter.indexContext());
var toRemove = Sets.difference(registeredComponents, writtenComponents);
if (!toRemove.isEmpty())
perIndexWriter.complete(stopwatch);

// The handling of components when we flush/compact is a tad backward: instead of registering the
// components as we write them, all the components are collected beforehand in `SSTableWriter#create`,
// which means this is a superset of possible components, but if any components are not written for
// those reason, this needs to be fixed afterward. One case for SAI component for instance is empty
// indexes: if a particular sstable has nothing indexed for a particular index, then only the completion
// marker for that index is kept on disk but no other components, so we need to remove the components
// that were "optimistically" added (and more generally, future index implementation may have some
// components that are only optionally present based on specific conditions).
// Note 1: for index build/rebuild on existing sstable, `SSTableWriter#create` is not used, and instead
// we do only register components written (see `StorageAttachedIndexBuilder#completeSSTable`).
// Note 2: as hinted above, an alternative here would be to change the whole handling of components,
// registering components only as they are effectively written. This is a larger refactor, with some
// subtleties involved, so it is left as potential future work.
if (opType == OperationType.FLUSH || opType == OperationType.COMPACTION)
{
if (logger.isTraceEnabled())
var writtenComponents = perIndexWriter.writtenComponents().allAsCustomComponents();
var registeredComponents = IndexDescriptor.perIndexComponentsForNewlyFlushedSSTable(perIndexWriter.indexContext());
var toRemove = Sets.difference(registeredComponents, writtenComponents);
if (!toRemove.isEmpty())
{
logger.trace(indexDescriptor.logMessage("Removing optimistically added but not writen components from TOC of SSTable {} for index {}"),
indexDescriptor.descriptor,
perIndexWriter.indexContext().getIndexName());
if (logger.isTraceEnabled())
{
logger.trace(indexDescriptor.logMessage("Removing optimistically added but not writen components from TOC of SSTable {} for index {}"),
indexDescriptor.descriptor,
perIndexWriter.indexContext().getIndexName());
}

// During flush, this happens as we finalize the sstable and before its size is tracked, so not
// passing a tracker is correct and intended (there is nothing to update in the tracker).
sstable.unregisterComponents(toRemove, null);
}

// During flush, this happens as we finalize the sstable and before its size is tracked, so not
// passing a tracker is correct and intended (there is nothing to update in the tracker).
sstable.unregisterComponents(toRemove, null);
}
}

elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
logger.trace(indexDescriptor.logMessage("Completed per-index writes for SSTable {}. Duration: {} ms. Total elapsed time: {} ms."),
indexDescriptor.descriptor,
elapsed - start,
elapsed);
}
catch (Throwable t)
{
logger.error(indexDescriptor.logMessage("Failed to complete an index build"), t);
abort(t, true);
// fail compaction task or index build task if SAI failed
throw Throwables.unchecked(t);
}
elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
logger.trace(indexDescriptor.logMessage("Completed per-index writes for SSTable {}. Duration: {} ms. Total elapsed time: {} ms."),
indexDescriptor.descriptor,
elapsed - start,
elapsed);
}
catch (Throwable t)
finally
{
logger.error(indexDescriptor.logMessage("Failed to complete an index build"), t);
abort(t, true);
// fail compaction task or index build task if SAI failed
throw Throwables.unchecked(t);
totalTimeSpent += (ApproximateTime.nanoTime() - startComplete);
tableMetrics.updateStorageAttachedIndexWritingTime(totalTimeSpent, opType);
}
}

Expand Down Expand Up @@ -315,6 +330,7 @@ public void abort(Throwable accumulator, boolean fromIndex)

private void addRow(Row row) throws IOException, TrieSpaceExhaustedException
{
long now = ApproximateTime.nanoTime();
PrimaryKey primaryKey = primaryKeyFactory.create(currentKey, row.clustering());
perSSTableWriter.nextRow(primaryKey);
rowMapping.add(primaryKey, sstableRowId);
Expand All @@ -324,5 +340,7 @@ private void addRow(Row row) throws IOException, TrieSpaceExhaustedException
w.addRow(primaryKey, row, sstableRowId);
}
sstableRowId++;

totalTimeSpent += (ApproximateTime.nanoTime() - now);
}
}
19 changes: 19 additions & 0 deletions src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,18 @@ public class KeyspaceMetrics
public final Histogram sstablesPerReadHistogram;
/** Tombstones scanned in queries on this Keyspace */
public final Histogram tombstoneScannedHistogram;
/** Time spent flushing memtables */
public final Histogram flushTime;
public final Histogram storageAttachedIndexRebuildTime;

/** Time spent writing SAI */
public final Histogram storageAttachedIndexWritingTimeForIndexBuild;
public final Histogram storageAttachedIndexWritingTimeForCompaction;
public final Histogram storageAttachedIndexWritingTimeForFlush;
public final Histogram storageAttachedIndexWritingTimeForOther;

/** Time spent writing memtables during compaction */
public final Histogram compactionTime;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CNDB has CompactionTaskMetrics#duration

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's another thing, I initially thought I had to go there, but these metrics are more low level

CompactionTaskMetrics#duration is more about the overall "task", including etcd and other stuff.
I want this metric because it is comparable with the other storageAttachedIndexWritingTimeForCompaction

https://github.com/riptano/cndb/blob/46688e90e2444bde0261a3d499ab6f237a6a306d/cndb-tasks/src/main/java/com/datastax/cndb/compactor/CompactionTaskMetrics.java#L206


/** Shadowed keys scan metrics **/
public final Histogram shadowedKeysScannedHistogram;
Expand Down Expand Up @@ -218,6 +230,13 @@ public KeyspaceMetrics(final Keyspace ks)
// create histograms for TableMetrics to replicate updates to
sstablesPerReadHistogram = createKeyspaceHistogram("SSTablesPerReadHistogram", true);
tombstoneScannedHistogram = createKeyspaceHistogram("TombstoneScannedHistogram", false);
flushTime = createKeyspaceHistogram("FlushTime", false);
storageAttachedIndexRebuildTime = createKeyspaceHistogram("StorageAttachedIndexRebuildTime", false);
storageAttachedIndexWritingTimeForIndexBuild = createKeyspaceHistogram("StorageAttachedIndexWritingTimeIndexBuild", false);
storageAttachedIndexWritingTimeForCompaction = createKeyspaceHistogram("StorageAttachedIndexWritingTimeCompaction", false);
storageAttachedIndexWritingTimeForFlush = createKeyspaceHistogram("StorageAttachedIndexWritingTimeFlush", false);
storageAttachedIndexWritingTimeForOther = createKeyspaceHistogram("StorageAttachedIndexWritingTimeOther", false);
compactionTime = createKeyspaceHistogram("CompactionTime", false);
shadowedKeysScannedHistogram = createKeyspaceHistogram("ShadowedKeysScannedHistogram", false);
shadowedKeysLoopsHistogram = createKeyspaceHistogram("ShadowedKeysLoopsHistogram", false);
liveScannedHistogram = createKeyspaceHistogram("LiveScannedHistogram", false);
Expand Down
Loading