Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,16 @@ public enum CassandraRelevantProperties
*/
SAI_TABLE_STATE_METRICS_ENABLED("cassandra.sai.metrics.table_state.enabled", "true"),

/**
* Whether to enable SAI histogram metrics added after May 2025 release, including
* {@link org.apache.cassandra.index.sai.metrics.IndexMetrics#memtableIndexWriteLatency},
* {@link org.apache.cassandra.index.sai.metrics.ColumnQueryMetrics.TrieIndexMetrics#termsTraversalTotalTime},
* {@link org.apache.cassandra.index.sai.metrics.ColumnQueryMetrics.BKDIndexMetrics#intersectionLatency}, and
* {@link org.apache.cassandra.index.sai.metrics.TableQueryMetrics.PerQuery#queryLatency}.
* Enabled by default.
*/
SAI_HISTOGRAMS_ENABLED("cassandra.sai.metrics.histograms.enabled", "true"),

/**
* If true, while creating or altering schema, NetworkTopologyStrategy won't check if the DC exists.
* This is to remain compatible with older workflows that first change the replication before adding the nodes.
Expand Down
19 changes: 18 additions & 1 deletion src/java/org/apache/cassandra/db/ColumnFamilyStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,15 @@
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -396,6 +398,8 @@ public boolean isInvalidAndShouldDropData()

private final ReentrantLock longRunningSerializedOperationsLock = new ReentrantLock();

private List<Future<?>> initialBuilds = new LinkedList<>();

public static void shutdownPostFlushExecutor() throws InterruptedException
{
postFlushExecutor.shutdown();
Expand Down Expand Up @@ -617,7 +621,7 @@ public ColumnFamilyStore(Keyspace keyspace,
indexManager = new SecondaryIndexManager(this);
for (IndexMetadata info : metadata.get().indexes)
{
indexManager.addIndex(info, true);
initialBuilds.add(indexManager.addIndex(info, true));
}

metric = new TableMetrics(this, memtableFactory.createMemtableMetrics(metadata));
Expand Down Expand Up @@ -648,6 +652,19 @@ public ColumnFamilyStore(Keyspace keyspace,
sstableImporter = new SSTableImporter(this);
}

/**
* Waits for the completion of index builds created during CFS initialization.
* <p>
* This method blocks until all initial index builds have been completed or the timeout expires.
*
* @param timeout the maximum time to wait before timing out
* @param unit the time unit of the timeout argument
*/
public void awaitInitialIndexBuilds(long timeout, TimeUnit unit)
{
FBUtilities.waitOnFutures(initialBuilds, timeout, unit);
}

