Skip to content

Commit e7c17a0

Browse files
authored
KAFKA-20222: Enable SessionStore with headers in DSL (apache#21600)
Adding support for DSL integration with SessionStores with header in support of KIP-1285. Also adds a ReadOnlySessionStoreFacade that wraps SessionStoreWithHeaders and strips the AggregationWithHeaders wrapper when queried through Interactive Queries, mirroring the existing pattern for ReadOnlyKeyValueStoreFacade and ReadOnlyWindowStoreFacade. Reviewers: Alieh Saeedi <asaeedi@confluent.io>, Matthias J. Sax <matthias@confluent.io>
1 parent 9f15f26 commit e7c17a0

19 files changed

Lines changed: 654 additions & 157 deletions

streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@
108108
import static org.apache.kafka.common.utils.Utils.mkMap;
109109
import static org.apache.kafka.streams.query.StateQueryRequest.inStore;
110110
import static org.hamcrest.MatcherAssert.assertThat;
111+
import static org.hamcrest.Matchers.containsString;
111112
import static org.hamcrest.Matchers.empty;
112113
import static org.hamcrest.Matchers.equalTo;
113114
import static org.hamcrest.Matchers.is;
@@ -1498,14 +1499,10 @@ private <T> void shouldHandleSessionKeyDSLQueries() {
14981499
throw new AssertionError(queryResult.toString());
14991500
}
15001501
assertThat(partitionResult.getFailureReason(), is(FailureReason.UNKNOWN_QUERY_TYPE));
1501-
assertThat(partitionResult.getFailureMessage(), is(
1502-
"This store"
1503-
+ " (class org.apache.kafka.streams.state.internals.MeteredSessionStore)"
1504-
+ " doesn't know how to execute the given query"
1505-
+ " (WindowRangeQuery{key=Optional.empty, timeFrom=Optional[1970-01-01T00:00:00Z], timeTo=Optional[1970-01-01T00:00:00Z]})"
1506-
+ " because SessionStores only support WindowRangeQuery.withKey."
1507-
+ " Contact the store maintainer if you need support for a new query type."
1508-
));
1502+
assertThat(partitionResult.getFailureMessage(),
1503+
containsString("doesn't know how to execute the given query"));
1504+
assertThat(partitionResult.getFailureMessage(),
1505+
containsString("because SessionStores only support WindowRangeQuery.withKey."));
15091506
}
15101507
}
15111508
}
@@ -1571,14 +1568,10 @@ private <T> void shouldHandleSessionKeyPAPIQueries() {
15711568
throw new AssertionError(queryResult.toString());
15721569
}
15731570
assertThat(partitionResult.getFailureReason(), is(FailureReason.UNKNOWN_QUERY_TYPE));
1574-
assertThat(partitionResult.getFailureMessage(), is(
1575-
"This store"
1576-
+ " (class org.apache.kafka.streams.state.internals.MeteredSessionStore)"
1577-
+ " doesn't know how to execute the given query"
1578-
+ " (WindowRangeQuery{key=Optional.empty, timeFrom=Optional[1970-01-01T00:00:00Z], timeTo=Optional[1970-01-01T00:00:00Z]})"
1579-
+ " because SessionStores only support WindowRangeQuery.withKey."
1580-
+ " Contact the store maintainer if you need support for a new query type."
1581-
));
1571+
assertThat(partitionResult.getFailureMessage(),
1572+
containsString("doesn't know how to execute the given query"));
1573+
assertThat(partitionResult.getFailureMessage(),
1574+
containsString("because SessionStores only support WindowRangeQuery.withKey."));
15821575
}
15831576
}
15841577
}

streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java

Lines changed: 87 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717
package org.apache.kafka.streams.integration;
1818

1919
import org.apache.kafka.clients.consumer.ConsumerConfig;
20+
import org.apache.kafka.common.header.Headers;
21+
import org.apache.kafka.common.header.internals.RecordHeader;
22+
import org.apache.kafka.common.header.internals.RecordHeaders;
2023
import org.apache.kafka.common.serialization.Deserializer;
2124
import org.apache.kafka.common.serialization.IntegerDeserializer;
2225
import org.apache.kafka.common.serialization.IntegerSerializer;
@@ -71,11 +74,15 @@
7174
import org.junit.jupiter.api.Test;
7275
import org.junit.jupiter.api.TestInfo;
7376
import org.junit.jupiter.api.Timeout;
77+
import org.junit.jupiter.params.ParameterizedTest;
78+
import org.junit.jupiter.params.provider.ValueSource;
7479

7580
import java.io.ByteArrayOutputStream;
7681
import java.io.PrintStream;
82+
import java.nio.charset.StandardCharsets;
7783
import java.time.Duration;
7884
import java.util.Arrays;
85+
import java.util.Collection;
7986
import java.util.Collections;
8087
import java.util.Comparator;
8188
import java.util.HashMap;
@@ -718,8 +725,9 @@ public void shouldAggregateSlidingWindows(final TestInfo testInfo) throws Except
718725
}
719726
}
720727

721-
@Test
722-
public void shouldCountSessionWindows() throws Exception {
728+
@ParameterizedTest
729+
@ValueSource(booleans = {false, true})
730+
public void shouldCountSessionWindows(final boolean withHeaders) throws Exception {
723731
final long sessionGap = 5 * 60 * 1000L;
724732
final List<KeyValue<String, String>> t1Messages = Arrays.asList(
725733
new KeyValue<>("bob", "start"),
@@ -797,6 +805,10 @@ public void shouldCountSessionWindows() throws Exception {
797805
final Map<Windowed<String>, KeyValue<Long, Long>> results = new HashMap<>();
798806
final CountDownLatch latch = new CountDownLatch(13);
799807

808+
if (withHeaders) {
809+
streamsConfiguration.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, StreamsConfig.DSL_STORE_FORMAT_HEADERS);
810+
}
811+
800812
builder.stream(userSessionsStream, Consumed.with(Serdes.String(), Serdes.String()))
801813
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
802814
.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(ofMillis(sessionGap)))
@@ -819,88 +831,37 @@ public void shouldCountSessionWindows() throws Exception {
819831
assertThat(results.get(new Windowed<>("penny", new SessionWindow(t3, t3))), equalTo(KeyValue.pair(1L, t3)));
820832
}
821833

822-
@Test
823-
public void shouldReduceSessionWindows() throws Exception {
834+
@ParameterizedTest
835+
@ValueSource(booleans = {false, true})
836+
public void shouldReduceSessionWindows(final boolean withHeaders) throws Exception {
824837
final long sessionGap = 1000L; // something to do with time
825-
final List<KeyValue<String, String>> t1Messages = Arrays.asList(
826-
new KeyValue<>("bob", "start"),
827-
new KeyValue<>("penny", "start"),
828-
new KeyValue<>("jo", "pause"),
829-
new KeyValue<>("emily", "pause")
830-
);
838+
839+
final Properties producerConfig = TestUtils.producerConfig(
840+
CLUSTER.bootstrapServers(),
841+
StringSerializer.class,
842+
StringSerializer.class,
843+
new Properties());
831844

832845
final long t1 = mockTime.milliseconds();
833-
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
834-
userSessionsStream,
835-
t1Messages,
836-
TestUtils.producerConfig(
837-
CLUSTER.bootstrapServers(),
838-
StringSerializer.class,
839-
StringSerializer.class,
840-
new Properties()),
841-
t1
842-
);
843846
final long t2 = t1 + (sessionGap / 2);
844-
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
845-
userSessionsStream,
846-
Collections.singletonList(
847-
new KeyValue<>("emily", "resume")
848-
),
849-
TestUtils.producerConfig(
850-
CLUSTER.bootstrapServers(),
851-
StringSerializer.class,
852-
StringSerializer.class,
853-
new Properties()),
854-
t2
855-
);
856847
final long t3 = t1 + sessionGap + 1;
857-
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
858-
userSessionsStream,
859-
Arrays.asList(
860-
new KeyValue<>("bob", "pause"),
861-
new KeyValue<>("penny", "stop")
862-
),
863-
TestUtils.producerConfig(
864-
CLUSTER.bootstrapServers(),
865-
StringSerializer.class,
866-
StringSerializer.class,
867-
new Properties()),
868-
t3
869-
);
870848
final long t4 = t3 + (sessionGap / 2);
871-
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
872-
userSessionsStream,
873-
Arrays.asList(
874-
new KeyValue<>("bob", "resume"), // bobs session continues
875-
new KeyValue<>("jo", "resume") // jo's starts new session
876-
),
877-
TestUtils.producerConfig(
878-
CLUSTER.bootstrapServers(),
879-
StringSerializer.class,
880-
StringSerializer.class,
881-
new Properties()),
882-
t4
883-
);
884849
final long t5 = t4 - 1;
885-
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
886-
userSessionsStream,
887-
Collections.singletonList(
888-
new KeyValue<>("jo", "late") // jo has late arrival
889-
),
890-
TestUtils.producerConfig(
891-
CLUSTER.bootstrapServers(),
892-
StringSerializer.class,
893-
StringSerializer.class,
894-
new Properties()),
895-
t5
896-
);
850+
851+
produceSessionWindowData(producerConfig, withHeaders, t1, t2, t3, t4, t5, sessionGap);
897852

898853
final Map<Windowed<String>, KeyValue<String, Long>> results = new HashMap<>();
899854
final CountDownLatch latch = new CountDownLatch(13);
900855
final String userSessionsStore = "UserSessionsStore";
901856

857+
if (withHeaders) {
858+
streamsConfiguration.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, StreamsConfig.DSL_STORE_FORMAT_HEADERS);
859+
}
860+
902861
builder.stream(userSessionsStream, Consumed.with(Serdes.String(), Serdes.String()))
903-
.groupByKey(Grouped.with(Serdes.String(), Serdes.String())) .windowedBy(SessionWindows.ofInactivityGapAndGrace(ofMillis(sessionGap), ofMinutes(1))) .reduce((value1, value2) -> value1 + ":" + value2, Materialized.as(userSessionsStore))
862+
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
863+
.windowedBy(SessionWindows.ofInactivityGapAndGrace(ofMillis(sessionGap), ofMinutes(1)))
864+
.reduce((value1, value2) -> value1 + ":" + value2, Materialized.as(userSessionsStore))
904865
.toStream()
905866
.process(() -> (Processor<Windowed<String>, String, Object, Object>) record -> {
906867
results.put(record.key(), KeyValue.pair(record.value(), record.timestamp()));
@@ -919,17 +880,69 @@ public void shouldReduceSessionWindows() throws Exception {
919880
assertThat(results.get(new Windowed<>("bob", new SessionWindow(t3, t4))), equalTo(KeyValue.pair("pause:resume", t4)));
920881
assertThat(results.get(new Windowed<>("penny", new SessionWindow(t3, t3))), equalTo(KeyValue.pair("stop", t3)));
921882

922-
// verify can query data via IQ
883+
verifySessionStore(userSessionsStore, t1, t3, t4);
884+
885+
}
886+
887+
private void produceSessionWindowData(final Properties producerConfig,
888+
final boolean withHeaders,
889+
final long t1, final long t2, final long t3,
890+
final long t4, final long t5,
891+
final long sessionGap) throws Exception {
892+
final List<KeyValue<String, String>> t1Messages = Arrays.asList(
893+
new KeyValue<>("bob", "start"),
894+
new KeyValue<>("penny", "start"),
895+
new KeyValue<>("jo", "pause"),
896+
new KeyValue<>("emily", "pause")
897+
);
898+
899+
produceWithOptionalHeaders(t1Messages, producerConfig, withHeaders, "t1", t1);
900+
produceWithOptionalHeaders(
901+
Collections.singletonList(new KeyValue<>("emily", "resume")),
902+
producerConfig, withHeaders, "t2", t2);
903+
produceWithOptionalHeaders(
904+
Arrays.asList(new KeyValue<>("bob", "pause"), new KeyValue<>("penny", "stop")),
905+
producerConfig, withHeaders, "t3", t3);
906+
produceWithOptionalHeaders(
907+
Arrays.asList(
908+
new KeyValue<>("bob", "resume"), // bobs session continues
909+
new KeyValue<>("jo", "resume")), // jo's starts new session
910+
producerConfig, withHeaders, "t4", t4);
911+
produceWithOptionalHeaders(
912+
Collections.singletonList(new KeyValue<>("jo", "late")), // jo has late arrival
913+
producerConfig, withHeaders, "t5", t5);
914+
}
915+
916+
private void produceWithOptionalHeaders(final Collection<KeyValue<String, String>> records,
917+
final Properties producerConfig,
918+
final boolean withHeaders,
919+
final String batchId,
920+
final long timestamp) throws Exception {
921+
if (withHeaders) {
922+
final Headers headers = new RecordHeaders(Arrays.asList(
923+
new RecordHeader("batch", batchId.getBytes(StandardCharsets.UTF_8)),
924+
new RecordHeader("source", "test".getBytes(StandardCharsets.UTF_8))
925+
));
926+
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
927+
userSessionsStream, records, producerConfig, headers, timestamp, false);
928+
} else {
929+
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
930+
userSessionsStream, records, producerConfig, timestamp);
931+
}
932+
}
933+
934+
private void verifySessionStore(final String storeName,
935+
final long t1, final long t3, final long t4) throws Exception {
923936
final ReadOnlySessionStore<String, String> sessionStore =
924-
IntegrationTestUtils.getStore(userSessionsStore, kafkaStreams, QueryableStoreTypes.sessionStore());
937+
IntegrationTestUtils.getStore(storeName, kafkaStreams, QueryableStoreTypes.sessionStore());
925938

926939
try (final KeyValueIterator<Windowed<String>, String> bob = sessionStore.fetch("bob")) {
927940
assertThat(bob.next(), equalTo(KeyValue.pair(new Windowed<>("bob", new SessionWindow(t1, t1)), "start")));
928941
assertThat(bob.next(), equalTo(KeyValue.pair(new Windowed<>("bob", new SessionWindow(t3, t4)), "pause:resume")));
929942
assertFalse(bob.hasNext());
930943
}
931944
}
932-
945+
933946
@Test
934947
public void shouldCountUnlimitedWindows() throws Exception {
935948
final long startTime = mockTime.milliseconds() - TimeUnit.MILLISECONDS.convert(1, TimeUnit.HOURS) + 1;

0 commit comments

Comments
 (0)