Skip to content

Commit a2efacd

Browse files
authored
KAFKA-20307: Add Interactive Queries (IQv1) support for headers-aware stores (apache#21744)
This PR adds backward compatibility support for the new headers-aware state stores (TimestampedKeyValueStoreWithHeaders and TimestampedWindowStoreWithHeaders) when queried through the Interactive Queries v1 (IQv1) API. Users can now query headers-aware stores using the existing QueryableStoreTypes API without any code changes. Reviewers: Matthias J. Sax <matthias@confluent.io>
1 parent 4ebf018 commit a2efacd

16 files changed

Lines changed: 764 additions & 47 deletions

streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java

Lines changed: 60 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.kafka.streams.state.AggregationWithHeaders;
3838
import org.apache.kafka.streams.state.KeyValueIterator;
3939
import org.apache.kafka.streams.state.KeyValueStore;
40+
import org.apache.kafka.streams.state.QueryableStoreType;
4041
import org.apache.kafka.streams.state.QueryableStoreTypes;
4142
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
4243
import org.apache.kafka.streams.state.ReadOnlySessionStore;
@@ -51,6 +52,9 @@
5152
import org.apache.kafka.streams.state.ValueAndTimestamp;
5253
import org.apache.kafka.streams.state.ValueTimestampHeaders;
5354
import org.apache.kafka.streams.state.WindowStore;
55+
import org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore;
56+
import org.apache.kafka.streams.state.internals.CompositeReadOnlyWindowStore;
57+
import org.apache.kafka.streams.state.internals.StateStoreProvider;
5458
import org.apache.kafka.test.TestUtils;
5559

5660
import org.junit.jupiter.api.AfterAll;
@@ -479,7 +483,7 @@ private <K, V> void processKeyValueWithTimestampAndHeadersAndVerify(final K key,
479483
() -> {
480484
try {
481485
final ReadOnlyKeyValueStore<K, ValueTimestampHeaders<V>> store = IntegrationTestUtils
482-
.getStore(STORE_NAME, kafkaStreams, QueryableStoreTypes.keyValueStore());
486+
.getStore(STORE_NAME, kafkaStreams, new TimestampedKeyValueStoreWithHeadersType<>());
483487

484488
if (store == null)
485489
return false;
@@ -520,7 +524,7 @@ private <K, V> void processKeyValueWithTimestampAndHeadersAndVerify(final K key,
520524
() -> {
521525
try {
522526
final ReadOnlyKeyValueStore<K, ValueTimestampHeaders<V>> store = IntegrationTestUtils
523-
.getStore(STORE_NAME, kafkaStreams, QueryableStoreTypes.keyValueStore());
527+
.getStore(STORE_NAME, kafkaStreams, new TimestampedKeyValueStoreWithHeadersType<>());
524528

525529
if (store == null)
526530
return false;
@@ -546,7 +550,7 @@ private <K, V> void verifyLegacyValuesWithEmptyHeaders(final K key,
546550
() -> {
547551
try {
548552
final ReadOnlyKeyValueStore<K, ValueTimestampHeaders<V>> store = IntegrationTestUtils
549-
.getStore(STORE_NAME, kafkaStreams, QueryableStoreTypes.keyValueStore());
553+
.getStore(STORE_NAME, kafkaStreams, new TimestampedKeyValueStoreWithHeadersType<>());
550554

551555
if (store == null)
552556
return false;
@@ -565,30 +569,6 @@ private <K, V> void verifyLegacyValuesWithEmptyHeaders(final K key,
565569
"Could not get expected result in time.");
566570
}
567571

568-
private <K, V> void verifyLegacyValuesWithEmptyHeaders(final K key,
569-
final V value) throws Exception {
570-
TestUtils.waitForCondition(
571-
() -> {
572-
try {
573-
final ReadOnlyKeyValueStore<K, ValueTimestampHeaders<V>> store = IntegrationTestUtils
574-
.getStore(STORE_NAME, kafkaStreams, QueryableStoreTypes.keyValueStore());
575-
576-
if (store == null)
577-
return false;
578-
579-
final ValueTimestampHeaders<V> result = store.get(key);
580-
return result != null
581-
&& result.value().equals(value)
582-
&& result.headers().toArray().length == 0;
583-
} catch (final Exception swallow) {
584-
LOG.error("Error while verifying legacy value with empty headers", swallow);
585-
return false;
586-
}
587-
},
588-
60_000L,
589-
"Could not get expected result in time.");
590-
}
591-
592572
private static class KeyValueProcessor implements Processor<String, String, Void, Void> {
593573
private KeyValueStore<String, String> store;
594574

@@ -906,7 +886,7 @@ private void verifyPlainWindowValueWithEmptyHeadersAndTimestamp(final String key
906886
TestUtils.waitForCondition(() -> {
907887
try {
908888
final ReadOnlyWindowStore<String, ValueTimestampHeaders<String>> store =
909-
IntegrationTestUtils.getStore(WINDOW_STORE_NAME, kafkaStreams, QueryableStoreTypes.windowStore());
889+
IntegrationTestUtils.getStore(WINDOW_STORE_NAME, kafkaStreams, new TimestampedWindowStoreWithHeadersType<>());
910890

911891
if (store == null) {
912892
return false;
@@ -963,7 +943,7 @@ private void processPlainWindowedKeyValueWithHeadersAndVerify(final String key,
963943
TestUtils.waitForCondition(() -> {
964944
try {
965945
final ReadOnlyWindowStore<String, ValueTimestampHeaders<String>> store =
966-
IntegrationTestUtils.getStore(WINDOW_STORE_NAME, kafkaStreams, QueryableStoreTypes.windowStore());
946+
IntegrationTestUtils.getStore(WINDOW_STORE_NAME, kafkaStreams, new TimestampedWindowStoreWithHeadersType<>());
967947

968948
if (store == null) {
969949
return false;
@@ -1049,7 +1029,7 @@ private void processWindowedKeyValueWithHeadersAndVerify(final String key,
10491029
TestUtils.waitForCondition(() -> {
10501030
try {
10511031
final ReadOnlyWindowStore<String, ValueTimestampHeaders<String>> store =
1052-
IntegrationTestUtils.getStore(WINDOW_STORE_NAME, kafkaStreams, QueryableStoreTypes.windowStore());
1032+
IntegrationTestUtils.getStore(WINDOW_STORE_NAME, kafkaStreams, new TimestampedWindowStoreWithHeadersType<>());
10531033

10541034
if (store == null) {
10551035
return false;
@@ -1089,7 +1069,7 @@ private void verifyWindowValueWithEmptyHeaders(final String key,
10891069
TestUtils.waitForCondition(() -> {
10901070
try {
10911071
final ReadOnlyWindowStore<String, ValueTimestampHeaders<String>> store =
1092-
IntegrationTestUtils.getStore(WINDOW_STORE_NAME, kafkaStreams, QueryableStoreTypes.windowStore());
1072+
IntegrationTestUtils.getStore(WINDOW_STORE_NAME, kafkaStreams, new TimestampedWindowStoreWithHeadersType<>());
10931073

10941074
if (store == null) {
10951075
return false;
@@ -1493,7 +1473,7 @@ public void shouldSuccessfullyDowngradeFromTimestampedWindowStoreWithHeadersAfte
14931473
private boolean windowStoreContainsKey(final String key, final long timestamp) {
14941474
try {
14951475
final ReadOnlyWindowStore<String, ValueTimestampHeaders<String>> store =
1496-
IntegrationTestUtils.getStore(WINDOW_STORE_NAME, kafkaStreams, QueryableStoreTypes.windowStore());
1476+
IntegrationTestUtils.getStore(WINDOW_STORE_NAME, kafkaStreams, new TimestampedWindowStoreWithHeadersType<>());
14971477

14981478
if (store == null) {
14991479
return false;
@@ -2037,4 +2017,52 @@ public void process(final Record<String, String> record) {
20372017
store.put(sessionKey, AggregationWithHeaders.make(record.value(), record.headers()));
20382018
}
20392019
}
2020+
2021+
// ==================== Custom QueryableStoreTypes ====================
2022+
2023+
/**
2024+
* Custom QueryableStoreType for querying TimestampedKeyValueStoreWithHeaders directly
2025+
* without facade wrapping. This returns the full ValueTimestampHeaders wrapper.
2026+
*/
2027+
private static class TimestampedKeyValueStoreWithHeadersType<K, V>
2028+
implements QueryableStoreType<ReadOnlyKeyValueStore<K, ValueTimestampHeaders<V>>> {
2029+
2030+
@Override
2031+
public boolean accepts(final org.apache.kafka.streams.processor.StateStore stateStore) {
2032+
// Accept stores that implement both TimestampedKeyValueStoreWithHeaders and ReadOnlyKeyValueStore
2033+
return stateStore instanceof TimestampedKeyValueStoreWithHeaders
2034+
&& stateStore instanceof ReadOnlyKeyValueStore;
2035+
}
2036+
2037+
@Override
2038+
public ReadOnlyKeyValueStore<K, ValueTimestampHeaders<V>> create(
2039+
final StateStoreProvider storeProvider,
2040+
final String storeName) {
2041+
return new CompositeReadOnlyKeyValueStore<>(storeProvider, this, storeName);
2042+
}
2043+
}
2044+
2045+
/**
2046+
* Custom queryable store type for accessing TimestampedWindowStoreWithHeaders directly
2047+
* without facade wrapping. This returns the full ValueTimestampHeaders wrapper.
2048+
*/
2049+
private static class TimestampedWindowStoreWithHeadersType<K, V>
2050+
implements QueryableStoreType<ReadOnlyWindowStore<K, ValueTimestampHeaders<V>>> {
2051+
2052+
@Override
2053+
public boolean accepts(final org.apache.kafka.streams.processor.StateStore stateStore) {
2054+
// Accept stores that implement both TimestampedWindowStoreWithHeaders and ReadOnlyWindowStore
2055+
return stateStore instanceof TimestampedWindowStoreWithHeaders
2056+
&& stateStore instanceof ReadOnlyWindowStore;
2057+
}
2058+
2059+
@Override
2060+
public ReadOnlyWindowStore<K, ValueTimestampHeaders<V>> create(
2061+
final StateStoreProvider storeProvider,
2062+
final String storeName) {
2063+
return new CompositeReadOnlyWindowStore<>(storeProvider, this, storeName);
2064+
}
2065+
}
2066+
2067+
20402068
}

streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,10 @@
2626
import org.apache.kafka.streams.state.KeyValueStore;
2727
import org.apache.kafka.streams.state.SessionStore;
2828
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
29+
import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
2930
import org.apache.kafka.streams.state.TimestampedWindowStore;
3031
import org.apache.kafka.streams.state.ValueAndTimestamp;
32+
import org.apache.kafka.streams.state.ValueTimestampHeaders;
3133
import org.apache.kafka.streams.state.VersionedKeyValueStore;
3234
import org.apache.kafka.streams.state.VersionedRecord;
3335
import org.apache.kafka.streams.state.WindowStore;
@@ -163,6 +165,15 @@ private TimestampedKeyValueStoreReadOnlyDecorator(final TimestampedKeyValueStore
163165
}
164166
}
165167

168+
static class TimestampedKeyValueStoreReadOnlyDecoratorWithHeaders<K, V>
169+
extends KeyValueStoreReadOnlyDecorator<K, ValueTimestampHeaders<V>>
170+
implements TimestampedKeyValueStoreWithHeaders<K, V> {
171+
172+
private TimestampedKeyValueStoreReadOnlyDecoratorWithHeaders(final TimestampedKeyValueStoreWithHeaders<K, V> inner) {
173+
super(inner);
174+
}
175+
}
176+
166177
static class VersionedKeyValueStoreReadOnlyDecorator<K, V>
167178
extends AbstractReadOnlyDecorator<VersionedKeyValueStore<K, V>, K, V>
168179
implements VersionedKeyValueStore<K, V> {

streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,12 @@
2727
import org.apache.kafka.streams.processor.TaskId;
2828
import org.apache.kafka.streams.processor.internals.Task.TaskType;
2929
import org.apache.kafka.streams.state.SessionStore;
30+
import org.apache.kafka.streams.state.internals.PlainToHeadersStoreAdapter;
31+
import org.apache.kafka.streams.state.internals.PlainToHeadersWindowStoreAdapter;
3032
import org.apache.kafka.streams.state.internals.RecordConverter;
33+
import org.apache.kafka.streams.state.internals.TimestampedToHeadersStoreAdapter;
34+
import org.apache.kafka.streams.state.internals.TimestampedToHeadersWindowStoreAdapter;
35+
import org.apache.kafka.streams.state.internals.WrappedStateStore;
3136

3237
import org.slf4j.Logger;
3338

@@ -54,6 +59,7 @@ final class StateManagerUtil {
5459
private StateManagerUtil() {}
5560

5661
static RecordConverter converterForStore(final StateStore store) {
62+
// First check if the top-level store implements HeadersBytesStore or TimestampedBytesStore
5763
if (isHeadersAware(store)) {
5864
if (store instanceof SessionStore) {
5965
return rawValueToSessionHeadersValue();
@@ -64,6 +70,29 @@ static RecordConverter converterForStore(final StateStore store) {
6470
// timestamp is used separately during put() process for restore of versioned stores
6571
return rawValueToTimestampedValue();
6672
}
73+
74+
// If top-level check didn't find the type, unwrap to find adapters
75+
// This handles persistent stores that use adapters
76+
StateStore current = store;
77+
while (current != null) {
78+
if (current instanceof TimestampedToHeadersStoreAdapter || current instanceof TimestampedToHeadersWindowStoreAdapter) {
79+
// Adapter wraps a timestamped store, so restore in timestamped format
80+
return rawValueToTimestampedValue();
81+
} else if (current instanceof PlainToHeadersStoreAdapter || current instanceof PlainToHeadersWindowStoreAdapter) {
82+
// Adapter wraps a plain store, so restore in plain format
83+
return identity();
84+
}
85+
86+
// If not a WrappedStateStore, we've reached the innermost store
87+
if (!(current instanceof WrappedStateStore)) {
88+
break;
89+
}
90+
91+
// Unwrap one more level
92+
current = ((WrappedStateStore<?, ?, ?>) current).wrapped();
93+
}
94+
95+
// Default to identity if no special handling needed
6796
return identity();
6897
}
6998

streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -123,15 +123,20 @@ public ReadOnlyKeyValueStore<K, V> create(final StateStoreProvider storeProvider
123123

124124
}
125125

126-
private static class TimestampedKeyValueStoreType<K, V>
126+
public static class TimestampedKeyValueStoreType<K, V>
127127
extends QueryableStoreTypeMatcher<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>>> {
128128

129129
TimestampedKeyValueStoreType() {
130130
super(Set.of(
131-
TimestampedKeyValueStore.class,
132131
ReadOnlyKeyValueStore.class));
133132
}
134133

134+
@Override
135+
public boolean accepts(final StateStore stateStore) {
136+
return super.accepts(stateStore) &&
137+
(stateStore instanceof TimestampedKeyValueStore || stateStore instanceof TimestampedKeyValueStoreWithHeaders);
138+
}
139+
135140
@Override
136141
public ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> create(final StateStoreProvider storeProvider,
137142
final String storeName) {
@@ -152,15 +157,20 @@ public ReadOnlyWindowStore<K, V> create(final StateStoreProvider storeProvider,
152157
}
153158
}
154159

155-
private static class TimestampedWindowStoreType<K, V>
160+
public static class TimestampedWindowStoreType<K, V>
156161
extends QueryableStoreTypeMatcher<ReadOnlyWindowStore<K, ValueAndTimestamp<V>>> {
157162

158163
TimestampedWindowStoreType() {
159164
super(Set.of(
160-
TimestampedWindowStore.class,
161165
ReadOnlyWindowStore.class));
162166
}
163167

168+
@Override
169+
public boolean accepts(final StateStore stateStore) {
170+
return super.accepts(stateStore) &&
171+
(stateStore instanceof TimestampedWindowStore || stateStore instanceof TimestampedWindowStoreWithHeaders);
172+
}
173+
164174
@Override
165175
public ReadOnlyWindowStore<K, ValueAndTimestamp<V>> create(final StateStoreProvider storeProvider,
166176
final String storeName) {

streams/src/main/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProvider.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@
2222
import org.apache.kafka.streams.state.QueryableStoreTypes;
2323
import org.apache.kafka.streams.state.SessionStoreWithHeaders;
2424
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
25+
import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
2526
import org.apache.kafka.streams.state.TimestampedWindowStore;
27+
import org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
2628

2729
import java.util.Collections;
2830
import java.util.List;
@@ -45,8 +47,26 @@ public <T> List<T> stores(final String storeName, final QueryableStoreType<T> qu
4547
if (!store.isOpen()) {
4648
throw new InvalidStateStoreException("the state store, " + storeName + ", is not open.");
4749
}
48-
if (store instanceof TimestampedKeyValueStore && queryableStoreType instanceof QueryableStoreTypes.KeyValueStoreType) {
50+
if (store instanceof TimestampedKeyValueStoreWithHeaders) {
51+
if (queryableStoreType instanceof QueryableStoreTypes.KeyValueStoreType) {
52+
return (List<T>) Collections.singletonList(new GenericReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStoreWithHeaders<Object, Object>) store, ValueConverters.extractValueFromHeaders()));
53+
} else if (queryableStoreType instanceof QueryableStoreTypes.TimestampedKeyValueStoreType) {
54+
return (List<T>) Collections.singletonList(new GenericReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStoreWithHeaders<Object, Object>) store, ValueConverters.extractValueAndTimestampFromHeaders()));
55+
} else {
56+
// For custom query types, return the raw store so they can access headers directly
57+
return (List<T>) Collections.singletonList(store);
58+
}
59+
} else if (store instanceof TimestampedKeyValueStore && queryableStoreType instanceof QueryableStoreTypes.KeyValueStoreType) {
4960
return (List<T>) Collections.singletonList(new GenericReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStore<Object, Object>) store, ValueConverters.extractValue()));
61+
} else if (store instanceof TimestampedWindowStoreWithHeaders) {
62+
if (queryableStoreType instanceof QueryableStoreTypes.WindowStoreType) {
63+
return (List<T>) Collections.singletonList(new GenericReadOnlyWindowStoreFacade<>((TimestampedWindowStoreWithHeaders<Object, Object>) store, ValueConverters.extractValueFromHeaders()));
64+
} else if (queryableStoreType instanceof QueryableStoreTypes.TimestampedWindowStoreType) {
65+
return (List<T>) Collections.singletonList(new GenericReadOnlyWindowStoreFacade<>((TimestampedWindowStoreWithHeaders<Object, Object>) store, ValueConverters.extractValueAndTimestampFromHeaders()));
66+
} else {
67+
// For custom query types, return the raw store so they can access headers directly
68+
return (List<T>) Collections.singletonList(store);
69+
}
5070
} else if (store instanceof TimestampedWindowStore && queryableStoreType instanceof QueryableStoreTypes.WindowStoreType) {
5171
return (List<T>) Collections.singletonList(new GenericReadOnlyWindowStoreFacade<>((TimestampedWindowStore<Object, Object>) store, ValueConverters.extractValue()));
5272
} else if (store instanceof SessionStoreWithHeaders && queryableStoreType instanceof QueryableStoreTypes.SessionStoreType) {

0 commit comments

Comments
 (0)