From d5990941de77c1d2df36c115b1412dfaf780d1a3 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Fri, 4 Oct 2024 12:29:19 +0200 Subject: [PATCH 1/9] CNDB-8187 Add metrics about time spent writing sstables (compaction and flush) and support tracking SAI build time --- .../index/sai/StorageAttachedIndexBuilder.java | 11 +++++++++-- .../apache/cassandra/io/sstable/SSTableWatcher.java | 8 ++++++++ .../org/apache/cassandra/metrics/KeyspaceMetrics.java | 7 +++++++ .../org/apache/cassandra/metrics/TableMetrics.java | 10 ++++++++++ 4 files changed, 34 insertions(+), 2 deletions(-) diff --git a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexBuilder.java b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexBuilder.java index c7eed3e6ee67..0f0814e98615 100644 --- a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexBuilder.java +++ b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexBuilder.java @@ -57,6 +57,7 @@ import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.util.RandomAccessReader; import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.utils.ApproximateTime; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.Throwables; import org.apache.cassandra.utils.UUIDGen; @@ -133,7 +134,7 @@ private String logMessage(String message) { */ private boolean indexSSTable(SSTableReader sstable, Set indexes) { - logger.debug(logMessage("Starting index build on {}"), sstable.descriptor); + logger.debug(logMessage("Starting index build on {} with {} indexes"), sstable.descriptor, indexes.size()); CountDownLatch perSSTableFileLock = null; StorageAttachedIndexWriter indexWriter = null; @@ -149,6 +150,8 @@ private boolean indexSSTable(SSTableReader sstable, Set in IndexDescriptor indexDescriptor = group.descriptorFor(sstable); Set replacedComponents = new HashSet<>(); + Throwable errorOccurred = null; + long indexBuildStartTimeNanos = ApproximateTime.nanoTime(); try (RandomAccessReader dataFile = sstable.openDataReader(); LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.INDEX_BUILD, tracker.metadata)) @@ -220,12 +223,14 @@ private boolean indexSSTable(SSTableReader sstable, Set in completeSSTable(txn, indexWriter, sstable, indexes, perSSTableFileLock, replacedComponents); } - logger.debug("Completed indexing sstable {}", sstable.descriptor); + logger.debug("Completed indexing sstable {} with {} indexes", sstable.descriptor, indexes.size()); return false; } catch (Throwable t) { + errorOccurred = t; + if (indexWriter != null) { indexWriter.abort(t, true); @@ -259,6 +264,8 @@ else if (t instanceof CompactionInterruptedException) } finally { + SSTableWatcher.instance.onIndonIndexBuildCompleted(sstable, indexes, indexBuildStartTimeNanos, errorOccurred); + ref.release(); // release current lock in case of error if (perSSTableFileLock != null) diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWatcher.java b/src/java/org/apache/cassandra/io/sstable/SSTableWatcher.java index 2cbcad441fb3..b58e6c5a5b3c 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableWatcher.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableWatcher.java @@ -20,6 +20,7 @@ import java.util.Set; +import org.apache.cassandra.index.Index; import org.apache.cassandra.index.sai.disk.format.IndexComponents; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.utils.FBUtilities; @@ -63,6 +64,13 @@ default void onIndexBuild(SSTableReader sstable) { } + /** + * Called when an index built is completed. This is currenly used only for metrics. + */ + default void onIndonIndexBuildCompleted(SSTableReader sstable, Set indexes, long indexBuildStartTimeNanos, Throwable errorOccurred) + { + } + /** * Called when an index is dropped on index components affected by that drop. *

diff --git a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java index f3279d103f9f..e3d9e7d652f3 100644 --- a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java +++ b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java @@ -81,6 +81,11 @@ public class KeyspaceMetrics public final Histogram sstablesPerReadHistogram; /** Tombstones scanned in queries on this Keyspace */ public final Histogram tombstoneScannedHistogram; + /** Time spent flushing memtables */ + public final Histogram flushTime; + + /** Time spent writing memtables during compaction */ + public final Histogram compactionTime; /** Shadowed keys scan metrics **/ public final Histogram shadowedKeysScannedHistogram; @@ -218,6 +223,8 @@ public KeyspaceMetrics(final Keyspace ks) // create histograms for TableMetrics to replicate updates to sstablesPerReadHistogram = createKeyspaceHistogram("SSTablesPerReadHistogram", true); tombstoneScannedHistogram = createKeyspaceHistogram("TombstoneScannedHistogram", false); + flushTime = createKeyspaceHistogram("FlushTime", false); + compactionTime = createKeyspaceHistogram("CompactionTime", false); shadowedKeysScannedHistogram = createKeyspaceHistogram("ShadowedKeysScannedHistogram", false); shadowedKeysLoopsHistogram = createKeyspaceHistogram("ShadowedKeysLoopsHistogram", false); liveScannedHistogram = createKeyspaceHistogram("LiveScannedHistogram", false); diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java b/src/java/org/apache/cassandra/metrics/TableMetrics.java index 77e6fa0b5ae5..35dcf0f4775a 100644 --- a/src/java/org/apache/cassandra/metrics/TableMetrics.java +++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java @@ -208,6 +208,8 @@ public String asCQLString() public final MovingAverage flushSegmentCount; /** The average duration per 1Kb of data flushed, in nanoseconds. */ public final MovingAverage flushTimePerKb; + /** Time spent in flushing memtables */ + public final TableHistogram flushTime; /** Total number of bytes inserted into memtables since server [re]start. */ public final Counter bytesInserted; /** Total number of bytes written by compaction since server [re]start */ @@ -216,6 +218,8 @@ public String asCQLString() public final Counter compactionBytesRead; /** The average duration per 1Kb of data compacted, in nanoseconds. */ public final MovingAverage compactionTimePerKb; + /** Time spent in writing sstables during compaction */ + public final TableHistogram compactionTime; /** Estimate of number of pending compactions for this table */ public final Gauge pendingCompactions; /** Number of SSTables on disk for this CF */ @@ -725,11 +729,13 @@ public Long getValue() flushSizeOnDisk = ExpMovingAverage.decayBy1000(); flushSegmentCount = ExpMovingAverage.decayBy1000(); flushTimePerKb = ExpMovingAverage.decayBy100(); + flushTime = createTableHistogram("FlushTime", cfs.getKeyspaceMetrics().flushTime, false); bytesInserted = createTableCounter("BytesInserted"); compactionBytesWritten = createTableCounter("CompactionBytesWritten"); compactionBytesRead = createTableCounter("CompactionBytesRead"); compactionTimePerKb = ExpMovingAverage.decayBy100(); + compactionTime = createTableHistogram("CompactionTime", cfs.getKeyspaceMetrics().compactionTime, false); pendingCompactions = createTableGauge("PendingCompactions", () -> cfs.getCompactionStrategy().getEstimatedRemainingTasks()); liveSSTableCount = createTableGauge("LiveSSTableCount", () -> cfs.getLiveSSTables().size()); oldVersionSSTableCount = createTableGauge("OldVersionSSTableCount", new Gauge() @@ -1098,6 +1104,7 @@ public void incBytesFlushed(long inputSize, long outputSize, long elapsedNanos) flushSize.update(outputSize); // this assumes that at least 1 Kb was flushed, which should always be the case, then rounds down flushTimePerKb.update(elapsedNanos / (double) Math.max(1, inputSize / 1024L)); + flushTime.update(elapsedNanos); } public void incBytesCompacted(long inputDiskSize, long outputDiskSize, long elapsedNanos) @@ -1106,7 +1113,10 @@ public void incBytesCompacted(long inputDiskSize, long outputDiskSize, long elap compactionBytesWritten.inc(outputDiskSize); // only update compactionTimePerKb when there are non-expired sstables (inputDiskSize > 0) if (inputDiskSize > 0) + { compactionTimePerKb.update(1024.0 * elapsedNanos / inputDiskSize); + compactionTime.update(elapsedNanos); + } } public void updateSSTableIterated(int count, int intersectingCount, long elapsedNanos) From c9ed4f276897cc0bd66d915203a8e1881be6a6f6 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Fri, 4 Oct 2024 12:32:05 +0200 Subject: [PATCH 2/9] Fix typo --- .../apache/cassandra/index/sai/StorageAttachedIndexBuilder.java | 2 +- src/java/org/apache/cassandra/io/sstable/SSTableWatcher.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexBuilder.java b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexBuilder.java index 0f0814e98615..026db06045ee 100644 --- a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexBuilder.java +++ b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexBuilder.java @@ -264,7 +264,7 @@ else if (t instanceof CompactionInterruptedException) } finally { - SSTableWatcher.instance.onIndonIndexBuildCompleted(sstable, indexes, indexBuildStartTimeNanos, errorOccurred); + SSTableWatcher.instance.onIndexBuildCompleted(sstable, indexes, indexBuildStartTimeNanos, errorOccurred); ref.release(); // release current lock in case of error diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWatcher.java b/src/java/org/apache/cassandra/io/sstable/SSTableWatcher.java index b58e6c5a5b3c..156851f881af 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableWatcher.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableWatcher.java @@ -67,7 +67,7 @@ default void onIndexBuild(SSTableReader sstable) /** * Called when an index built is completed. This is currenly used only for metrics. */ - default void onIndonIndexBuildCompleted(SSTableReader sstable, Set indexes, long indexBuildStartTimeNanos, Throwable errorOccurred) + default void onIndexBuildCompleted(SSTableReader sstable, Set indexes, long indexBuildStartTimeNanos, Throwable errorOccurred) { } From 67b8884160c4114305c58858e986c054c5da4ec2 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Fri, 4 Oct 2024 13:18:09 +0200 Subject: [PATCH 3/9] Hook into SAI writer --- .../cassandra/db/lifecycle/Tracker.java | 1 + .../sai/StorageAttachedIndexBuilder.java | 4 +--- .../index/sai/StorageAttachedIndexGroup.java | 2 +- .../sai/disk/StorageAttachedIndexWriter.java | 19 ++++++++++++++++--- .../cassandra/io/sstable/SSTableWatcher.java | 7 ------- .../sstable/format/SSTableFlushObserver.java | 7 +++++++ .../io/sstable/format/SSTableWriter.java | 1 + .../cassandra/metrics/KeyspaceMetrics.java | 4 ++++ .../cassandra/metrics/TableMetrics.java | 7 +++++++ 9 files changed, 38 insertions(+), 14 deletions(-) diff --git a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java index 7cee374bbcd1..e7293b996c6f 100644 --- a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java +++ b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java @@ -49,6 +49,7 @@ import org.apache.cassandra.io.util.File; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.metrics.StorageMetrics; +import org.apache.cassandra.metrics.TableMetrics; import org.apache.cassandra.notifications.INotification; import org.apache.cassandra.notifications.INotificationConsumer; import org.apache.cassandra.notifications.InitialSSTableAddedNotification; diff --git a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexBuilder.java b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexBuilder.java index 026db06045ee..273cedade4e6 100644 --- a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexBuilder.java +++ b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexBuilder.java @@ -165,7 +165,7 @@ private boolean indexSSTable(SSTableReader sstable, Set in prepareForRebuild(indexDescriptor.perIndexComponents(index.getIndexContext()), replacedComponents); long keyCount = SSTableReader.getApproximateKeyCount(Set.of(sstable)); - indexWriter = new StorageAttachedIndexWriter(indexDescriptor, metadata, indexes, txn, keyCount, perIndexComponentsOnly); + indexWriter = new StorageAttachedIndexWriter(indexDescriptor, metadata, indexes, txn, keyCount, perIndexComponentsOnly, group.table().metric); long previousKeyPosition = 0; indexWriter.begin(); @@ -264,8 +264,6 @@ else if (t instanceof CompactionInterruptedException) } finally { - SSTableWatcher.instance.onIndexBuildCompleted(sstable, indexes, indexBuildStartTimeNanos, errorOccurred); - ref.release(); // release current lock in case of error if (perSSTableFileLock != null) diff --git a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexGroup.java b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexGroup.java index c2962753b859..d10f61473f38 100644 --- a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexGroup.java +++ b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexGroup.java @@ -243,7 +243,7 @@ public SSTableFlushObserver getFlushObserver(Descriptor descriptor, LifecycleNew IndexDescriptor indexDescriptor = IndexDescriptor.empty(descriptor); try { - return new StorageAttachedIndexWriter(indexDescriptor, tableMetadata, indices, tracker, keyCount); + return new StorageAttachedIndexWriter(indexDescriptor, tableMetadata, indices, tracker, keyCount, baseCfs.metric); } catch (Throwable t) { diff --git a/src/java/org/apache/cassandra/index/sai/disk/StorageAttachedIndexWriter.java b/src/java/org/apache/cassandra/index/sai/disk/StorageAttachedIndexWriter.java index 416f8e615122..6ef725c94575 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/StorageAttachedIndexWriter.java +++ b/src/java/org/apache/cassandra/index/sai/disk/StorageAttachedIndexWriter.java @@ -43,7 +43,9 @@ import org.apache.cassandra.index.sai.utils.PrimaryKey; import org.apache.cassandra.io.sstable.SSTable; import org.apache.cassandra.io.sstable.format.SSTableFlushObserver; +import org.apache.cassandra.metrics.TableMetrics; import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.utils.ApproximateTime; import org.apache.cassandra.utils.Throwables; /** @@ -61,20 +63,23 @@ public class StorageAttachedIndexWriter implements SSTableFlushObserver private final Stopwatch stopwatch = Stopwatch.createUnstarted(); private final RowMapping rowMapping; private final OperationType opType; + private final TableMetrics tableMetrics; private DecoratedKey currentKey; private boolean tokenOffsetWriterCompleted = false; private boolean aborted = false; private long sstableRowId = 0; + private long totalTimeSpent = 0; public StorageAttachedIndexWriter(IndexDescriptor indexDescriptor, TableMetadata tableMetadata, Collection indices, LifecycleNewTracker lifecycleNewTracker, - long keyCount) throws IOException + long keyCount, + TableMetrics tableMetrics) throws IOException { - this(indexDescriptor, tableMetadata, indices, lifecycleNewTracker, keyCount, false); + this(indexDescriptor, tableMetadata, indices, lifecycleNewTracker, keyCount, false, tableMetrics); } public StorageAttachedIndexWriter(IndexDescriptor indexDescriptor, @@ -82,7 +87,8 @@ public StorageAttachedIndexWriter(IndexDescriptor indexDescriptor, Collection indices, LifecycleNewTracker lifecycleNewTracker, long keyCount, - boolean perIndexComponentsOnly) throws IOException + boolean perIndexComponentsOnly, + TableMetrics tableMetrics) throws IOException { // We always write at the latest version (through what that version is can be configured for specific cases) var onDiskFormat = Version.latest().onDiskFormat(); @@ -106,6 +112,7 @@ public StorageAttachedIndexWriter(IndexDescriptor indexDescriptor, this.perSSTableWriter = perIndexComponentsOnly ? PerSSTableWriter.NONE : onDiskFormat.newPerSSTableWriter(indexDescriptor); + this.tableMetrics = tableMetrics; } @Override @@ -187,6 +194,9 @@ public void staticRow(Row staticRow, long position) @Override public void complete(SSTable sstable) { + + tableMetrics.updateStorageAttachedIndexWritingTime(totalTimeSpent); + if (aborted) return; long start = stopwatch.elapsed(TimeUnit.MILLISECONDS); @@ -315,6 +325,7 @@ public void abort(Throwable accumulator, boolean fromIndex) private void addRow(Row row) throws IOException, TrieSpaceExhaustedException { + long now = ApproximateTime.nanoTime(); PrimaryKey primaryKey = primaryKeyFactory.create(currentKey, row.clustering()); perSSTableWriter.nextRow(primaryKey); rowMapping.add(primaryKey, sstableRowId); @@ -324,5 +335,7 @@ private void addRow(Row row) throws IOException, TrieSpaceExhaustedException w.addRow(primaryKey, row, sstableRowId); } sstableRowId++; + + totalTimeSpent += (ApproximateTime.nanoTime() - now); } } diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWatcher.java b/src/java/org/apache/cassandra/io/sstable/SSTableWatcher.java index 156851f881af..244379caf03b 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableWatcher.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableWatcher.java @@ -64,13 +64,6 @@ default void onIndexBuild(SSTableReader sstable) { } - /** - * Called when an index built is completed. This is currenly used only for metrics. - */ - default void onIndexBuildCompleted(SSTableReader sstable, Set indexes, long indexBuildStartTimeNanos, Throwable errorOccurred) - { - } - /** * Called when an index is dropped on index components affected by that drop. *

diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableFlushObserver.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableFlushObserver.java index e292736af5b1..3d7296a01da8 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableFlushObserver.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableFlushObserver.java @@ -23,6 +23,7 @@ import org.apache.cassandra.db.rows.Row; import org.apache.cassandra.db.rows.Unfiltered; import org.apache.cassandra.io.sstable.SSTable; +import org.apache.cassandra.metrics.TableMetrics; /** * Observer for events in the lifecycle of writing out an sstable. @@ -110,4 +111,10 @@ default void nextUnfilteredCluster(Unfiltered unfiltered, long position) * Clean up resources on error. There should be no side effects if called multiple times. */ default void abort(Throwable accumulator) {} + + /** + * Update table metrics with the latest information. + * @param tableMetrics the metrics to update + */ + default void updateTableMetrics(TableMetrics tableMetrics) {} } diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java index 519b48ca1b2f..7a52f83ff157 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java @@ -322,6 +322,7 @@ public SSTableReader finish(boolean openResult) openResult(); txnProxy().commit(); observers.forEach(obs -> obs.complete(this)); + return finished(); } diff --git a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java index e3d9e7d652f3..941ada312c0b 100644 --- a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java +++ b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java @@ -84,6 +84,9 @@ public class KeyspaceMetrics /** Time spent flushing memtables */ public final Histogram flushTime; + /** Time spent writing SAI */ + public final Histogram storageAttachedIndexWritingTime; + /** Time spent writing memtables during compaction */ public final Histogram compactionTime; @@ -224,6 +227,7 @@ public KeyspaceMetrics(final Keyspace ks) sstablesPerReadHistogram = createKeyspaceHistogram("SSTablesPerReadHistogram", true); tombstoneScannedHistogram = createKeyspaceHistogram("TombstoneScannedHistogram", false); flushTime = createKeyspaceHistogram("FlushTime", false); + storageAttachedIndexWritingTime = createKeyspaceHistogram("StorageAttachedIndexWritingTime", false); compactionTime = createKeyspaceHistogram("CompactionTime", false); shadowedKeysScannedHistogram = createKeyspaceHistogram("ShadowedKeysScannedHistogram", false); shadowedKeysLoopsHistogram = createKeyspaceHistogram("ShadowedKeysLoopsHistogram", false); diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java b/src/java/org/apache/cassandra/metrics/TableMetrics.java index 35dcf0f4775a..bde276159d74 100644 --- a/src/java/org/apache/cassandra/metrics/TableMetrics.java +++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java @@ -210,6 +210,7 @@ public String asCQLString() public final MovingAverage flushTimePerKb; /** Time spent in flushing memtables */ public final TableHistogram flushTime; + public final TableHistogram storageAttachedIndexWritingTime; /** Total number of bytes inserted into memtables since server [re]start. */ public final Counter bytesInserted; /** Total number of bytes written by compaction since server [re]start */ @@ -730,6 +731,7 @@ public Long getValue() flushSegmentCount = ExpMovingAverage.decayBy1000(); flushTimePerKb = ExpMovingAverage.decayBy100(); flushTime = createTableHistogram("FlushTime", cfs.getKeyspaceMetrics().flushTime, false); + storageAttachedIndexWritingTime = createTableHistogram("StorageAttachedIndexWritingTime", cfs.getKeyspaceMetrics().storageAttachedIndexWritingTime, false); bytesInserted = createTableCounter("BytesInserted"); compactionBytesWritten = createTableCounter("CompactionBytesWritten"); @@ -1107,6 +1109,11 @@ public void incBytesFlushed(long inputSize, long outputSize, long elapsedNanos) flushTime.update(elapsedNanos); } + public void updateStorageAttachedIndexWritingTime(long totalTimeSpentNanos) + { + storageAttachedIndexWritingTime.update(totalTimeSpentNanos); + } + public void incBytesCompacted(long inputDiskSize, long outputDiskSize, long elapsedNanos) { compactionBytesRead.inc(inputDiskSize); From 34564f55a38b595bad945476c5a801b7064b1bfa Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Fri, 4 Oct 2024 13:19:54 +0200 Subject: [PATCH 4/9] Revert useless stuff --- .../org/apache/cassandra/db/lifecycle/Tracker.java | 1 - .../index/sai/StorageAttachedIndexBuilder.java | 11 +++-------- .../io/sstable/format/SSTableFlushObserver.java | 7 ------- .../cassandra/io/sstable/format/SSTableWriter.java | 1 - 4 files changed, 3 insertions(+), 17 deletions(-) diff --git a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java index e7293b996c6f..7cee374bbcd1 100644 --- a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java +++ b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java @@ -49,7 +49,6 @@ import org.apache.cassandra.io.util.File; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.metrics.StorageMetrics; -import org.apache.cassandra.metrics.TableMetrics; import org.apache.cassandra.notifications.INotification; import org.apache.cassandra.notifications.INotificationConsumer; import org.apache.cassandra.notifications.InitialSSTableAddedNotification; diff --git a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexBuilder.java b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexBuilder.java index 273cedade4e6..c7eed3e6ee67 100644 --- a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexBuilder.java +++ b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexBuilder.java @@ -57,7 +57,6 @@ import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.util.RandomAccessReader; import org.apache.cassandra.schema.TableMetadata; -import org.apache.cassandra.utils.ApproximateTime; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.Throwables; import org.apache.cassandra.utils.UUIDGen; @@ -134,7 +133,7 @@ private String logMessage(String message) { */ private boolean indexSSTable(SSTableReader sstable, Set indexes) { - logger.debug(logMessage("Starting index build on {} with {} indexes"), sstable.descriptor, indexes.size()); + logger.debug(logMessage("Starting index build on {}"), sstable.descriptor); CountDownLatch perSSTableFileLock = null; StorageAttachedIndexWriter indexWriter = null; @@ -150,8 +149,6 @@ private boolean indexSSTable(SSTableReader sstable, Set in IndexDescriptor indexDescriptor = group.descriptorFor(sstable); Set replacedComponents = new HashSet<>(); - Throwable errorOccurred = null; - long indexBuildStartTimeNanos = ApproximateTime.nanoTime(); try (RandomAccessReader dataFile = sstable.openDataReader(); LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.INDEX_BUILD, tracker.metadata)) @@ -165,7 +162,7 @@ private boolean indexSSTable(SSTableReader sstable, Set in prepareForRebuild(indexDescriptor.perIndexComponents(index.getIndexContext()), replacedComponents); long keyCount = SSTableReader.getApproximateKeyCount(Set.of(sstable)); - indexWriter = new StorageAttachedIndexWriter(indexDescriptor, metadata, indexes, txn, keyCount, perIndexComponentsOnly, group.table().metric); + indexWriter = new StorageAttachedIndexWriter(indexDescriptor, metadata, indexes, txn, keyCount, perIndexComponentsOnly); long previousKeyPosition = 0; indexWriter.begin(); @@ -223,14 +220,12 @@ private boolean indexSSTable(SSTableReader sstable, Set in completeSSTable(txn, indexWriter, sstable, indexes, perSSTableFileLock, replacedComponents); } - logger.debug("Completed indexing sstable {} with {} indexes", sstable.descriptor, indexes.size()); + logger.debug("Completed indexing sstable {}", sstable.descriptor); return false; } catch (Throwable t) { - errorOccurred = t; - if (indexWriter != null) { indexWriter.abort(t, true); diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableFlushObserver.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableFlushObserver.java index 3d7296a01da8..e292736af5b1 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableFlushObserver.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableFlushObserver.java @@ -23,7 +23,6 @@ import org.apache.cassandra.db.rows.Row; import org.apache.cassandra.db.rows.Unfiltered; import org.apache.cassandra.io.sstable.SSTable; -import org.apache.cassandra.metrics.TableMetrics; /** * Observer for events in the lifecycle of writing out an sstable. @@ -111,10 +110,4 @@ default void nextUnfilteredCluster(Unfiltered unfiltered, long position) * Clean up resources on error. There should be no side effects if called multiple times. */ default void abort(Throwable accumulator) {} - - /** - * Update table metrics with the latest information. - * @param tableMetrics the metrics to update - */ - default void updateTableMetrics(TableMetrics tableMetrics) {} } diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java index 7a52f83ff157..519b48ca1b2f 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java @@ -322,7 +322,6 @@ public SSTableReader finish(boolean openResult) openResult(); txnProxy().commit(); observers.forEach(obs -> obs.complete(this)); - return finished(); } From 6c4c708521619c0e76063f5835cebd7e5ce13994 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Fri, 4 Oct 2024 13:20:21 +0200 Subject: [PATCH 5/9] Revert useless stuff --- src/java/org/apache/cassandra/io/sstable/SSTableWatcher.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWatcher.java b/src/java/org/apache/cassandra/io/sstable/SSTableWatcher.java index 244379caf03b..2cbcad441fb3 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableWatcher.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableWatcher.java @@ -20,7 +20,6 @@ import java.util.Set; -import org.apache.cassandra.index.Index; import org.apache.cassandra.index.sai.disk.format.IndexComponents; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.utils.FBUtilities; From 5f2526d1615af369e429000b7caf38f72cc5b9ae Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Fri, 4 Oct 2024 13:40:25 +0200 Subject: [PATCH 6/9] More metrics --- .../sai/StorageAttachedIndexBuilder.java | 10 +++++-- .../sai/disk/StorageAttachedIndexWriter.java | 2 +- .../cassandra/metrics/KeyspaceMetrics.java | 12 ++++++-- .../cassandra/metrics/TableMetrics.java | 30 ++++++++++++++++--- 4 files changed, 45 insertions(+), 9 deletions(-) diff --git a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexBuilder.java b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexBuilder.java index c7eed3e6ee67..f81fb0cccf10 100644 --- a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexBuilder.java +++ b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexBuilder.java @@ -29,6 +29,7 @@ import java.util.SortedMap; import java.util.UUID; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import com.google.common.collect.Maps; @@ -57,6 +58,7 @@ import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.util.RandomAccessReader; import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.utils.ApproximateTime; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.Throwables; import org.apache.cassandra.utils.UUIDGen; @@ -89,6 +91,7 @@ public class StorageAttachedIndexBuilder extends SecondaryIndexBuilder private long bytesProcessed = 0; private final long totalSizeInBytes; + private final long startTimeNanos; StorageAttachedIndexBuilder(StorageAttachedIndexGroup group, SortedMap> sstables, boolean isFullRebuild, boolean isInitialBuild) { @@ -99,6 +102,7 @@ public class StorageAttachedIndexBuilder extends SecondaryIndexBuilder this.isFullRebuild = isFullRebuild; this.isInitialBuild = isInitialBuild; this.totalSizeInBytes = sstables.keySet().stream().mapToLong(SSTableReader::uncompressedLength).sum(); + this.startTimeNanos = ApproximateTime.nanoTime(); } @Override @@ -162,7 +166,7 @@ private boolean indexSSTable(SSTableReader sstable, Set in prepareForRebuild(indexDescriptor.perIndexComponents(index.getIndexContext()), replacedComponents); long keyCount = SSTableReader.getApproximateKeyCount(Set.of(sstable)); - indexWriter = new StorageAttachedIndexWriter(indexDescriptor, metadata, indexes, txn, keyCount, perIndexComponentsOnly); + indexWriter = new StorageAttachedIndexWriter(indexDescriptor, metadata, indexes, txn, keyCount, perIndexComponentsOnly, group.table().metric); long previousKeyPosition = 0; indexWriter.begin(); @@ -220,7 +224,9 @@ private boolean indexSSTable(SSTableReader sstable, Set in completeSSTable(txn, indexWriter, sstable, indexes, perSSTableFileLock, replacedComponents); } - logger.debug("Completed indexing sstable {}", sstable.descriptor); + long timeTaken = ApproximateTime.nanoTime() - startTimeNanos; + group.table().metric.storageAttachedIndexRebuildTime.update(timeTaken); + logger.debug("Completed indexing sstable {} in {} seconds", sstable.descriptor, TimeUnit.NANOSECONDS.toSeconds(timeTaken)); return false; } diff --git a/src/java/org/apache/cassandra/index/sai/disk/StorageAttachedIndexWriter.java b/src/java/org/apache/cassandra/index/sai/disk/StorageAttachedIndexWriter.java index 6ef725c94575..ebcc60c10685 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/StorageAttachedIndexWriter.java +++ b/src/java/org/apache/cassandra/index/sai/disk/StorageAttachedIndexWriter.java @@ -195,7 +195,7 @@ public void staticRow(Row staticRow, long position) public void complete(SSTable sstable) { - tableMetrics.updateStorageAttachedIndexWritingTime(totalTimeSpent); + tableMetrics.updateStorageAttachedIndexWritingTime(totalTimeSpent, opType); if (aborted) return; diff --git a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java index 941ada312c0b..66252976e63a 100644 --- a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java +++ b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java @@ -83,9 +83,13 @@ public class KeyspaceMetrics public final Histogram tombstoneScannedHistogram; /** Time spent flushing memtables */ public final Histogram flushTime; + public final Histogram storageAttachedIndexRebuildTime; /** Time spent writing SAI */ - public final Histogram storageAttachedIndexWritingTime; + public final Histogram storageAttachedIndexWritingTimeForIndexBuild; + public final Histogram storageAttachedIndexWritingTimeForCompaction; + public final Histogram storageAttachedIndexWritingTimeForFlush; + public final Histogram storageAttachedIndexWritingTimeForOther; /** Time spent writing memtables during compaction */ public final Histogram compactionTime; @@ -227,7 +231,11 @@ public KeyspaceMetrics(final Keyspace ks) sstablesPerReadHistogram = createKeyspaceHistogram("SSTablesPerReadHistogram", true); tombstoneScannedHistogram = createKeyspaceHistogram("TombstoneScannedHistogram", false); flushTime = createKeyspaceHistogram("FlushTime", false); - storageAttachedIndexWritingTime = createKeyspaceHistogram("StorageAttachedIndexWritingTime", false); + storageAttachedIndexRebuildTime = createKeyspaceHistogram("StorageAttachedIndexRebuildTime", false); + storageAttachedIndexWritingTimeForIndexBuild = createKeyspaceHistogram("StorageAttachedIndexWritingTimeIndexBuild", false); + storageAttachedIndexWritingTimeForCompaction = createKeyspaceHistogram("StorageAttachedIndexWritingTimeCompaction", false); + storageAttachedIndexWritingTimeForFlush = createKeyspaceHistogram("StorageAttachedIndexWritingTimeFlush", false); + storageAttachedIndexWritingTimeForOther = createKeyspaceHistogram("StorageAttachedIndexWritingTimeOther", false); compactionTime = createKeyspaceHistogram("CompactionTime", false); shadowedKeysScannedHistogram = createKeyspaceHistogram("ShadowedKeysScannedHistogram", false); shadowedKeysLoopsHistogram = createKeyspaceHistogram("ShadowedKeysLoopsHistogram", false); diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java b/src/java/org/apache/cassandra/metrics/TableMetrics.java index bde276159d74..e65910579d54 100644 --- a/src/java/org/apache/cassandra/metrics/TableMetrics.java +++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java @@ -55,6 +55,7 @@ import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.DataRange; import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.db.lifecycle.SSTableSet; import org.apache.cassandra.db.lifecycle.View; @@ -210,7 +211,11 @@ public String asCQLString() public final MovingAverage flushTimePerKb; /** Time spent in flushing memtables */ public final TableHistogram flushTime; - public final TableHistogram storageAttachedIndexWritingTime; + public final TableHistogram storageAttachedIndexRebuildTime; + public final TableHistogram storageAttachedIndexWritingTimeForIndexBuild; + public final TableHistogram storageAttachedIndexWritingTimeForCompaction; + public final TableHistogram storageAttachedIndexWritingTimeForFlush; + public final TableHistogram storageAttachedIndexWritingTimeForOther; /** Total number of bytes inserted into memtables since server [re]start. */ public final Counter bytesInserted; /** Total number of bytes written by compaction since server [re]start */ @@ -731,7 +736,11 @@ public Long getValue() flushSegmentCount = ExpMovingAverage.decayBy1000(); flushTimePerKb = ExpMovingAverage.decayBy100(); flushTime = createTableHistogram("FlushTime", cfs.getKeyspaceMetrics().flushTime, false); - storageAttachedIndexWritingTime = createTableHistogram("StorageAttachedIndexWritingTime", cfs.getKeyspaceMetrics().storageAttachedIndexWritingTime, false); + storageAttachedIndexRebuildTime = createTableHistogram("StorageAttachedIndexRebuildTime", cfs.getKeyspaceMetrics().storageAttachedIndexRebuildTime, false); + storageAttachedIndexWritingTimeForIndexBuild = createTableHistogram("StorageAttachedIndexWritingTime", cfs.getKeyspaceMetrics().storageAttachedIndexWritingTimeForIndexBuild, false); + storageAttachedIndexWritingTimeForCompaction = createTableHistogram("StorageAttachedIndexWritingTimeForCompaction", cfs.getKeyspaceMetrics().storageAttachedIndexWritingTimeForCompaction, false); + storageAttachedIndexWritingTimeForFlush = createTableHistogram("StorageAttachedIndexWritingTimeForFlush", cfs.getKeyspaceMetrics().storageAttachedIndexWritingTimeForFlush, false); + storageAttachedIndexWritingTimeForOther= createTableHistogram("StorageAttachedIndexWritingTimeForOther", cfs.getKeyspaceMetrics().storageAttachedIndexWritingTimeForOther, false); bytesInserted = createTableCounter("BytesInserted"); compactionBytesWritten = createTableCounter("CompactionBytesWritten"); @@ -1109,9 +1118,22 @@ public void incBytesFlushed(long inputSize, long outputSize, long elapsedNanos) flushTime.update(elapsedNanos); } - public void updateStorageAttachedIndexWritingTime(long totalTimeSpentNanos) + public void updateStorageAttachedIndexWritingTime(long totalTimeSpentNanos, OperationType opType) { - storageAttachedIndexWritingTime.update(totalTimeSpentNanos); + switch (opType) + { + case INDEX_BUILD: + storageAttachedIndexWritingTimeForIndexBuild.update(totalTimeSpentNanos); + break; + case COMPACTION: + storageAttachedIndexWritingTimeForCompaction.update(totalTimeSpentNanos); + break; + case FLUSH: + storageAttachedIndexWritingTimeForFlush.update(totalTimeSpentNanos); + break; + default: + storageAttachedIndexWritingTimeForOther.update(totalTimeSpentNanos); + } } public void incBytesCompacted(long inputDiskSize, long outputDiskSize, long elapsedNanos) From d543b7cb4a984ec99fc1594b383bc838711e9098 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Fri, 4 Oct 2024 16:11:56 +0200 Subject: [PATCH 7/9] change index build tracking --- .../cassandra/index/sai/StorageAttachedIndexBuilder.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexBuilder.java b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexBuilder.java index f81fb0cccf10..93f1b86a08b0 100644 --- a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexBuilder.java +++ b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexBuilder.java @@ -91,7 +91,6 @@ public class StorageAttachedIndexBuilder extends SecondaryIndexBuilder private long bytesProcessed = 0; private final long totalSizeInBytes; - private final long startTimeNanos; StorageAttachedIndexBuilder(StorageAttachedIndexGroup group, SortedMap> sstables, boolean isFullRebuild, boolean isInitialBuild) { @@ -102,7 +101,6 @@ public class StorageAttachedIndexBuilder extends SecondaryIndexBuilder this.isFullRebuild = isFullRebuild; this.isInitialBuild = isInitialBuild; this.totalSizeInBytes = sstables.keySet().stream().mapToLong(SSTableReader::uncompressedLength).sum(); - this.startTimeNanos = ApproximateTime.nanoTime(); } @Override @@ -138,6 +136,7 @@ private String logMessage(String message) { private boolean indexSSTable(SSTableReader sstable, Set indexes) { logger.debug(logMessage("Starting index build on {}"), sstable.descriptor); + long startTimeNanos = ApproximateTime.nanoTime(); CountDownLatch perSSTableFileLock = null; StorageAttachedIndexWriter indexWriter = null; From 989e0664be4293a51ad448fc24928769fb8434d8 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Fri, 4 Oct 2024 16:18:48 +0200 Subject: [PATCH 8/9] move time tracking --- .../sai/disk/StorageAttachedIndexWriter.java | 123 +++++++++--------- 1 file changed, 63 insertions(+), 60 deletions(-) diff --git a/src/java/org/apache/cassandra/index/sai/disk/StorageAttachedIndexWriter.java b/src/java/org/apache/cassandra/index/sai/disk/StorageAttachedIndexWriter.java index ebcc60c10685..ef209a3d069d 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/StorageAttachedIndexWriter.java +++ b/src/java/org/apache/cassandra/index/sai/disk/StorageAttachedIndexWriter.java @@ -194,81 +194,84 @@ public void staticRow(Row staticRow, long position) @Override public void complete(SSTable sstable) { - - tableMetrics.updateStorageAttachedIndexWritingTime(totalTimeSpent, opType); - - if (aborted) return; - - long start = stopwatch.elapsed(TimeUnit.MILLISECONDS); - - logger.trace(indexDescriptor.logMessage("Completed partition iteration for index flush for SSTable {}. Elapsed time: {} ms"), - indexDescriptor.descriptor, - start); - try { - perSSTableWriter.complete(stopwatch); - tokenOffsetWriterCompleted = true; - long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS); - logger.trace(indexDescriptor.logMessage("Completed per-SSTable write for SSTable {}. Duration: {} ms. Total elapsed time: {} ms."), - indexDescriptor.descriptor, - elapsed - start, - elapsed); + if (aborted) return; - start = elapsed; + long start = stopwatch.elapsed(TimeUnit.MILLISECONDS); - rowMapping.complete(); + logger.trace(indexDescriptor.logMessage("Completed partition iteration for index flush for SSTable {}. Elapsed time: {} ms"), + indexDescriptor.descriptor, + start); - for (PerIndexWriter perIndexWriter : perIndexWriters) + try { - perIndexWriter.complete(stopwatch); - - // The handling of components when we flush/compact is a tad backward: instead of registering the - // components as we write them, all the components are collected beforehand in `SSTableWriter#create`, - // which means this is a superset of possible components, but if any components are not written for - // those reason, this needs to be fixed afterward. One case for SAI component for instance is empty - // indexes: if a particular sstable has nothing indexed for a particular index, then only the completion - // marker for that index is kept on disk but no other components, so we need to remove the components - // that were "optimistically" added (and more generally, future index implementation may have some - // components that are only optionally present based on specific conditions). - // Note 1: for index build/rebuild on existing sstable, `SSTableWriter#create` is not used, and instead - // we do only register components written (see `StorageAttachedIndexBuilder#completeSSTable`). - // Note 2: as hinted above, an alternative here would be to change the whole handling of components, - // registering components only as they are effectively written. This is a larger refactor, with some - // subtleties involved, so it is left as potential future work. - if (opType == OperationType.FLUSH || opType == OperationType.COMPACTION) + perSSTableWriter.complete(stopwatch); + tokenOffsetWriterCompleted = true; + long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS); + logger.trace(indexDescriptor.logMessage("Completed per-SSTable write for SSTable {}. Duration: {} ms. Total elapsed time: {} ms."), + indexDescriptor.descriptor, + elapsed - start, + elapsed); + + start = elapsed; + + rowMapping.complete(); + + for (PerIndexWriter perIndexWriter : perIndexWriters) { - var writtenComponents = perIndexWriter.writtenComponents().allAsCustomComponents(); - var registeredComponents = IndexDescriptor.perIndexComponentsForNewlyFlushedSSTable(perIndexWriter.indexContext()); - var toRemove = Sets.difference(registeredComponents, writtenComponents); - if (!toRemove.isEmpty()) + perIndexWriter.complete(stopwatch); + + // The handling of components when we flush/compact is a tad backward: instead of registering the + // components as we write them, all the components are collected beforehand in `SSTableWriter#create`, + // which means this is a superset of possible components, but if any components are not written for + // those reason, this needs to be fixed afterward. One case for SAI component for instance is empty + // indexes: if a particular sstable has nothing indexed for a particular index, then only the completion + // marker for that index is kept on disk but no other components, so we need to remove the components + // that were "optimistically" added (and more generally, future index implementation may have some + // components that are only optionally present based on specific conditions). + // Note 1: for index build/rebuild on existing sstable, `SSTableWriter#create` is not used, and instead + // we do only register components written (see `StorageAttachedIndexBuilder#completeSSTable`). + // Note 2: as hinted above, an alternative here would be to change the whole handling of components, + // registering components only as they are effectively written. This is a larger refactor, with some + // subtleties involved, so it is left as potential future work. + if (opType == OperationType.FLUSH || opType == OperationType.COMPACTION) { - if (logger.isTraceEnabled()) + var writtenComponents = perIndexWriter.writtenComponents().allAsCustomComponents(); + var registeredComponents = IndexDescriptor.perIndexComponentsForNewlyFlushedSSTable(perIndexWriter.indexContext()); + var toRemove = Sets.difference(registeredComponents, writtenComponents); + if (!toRemove.isEmpty()) { - logger.trace(indexDescriptor.logMessage("Removing optimistically added but not writen components from TOC of SSTable {} for index {}"), - indexDescriptor.descriptor, - perIndexWriter.indexContext().getIndexName()); + if (logger.isTraceEnabled()) + { + logger.trace(indexDescriptor.logMessage("Removing optimistically added but not writen components from TOC of SSTable {} for index {}"), + indexDescriptor.descriptor, + perIndexWriter.indexContext().getIndexName()); + } + + // During flush, this happens as we finalize the sstable and before its size is tracked, so not + // passing a tracker is correct and intended (there is nothing to update in the tracker). + sstable.unregisterComponents(toRemove, null); } - - // During flush, this happens as we finalize the sstable and before its size is tracked, so not - // passing a tracker is correct and intended (there is nothing to update in the tracker). - sstable.unregisterComponents(toRemove, null); } } - + elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS); + logger.trace(indexDescriptor.logMessage("Completed per-index writes for SSTable {}. Duration: {} ms. Total elapsed time: {} ms."), + indexDescriptor.descriptor, + elapsed - start, + elapsed); + } + catch (Throwable t) + { + logger.error(indexDescriptor.logMessage("Failed to complete an index build"), t); + abort(t, true); + // fail compaction task or index build task if SAI failed + throw Throwables.unchecked(t); } - elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS); - logger.trace(indexDescriptor.logMessage("Completed per-index writes for SSTable {}. Duration: {} ms. Total elapsed time: {} ms."), - indexDescriptor.descriptor, - elapsed - start, - elapsed); } - catch (Throwable t) + finally { - logger.error(indexDescriptor.logMessage("Failed to complete an index build"), t); - abort(t, true); - // fail compaction task or index build task if SAI failed - throw Throwables.unchecked(t); + tableMetrics.updateStorageAttachedIndexWritingTime(totalTimeSpent, opType); } } From cb4c4dc8d903b4dc07e5baa06f1a99a95407bf68 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Fri, 4 Oct 2024 16:19:51 +0200 Subject: [PATCH 9/9] move time tracking --- .../cassandra/index/sai/disk/StorageAttachedIndexWriter.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/java/org/apache/cassandra/index/sai/disk/StorageAttachedIndexWriter.java b/src/java/org/apache/cassandra/index/sai/disk/StorageAttachedIndexWriter.java index ef209a3d069d..a6352706f5ae 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/StorageAttachedIndexWriter.java +++ b/src/java/org/apache/cassandra/index/sai/disk/StorageAttachedIndexWriter.java @@ -194,6 +194,7 @@ public void staticRow(Row staticRow, long position) @Override public void complete(SSTable sstable) { + long startComplete = ApproximateTime.nanoTime(); try { if (aborted) return; @@ -271,6 +272,7 @@ public void complete(SSTable sstable) } finally { + totalTimeSpent += (ApproximateTime.nanoTime() - startComplete); tableMetrics.updateStorageAttachedIndexWritingTime(totalTimeSpent, opType); } }