Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/java/org/apache/cassandra/cql3/QueryHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ public static class Prepared

public final MD5Digest resultMetadataId;

/**
* Timestamp of when this prepared statement was created. Used in QueryProcessor.preparedStatements cache
* to ensure that the deletion timestamp always succeeds the insert timestamp.
*/
public final long timestamp;

public final String keyspace;
public final boolean fullyQualified;

Expand All @@ -71,6 +77,7 @@ public Prepared(CQLStatement statement, boolean fullyQualified, String keyspace)
this.resultMetadataId = ResultSet.ResultMetadata.fromPrepared(statement).getResultMetadataId();
this.fullyQualified = fullyQualified;
this.keyspace = keyspace;
this.timestamp = ClientState.getTimestamp();
}
}
}
76 changes: 57 additions & 19 deletions src/java/org/apache/cassandra/cql3/QueryProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.RemovalCause;
import org.antlr.runtime.RecognitionException;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.config.DatabaseDescriptor;
Expand Down Expand Up @@ -119,6 +120,12 @@
// counters. Callers of processStatement are responsible for correctly notifying metrics
public static final CQLMetrics metrics = new CQLMetrics();

// Paging size to use when preloading prepared statements.
public static final int PRELOAD_PREPARED_STATEMENTS_FETCH_SIZE = 5000;

// Size of the prepared statement cache in bytes.
public static long PREPARED_STATEMENT_CACHE_SIZE_BYTES = capacityToBytes(DatabaseDescriptor.getPreparedStatementsCacheSizeMB());

Check warning on line 127 in src/java/org/apache/cassandra/cql3/QueryProcessor.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Rename this field "PREPARED_STATEMENT_CACHE_SIZE_BYTES" to match the regular expression '^[a-z][a-zA-Z0-9]*$'.

See more on https://sonarcloud.io/project/issues?id=cassandra-stargazer&issues=AZ1J0_qpbDscp0XQGqqA&open=AZ1J0_qpbDscp0XQGqqA&pullRequest=2300

Check warning on line 127 in src/java/org/apache/cassandra/cql3/QueryProcessor.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Make PREPARED_STATEMENT_CACHE_SIZE_BYTES a static final constant or non-public and provide accessors if needed.

See more on https://sonarcloud.io/project/issues?id=cassandra-stargazer&issues=AZ1J0_qpbDscp0XQGqp-&open=AZ1J0_qpbDscp0XQGqp-&pullRequest=2300

Check warning on line 127 in src/java/org/apache/cassandra/cql3/QueryProcessor.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Make this "public static PREPARED_STATEMENT_CACHE_SIZE_BYTES" field final

See more on https://sonarcloud.io/project/issues?id=cassandra-stargazer&issues=AZ1J0_qpbDscp0XQGqp_&open=AZ1J0_qpbDscp0XQGqp_&pullRequest=2300

private static final AtomicInteger lastMinuteEvictionsCount = new AtomicInteger(0);

private final List<QueryInterceptor> interceptors = new ArrayList<>();
Expand All @@ -127,17 +134,10 @@
{
preparedStatements = Caffeine.newBuilder()
.executor(MoreExecutors.directExecutor())
.maximumWeight(capacityToBytes(DatabaseDescriptor.getPreparedStatementsCacheSizeMB()))
.maximumWeight(PREPARED_STATEMENT_CACHE_SIZE_BYTES)
.weigher(QueryProcessor::getPrecomputedSize)
.removalListener((key, prepared, cause) -> {
MD5Digest md5Digest = (MD5Digest) key;
if (cause.wasEvicted())
{
metrics.preparedStatementsEvicted.inc();
lastMinuteEvictionsCount.incrementAndGet();
SystemKeyspace.removePreparedStatement(md5Digest);
}
}).build();
.removalListener((key, prepared, cause) -> evictPreparedStatement((MD5Digest) key, cause))
.build();

ScheduledExecutors.scheduledTasks.scheduleAtFixedRate(() -> {
long count = lastMinuteEvictionsCount.getAndSet(0);
Expand All @@ -151,6 +151,16 @@
DatabaseDescriptor.getPreparedStatementsCacheSizeMB());
}

private static void evictPreparedStatement(MD5Digest key, RemovalCause cause)
{
if (cause.wasEvicted())
{
metrics.preparedStatementsEvicted.inc();
lastMinuteEvictionsCount.incrementAndGet();
SystemKeyspace.removePreparedStatement(key);
}
}

private static long capacityToBytes(long cacheSizeMB)
{
return cacheSizeMB * 1024 * 1024;
Expand All @@ -175,6 +185,12 @@
}

public void preloadPreparedStatements()
{
preloadPreparedStatements(PRELOAD_PREPARED_STATEMENTS_FETCH_SIZE);
}

@VisibleForTesting
public int preloadPreparedStatements(int pageSize)
{
int count = SystemKeyspace.loadPreparedStatements((id, query, keyspace) -> {
try
Expand All @@ -191,17 +207,18 @@
// Preload `null` statement for non-fully qualified statements, since it can't be parsed if loaded from cache and will be dropped
if (!prepared.fullyQualified)
preparedStatements.get(computeId(query, null), (ignored_) -> cacheValue);
return true;
return precomputedCacheEntrySize;
}
catch (RequestValidationException e)
{
JVMStabilityInspector.inspectThrowable(e);
logger.warn(String.format("Prepared statement recreation error, removing statement: %s %s %s", id, query, keyspace));
SystemKeyspace.removePreparedStatement(id);
return false;
return null;
}
});
logger.debug("Preloaded {} prepared statements", count);
}, pageSize);
logger.info("Preloaded {} prepared statements", count);
return count;
}