public static String getTableMBeanName(String ks, String name, boolean isIndex)
{
return String.format("org.apache.cassandra.db:type=%s,keyspace=%s,table=%s",
Expand Down
3 changes: 2 additions & 1 deletion src/java/org/apache/cassandra/index/sai/IndexContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,8 @@ public void index(DecoratedKey key, Row row, Memtable memtable, OpOrder.Group op
ByteBuffer value = getValueOf(key, row, FBUtilities.nowInSeconds());
target.index(key, row.clustering(), value, memtable, opGroup);
}
indexMetrics.ifPresent(metrics -> metrics.memtableIndexWriteLatency.update(System.nanoTime() - start, TimeUnit.NANOSECONDS));
indexMetrics.flatMap(metrics -> metrics.memtableIndexWriteLatency).ifPresent(timer ->
timer.update(System.nanoTime() - start, TimeUnit.NANOSECONDS));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@
*/
package org.apache.cassandra.index.sai.metrics;

import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;

import com.codahale.metrics.Counter;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import io.github.jbellis.jvector.graph.SearchResult;
import org.apache.cassandra.config.CassandraRelevantProperties;

import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;

Expand All @@ -41,15 +43,18 @@ public static class TrieIndexMetrics extends ColumnQueryMetrics implements Query
/**
* Trie index metrics.
*/
public final Timer termsTraversalTotalTime;
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
public final Optional<Timer> termsTraversalTotalTime;

public final QueryEventListener.PostingListEventListener postingsListener;

public TrieIndexMetrics(String keyspace, String table, String indexName)
{
super(keyspace, table, indexName);

termsTraversalTotalTime = Metrics.timer(createMetricName("TermsLookupLatency"));
termsTraversalTotalTime = CassandraRelevantProperties.SAI_HISTOGRAMS_ENABLED.getBoolean()
? Optional.of(Metrics.timer(createMetricName("TermsLookupLatency")))
: Optional.empty();

Meter postingDecodes = Metrics.meter(createMetricName("PostingDecodes", TRIE_POSTINGS_TYPE));

Expand All @@ -62,7 +67,7 @@ public void onSegmentHit() { }
@Override
public void onTraversalComplete(long traversalTotalTime, TimeUnit unit)
{
termsTraversalTotalTime.update(traversalTotalTime, unit);
termsTraversalTotalTime.ifPresent(timer -> timer.update(traversalTotalTime, unit));
}

@Override
Expand All @@ -79,7 +84,8 @@ public static class BKDIndexMetrics extends ColumnQueryMetrics implements QueryE
/**
* BKD index metrics.
*/
public final Timer intersectionLatency;
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
public final Optional<Timer> intersectionLatency;
public final Meter postingsNumPostings;
public final Meter intersectionEarlyExits;

Expand All @@ -89,7 +95,9 @@ public BKDIndexMetrics(String keyspace, String table, String indexName)
{
super(keyspace, table, indexName);

intersectionLatency = Metrics.timer(createMetricName("KDTreeIntersectionLatency"));
intersectionLatency = CassandraRelevantProperties.SAI_HISTOGRAMS_ENABLED.getBoolean()
? Optional.of(Metrics.timer(createMetricName("KDTreeIntersectionLatency")))
: Optional.empty();
intersectionEarlyExits = Metrics.meter(createMetricName("KDTreeIntersectionEarlyExits"));

postingsNumPostings = Metrics.meter(createMetricName("NumPostings", BKD_POSTINGS_TYPE));
Expand All @@ -102,7 +110,7 @@ public BKDIndexMetrics(String keyspace, String table, String indexName)
@Override
public void onIntersectionComplete(long intersectionTotalTime, TimeUnit unit)
{
intersectionLatency.update(intersectionTotalTime, unit);
intersectionLatency.ifPresent(timer -> timer.update(intersectionTotalTime, unit));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,21 @@
*/
package org.apache.cassandra.index.sai.metrics;

import java.util.Optional;

import com.codahale.metrics.Counter;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Timer;
import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.index.sai.IndexContext;

import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;

public class IndexMetrics extends AbstractMetrics
{
public final Timer memtableIndexWriteLatency;
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
public final Optional<Timer> memtableIndexWriteLatency;

public final Gauge ssTableCellCount;
public final Gauge liveMemtableIndexWriteCount;
Expand All @@ -52,7 +56,9 @@ public IndexMetrics(IndexContext context)
{
super(context.getKeyspace(), context.getTable(), context.getIndexName(), "IndexMetrics");

memtableIndexWriteLatency = Metrics.timer(createMetricName("MemtableIndexWriteLatency"));
memtableIndexWriteLatency = CassandraRelevantProperties.SAI_HISTOGRAMS_ENABLED.getBoolean()
? Optional.of(Metrics.timer(createMetricName("MemtableIndexWriteLatency")))
: Optional.empty();
compactionSegmentCellsPerSecond = Metrics.histogram(createMetricName("CompactionSegmentCellsPerSecond"), false);
compactionSegmentBytesPerSecond = Metrics.histogram(createMetricName("CompactionSegmentBytesPerSecond"), false);
memtableFlushCellsPerSecond = Metrics.histogram(createMetricName("MemtableIndexFlushCellsPerSecond"), false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.cassandra.index.sai.metrics;

import java.util.EnumMap;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -291,7 +292,8 @@ public static class PerQuery extends AbstractQueryMetrics
{
public static final String METRIC_TYPE = "PerQuery";

public final Timer queryLatency;
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
public final Optional<Timer> queryLatency;

/**
* Global metrics for all indices hit during the query.
Expand Down Expand Up @@ -339,7 +341,9 @@ public PerQuery(TableMetadata table, QueryKind queryKind, Predicate<ReadCommand>
{
super(table.keyspace, table.name, METRIC_TYPE, queryKind, filter);

queryLatency = Metrics.timer(createMetricName("QueryLatency"));
queryLatency = CassandraRelevantProperties.SAI_HISTOGRAMS_ENABLED.getBoolean()
? Optional.of(Metrics.timer(createMetricName("QueryLatency")))
: Optional.empty();

sstablesHit = Metrics.histogram(createMetricName("SSTableIndexesHit"), false);
segmentsHit = Metrics.histogram(createMetricName("IndexSegmentsHit"), false);
Expand Down Expand Up @@ -369,7 +373,7 @@ public PerQuery(TableMetadata table, QueryKind queryKind, Predicate<ReadCommand>
@Override
public void record(QueryContext.Snapshot snapshot)
{
queryLatency.update(snapshot.totalQueryTimeNs, TimeUnit.NANOSECONDS);
queryLatency.ifPresent(timer -> timer.update(snapshot.totalQueryTimeNs, TimeUnit.NANOSECONDS));
sstablesHit.update(snapshot.sstablesHit);
segmentsHit.update(snapshot.segmentsHit);
keysFetched.update(snapshot.keysFetched);
Expand Down
Loading
Loading