Skip to content

Commit 52a919b

Browse files
authored
JAVA-3058: Clear prepared statement cache on UDT type change event (#1638)
1 parent cfeb55f commit 52a919b

File tree

5 files changed

+237
-9
lines changed

5 files changed

+237
-9
lines changed

core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlPrepareAsyncProcessor.java

+83-2
Original file line numberDiff line numberDiff line change
@@ -18,31 +18,112 @@
1818
import com.datastax.oss.driver.api.core.cql.PrepareRequest;
1919
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
2020
import com.datastax.oss.driver.api.core.session.Request;
21+
import com.datastax.oss.driver.api.core.type.DataType;
22+
import com.datastax.oss.driver.api.core.type.ListType;
23+
import com.datastax.oss.driver.api.core.type.MapType;
24+
import com.datastax.oss.driver.api.core.type.SetType;
25+
import com.datastax.oss.driver.api.core.type.TupleType;
26+
import com.datastax.oss.driver.api.core.type.UserDefinedType;
2127
import com.datastax.oss.driver.api.core.type.reflect.GenericType;
28+
import com.datastax.oss.driver.internal.core.context.DefaultDriverContext;
2229
import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
30+
import com.datastax.oss.driver.internal.core.metadata.schema.events.TypeChangeEvent;
2331
import com.datastax.oss.driver.internal.core.session.DefaultSession;
2432
import com.datastax.oss.driver.internal.core.session.RequestProcessor;
2533
import com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures;
34+
import com.datastax.oss.driver.internal.core.util.concurrent.RunOrSchedule;
2635
import com.datastax.oss.driver.shaded.guava.common.cache.Cache;
2736
import com.datastax.oss.driver.shaded.guava.common.cache.CacheBuilder;
37+
import com.datastax.oss.driver.shaded.guava.common.collect.Iterables;
38+
import com.datastax.oss.protocol.internal.ProtocolConstants;
39+
import edu.umd.cs.findbugs.annotations.NonNull;
40+
import io.netty.util.concurrent.EventExecutor;
41+
import java.util.Map;
42+
import java.util.Optional;
2843
import java.util.concurrent.CompletableFuture;
2944
import java.util.concurrent.CompletionStage;
3045
import java.util.concurrent.ExecutionException;
3146
import net.jcip.annotations.ThreadSafe;
47+
import org.slf4j.Logger;
48+
import org.slf4j.LoggerFactory;
3249

3350
@ThreadSafe
3451
public class CqlPrepareAsyncProcessor
3552
implements RequestProcessor<PrepareRequest, CompletionStage<PreparedStatement>> {
3653

54+
private static final Logger LOG = LoggerFactory.getLogger(CqlPrepareAsyncProcessor.class);
55+
3756
protected final Cache<PrepareRequest, CompletableFuture<PreparedStatement>> cache;
3857

3958
public CqlPrepareAsyncProcessor() {
40-
this(CacheBuilder.newBuilder().weakValues().build());
59+
this(Optional.empty());
60+
}
61+
62+
public CqlPrepareAsyncProcessor(@NonNull Optional<? extends DefaultDriverContext> context) {
63+
this(CacheBuilder.newBuilder().weakValues().build(), context);
4164
}
4265

4366
protected CqlPrepareAsyncProcessor(
44-
Cache<PrepareRequest, CompletableFuture<PreparedStatement>> cache) {
67+
Cache<PrepareRequest, CompletableFuture<PreparedStatement>> cache,
68+
Optional<? extends DefaultDriverContext> context) {
69+
4570
this.cache = cache;
71+
context.ifPresent(
72+
(ctx) -> {
73+
LOG.info("Adding handler to invalidate cached prepared statements on type changes");
74+
EventExecutor adminExecutor = ctx.getNettyOptions().adminEventExecutorGroup().next();
75+
ctx.getEventBus()
76+
.register(
77+
TypeChangeEvent.class, RunOrSchedule.on(adminExecutor, this::onTypeChanged));
78+
});
79+
}
80+
81+
private static boolean typeMatches(UserDefinedType oldType, DataType typeToCheck) {
82+
83+
switch (typeToCheck.getProtocolCode()) {
84+
case ProtocolConstants.DataType.UDT:
85+
UserDefinedType udtType = (UserDefinedType) typeToCheck;
86+
return udtType.equals(oldType)
87+
? true
88+
: Iterables.any(udtType.getFieldTypes(), (testType) -> typeMatches(oldType, testType));
89+
case ProtocolConstants.DataType.LIST:
90+
ListType listType = (ListType) typeToCheck;
91+
return typeMatches(oldType, listType.getElementType());
92+
case ProtocolConstants.DataType.SET:
93+
SetType setType = (SetType) typeToCheck;
94+
return typeMatches(oldType, setType.getElementType());
95+
case ProtocolConstants.DataType.MAP:
96+
MapType mapType = (MapType) typeToCheck;
97+
return typeMatches(oldType, mapType.getKeyType())
98+
|| typeMatches(oldType, mapType.getValueType());
99+
case ProtocolConstants.DataType.TUPLE:
100+
TupleType tupleType = (TupleType) typeToCheck;
101+
return Iterables.any(
102+
tupleType.getComponentTypes(), (testType) -> typeMatches(oldType, testType));
103+
default:
104+
return false;
105+
}
106+
}
107+
108+
private void onTypeChanged(TypeChangeEvent event) {
109+
for (Map.Entry<PrepareRequest, CompletableFuture<PreparedStatement>> entry :
110+
this.cache.asMap().entrySet()) {
111+
112+
try {
113+
PreparedStatement stmt = entry.getValue().get();
114+
if (Iterables.any(
115+
stmt.getResultSetDefinitions(), (def) -> typeMatches(event.oldType, def.getType()))
116+
|| Iterables.any(
117+
stmt.getVariableDefinitions(),
118+
(def) -> typeMatches(event.oldType, def.getType()))) {
119+
120+
this.cache.invalidate(entry.getKey());
121+
this.cache.cleanUp();
122+
}
123+
} catch (Exception e) {
124+
LOG.info("Exception while invalidating prepared statement cache due to UDT change", e);
125+
}
126+
}
46127
}
47128

48129
@Override

core/src/main/java/com/datastax/oss/driver/internal/core/session/BuiltInRequestProcessors.java

+6-3
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import com.datastax.oss.driver.internal.core.util.DefaultDependencyChecker;
3535
import java.util.ArrayList;
3636
import java.util.List;
37+
import java.util.Optional;
3738
import org.slf4j.Logger;
3839
import org.slf4j.LoggerFactory;
3940

@@ -43,7 +44,7 @@ public class BuiltInRequestProcessors {
4344

4445
public static List<RequestProcessor<?, ?>> createDefaultProcessors(DefaultDriverContext context) {
4546
List<RequestProcessor<?, ?>> processors = new ArrayList<>();
46-
addBasicProcessors(processors);
47+
addBasicProcessors(processors, context);
4748
if (DefaultDependencyChecker.isPresent(TINKERPOP)) {
4849
addGraphProcessors(context, processors);
4950
} else {
@@ -62,7 +63,8 @@ public class BuiltInRequestProcessors {
6263
return processors;
6364
}
6465

65-
public static void addBasicProcessors(List<RequestProcessor<?, ?>> processors) {
66+
public static void addBasicProcessors(
67+
List<RequestProcessor<?, ?>> processors, DefaultDriverContext context) {
6668
// regular requests (sync and async)
6769
CqlRequestAsyncProcessor cqlRequestAsyncProcessor = new CqlRequestAsyncProcessor();
6870
CqlRequestSyncProcessor cqlRequestSyncProcessor =
@@ -71,7 +73,8 @@ public static void addBasicProcessors(List<RequestProcessor<?, ?>> processors) {
7173
processors.add(cqlRequestSyncProcessor);
7274

7375
// prepare requests (sync and async)
74-
CqlPrepareAsyncProcessor cqlPrepareAsyncProcessor = new CqlPrepareAsyncProcessor();
76+
CqlPrepareAsyncProcessor cqlPrepareAsyncProcessor =
77+
new CqlPrepareAsyncProcessor(Optional.of(context));
7578
CqlPrepareSyncProcessor cqlPrepareSyncProcessor =
7679
new CqlPrepareSyncProcessor(cqlPrepareAsyncProcessor);
7780
processors.add(cqlPrepareAsyncProcessor);

core/src/main/java/com/datastax/oss/driver/internal/core/session/BuiltInRequestProcessorsSubstitutions.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public static final class BuiltInRequestProcessorsGraphMissingReactiveMissing {
3636
public static List<RequestProcessor<?, ?>> createDefaultProcessors(
3737
DefaultDriverContext context) {
3838
List<RequestProcessor<?, ?>> processors = new ArrayList<>();
39-
BuiltInRequestProcessors.addBasicProcessors(processors);
39+
BuiltInRequestProcessors.addBasicProcessors(processors, context);
4040
return processors;
4141
}
4242
}
@@ -48,7 +48,7 @@ public static final class BuiltInRequestProcessorsGraphMissingReactivePresent {
4848
public static List<RequestProcessor<?, ?>> createDefaultProcessors(
4949
DefaultDriverContext context) {
5050
List<RequestProcessor<?, ?>> processors = new ArrayList<>();
51-
BuiltInRequestProcessors.addBasicProcessors(processors);
51+
BuiltInRequestProcessors.addBasicProcessors(processors, context);
5252
BuiltInRequestProcessors.addReactiveProcessors(processors);
5353
return processors;
5454
}
@@ -61,7 +61,7 @@ public static final class BuiltInRequestProcessorsGraphPresentReactiveMissing {
6161
public static List<RequestProcessor<?, ?>> createDefaultProcessors(
6262
DefaultDriverContext context) {
6363
List<RequestProcessor<?, ?>> processors = new ArrayList<>();
64-
BuiltInRequestProcessors.addBasicProcessors(processors);
64+
BuiltInRequestProcessors.addBasicProcessors(processors, context);
6565
BuiltInRequestProcessors.addGraphProcessors(context, processors);
6666
return processors;
6767
}

integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementIT.java

+142
Original file line numberDiff line numberDiff line change
@@ -39,14 +39,20 @@
3939
import com.datastax.oss.driver.api.testinfra.session.SessionRule;
4040
import com.datastax.oss.driver.api.testinfra.session.SessionUtils;
4141
import com.datastax.oss.driver.categories.ParallelizableTests;
42+
import com.datastax.oss.driver.internal.core.context.DefaultDriverContext;
43+
import com.datastax.oss.driver.internal.core.metadata.schema.events.TypeChangeEvent;
4244
import com.datastax.oss.driver.internal.core.metadata.token.DefaultTokenMap;
4345
import com.datastax.oss.driver.internal.core.metadata.token.TokenFactory;
4446
import com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures;
47+
import com.datastax.oss.driver.shaded.guava.common.util.concurrent.Uninterruptibles;
4548
import com.datastax.oss.protocol.internal.util.Bytes;
4649
import com.google.common.collect.ImmutableList;
4750
import java.nio.ByteBuffer;
4851
import java.time.Duration;
4952
import java.util.concurrent.CompletionStage;
53+
import java.util.concurrent.CountDownLatch;
54+
import java.util.concurrent.TimeUnit;
55+
import java.util.function.Consumer;
5056
import junit.framework.TestCase;
5157
import org.junit.Before;
5258
import org.junit.Rule;
@@ -444,6 +450,142 @@ public void should_fail_fast_if_id_changes_on_reprepare() {
444450
}
445451
}
446452

453+
private void invalidationResultSetTest(Consumer<CqlSession> createFn) {
454+
455+
try (CqlSession session = sessionWithCacheSizeMetric()) {
456+
457+
assertThat(getPreparedCacheSize(session)).isEqualTo(0);
458+
createFn.accept(session);
459+
460+
session.prepare("select f from test_table_1 where e = ?");
461+
session.prepare("select h from test_table_2 where g = ?");
462+
assertThat(getPreparedCacheSize(session)).isEqualTo(2);
463+
464+
CountDownLatch latch = new CountDownLatch(1);
465+
DefaultDriverContext ctx = (DefaultDriverContext) session.getContext();
466+
ctx.getEventBus()
467+
.register(
468+
TypeChangeEvent.class,
469+
(e) -> {
470+
assertThat(e.oldType.getName().toString()).isEqualTo("test_type_2");
471+
latch.countDown();
472+
});
473+
474+
session.execute("ALTER TYPE test_type_2 add i blob");
475+
Uninterruptibles.awaitUninterruptibly(latch, 2, TimeUnit.SECONDS);
476+
477+
assertThat(getPreparedCacheSize(session)).isEqualTo(1);
478+
}
479+
}
480+
481+
private void invalidationVariableDefsTest(Consumer<CqlSession> createFn, boolean isCollection) {
482+
483+
try (CqlSession session = sessionWithCacheSizeMetric()) {
484+
485+
assertThat(getPreparedCacheSize(session)).isEqualTo(0);
486+
createFn.accept(session);
487+
488+
String fStr = isCollection ? "f contains ?" : "f = ?";
489+
session.prepare(String.format("select e from test_table_1 where %s allow filtering", fStr));
490+
String hStr = isCollection ? "h contains ?" : "h = ?";
491+
session.prepare(String.format("select g from test_table_2 where %s allow filtering", hStr));
492+
assertThat(getPreparedCacheSize(session)).isEqualTo(2);
493+
494+
CountDownLatch latch = new CountDownLatch(1);
495+
DefaultDriverContext ctx = (DefaultDriverContext) session.getContext();
496+
ctx.getEventBus()
497+
.register(
498+
TypeChangeEvent.class,
499+
(e) -> {
500+
assertThat(e.oldType.getName().toString()).isEqualTo("test_type_2");
501+
latch.countDown();
502+
});
503+
504+
session.execute("ALTER TYPE test_type_2 add i blob");
505+
Uninterruptibles.awaitUninterruptibly(latch, 2, TimeUnit.SECONDS);
506+
507+
assertThat(getPreparedCacheSize(session)).isEqualTo(1);
508+
}
509+
}
510+
511+
Consumer<CqlSession> setupCacheEntryTestBasic =
512+
(session) -> {
513+
session.execute("CREATE TYPE test_type_1 (a text, b int)");
514+
session.execute("CREATE TYPE test_type_2 (c int, d text)");
515+
session.execute("CREATE TABLE test_table_1 (e int primary key, f frozen<test_type_1>)");
516+
session.execute("CREATE TABLE test_table_2 (g int primary key, h frozen<test_type_2>)");
517+
};
518+
519+
@Test
520+
public void should_invalidate_cache_entry_on_basic_udt_change_result_set() {
521+
invalidationResultSetTest(setupCacheEntryTestBasic);
522+
}
523+
524+
@Test
525+
public void should_invalidate_cache_entry_on_basic_udt_change_variable_defs() {
526+
invalidationVariableDefsTest(setupCacheEntryTestBasic, false);
527+
}
528+
529+
Consumer<CqlSession> setupCacheEntryTestCollection =
530+
(session) -> {
531+
session.execute("CREATE TYPE test_type_1 (a text, b int)");
532+
session.execute("CREATE TYPE test_type_2 (c int, d text)");
533+
session.execute(
534+
"CREATE TABLE test_table_1 (e int primary key, f list<frozen<test_type_1>>)");
535+
session.execute(
536+
"CREATE TABLE test_table_2 (g int primary key, h list<frozen<test_type_2>>)");
537+
};
538+
539+
@Test
540+
public void should_invalidate_cache_entry_on_collection_udt_change_result_set() {
541+
invalidationResultSetTest(setupCacheEntryTestCollection);
542+
}
543+
544+
@Test
545+
public void should_invalidate_cache_entry_on_collection_udt_change_variable_defs() {
546+
invalidationVariableDefsTest(setupCacheEntryTestCollection, true);
547+
}
548+
549+
Consumer<CqlSession> setupCacheEntryTestTuple =
550+
(session) -> {
551+
session.execute("CREATE TYPE test_type_1 (a text, b int)");
552+
session.execute("CREATE TYPE test_type_2 (c int, d text)");
553+
session.execute(
554+
"CREATE TABLE test_table_1 (e int primary key, f tuple<int, test_type_1, text>)");
555+
session.execute(
556+
"CREATE TABLE test_table_2 (g int primary key, h tuple<text, test_type_2, int>)");
557+
};
558+
559+
@Test
560+
public void should_invalidate_cache_entry_on_tuple_udt_change_result_set() {
561+
invalidationResultSetTest(setupCacheEntryTestTuple);
562+
}
563+
564+
@Test
565+
public void should_invalidate_cache_entry_on_tuple_udt_change_variable_defs() {
566+
invalidationVariableDefsTest(setupCacheEntryTestTuple, false);
567+
}
568+
569+
Consumer<CqlSession> setupCacheEntryTestNested =
570+
(session) -> {
571+
session.execute("CREATE TYPE test_type_1 (a text, b int)");
572+
session.execute("CREATE TYPE test_type_2 (c int, d text)");
573+
session.execute("CREATE TYPE test_type_3 (e frozen<test_type_1>, f int)");
574+
session.execute("CREATE TYPE test_type_4 (g int, h frozen<test_type_2>)");
575+
session.execute("CREATE TABLE test_table_1 (e int primary key, f frozen<test_type_3>)");
576+
session.execute("CREATE TABLE test_table_2 (g int primary key, h frozen<test_type_4>)");
577+
};
578+
579+
@Test
580+
public void should_invalidate_cache_entry_on_nested_udt_change_result_set() {
581+
invalidationResultSetTest(setupCacheEntryTestNested);
582+
}
583+
584+
@Test
585+
public void should_invalidate_cache_entry_on_nested_udt_change_variable_defs() {
586+
invalidationVariableDefsTest(setupCacheEntryTestNested, false);
587+
}
588+
447589
@Test
448590
public void should_infer_routing_information_when_partition_key_is_bound() {
449591
should_infer_routing_information_when_partition_key_is_bound(

integration-tests/src/test/java/com/datastax/oss/driver/example/guava/internal/GuavaDriverContext.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.datastax.oss.driver.internal.core.cql.CqlRequestAsyncProcessor;
2727
import com.datastax.oss.driver.internal.core.cql.CqlRequestSyncProcessor;
2828
import com.datastax.oss.driver.internal.core.session.RequestProcessorRegistry;
29+
import java.util.Optional;
2930

3031
/**
3132
* A Custom {@link DefaultDriverContext} that overrides {@link #getRequestProcessorRegistry()} to
@@ -44,7 +45,8 @@ public RequestProcessorRegistry buildRequestProcessorRegistry() {
4445
// use GuavaRequestAsyncProcessor to return ListenableFutures in async methods.
4546

4647
CqlRequestAsyncProcessor cqlRequestAsyncProcessor = new CqlRequestAsyncProcessor();
47-
CqlPrepareAsyncProcessor cqlPrepareAsyncProcessor = new CqlPrepareAsyncProcessor();
48+
CqlPrepareAsyncProcessor cqlPrepareAsyncProcessor =
49+
new CqlPrepareAsyncProcessor(Optional.of(this));
4850
CqlRequestSyncProcessor cqlRequestSyncProcessor =
4951
new CqlRequestSyncProcessor(cqlRequestAsyncProcessor);
5052

0 commit comments

Comments
 (0)