Expand Down Expand Up @@ -544,11 +561,31 @@
public static UntypedResultSet executeInternalWithPaging(String query, PageSize pageSize, Object... values)
{
Prepared prepared = prepareInternal(query);
if (!(prepared.statement instanceof SelectStatement))
return executeInternalWithPaging(prepared.statement, pageSize, values);
}

/**
* Executes with a non-prepared statement using paging. Generally {@link #executeInternalWithPaging(String, PageSize, Object...)}
* should be used instead of this, but this may be used in niche cases like
* {@link SystemKeyspace#loadPreparedStatements(SystemKeyspace.TriFunction)} where prepared statements are
* being loaded into {@link #preparedStatements} so it doesn't make sense to prepare a statement in this context.
*/
public static UntypedResultSet executeOnceInternalWithPaging(String query, int pageSize, Object... values)
{
QueryState queryState = internalQueryState();
CQLStatement statement = parseStatement(query, queryState.getClientState());
statement.validate(queryState);
return executeInternalWithPaging(statement, PageSize.inRows(pageSize), values);
}

private static UntypedResultSet executeInternalWithPaging(CQLStatement statement, PageSize pageSize, Object... values)
{
if (!(statement instanceof SelectStatement))
throw new IllegalArgumentException("Only SELECTs can be paged");

SelectStatement select = (SelectStatement)prepared.statement;
QueryPager pager = select.getQuery(QueryState.forInternalCalls(), makeInternalOptions(prepared.statement, values), FBUtilities.nowInSeconds()).getPager(null, ProtocolVersion.CURRENT);
SelectStatement select = (SelectStatement) statement;
int nowInSec = FBUtilities.nowInSeconds();
QueryPager pager = select.getQuery(QueryState.forInternalCalls(), makeInternalOptions(select, values), nowInSec).getPager(null, ProtocolVersion.CURRENT);
return UntypedResultSet.create(select, pager, pageSize);
}

Expand Down Expand Up @@ -775,7 +812,7 @@
Pair<Prepared, Integer> cacheValue = Pair.create(prepared, precomputedCacheEntrySize);
Pair<Prepared, Integer> previous = preparedStatements.get(statementId, (ignored_) -> cacheValue);
if (previous != null && previous.left() == prepared)
SystemKeyspace.writePreparedStatement(keyspace, statementId, queryString);
SystemKeyspace.writePreparedStatement(keyspace, statementId, queryString, prepared.timestamp);

ResultSet.PreparedMetadata preparedMetadata = ResultSet.PreparedMetadata.fromPrepared(prepared.statement);
ResultSet.ResultMetadata resultMetadata = ResultSet.ResultMetadata.fromPrepared(prepared.statement);
Expand Down Expand Up @@ -898,7 +935,8 @@
}
}

private static int measurePStatementCacheEntrySize(Object key, Prepared value)
@VisibleForTesting
static int measurePStatementCacheEntrySize(Object key, Prepared value)
{
Pair<Prepared, Integer> valuePair = Pair.create(value, 0);
return Ints.checkedCast(ObjectSizes.measureDeep(key) + ObjectSizes.measureDeep(valuePair));
Expand Down
51 changes: 41 additions & 10 deletions src/java/org/apache/cassandra/db/SystemKeyspace.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
import org.apache.cassandra.db.commitlog.CommitLogPosition;
import org.apache.cassandra.db.compaction.CompactionHistoryTabularData;
Expand Down Expand Up @@ -99,8 +100,10 @@
import static java.util.Collections.singletonMap;
import static org.apache.cassandra.config.CassandraRelevantProperties.PERSIST_PREPARED_STATEMENTS;
import static org.apache.cassandra.config.CassandraRelevantProperties.UNSAFE_SYSTEM;
import static org.apache.cassandra.cql3.QueryProcessor.PREPARED_STATEMENT_CACHE_SIZE_BYTES;
import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
import static org.apache.cassandra.cql3.QueryProcessor.executeOnceInternal;
import static org.apache.cassandra.cql3.QueryProcessor.executeOnceInternalWithPaging;

public final class SystemKeyspace
{
Expand Down Expand Up @@ -973,13 +976,13 @@ private static Range<Token> byteBufferToRange(ByteBuffer rawRange, IPartitioner
}
}

public static void writePreparedStatement(String loggedKeyspace, MD5Digest key, String cql)
public static void writePreparedStatement(String loggedKeyspace, MD5Digest key, String cql, long timestamp)
{
if (PERSIST_PREPARED_STATEMENTS.getBoolean())
{
executeInternal(format("INSERT INTO %s (logged_keyspace, prepared_id, query_string) VALUES (?, ?, ?)",
executeInternal(format("INSERT INTO %s (logged_keyspace, prepared_id, query_string) VALUES (?, ?, ?) USING TIMESTAMP ?",
PreparedStatements.toString()),
loggedKeyspace, key.byteBuffer(), cql);
loggedKeyspace, key.byteBuffer(), cql, timestamp);
logger.debug("stored prepared statement for logged keyspace '{}': '{}'", loggedKeyspace, cql);
}
else
Expand All @@ -998,22 +1001,50 @@ public static void resetPreparedStatements()
preparedStatements.truncateBlockingWithoutSnapshot();
}

public static int loadPreparedStatements(TriFunction<MD5Digest, String, String, Boolean> onLoaded)
public static int loadPreparedStatements(TriFunction<MD5Digest, String, String, Integer> onLoaded)
{
return loadPreparedStatements(onLoaded, QueryProcessor.PRELOAD_PREPARED_STATEMENTS_FETCH_SIZE);
}

public static int loadPreparedStatements(TriFunction<MD5Digest, String, String, Integer> onLoaded, int pageSize)
{
String query = String.format("SELECT prepared_id, logged_keyspace, query_string FROM %s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, PREPARED_STATEMENTS);
UntypedResultSet resultSet = executeOnceInternal(query);
UntypedResultSet resultSet = executeOnceInternalWithPaging(query, pageSize);
int counter = 0;

// As the cache size may be briefly exceeded before statements are evicted, we allow loading 110% the cache size
// to avoid logging early.
long preparedBytesLoadThreshold = (long) (PREPARED_STATEMENT_CACHE_SIZE_BYTES * 1.1);
long preparedBytesLoaded = 0L;
for (UntypedResultSet.Row row : resultSet)
{
if (onLoaded.accept(MD5Digest.wrap(row.getByteArray("prepared_id")),
row.getString("query_string"),
row.has("logged_keyspace") ? row.getString("logged_keyspace") : null))
Integer cacheEntrySize = onLoaded.accept(MD5Digest.wrap(row.getByteArray("prepared_id")),
row.getString("query_string"),
row.has("logged_keyspace") ? row.getString("logged_keyspace") : null);
if (cacheEntrySize != null)
{
counter++;
preparedBytesLoaded += Math.max(0, cacheEntrySize);

if (preparedBytesLoaded > preparedBytesLoadThreshold)
{
logger.warn("Detected prepared statement cache filling up during preload after preparing {} " +
"statements (loaded {} with prepared_statements_cache_size being {}). " +
"This could be an indication that prepared statements leaked prior to CASSANDRA-19703 " +
"being fixed. Returning early to prevent indefinite startup. " +
"Consider truncating {}.{} to clear out leaked prepared statements.",
counter,
FileUtils.stringifyFileSize(preparedBytesLoaded),
FileUtils.stringifyFileSize(PREPARED_STATEMENT_CACHE_SIZE_BYTES),
SchemaConstants.SYSTEM_KEYSPACE_NAME, PREPARED_STATEMENTS);
break;
}
}
}
return counter;
}

public static int loadPreparedStatement(MD5Digest digest, TriFunction<MD5Digest, String, String, Boolean> onLoaded)
public static int loadPreparedStatement(MD5Digest digest, TriFunction<MD5Digest, String, String, Integer> onLoaded)
{
String query = String.format("SELECT prepared_id, logged_keyspace, query_string FROM %s.%s WHERE prepared_id = ?", SchemaConstants.SYSTEM_KEYSPACE_NAME, PREPARED_STATEMENTS);
UntypedResultSet resultSet = executeOnceInternal(query, digest.byteBuffer());
Expand All @@ -1022,7 +1053,7 @@ public static int loadPreparedStatement(MD5Digest digest, TriFunction<MD5Digest,
{
if (onLoaded.accept(MD5Digest.wrap(row.getByteArray("prepared_id")),
row.getString("query_string"),
row.has("logged_keyspace") ? row.getString("logged_keyspace") : null))
row.has("logged_keyspace") ? row.getString("logged_keyspace") : null) != null)
counter++;
}
return counter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ public void mixedModeFuzzTest() throws Throwable
SystemKeyspace.loadPreparedStatements((id, query, keyspace) -> {
if (rng.nextBoolean())
QueryProcessor.instance.evictPrepared(id);
return true;
return 0;
});
});
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ public void fuzzTest() throws Throwable
SystemKeyspace.loadPreparedStatements((id, query, keyspace) -> {
if (rng.nextBoolean())
QueryProcessor.instance.evictPrepared(id);
return true;
return 0;
});
});
break;
Expand Down
Loading
Loading