Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.kafka.streams.state.AggregationWithHeaders;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.ReadOnlySessionStore;
Expand All @@ -51,6 +52,9 @@
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.ValueTimestampHeaders;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.internals.CompositeReadOnlyWindowStore;
import org.apache.kafka.streams.state.internals.StateStoreProvider;
import org.apache.kafka.test.TestUtils;

import org.junit.jupiter.api.AfterAll;
Expand Down Expand Up @@ -479,7 +483,7 @@ private <K, V> void processKeyValueWithTimestampAndHeadersAndVerify(final K key,
() -> {
try {
final ReadOnlyKeyValueStore<K, ValueTimestampHeaders<V>> store = IntegrationTestUtils
.getStore(STORE_NAME, kafkaStreams, QueryableStoreTypes.keyValueStore());
.getStore(STORE_NAME, kafkaStreams, new TimestampedKeyValueStoreWithHeadersType<>());

if (store == null)
return false;
Expand Down Expand Up @@ -520,7 +524,7 @@ private <K, V> void processKeyValueWithTimestampAndHeadersAndVerify(final K key,
() -> {
try {
final ReadOnlyKeyValueStore<K, ValueTimestampHeaders<V>> store = IntegrationTestUtils
.getStore(STORE_NAME, kafkaStreams, QueryableStoreTypes.keyValueStore());
.getStore(STORE_NAME, kafkaStreams, new TimestampedKeyValueStoreWithHeadersType<>());

if (store == null)
return false;
Expand All @@ -546,7 +550,7 @@ private <K, V> void verifyLegacyValuesWithEmptyHeaders(final K key,
() -> {
try {
final ReadOnlyKeyValueStore<K, ValueTimestampHeaders<V>> store = IntegrationTestUtils
.getStore(STORE_NAME, kafkaStreams, QueryableStoreTypes.keyValueStore());
.getStore(STORE_NAME, kafkaStreams, new TimestampedKeyValueStoreWithHeadersType<>());

if (store == null)
return false;
Expand All @@ -565,30 +569,6 @@ private <K, V> void verifyLegacyValuesWithEmptyHeaders(final K key,
"Could not get expected result in time.");
}

private <K, V> void verifyLegacyValuesWithEmptyHeaders(final K key,
final V value) throws Exception {
TestUtils.waitForCondition(
() -> {
try {
final ReadOnlyKeyValueStore<K, ValueTimestampHeaders<V>> store = IntegrationTestUtils
.getStore(STORE_NAME, kafkaStreams, QueryableStoreTypes.keyValueStore());

if (store == null)
return false;

final ValueTimestampHeaders<V> result = store.get(key);
return result != null
&& result.value().equals(value)
&& result.headers().toArray().length == 0;
} catch (final Exception swallow) {
LOG.error("Error while verifying legacy value with empty headers", swallow);
return false;
}
},
60_000L,
"Could not get expected result in time.");
}

private static class KeyValueProcessor implements Processor<String, String, Void, Void> {
private KeyValueStore<String, String> store;

Expand Down Expand Up @@ -906,7 +886,7 @@ private void verifyPlainWindowValueWithEmptyHeadersAndTimestamp(final String key
TestUtils.waitForCondition(() -> {
try {
final ReadOnlyWindowStore<String, ValueTimestampHeaders<String>> store =
IntegrationTestUtils.getStore(WINDOW_STORE_NAME, kafkaStreams, QueryableStoreTypes.windowStore());
IntegrationTestUtils.getStore(WINDOW_STORE_NAME, kafkaStreams, new TimestampedWindowStoreWithHeadersType<>());

if (store == null) {
return false;
Expand Down Expand Up @@ -963,7 +943,7 @@ private void processPlainWindowedKeyValueWithHeadersAndVerify(final String key,
TestUtils.waitForCondition(() -> {
try {
final ReadOnlyWindowStore<String, ValueTimestampHeaders<String>> store =
IntegrationTestUtils.getStore(WINDOW_STORE_NAME, kafkaStreams, QueryableStoreTypes.windowStore());
IntegrationTestUtils.getStore(WINDOW_STORE_NAME, kafkaStreams, new TimestampedWindowStoreWithHeadersType<>());

if (store == null) {
return false;
Expand Down Expand Up @@ -1049,7 +1029,7 @@ private void processWindowedKeyValueWithHeadersAndVerify(final String key,
TestUtils.waitForCondition(() -> {
try {
final ReadOnlyWindowStore<String, ValueTimestampHeaders<String>> store =
IntegrationTestUtils.getStore(WINDOW_STORE_NAME, kafkaStreams, QueryableStoreTypes.windowStore());
IntegrationTestUtils.getStore(WINDOW_STORE_NAME, kafkaStreams, new TimestampedWindowStoreWithHeadersType<>());

if (store == null) {
return false;
Expand Down Expand Up @@ -1089,7 +1069,7 @@ private void verifyWindowValueWithEmptyHeaders(final String key,
TestUtils.waitForCondition(() -> {
try {
final ReadOnlyWindowStore<String, ValueTimestampHeaders<String>> store =
IntegrationTestUtils.getStore(WINDOW_STORE_NAME, kafkaStreams, QueryableStoreTypes.windowStore());
IntegrationTestUtils.getStore(WINDOW_STORE_NAME, kafkaStreams, new TimestampedWindowStoreWithHeadersType<>());

if (store == null) {
return false;
Expand Down Expand Up @@ -1493,7 +1473,7 @@ public void shouldSuccessfullyDowngradeFromTimestampedWindowStoreWithHeadersAfte
private boolean windowStoreContainsKey(final String key, final long timestamp) {
try {
final ReadOnlyWindowStore<String, ValueTimestampHeaders<String>> store =
IntegrationTestUtils.getStore(WINDOW_STORE_NAME, kafkaStreams, QueryableStoreTypes.windowStore());
IntegrationTestUtils.getStore(WINDOW_STORE_NAME, kafkaStreams, new TimestampedWindowStoreWithHeadersType<>());

if (store == null) {
return false;
Expand Down Expand Up @@ -2037,4 +2017,52 @@ public void process(final Record<String, String> record) {
store.put(sessionKey, AggregationWithHeaders.make(record.value(), record.headers()));
}
}

// ==================== Custom QueryableStoreTypes ====================

/**
* Custom QueryableStoreType for querying TimestampedKeyValueStoreWithHeaders directly
* without facade wrapping. This returns the full ValueTimestampHeaders wrapper.
*/
private static class TimestampedKeyValueStoreWithHeadersType<K, V>
implements QueryableStoreType<ReadOnlyKeyValueStore<K, ValueTimestampHeaders<V>>> {

@Override
public boolean accepts(final org.apache.kafka.streams.processor.StateStore stateStore) {
// Accept stores that implement both TimestampedKeyValueStoreWithHeaders and ReadOnlyKeyValueStore
return stateStore instanceof TimestampedKeyValueStoreWithHeaders
&& stateStore instanceof ReadOnlyKeyValueStore;
}

@Override
public ReadOnlyKeyValueStore<K, ValueTimestampHeaders<V>> create(
final StateStoreProvider storeProvider,
final String storeName) {
return new CompositeReadOnlyKeyValueStore<>(storeProvider, this, storeName);
}
}

/**
* Custom queryable store type for accessing TimestampedWindowStoreWithHeaders directly
* without facade wrapping. This returns the full ValueTimestampHeaders wrapper.
*/
private static class TimestampedWindowStoreWithHeadersType<K, V>
implements QueryableStoreType<ReadOnlyWindowStore<K, ValueTimestampHeaders<V>>> {

@Override
public boolean accepts(final org.apache.kafka.streams.processor.StateStore stateStore) {
// Accept stores that implement both TimestampedWindowStoreWithHeaders and ReadOnlyWindowStore
return stateStore instanceof TimestampedWindowStoreWithHeaders
&& stateStore instanceof ReadOnlyWindowStore;
}

@Override
public ReadOnlyWindowStore<K, ValueTimestampHeaders<V>> create(
final StateStoreProvider storeProvider,
final String storeName) {
return new CompositeReadOnlyWindowStore<>(storeProvider, this, storeName);
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
import org.apache.kafka.streams.state.TimestampedWindowStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.ValueTimestampHeaders;
import org.apache.kafka.streams.state.VersionedKeyValueStore;
import org.apache.kafka.streams.state.VersionedRecord;
import org.apache.kafka.streams.state.WindowStore;
Expand Down Expand Up @@ -163,6 +165,15 @@ private TimestampedKeyValueStoreReadOnlyDecorator(final TimestampedKeyValueStore
}
}

static class TimestampedKeyValueStoreReadOnlyDecoratorWithHeaders<K, V>
extends KeyValueStoreReadOnlyDecorator<K, ValueTimestampHeaders<V>>
implements TimestampedKeyValueStoreWithHeaders<K, V> {

private TimestampedKeyValueStoreReadOnlyDecoratorWithHeaders(final TimestampedKeyValueStoreWithHeaders<K, V> inner) {
super(inner);
}
}

static class VersionedKeyValueStoreReadOnlyDecorator<K, V>
extends AbstractReadOnlyDecorator<VersionedKeyValueStore<K, V>, K, V>
implements VersionedKeyValueStore<K, V> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,12 @@
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.Task.TaskType;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.internals.PlainToHeadersStoreAdapter;
import org.apache.kafka.streams.state.internals.PlainToHeadersWindowStoreAdapter;
import org.apache.kafka.streams.state.internals.RecordConverter;
import org.apache.kafka.streams.state.internals.TimestampedToHeadersStoreAdapter;
import org.apache.kafka.streams.state.internals.TimestampedToHeadersWindowStoreAdapter;
import org.apache.kafka.streams.state.internals.WrappedStateStore;

import org.slf4j.Logger;

Expand All @@ -54,6 +59,7 @@ final class StateManagerUtil {
private StateManagerUtil() {}

static RecordConverter converterForStore(final StateStore store) {
// First check if the top-level store implements HeadersBytesStore or TimestampedBytesStore
if (isHeadersAware(store)) {
if (store instanceof SessionStore) {
return rawValueToSessionHeadersValue();
Expand All @@ -64,6 +70,29 @@ static RecordConverter converterForStore(final StateStore store) {
// timestamp is used separately during put() process for restore of versioned stores
return rawValueToTimestampedValue();
}

// If top-level check didn't find the type, unwrap to find adapters
// This handles persistent stores that use adapters
StateStore current = store;
while (current != null) {
if (current instanceof TimestampedToHeadersStoreAdapter || current instanceof TimestampedToHeadersWindowStoreAdapter) {
// Adapter wraps a timestamped store, so restore in timestamped format
return rawValueToTimestampedValue();
} else if (current instanceof PlainToHeadersStoreAdapter || current instanceof PlainToHeadersWindowStoreAdapter) {
// Adapter wraps a plain store, so restore in plain format
return identity();
}

// If not a WrappedStateStore, we've reached the innermost store
if (!(current instanceof WrappedStateStore)) {
break;
}

// Unwrap one more level
current = ((WrappedStateStore<?, ?, ?>) current).wrapped();
}

// Default to identity if no special handling needed
return identity();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,15 +123,20 @@ public ReadOnlyKeyValueStore<K, V> create(final StateStoreProvider storeProvider

}

private static class TimestampedKeyValueStoreType<K, V>
public static class TimestampedKeyValueStoreType<K, V>
extends QueryableStoreTypeMatcher<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>>> {

TimestampedKeyValueStoreType() {
super(Set.of(
TimestampedKeyValueStore.class,
ReadOnlyKeyValueStore.class));
}

@Override
public boolean accepts(final StateStore stateStore) {
return super.accepts(stateStore) &&
(stateStore instanceof TimestampedKeyValueStore || stateStore instanceof TimestampedKeyValueStoreWithHeaders);
}

@Override
public ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> create(final StateStoreProvider storeProvider,
final String storeName) {
Expand All @@ -152,15 +157,20 @@ public ReadOnlyWindowStore<K, V> create(final StateStoreProvider storeProvider,
}
}

private static class TimestampedWindowStoreType<K, V>
public static class TimestampedWindowStoreType<K, V>
extends QueryableStoreTypeMatcher<ReadOnlyWindowStore<K, ValueAndTimestamp<V>>> {

TimestampedWindowStoreType() {
super(Set.of(
TimestampedWindowStore.class,
ReadOnlyWindowStore.class));
}

@Override
public boolean accepts(final StateStore stateStore) {
return super.accepts(stateStore) &&
(stateStore instanceof TimestampedWindowStore || stateStore instanceof TimestampedWindowStoreWithHeaders);
}

@Override
public ReadOnlyWindowStore<K, ValueAndTimestamp<V>> create(final StateStoreProvider storeProvider,
final String storeName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.SessionStoreWithHeaders;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
import org.apache.kafka.streams.state.TimestampedWindowStore;
import org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;

import java.util.Collections;
import java.util.List;
Expand All @@ -45,8 +47,26 @@ public <T> List<T> stores(final String storeName, final QueryableStoreType<T> qu
if (!store.isOpen()) {
throw new InvalidStateStoreException("the state store, " + storeName + ", is not open.");
}
if (store instanceof TimestampedKeyValueStore && queryableStoreType instanceof QueryableStoreTypes.KeyValueStoreType) {
if (store instanceof TimestampedKeyValueStoreWithHeaders) {
if (queryableStoreType instanceof QueryableStoreTypes.KeyValueStoreType) {
return (List<T>) Collections.singletonList(new GenericReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStoreWithHeaders<Object, Object>) store, ValueConverters.extractValueFromHeaders()));
} else if (queryableStoreType instanceof QueryableStoreTypes.TimestampedKeyValueStoreType) {
return (List<T>) Collections.singletonList(new GenericReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStoreWithHeaders<Object, Object>) store, ValueConverters.extractValueAndTimestampFromHeaders()));
} else {
// For custom query types, return the raw store so they can access headers directly
return (List<T>) Collections.singletonList(store);
}
} else if (store instanceof TimestampedKeyValueStore && queryableStoreType instanceof QueryableStoreTypes.KeyValueStoreType) {
return (List<T>) Collections.singletonList(new GenericReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStore<Object, Object>) store, ValueConverters.extractValue()));
} else if (store instanceof TimestampedWindowStoreWithHeaders) {
if (queryableStoreType instanceof QueryableStoreTypes.WindowStoreType) {
return (List<T>) Collections.singletonList(new GenericReadOnlyWindowStoreFacade<>((TimestampedWindowStoreWithHeaders<Object, Object>) store, ValueConverters.extractValueFromHeaders()));
} else if (queryableStoreType instanceof QueryableStoreTypes.TimestampedWindowStoreType) {
return (List<T>) Collections.singletonList(new GenericReadOnlyWindowStoreFacade<>((TimestampedWindowStoreWithHeaders<Object, Object>) store, ValueConverters.extractValueAndTimestampFromHeaders()));
} else {
// For custom query types, return the raw store so they can access headers directly
return (List<T>) Collections.singletonList(store);
}
} else if (store instanceof TimestampedWindowStore && queryableStoreType instanceof QueryableStoreTypes.WindowStoreType) {
return (List<T>) Collections.singletonList(new GenericReadOnlyWindowStoreFacade<>((TimestampedWindowStore<Object, Object>) store, ValueConverters.extractValue()));
} else if (store instanceof SessionStoreWithHeaders && queryableStoreType instanceof QueryableStoreTypes.SessionStoreType) {
Expand Down
Loading
Loading