From 1fc798ee8d814f57281728a58daa7d96d9ebfe69 Mon Sep 17 00:00:00 2001 From: Brandon Williams Date: Wed, 1 Apr 2026 10:12:07 -0500 Subject: [PATCH 1/2] CNDB-17328: backport CASSANDRA-19703: prepared statements get evicted too early --- .../apache/cassandra/cql3/QueryHandler.java | 7 + .../apache/cassandra/cql3/QueryProcessor.java | 76 ++++-- .../apache/cassandra/db/SystemKeyspace.java | 51 +++- .../distributed/test/MixedModeFuzzTest.java | 2 +- .../distributed/test/ReprepareFuzzTest.java | 2 +- .../cassandra/cql3/PstmtPersistenceTest.java | 238 +++++++++++++++++- 6 files changed, 336 insertions(+), 40 deletions(-) diff --git a/src/java/org/apache/cassandra/cql3/QueryHandler.java b/src/java/org/apache/cassandra/cql3/QueryHandler.java index 41aaa9941491..da55ce6292ec 100644 --- a/src/java/org/apache/cassandra/cql3/QueryHandler.java +++ b/src/java/org/apache/cassandra/cql3/QueryHandler.java @@ -62,6 +62,12 @@ public static class Prepared public final MD5Digest resultMetadataId; + /** + * Timestamp of when this prepared statement was created. Used in QueryProcessor.preparedStatements cache + * to ensure that the deletion timestamp always succeeds the insert timestamp. + */ + public final long timestamp; + public final String keyspace; public final boolean fullyQualified; @@ -71,6 +77,7 @@ public Prepared(CQLStatement statement, boolean fullyQualified, String keyspace) this.resultMetadataId = ResultSet.ResultMetadata.fromPrepared(statement).getResultMetadataId(); this.fullyQualified = fullyQualified; this.keyspace = keyspace; + this.timestamp = ClientState.getTimestamp(); } } } diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java index 12d7c97cd365..0773abdffb97 100644 --- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java +++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java @@ -41,6 +41,7 @@ import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.RemovalCause; import org.antlr.runtime.RecognitionException; import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.config.DatabaseDescriptor; @@ -119,6 +120,12 @@ public class QueryProcessor implements QueryHandler // counters. Callers of processStatement are responsible for correctly notifying metrics public static final CQLMetrics metrics = new CQLMetrics(); + // Paging size to use when preloading prepared statements. + public static final int PRELOAD_PREPARED_STATEMENTS_FETCH_SIZE = 5000; + + // Size of the prepared statement cache in bytes. + public static long PREPARED_STATEMENT_CACHE_SIZE_BYTES = capacityToBytes(DatabaseDescriptor.getPreparedStatementsCacheSizeMB()); + private static final AtomicInteger lastMinuteEvictionsCount = new AtomicInteger(0); private final List interceptors = new ArrayList<>(); @@ -127,17 +134,10 @@ public class QueryProcessor implements QueryHandler { preparedStatements = Caffeine.newBuilder() .executor(MoreExecutors.directExecutor()) - .maximumWeight(capacityToBytes(DatabaseDescriptor.getPreparedStatementsCacheSizeMB())) + .maximumWeight(PREPARED_STATEMENT_CACHE_SIZE_BYTES) .weigher(QueryProcessor::getPrecomputedSize) - .removalListener((key, prepared, cause) -> { - MD5Digest md5Digest = (MD5Digest) key; - if (cause.wasEvicted()) - { - metrics.preparedStatementsEvicted.inc(); - lastMinuteEvictionsCount.incrementAndGet(); - SystemKeyspace.removePreparedStatement(md5Digest); - } - }).build(); + .removalListener((key, prepared, cause) -> evictPreparedStatement((MD5Digest) key, cause)) + .build(); ScheduledExecutors.scheduledTasks.scheduleAtFixedRate(() -> { long count = lastMinuteEvictionsCount.getAndSet(0); @@ -151,6 +151,16 @@ public class QueryProcessor implements QueryHandler DatabaseDescriptor.getPreparedStatementsCacheSizeMB()); } + private static void evictPreparedStatement(MD5Digest key, RemovalCause cause) + { + if (cause.wasEvicted()) + { + metrics.preparedStatementsEvicted.inc(); + lastMinuteEvictionsCount.incrementAndGet(); + SystemKeyspace.removePreparedStatement(key); + } + } + private static long capacityToBytes(long cacheSizeMB) { return cacheSizeMB * 1024 * 1024; @@ -175,6 +185,12 @@ private enum InternalStateInstance } public void preloadPreparedStatements() + { + preloadPreparedStatements(PRELOAD_PREPARED_STATEMENTS_FETCH_SIZE); + } + + @VisibleForTesting + public int preloadPreparedStatements(int pageSize) { int count = SystemKeyspace.loadPreparedStatements((id, query, keyspace) -> { try @@ -191,17 +207,18 @@ public void preloadPreparedStatements() // Preload `null` statement for non-fully qualified statements, since it can't be parsed if loaded from cache and will be dropped if (!prepared.fullyQualified) preparedStatements.get(computeId(query, null), (ignored_) -> cacheValue); - return true; + return precomputedCacheEntrySize; } catch (RequestValidationException e) { JVMStabilityInspector.inspectThrowable(e); logger.warn(String.format("Prepared statement recreation error, removing statement: %s %s %s", id, query, keyspace)); SystemKeyspace.removePreparedStatement(id); - return false; + return null; } - }); - logger.debug("Preloaded {} prepared statements", count); + }, pageSize); + logger.info("Preloaded {} prepared statements", count); + return count; } @@ -544,11 +561,31 @@ static UntypedResultSet executeOnce(String query, ConsistencyLevel cl, Object... public static UntypedResultSet executeInternalWithPaging(String query, PageSize pageSize, Object... values) { Prepared prepared = prepareInternal(query); - if (!(prepared.statement instanceof SelectStatement)) + return executeInternalWithPaging(prepared.statement, pageSize, values); + } + + /** + * Executes with a non-prepared statement using paging. Generally {@link #executeInternalWithPaging(String, PageSize, Object...)} + * should be used instead of this, but this may be used in niche cases like + * {@link SystemKeyspace#loadPreparedStatements(SystemKeyspace.TriFunction)} where prepared statements are + * being loaded into {@link #preparedStatements} so it doesn't make sense to prepare a statement in this context. + */ + public static UntypedResultSet executeOnceInternalWithPaging(String query, int pageSize, Object... values) + { + QueryState queryState = internalQueryState(); + CQLStatement statement = parseStatement(query, queryState.getClientState()); + statement.validate(queryState); + return executeInternalWithPaging(statement, PageSize.inRows(pageSize), values); + } + + private static UntypedResultSet executeInternalWithPaging(CQLStatement statement, PageSize pageSize, Object... values) + { + if (!(statement instanceof SelectStatement)) throw new IllegalArgumentException("Only SELECTs can be paged"); - SelectStatement select = (SelectStatement)prepared.statement; - QueryPager pager = select.getQuery(QueryState.forInternalCalls(), makeInternalOptions(prepared.statement, values), FBUtilities.nowInSeconds()).getPager(null, ProtocolVersion.CURRENT); + SelectStatement select = (SelectStatement) statement; + int nowInSec = FBUtilities.nowInSeconds(); + QueryPager pager = select.getQuery(QueryState.forInternalCalls(), makeInternalOptions(select, values), nowInSec).getPager(null, ProtocolVersion.CURRENT); return UntypedResultSet.create(select, pager, pageSize); } @@ -775,7 +812,7 @@ public static ResultMessage.Prepared storePreparedStatement(String queryString, Pair cacheValue = Pair.create(prepared, precomputedCacheEntrySize); Pair previous = preparedStatements.get(statementId, (ignored_) -> cacheValue); if (previous != null && previous.left() == prepared) - SystemKeyspace.writePreparedStatement(keyspace, statementId, queryString); + SystemKeyspace.writePreparedStatement(keyspace, statementId, queryString, prepared.timestamp); ResultSet.PreparedMetadata preparedMetadata = ResultSet.PreparedMetadata.fromPrepared(prepared.statement); ResultSet.ResultMetadata resultMetadata = ResultSet.ResultMetadata.fromPrepared(prepared.statement); @@ -898,7 +935,8 @@ public static CQLStatement.Raw parseStatement(String queryStr) throws SyntaxExce } } - private static int measurePStatementCacheEntrySize(Object key, Prepared value) + @VisibleForTesting + static int measurePStatementCacheEntrySize(Object key, Prepared value) { Pair valuePair = Pair.create(value, 0); return Ints.checkedCast(ObjectSizes.measureDeep(key) + ObjectSizes.measureDeep(valuePair)); diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java index 299247890f3d..7bba18a7b55b 100644 --- a/src/java/org/apache/cassandra/db/SystemKeyspace.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java @@ -50,6 +50,7 @@ import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.cql3.UntypedResultSet; +import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.cql3.statements.schema.CreateTableStatement; import org.apache.cassandra.db.commitlog.CommitLogPosition; import org.apache.cassandra.db.compaction.CompactionHistoryTabularData; @@ -99,8 +100,10 @@ import static java.util.Collections.singletonMap; import static org.apache.cassandra.config.CassandraRelevantProperties.PERSIST_PREPARED_STATEMENTS; import static org.apache.cassandra.config.CassandraRelevantProperties.UNSAFE_SYSTEM; +import static org.apache.cassandra.cql3.QueryProcessor.PREPARED_STATEMENT_CACHE_SIZE_BYTES; import static org.apache.cassandra.cql3.QueryProcessor.executeInternal; import static org.apache.cassandra.cql3.QueryProcessor.executeOnceInternal; +import static org.apache.cassandra.cql3.QueryProcessor.executeOnceInternalWithPaging; public final class SystemKeyspace { @@ -973,13 +976,13 @@ private static Range byteBufferToRange(ByteBuffer rawRange, IPartitioner } } - public static void writePreparedStatement(String loggedKeyspace, MD5Digest key, String cql) + public static void writePreparedStatement(String loggedKeyspace, MD5Digest key, String cql, long timestamp) { if (PERSIST_PREPARED_STATEMENTS.getBoolean()) { - executeInternal(format("INSERT INTO %s (logged_keyspace, prepared_id, query_string) VALUES (?, ?, ?)", + executeInternal(format("INSERT INTO %s (logged_keyspace, prepared_id, query_string) VALUES (?, ?, ?) USING TIMESTAMP ?", PreparedStatements.toString()), - loggedKeyspace, key.byteBuffer(), cql); + loggedKeyspace, key.byteBuffer(), cql, timestamp); logger.debug("stored prepared statement for logged keyspace '{}': '{}'", loggedKeyspace, cql); } else @@ -998,22 +1001,50 @@ public static void resetPreparedStatements() preparedStatements.truncateBlockingWithoutSnapshot(); } - public static int loadPreparedStatements(TriFunction onLoaded) + public static int loadPreparedStatements(TriFunction onLoaded) + { + return loadPreparedStatements(onLoaded, QueryProcessor.PRELOAD_PREPARED_STATEMENTS_FETCH_SIZE); + } + + public static int loadPreparedStatements(TriFunction onLoaded, int pageSize) { String query = String.format("SELECT prepared_id, logged_keyspace, query_string FROM %s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, PREPARED_STATEMENTS); - UntypedResultSet resultSet = executeOnceInternal(query); + UntypedResultSet resultSet = executeOnceInternalWithPaging(query, pageSize); int counter = 0; + + // As the cache size may be briefly exceeded before statements are evicted, we allow loading 110% the cache size + // to avoid logging early. + long preparedBytesLoadThreshold = (long) (PREPARED_STATEMENT_CACHE_SIZE_BYTES * 1.1); + long preparedBytesLoaded = 0L; for (UntypedResultSet.Row row : resultSet) { - if (onLoaded.accept(MD5Digest.wrap(row.getByteArray("prepared_id")), - row.getString("query_string"), - row.has("logged_keyspace") ? row.getString("logged_keyspace") : null)) + Integer cacheEntrySize = onLoaded.accept(MD5Digest.wrap(row.getByteArray("prepared_id")), + row.getString("query_string"), + row.has("logged_keyspace") ? row.getString("logged_keyspace") : null); + if (cacheEntrySize != null) + { counter++; + preparedBytesLoaded += Math.max(0, cacheEntrySize); + + if (preparedBytesLoaded > preparedBytesLoadThreshold) + { + logger.warn("Detected prepared statement cache filling up during preload after preparing {} " + + "statements (loaded {} with prepared_statements_cache_size being {}). " + + "This could be an indication that prepared statements leaked prior to CASSANDRA-19703 " + + "being fixed. Returning early to prevent indefinite startup. " + + "Consider truncating {}.{} to clear out leaked prepared statements.", + counter, + FileUtils.stringifyFileSize(preparedBytesLoaded), + FileUtils.stringifyFileSize(PREPARED_STATEMENT_CACHE_SIZE_BYTES), + SchemaConstants.SYSTEM_KEYSPACE_NAME, PREPARED_STATEMENTS); + break; + } + } } return counter; } - public static int loadPreparedStatement(MD5Digest digest, TriFunction onLoaded) + public static int loadPreparedStatement(MD5Digest digest, TriFunction onLoaded) { String query = String.format("SELECT prepared_id, logged_keyspace, query_string FROM %s.%s WHERE prepared_id = ?", SchemaConstants.SYSTEM_KEYSPACE_NAME, PREPARED_STATEMENTS); UntypedResultSet resultSet = executeOnceInternal(query, digest.byteBuffer()); @@ -1022,7 +1053,7 @@ public static int loadPreparedStatement(MD5Digest digest, TriFunction { if (rng.nextBoolean()) QueryProcessor.instance.evictPrepared(id); - return true; + return 0; }); }); break; diff --git a/test/distributed/org/apache/cassandra/distributed/test/ReprepareFuzzTest.java b/test/distributed/org/apache/cassandra/distributed/test/ReprepareFuzzTest.java index f56847f68c9a..97e85cc4c0b6 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/ReprepareFuzzTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/ReprepareFuzzTest.java @@ -228,7 +228,7 @@ public void fuzzTest() throws Throwable SystemKeyspace.loadPreparedStatements((id, query, keyspace) -> { if (rng.nextBoolean()) QueryProcessor.instance.evictPrepared(id); - return true; + return 0; }); }); break; diff --git a/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java b/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java index db2347534290..e2fde3ca3573 100644 --- a/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java +++ b/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java @@ -21,32 +21,61 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.apache.cassandra.db.ReadQuery; import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.db.marshal.Int32Type; import org.apache.cassandra.db.marshal.UTF8Type; import org.apache.cassandra.schema.SchemaConstants; import org.apache.cassandra.schema.SchemaKeyspaceTables; +import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.QueryState; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.MD5Digest; +import org.jboss.byteman.contrib.bmunit.BMRule; +import org.jboss.byteman.contrib.bmunit.BMRules; +import org.jboss.byteman.contrib.bmunit.BMUnitRunner; import static org.apache.cassandra.config.CassandraRelevantProperties.PERSIST_PREPARED_STATEMENTS; import static org.junit.Assert.*; +@RunWith(BMUnitRunner.class) public class PstmtPersistenceTest extends CQLTester { + private static final CompletableFuture[] futureArray = new CompletableFuture[0]; + + private static final ConcurrentMap preparedStatementLoadTimestamps = new ConcurrentHashMap<>(); + private static final ConcurrentMap preparedStatementRemoveTimestamps = new ConcurrentHashMap<>(); + + // page size passed to preloadPreparedStatements + private static final int PRELOAD_PAGE_SIZE = 100; + + // recorded page invocations in preloadPreparedStatements + private static final AtomicInteger pageInvocations = new AtomicInteger(); + @Before public void setUp() { + preparedStatementLoadTimestamps.clear(); + preparedStatementRemoveTimestamps.clear(); + QueryProcessor.clearPreparedStatements(false); } - + @Test public void testCachedPreparedStatements() throws Throwable { @@ -103,7 +132,7 @@ public void testCachedPreparedStatements() throws Throwable Assert.assertNotNull(prepared); } - // add anther prepared statement and sync it to table + // add another prepared statement and sync it to table prepareStatement(statement2, "foo", "bar", clientState); // statement1 will have two statements prepared because of `setKeyspace` usage @@ -176,21 +205,23 @@ public void testPstmtInvalidation() throws Throwable createTable("CREATE TABLE %s (key int primary key, val int)"); + long initialEvicted = numberOfEvictedStatements(); + for (int cnt = 1; cnt < 10000; cnt++) { prepareStatement("INSERT INTO %s (key, val) VALUES (?, ?) USING TIMESTAMP " + cnt, clientState); - if (numberOfEvictedStatements() > 0) + if (numberOfEvictedStatements() - initialEvicted > 0) { assertEquals("Number of statements in table and in cache don't match", numberOfStatementsInMemory(), numberOfStatementsOnDisk()); - // prepare a more statements to trigger more evictions - for (int cnt2 = 1; cnt2 < 10; cnt2++) + // prepare more statements to trigger more evictions + for (int cnt2 = cnt + 1; cnt2 < cnt + 10; cnt2++) prepareStatement("INSERT INTO %s (key, val) VALUES (?, ?) USING TIMESTAMP " + cnt2, clientState); // each new prepared statement should have caused an eviction - assertEquals("eviction count didn't increase by the expected number", numberOfEvictedStatements(), 10); - assertEquals("Number of statements in table and in cache don't match", numberOfStatementsInMemory(), numberOfStatementsOnDisk()); + assertEquals("eviction count didn't increase by the expected number", 10, numberOfEvictedStatements() - initialEvicted); + assertEquals("Number of statements in memory (expected) and table (actual) don't match", numberOfStatementsInMemory(), numberOfStatementsOnDisk()); return; } @@ -199,6 +230,196 @@ public void testPstmtInvalidation() throws Throwable fail("Prepared statement eviction does not work"); } + @Test + @BMRules(rules= { + @BMRule(name = "CaptureWriteTimestamps", + targetClass = "SystemKeyspace", + targetMethod = "writePreparedStatement(String, MD5Digest, String, long)", + targetLocation = "AT INVOKE executeInternal", + action = "org.apache.cassandra.cql3.PstmtPersistenceTest.preparedStatementLoadTimestamps.put($key, $timestamp);" + ), + @BMRule(name = "CaptureEvictTimestamps", + targetClass = "QueryProcessor", + targetMethod = "evictPreparedStatement(MD5Digest, RemovalCause)", + action = "org.apache.cassandra.cql3.PstmtPersistenceTest.preparedStatementRemoveTimestamps.put($key, org.apache.cassandra.service.ClientState.getTimestamp());" + ) + }) + public void testAsyncPstmtInvalidation() throws Throwable + { + ClientState clientState = ClientState.forInternalCalls(); + createTable("CREATE TABLE %s (key int primary key, val int)"); + + // prepare statements concurrently in a thread pool to exercise bug encountered in CASSANDRA-19703 where + // delete from table occurs before the insert due to early eviction. + final ExecutorService executor = Executors.newFixedThreadPool(10); + + long initialEvicted = numberOfEvictedStatements(); + try + { + int initialMaxStatementsToPrepare = 10000; + int maxStatementsToPrepare = initialMaxStatementsToPrepare; + boolean hasEvicted = false; + int concurrency = 100; + List> prepareFutures = new ArrayList<>(concurrency); + + for (int cnt = 1; cnt <= maxStatementsToPrepare; cnt++) + { + final int localCnt = cnt; + prepareFutures.add(CompletableFuture.supplyAsync(() -> prepareStatement("INSERT INTO %s (key, val) VALUES (?, ?) USING TIMESTAMP " + localCnt, clientState), executor)); + + if (prepareFutures.size() == concurrency) + { + // Await completion of current inflight futures + CompletableFuture.allOf(prepareFutures.toArray(futureArray)).get(10, TimeUnit.SECONDS); + prepareFutures.clear(); + } + + // Once we've detected evictions, prepare as many statements as we've prepared so far to initialMaxStatementsToPrepare and then stop. + if (!hasEvicted && numberOfEvictedStatements() - initialEvicted > 0) + { + maxStatementsToPrepare = Math.min(cnt * 2, initialMaxStatementsToPrepare); + hasEvicted = true; + } + } + + long evictedStatements = numberOfEvictedStatements() - initialEvicted; + assertNotEquals("Should have evicted some prepared statements", 0, evictedStatements); + + // Recorded prepared statement removals should match metrics + assertEquals("Actual evicted statements does not match metrics", evictedStatements, preparedStatementRemoveTimestamps.size()); + + // For each prepared statement evicted, assert the time it was deleted is greater than the timestamp + // used for when it was loaded. + for (Map.Entry evictedStatementEntry : preparedStatementRemoveTimestamps.entrySet()) + { + MD5Digest key = evictedStatementEntry.getKey(); + long deletionTimestamp = evictedStatementEntry.getValue(); + long insertionTimestamp = preparedStatementLoadTimestamps.get(key); + + assertTrue(String.format("Expected deletion timestamp for prepared statement (%d) to be greater than insertion timestamp (%d)", + deletionTimestamp, insertionTimestamp), + deletionTimestamp > insertionTimestamp); + } + + // ensure the number of statements on disk match the number in memory, if number of statements on disk eclipses in memory, there was a leak. + assertEquals("Number of statements in memory (expected) and table (actual) don't match", numberOfStatementsInMemory(), numberOfStatementsOnDisk()); + } + finally + { + executor.shutdown(); + } + } + + /** + * Invoked whenever paging happens in testPreloadPreparedStatements, increments PAGE_INVOCATIONS when we detect + * paging happening in the path of QueryProcessor.preloadPreparedStatements with the expected page size. + */ + @SuppressWarnings("unused") + private static void nextPageReadQuery(ReadQuery query, PageSize pageSize) + { + TableMetadata metadata = query.metadata(); + if (metadata.keyspace.equals(SchemaConstants.SYSTEM_KEYSPACE_NAME) && + metadata.name.equals(SystemKeyspace.PREPARED_STATEMENTS) && + pageSize.rows() == PRELOAD_PAGE_SIZE) + { + for (StackTraceElement stackTraceElement : Thread.currentThread().getStackTrace()) + { + if (stackTraceElement.getClassName().equals(QueryProcessor.class.getName()) && stackTraceElement.getMethodName().equals("preloadPreparedStatements")) + { + pageInvocations.incrementAndGet(); + return; + } + } + } + } + + @Test + @BMRule(name = "CapturePageInvocations", + targetClass = "PartitionRangeQueryPager", + targetMethod = "nextPageReadQuery(PageSize, DataLimits)", + action = "org.apache.cassandra.cql3.PstmtPersistenceTest.nextPageReadQuery($this.query, $pageSize)") + public void testPreloadPreparedStatements() throws Throwable + { + ClientState clientState = ClientState.forInternalCalls(); + createTable("CREATE TABLE %s (key int primary key, val int)"); + + // Prepare more statements than the paging size to ensure paging works properly. + int statementsToPrepare = 750; + + for (int cnt = 1; cnt <= statementsToPrepare; cnt++) + { + prepareStatement("INSERT INTO %s (key, val) VALUES (?, ?) USING TIMESTAMP " + cnt, clientState); + } + + // Capture how many statements are in memory before clearing cache. + long statementsInMemory = numberOfStatementsInMemory(); + long statementsOnDisk = numberOfStatementsOnDisk(); + assertEquals(statementsOnDisk, statementsInMemory); + + // Drop prepared statements from cache only and ensure the cache empties out. + QueryProcessor.clearPreparedStatements(true); + assertEquals(0, numberOfStatementsInMemory()); + + // Load prepared statements and ensure the cache size matches max + QueryProcessor.instance.preloadPreparedStatements(PRELOAD_PAGE_SIZE); + + long statementsInMemoryAfterLoading = numberOfStatementsInMemory(); + // Ensure size of cache matches statements that were on disk before preload + assertEquals("Statements prepared - evicted (expected) does not match statements in memory (actual)", + statementsOnDisk, statementsInMemoryAfterLoading); + + // Number of statements on disk should match memory + assertEquals(statementsInMemoryAfterLoading, numberOfStatementsOnDisk()); + + // Ensure only executed the expected amount of pages. + int expectedPageInvocations = (int) Math.ceil(statementsInMemoryAfterLoading / (double) PRELOAD_PAGE_SIZE); + assertEquals(expectedPageInvocations, pageInvocations.get()); + } + + @Test + public void testPreloadPreparedStatementsUntilCacheFull() + { + QueryHandler handler = ClientState.getCQLQueryHandler(); + ClientState clientState = ClientState.forInternalCalls(); + createTable("CREATE TABLE %s (key int primary key, val int)"); + + // Fill up and clear the prepared statement cache several times to load up the system.prepared_statements table. + // This simulates a 'leak' of prepared statements akin to CASSANDRA-19703 as the system.prepared_statements + // table is able to grow to a larger size than the in memory prepared statement cache. In such a case we + // should detect a possible leak and defer paging indefinitely by returning early in preloadPreparedStatements. + int statementsLoadedWhenFull = -1; + long accumulatedSize = 0; + // load enough prepared statements to fill the cache 5 times. + for (int cnt = 0; accumulatedSize < QueryProcessor.PREPARED_STATEMENT_CACHE_SIZE_BYTES * 5; cnt++) + { + MD5Digest id = prepareStatement("INSERT INTO %s (key, val) VALUES (?, ?) USING TIMESTAMP " + cnt, clientState); + QueryHandler.Prepared prepared = handler.getPrepared(id); + int cacheEntrySize = QueryProcessor.measurePStatementCacheEntrySize(id, prepared); + assertTrue(cacheEntrySize > 0); + accumulatedSize += cacheEntrySize; + if (statementsLoadedWhenFull == -1 && accumulatedSize > QueryProcessor.PREPARED_STATEMENT_CACHE_SIZE_BYTES) + { + statementsLoadedWhenFull = cnt; + } + // clear cache repeatedly to avoid eviction. + QueryProcessor.clearPreparedStatements(true); + } + + int preloadedStatements = QueryProcessor.instance.preloadPreparedStatements(PRELOAD_PAGE_SIZE); + + // Should have loaded as many statements as we detected were loaded before cache would be full. + assertTrue(String.format("Preloaded %d statements, expected at least %d", + preloadedStatements, statementsLoadedWhenFull), + preloadedStatements > statementsLoadedWhenFull); + + // We should only expect to load how many statements we were able to load before filling the cache + // + a buffer of 110%, set to 1.5x just to deal with sensitivity of detecting cache filling up. + int atMostPreloadedExpected = (int) (statementsLoadedWhenFull * 1.5); + assertTrue(String.format("Preloaded %d statements, but only expected that we'd load at most %d", + preloadedStatements, atMostPreloadedExpected), + preloadedStatements <= atMostPreloadedExpected); + } + private long numberOfStatementsOnDisk() throws Throwable { UntypedResultSet.Row row = execute("SELECT COUNT(*) FROM " + SchemaConstants.SYSTEM_KEYSPACE_NAME + '.' + SystemKeyspace.PREPARED_STATEMENTS).one(); @@ -222,7 +443,6 @@ private MD5Digest prepareStatement(String stmt, ClientState clientState) private MD5Digest prepareStatement(String stmt, String keyspace, String table, ClientState clientState) { - System.out.println(stmt + String.format(stmt, keyspace + "." + table)); - return QueryProcessor.instance.prepare(String.format(stmt, keyspace + "." + table), clientState).statementId; + return QueryProcessor.instance.prepare(String.format(stmt, keyspace + '.' + table), clientState).statementId; } } From da862097ed5fdd04241a1afdc860ed9c20420894 Mon Sep 17 00:00:00 2001 From: Brandon Williams Date: Thu, 2 Apr 2026 11:10:21 -0500 Subject: [PATCH 2/2] Test sonar fix from 17263 --- ds/Jenkinsfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ds/Jenkinsfile b/ds/Jenkinsfile index b840c9cd84d6..1cb8a8652e09 100644 --- a/ds/Jenkinsfile +++ b/ds/Jenkinsfile @@ -15,6 +15,6 @@ // This Jenkinsfile uses a shared library from https://github.com/riptano/jenkins-pipeline-lib // The pipeline logic is defined in the library rather than in this file. -@Library('ds-pipeline-lib') _ +@Library('ds-pipeline-lib@CNDB-17263-remove-noverify') _ dsCassandraPRGate()