Skip to content

KAFKA-20307: Add Interactive Queries (IQv1) support for headers-aware stores#21744

Open
aliehsaeedii wants to merge 3 commits intoapache:trunkfrom
aliehsaeedii:iqv1_updates
Open

KAFKA-20307: Add Interactive Queries (IQv1) support for headers-aware stores#21744
aliehsaeedii wants to merge 3 commits intoapache:trunkfrom
aliehsaeedii:iqv1_updates

Conversation

@aliehsaeedii
Copy link
Contributor

@aliehsaeedii aliehsaeedii commented Mar 13, 2026

This PR adds backward compatibility support for the new headers-aware
state stores (TimestampedKeyValueStoreWithHeaders and
TimestampedWindowStoreWithHeaders) when queried through the
Interactive Queries v1 (IQv1) API. Users can now query headers-aware
stores using the existing QueryableStoreTypes API without any code
changes.

@github-actions github-actions bot added triage PRs from the community streams labels Mar 13, 2026
@mjsax mjsax added ci-approved kip Requires or implements a KIP and removed triage PRs from the community labels Mar 15, 2026
Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM -- basically only comments about tests


static RecordConverter converterForStore(final StateStore store) {
// First check if the top-level store implements HeadersBytesStore or TimestampedBytesStore
// This handles in-memory stores with marker wrappers
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It also handler persistent stores, if there are not adaptors?

if (queryableStoreType instanceof QueryableStoreTypes.KeyValueStoreType) {
return (List<T>) Collections.singletonList(new GenericReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStoreWithHeaders<Object, Object>) store, ValueConverters.extractValueFromHeaders()));
} else if (queryableStoreType instanceof QueryableStoreTypes.TimestampedKeyValueStoreType) {
return (List<T>) Collections.singletonList(new GenericReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStoreWithHeaders<Object, Object>) store, ValueConverters.headersToValueAndTimestamp()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: might be good to rename from headersToValueAndTimestamp to extractValueAndTimestampFromHeaders to align naming to extractValueFromHeaders ?

@Test
@SuppressWarnings("unchecked")
public void shouldReturnTimestampedConverterForTimestampedToHeadersStoreAdapter() {
final WrappedStateStore<?, ?, ?> mockWrapper = mock(WrappedStateStore.class);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we using mocks here, but not above? Could we simplify the tests above using mocks, too?


@Test
public void shouldReturnIdentityConverterForPlainKeyValueStore() {
final StateStore store = Stores.keyValueStoreBuilder(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same. Should we just use a mock instead?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, this test seems to be duplicated vi a shouldReturnIdentityConverterForMockKeyValueStore below? Both test the KeyValueStore case?

}

@Test
public void shouldReturnHeadersConverterForHeadersAwareWindowStore() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a test for session-header store?

assertEquals(1, stores.size());
for (final ReadOnlyWindowStore<String, String> store : stores) {
assertThat(store, instanceOf(ReadOnlyWindowStore.class));
assertThat(store, instanceOf(GenericReadOnlyWindowStoreFacade.class));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar as above -- same below.

}

@Test
public void shouldFindTimestampedKeyValueStoresWithHeaders() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar comments as for global-store-provide-test

.build();

final QueryableStoreType<ReadOnlyKeyValueStore<String, ValueAndTimestamp<String>>> storeType =
QueryableStoreTypes.timestampedKeyValueStore();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need similar test for keyValuStore()?

.build();

final QueryableStoreType<ReadOnlyWindowStore<String, ValueAndTimestamp<String>>> storeType =
QueryableStoreTypes.timestampedWindowStore();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need similar test for windowStore()?

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class QueryableStoreTypesWithHeadersTest {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add tests for session-store cases, too?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

ci-approved kip Requires or implements a KIP streams

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants