Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,16 @@ public enum CassandraRelevantProperties
*/
SAI_TABLE_STATE_METRICS_ENABLED("cassandra.sai.metrics.table_state.enabled", "true"),

/**
* Whether to enable SAI histogram metrics added after May 2025 release, including
* {@link org.apache.cassandra.index.sai.metrics.IndexMetrics#memtableIndexWriteLatency},
* {@link org.apache.cassandra.index.sai.metrics.ColumnQueryMetrics.TrieIndexMetrics#termsTraversalTotalTime},
* {@link org.apache.cassandra.index.sai.metrics.ColumnQueryMetrics.BKDIndexMetrics#intersectionLatency}, and
* {@link org.apache.cassandra.index.sai.metrics.TableQueryMetrics.PerQuery#queryLatency}.
* Enabled by default.
*/
SAI_HISTOGRAMS_ENABLED("cassandra.sai.metrics.histograms.enabled", "true"),

/**
* If true, while creating or altering schema, NetworkTopologyStrategy won't check if the DC exists.
* This is to remain compatible with older workflows that first change the replication before adding the nodes.
Expand Down
3 changes: 2 additions & 1 deletion src/java/org/apache/cassandra/index/sai/IndexContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,8 @@ public void index(DecoratedKey key, Row row, Memtable memtable, OpOrder.Group op
ByteBuffer value = getValueOf(key, row, FBUtilities.nowInSeconds());
target.index(key, row.clustering(), value, memtable, opGroup);
}
indexMetrics.ifPresent(metrics -> metrics.memtableIndexWriteLatency.update(System.nanoTime() - start, TimeUnit.NANOSECONDS));
indexMetrics.flatMap(metrics -> metrics.memtableIndexWriteLatency).ifPresent(timer ->
timer.update(System.nanoTime() - start, TimeUnit.NANOSECONDS));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@
*/
package org.apache.cassandra.index.sai.metrics;

import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;

import com.codahale.metrics.Counter;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import io.github.jbellis.jvector.graph.SearchResult;
import org.apache.cassandra.config.CassandraRelevantProperties;

import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;

Expand All @@ -41,15 +43,18 @@ public static class TrieIndexMetrics extends ColumnQueryMetrics implements Query
/**
* Trie index metrics.
*/
public final Timer termsTraversalTotalTime;
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
public final Optional<Timer> termsTraversalTotalTime;

public final QueryEventListener.PostingListEventListener postingsListener;

public TrieIndexMetrics(String keyspace, String table, String indexName)
{
super(keyspace, table, indexName);

termsTraversalTotalTime = Metrics.timer(createMetricName("TermsLookupLatency"));
termsTraversalTotalTime = CassandraRelevantProperties.SAI_HISTOGRAMS_ENABLED.getBoolean()
? Optional.of(Metrics.timer(createMetricName("TermsLookupLatency")))
: Optional.empty();

Meter postingDecodes = Metrics.meter(createMetricName("PostingDecodes", TRIE_POSTINGS_TYPE));

Expand All @@ -62,7 +67,7 @@ public void onSegmentHit() { }
@Override
public void onTraversalComplete(long traversalTotalTime, TimeUnit unit)
{
termsTraversalTotalTime.update(traversalTotalTime, unit);
termsTraversalTotalTime.ifPresent(timer -> timer.update(traversalTotalTime, unit));
}

@Override
Expand All @@ -79,7 +84,8 @@ public static class BKDIndexMetrics extends ColumnQueryMetrics implements QueryE
/**
* BKD index metrics.
*/
public final Timer intersectionLatency;
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
public final Optional<Timer> intersectionLatency;
public final Meter postingsNumPostings;
public final Meter intersectionEarlyExits;

Expand All @@ -89,7 +95,9 @@ public BKDIndexMetrics(String keyspace, String table, String indexName)
{
super(keyspace, table, indexName);

intersectionLatency = Metrics.timer(createMetricName("KDTreeIntersectionLatency"));
intersectionLatency = CassandraRelevantProperties.SAI_HISTOGRAMS_ENABLED.getBoolean()
? Optional.of(Metrics.timer(createMetricName("KDTreeIntersectionLatency")))
: Optional.empty();
intersectionEarlyExits = Metrics.meter(createMetricName("KDTreeIntersectionEarlyExits"));

postingsNumPostings = Metrics.meter(createMetricName("NumPostings", BKD_POSTINGS_TYPE));
Expand All @@ -102,7 +110,7 @@ public BKDIndexMetrics(String keyspace, String table, String indexName)
@Override
public void onIntersectionComplete(long intersectionTotalTime, TimeUnit unit)
{
intersectionLatency.update(intersectionTotalTime, unit);
intersectionLatency.ifPresent(timer -> timer.update(intersectionTotalTime, unit));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,21 @@
*/
package org.apache.cassandra.index.sai.metrics;

import java.util.Optional;

import com.codahale.metrics.Counter;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Timer;
import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.index.sai.IndexContext;

import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;

public class IndexMetrics extends AbstractMetrics
{
public final Timer memtableIndexWriteLatency;
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
public final Optional<Timer> memtableIndexWriteLatency;

public final Gauge ssTableCellCount;
public final Gauge liveMemtableIndexWriteCount;
Expand All @@ -52,7 +56,9 @@ public IndexMetrics(IndexContext context)
{
super(context.getKeyspace(), context.getTable(), context.getIndexName(), "IndexMetrics");

memtableIndexWriteLatency = Metrics.timer(createMetricName("MemtableIndexWriteLatency"));
memtableIndexWriteLatency = CassandraRelevantProperties.SAI_HISTOGRAMS_ENABLED.getBoolean()
? Optional.of(Metrics.timer(createMetricName("MemtableIndexWriteLatency")))
: Optional.empty();
compactionSegmentCellsPerSecond = Metrics.histogram(createMetricName("CompactionSegmentCellsPerSecond"), false);
compactionSegmentBytesPerSecond = Metrics.histogram(createMetricName("CompactionSegmentBytesPerSecond"), false);
memtableFlushCellsPerSecond = Metrics.histogram(createMetricName("MemtableIndexFlushCellsPerSecond"), false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.cassandra.index.sai.metrics;

import java.util.EnumMap;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -291,7 +292,8 @@ public static class PerQuery extends AbstractQueryMetrics
{
public static final String METRIC_TYPE = "PerQuery";

public final Timer queryLatency;
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
public final Optional<Timer> queryLatency;

/**
* Global metrics for all indices hit during the query.
Expand Down Expand Up @@ -339,7 +341,9 @@ public PerQuery(TableMetadata table, QueryKind queryKind, Predicate<ReadCommand>
{
super(table.keyspace, table.name, METRIC_TYPE, queryKind, filter);

queryLatency = Metrics.timer(createMetricName("QueryLatency"));
queryLatency = CassandraRelevantProperties.SAI_HISTOGRAMS_ENABLED.getBoolean()
? Optional.of(Metrics.timer(createMetricName("QueryLatency")))
: Optional.empty();

sstablesHit = Metrics.histogram(createMetricName("SSTableIndexesHit"), false);
segmentsHit = Metrics.histogram(createMetricName("IndexSegmentsHit"), false);
Expand Down Expand Up @@ -369,7 +373,7 @@ public PerQuery(TableMetadata table, QueryKind queryKind, Predicate<ReadCommand>
@Override
public void record(QueryContext.Snapshot snapshot)
{
queryLatency.update(snapshot.totalQueryTimeNs, TimeUnit.NANOSECONDS);
queryLatency.ifPresent(timer -> timer.update(snapshot.totalQueryTimeNs, TimeUnit.NANOSECONDS));
sstablesHit.update(snapshot.sstablesHit);
segmentsHit.update(snapshot.segmentsHit);
keysFetched.update(snapshot.keysFetched);
Expand Down
142 changes: 122 additions & 20 deletions test/unit/org/apache/cassandra/index/sai/metrics/IndexMetricsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -171,37 +171,37 @@ private void testIndexMetrics(boolean metricsEnabled)
String index = createIndex("CREATE CUSTOM INDEX IF NOT EXISTS ON %s (v1) USING 'StorageAttachedIndex'");

// Test all Gauge metrics
assertMetricExistsIfEnabled(metricsEnabled, "SSTableCellCount", table, index);
assertMetricExistsIfEnabled(metricsEnabled, "LiveMemtableIndexWriteCount", table, index);
assertMetricExistsIfEnabled(metricsEnabled, "DiskUsedBytes", table, index);
assertMetricExistsIfEnabled(metricsEnabled, "MemtableOnHeapIndexBytes", table, index);
assertMetricExistsIfEnabled(metricsEnabled, "MemtableOffHeapIndexBytes", table, index);
assertMetricExistsIfEnabled(metricsEnabled, "IndexFileCacheBytes", table, index);
assertIndexMetricsExistsIfEnabled(metricsEnabled, "SSTableCellCount", table, index);
assertIndexMetricsExistsIfEnabled(metricsEnabled, "LiveMemtableIndexWriteCount", table, index);
assertIndexMetricsExistsIfEnabled(metricsEnabled, "DiskUsedBytes", table, index);
assertIndexMetricsExistsIfEnabled(metricsEnabled, "MemtableOnHeapIndexBytes", table, index);
assertIndexMetricsExistsIfEnabled(metricsEnabled, "MemtableOffHeapIndexBytes", table, index);
assertIndexMetricsExistsIfEnabled(metricsEnabled, "IndexFileCacheBytes", table, index);

// Test all Counter metrics
assertMetricExistsIfEnabled(metricsEnabled, "MemtableIndexFlushCount", table, index);
assertMetricExistsIfEnabled(metricsEnabled, "CompactionCount", table, index);
assertMetricExistsIfEnabled(metricsEnabled, "CompactionTermsProcessedCount", table, index);
assertMetricExistsIfEnabled(metricsEnabled, "MemtableIndexFlushErrors", table, index);
assertMetricExistsIfEnabled(metricsEnabled, "CompactionSegmentFlushErrors", table, index);
assertMetricExistsIfEnabled(metricsEnabled, "QueriesCount", table, index);
assertIndexMetricsExistsIfEnabled(metricsEnabled, "MemtableIndexFlushCount", table, index);
assertIndexMetricsExistsIfEnabled(metricsEnabled, "CompactionCount", table, index);
assertIndexMetricsExistsIfEnabled(metricsEnabled, "CompactionTermsProcessedCount", table, index);
assertIndexMetricsExistsIfEnabled(metricsEnabled, "MemtableIndexFlushErrors", table, index);
assertIndexMetricsExistsIfEnabled(metricsEnabled, "CompactionSegmentFlushErrors", table, index);
assertIndexMetricsExistsIfEnabled(metricsEnabled, "QueriesCount", table, index);

// Test all Histogram metrics
assertMetricExistsIfEnabled(metricsEnabled, "MemtableIndexFlushCellsPerSecond", table, index);
assertMetricExistsIfEnabled(metricsEnabled, "SegmentsPerCompaction", table, index);
assertMetricExistsIfEnabled(metricsEnabled, "CompactionSegmentCellsPerSecond", table, index);
assertMetricExistsIfEnabled(metricsEnabled, "CompactionSegmentBytesPerSecond", table, index);
assertIndexMetricsExistsIfEnabled(metricsEnabled, "MemtableIndexFlushCellsPerSecond", table, index);
assertIndexMetricsExistsIfEnabled(metricsEnabled, "SegmentsPerCompaction", table, index);
assertIndexMetricsExistsIfEnabled(metricsEnabled, "CompactionSegmentCellsPerSecond", table, index);
assertIndexMetricsExistsIfEnabled(metricsEnabled, "CompactionSegmentBytesPerSecond", table, index);

// Test Timer metrics
assertMetricExistsIfEnabled(metricsEnabled, "MemtableIndexWriteLatency", table, index);
assertIndexMetricsExistsIfEnabled(metricsEnabled, "MemtableIndexWriteLatency", table, index);

// Test indexing operations to ensure null indexMetrics is handled gracefully
execute("INSERT INTO %s (id1, v1, v2) VALUES ('0', 0, '0')");
execute("INSERT INTO %s (id1, v1, v2) VALUES ('1', 1, '1')");
execute("INSERT INTO %s (id1, v1, v2) VALUES ('2', 2, '2')");

// Verify MemtableIndexWriteLatency metric behavior after indexing operations
assertMetricExistsIfEnabled(metricsEnabled, "MemtableIndexWriteLatency", table, index);
assertIndexMetricsExistsIfEnabled(metricsEnabled, "MemtableIndexWriteLatency", table, index);
}
finally
{
Expand All @@ -210,9 +210,29 @@ private void testIndexMetrics(boolean metricsEnabled)
}
}

private void assertMetricExistsIfEnabled(boolean shouldExist, String metricName, String table, String index)
private void assertMetricExistsIfEnabled(boolean shouldExist, String metricName, String table, String index, String metricType)
{
ObjectName name = objectName(metricName, KEYSPACE, table, index, "IndexMetrics");
ObjectName name = objectName(metricName, KEYSPACE, table, index, metricType);

if (shouldExist)
assertMetricExists(name);
else
assertMetricDoesNotExist(name);
}

private void assertIndexMetricsExistsIfEnabled(boolean shouldExist, String metricName, String table, String index)
{
assertMetricExistsIfEnabled(shouldExist, metricName, table, index, "IndexMetrics");
}

private void assertColumnQueryMetricsExistsIfEnabled(boolean shouldExist, String metricName, String table, String index)
{
assertMetricExistsIfEnabled(shouldExist, metricName, table, index, "ColumnQueryMetrics");
}

private void assertTableQueryMetricsExistsIfEnabled(boolean shouldExist, String metricName, String table)
{
ObjectName name = objectNameNoIndex(metricName, KEYSPACE, table, "PerQuery");

if (shouldExist)
assertMetricExists(name);
Expand Down Expand Up @@ -442,4 +462,86 @@ public void testCompactionTermsProcessedCountWithAnalyzer()
// Verify compaction count was also incremented
assertEquals(1L, getMetricValue(objectName("CompactionCount", KEYSPACE, table, index, "IndexMetrics")));
}

@Test
public void testHistogramMetricsEnabledAndDisabled()
{
testHistogramMetrics(false);
testHistogramMetrics(true);
}

private void testHistogramMetrics(boolean histogramsEnabled)
{
// Set the property before creating any indexes
CassandraRelevantProperties.SAI_HISTOGRAMS_ENABLED.setBoolean(histogramsEnabled);

try
{
String table = createTable("CREATE TABLE %s (ID1 TEXT PRIMARY KEY, v1 INT, v2 TEXT) WITH compaction = " +
"{'class' : 'SizeTieredCompactionStrategy', 'enabled' : false }");
String indexV1 = createIndex("CREATE CUSTOM INDEX IF NOT EXISTS ON %s (v1) USING 'StorageAttachedIndex'");
String indexV2 = createIndex("CREATE CUSTOM INDEX IF NOT EXISTS ON %s (v2) USING 'StorageAttachedIndex'");

// Test indexing operations to ensure Optional histograms are handled gracefully
for (int i = 0; i < 10; i++)
execute("INSERT INTO %s (id1, v1, v2) VALUES (?, ?, ?)", String.valueOf(i), i, String.valueOf(i));

// Verify MemtableIndexWriteLatency metric behavior after indexing operations
// This metric is created at index creation time in IndexMetrics
assertIndexMetricsExistsIfEnabled(histogramsEnabled, "MemtableIndexWriteLatency", table, indexV1);
assertIndexMetricsExistsIfEnabled(histogramsEnabled, "MemtableIndexWriteLatency", table, indexV2);

// Verify that histograms added before April 2026 release are always enabled
for (String index : new String[]{ indexV1, indexV2})
{
assertIndexMetricsExistsIfEnabled(true, "MemtableIndexFlushCellsPerSecond", table, index);
assertIndexMetricsExistsIfEnabled(true, "SegmentsPerCompaction", table, index);
assertIndexMetricsExistsIfEnabled(true, "CompactionSegmentCellsPerSecond", table, index);
assertIndexMetricsExistsIfEnabled(true, "CompactionSegmentBytesPerSecond", table, index);
}

// Flush to persist data
flush(KEYSPACE, table);

// Execute queries to trigger column query metrics
// v1 is INT, so it uses BKDIndexMetrics with KDTreeIntersectionLatency
ResultSet rows = executeNet("SELECT id1 FROM %s WHERE v1 >= 0 AND v1 < 5");
assertEquals(5, rows.all().size());

// v2 is TEXT, so it uses TrieIndexMetrics with TermsLookupLatency
rows = executeNet("SELECT id1 FROM %s WHERE v2 = '0'");
assertEquals(1, rows.all().size());

// Test KDTreeIntersectionLatency histogram (ColumnQueryMetrics - BKD for INT column)
assertColumnQueryMetricsExistsIfEnabled(histogramsEnabled, "KDTreeIntersectionLatency", table, indexV1);

// Test TermsLookupLatency histogram (ColumnQueryMetrics - Trie for TEXT column)
assertColumnQueryMetricsExistsIfEnabled(histogramsEnabled, "TermsLookupLatency", table, indexV2);

// Test QueryLatency timer (TableQueryMetrics.PerQuery - table-level metric, no index name)
assertTableQueryMetricsExistsIfEnabled(histogramsEnabled, "QueryLatency", table);

// Verify that other PerQuery histograms are always enabled (not affected by the flag)
assertTableQueryMetricsExistsIfEnabled(true, "SSTableIndexesHit", table);
assertTableQueryMetricsExistsIfEnabled(true, "IndexSegmentsHit", table);
assertTableQueryMetricsExistsIfEnabled(true, "KeysFetched", table);
assertTableQueryMetricsExistsIfEnabled(true, "PartitionsFetched", table);
assertTableQueryMetricsExistsIfEnabled(true, "PartitionsReturned", table);
assertTableQueryMetricsExistsIfEnabled(true, "PartitionTombstonesFetched", table);
assertTableQueryMetricsExistsIfEnabled(true, "RowsFetched", table);
assertTableQueryMetricsExistsIfEnabled(true, "RowsReturned", table);
assertTableQueryMetricsExistsIfEnabled(true, "RowTombstonesFetched", table);
assertTableQueryMetricsExistsIfEnabled(true, "KDTreePostingsSkips", table);
assertTableQueryMetricsExistsIfEnabled(true, "KDTreePostingsNumPostings", table);
assertTableQueryMetricsExistsIfEnabled(true, "KDTreePostingsDecodes", table);
assertTableQueryMetricsExistsIfEnabled(true, "PostingsSkips", table);
assertTableQueryMetricsExistsIfEnabled(true, "PostingsDecodes", table);
}
finally
{
// Reset property to default
CassandraRelevantProperties.SAI_HISTOGRAMS_ENABLED.setBoolean(true);
}
}

}