Skip to content

Commit 7b8549f

Browse files
authored
KAFKA-20194: Ensure backward compatibility for Sliding Windows (apache#21949)
By default, the DSL should expose it's state-stores as ts-stores, as long as header format is not enabled; otherwise, it would be a backward incompatible change. This PR ensures that the builders are creating the correct state stores, depending on the format, and we insert an adaptor to allow the DSL Processors to work only against headers-store interface. Reviewers: Bill Bejeck <bbejeck@apache.org>
1 parent 7352188 commit 7b8549f

5 files changed

Lines changed: 206 additions & 38 deletions

File tree

streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/PapiDslIntegrationTest.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.kafka.streams.kstream.Materialized;
3535
import org.apache.kafka.streams.kstream.Produced;
3636
import org.apache.kafka.streams.kstream.SessionWindows;
37+
import org.apache.kafka.streams.kstream.SlidingWindows;
3738
import org.apache.kafka.streams.kstream.TimeWindows;
3839
import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
3940
import org.apache.kafka.streams.kstream.Windowed;
@@ -432,6 +433,64 @@ public void processorShouldAccessKStreamSessionAggregateOnWindowCloseKTableStore
432433
);
433434
}
434435

436+
@Test
437+
public void processorShouldAccessKStreamSlidingReducedKTableStoreAsTimestampedStore() {
438+
verifyWindow(builder
439+
.stream("input-topic", Consumed.with(Serdes.String(), Serdes.String()))
440+
.groupByKey()
441+
.windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofHours(1L)))
442+
.reduce(
443+
(value, aggregate) -> value,
444+
Materialized.<String, String, WindowStore<Bytes, byte[]>>as("table-store").withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
445+
)
446+
);
447+
}
448+
449+
@Test
450+
public void processorShouldAccessKStreamSlidingReducedOnWindowCloseKTableStoreAsTimestampedStore() {
451+
verifyWindow(builder
452+
.stream("input-topic", Consumed.with(Serdes.String(), Serdes.String()))
453+
.groupByKey()
454+
.windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofHours(1L)))
455+
.emitStrategy(EmitStrategy.onWindowClose())
456+
.reduce(
457+
(value, aggregate) -> value,
458+
Materialized.<String, String, WindowStore<Bytes, byte[]>>as("table-store").withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
459+
),
460+
true
461+
);
462+
}
463+
464+
@Test
465+
public void processorShouldAccessKStreamSlidingAggregatedKTableStoreAsTimestampedStore() {
466+
verifyWindow(builder
467+
.stream("input-topic", Consumed.with(Serdes.String(), Serdes.String()))
468+
.groupByKey()
469+
.windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofHours(1L)))
470+
.aggregate(
471+
() -> "",
472+
(key, value, aggregate) -> value,
473+
Materialized.<String, String, WindowStore<Bytes, byte[]>>as("table-store").withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
474+
)
475+
);
476+
}
477+
478+
@Test
479+
public void processorShouldAccessKStreamSlidingAggregatedOnWindowCloseKTableStoreAsTimestampedStore() {
480+
verifyWindow(builder
481+
.stream("input-topic", Consumed.with(Serdes.String(), Serdes.String()))
482+
.groupByKey()
483+
.windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofHours(1L)))
484+
.emitStrategy(EmitStrategy.onWindowClose())
485+
.aggregate(
486+
() -> "",
487+
(key, value, aggregate) -> value,
488+
Materialized.<String, String, WindowStore<Bytes, byte[]>>as("table-store").withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
489+
),
490+
true
491+
);
492+
}
493+
435494
@Test
436495
public void processorShouldAccessKTableStoreAsHeadersStoreViaConfig() {
437496
final StreamsBuilder builder = new StreamsBuilder();

streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,14 @@
1919
import org.apache.kafka.clients.consumer.ConsumerRecord;
2020
import org.apache.kafka.common.header.internals.RecordHeaders;
2121
import org.apache.kafka.streams.KeyValue;
22+
import org.apache.kafka.streams.errors.InvalidStateStoreException;
2223
import org.apache.kafka.streams.kstream.Aggregator;
2324
import org.apache.kafka.streams.kstream.EmitStrategy;
2425
import org.apache.kafka.streams.kstream.Initializer;
2526
import org.apache.kafka.streams.kstream.SlidingWindows;
2627
import org.apache.kafka.streams.kstream.Window;
2728
import org.apache.kafka.streams.kstream.Windowed;
29+
import org.apache.kafka.streams.processor.StateStore;
2830
import org.apache.kafka.streams.processor.api.Processor;
2931
import org.apache.kafka.streams.processor.api.ProcessorContext;
3032
import org.apache.kafka.streams.processor.api.Record;
@@ -36,6 +38,7 @@
3638
import org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
3739
import org.apache.kafka.streams.state.ValueTimestampHeaders;
3840
import org.apache.kafka.streams.state.WindowStoreIterator;
41+
import org.apache.kafka.streams.state.internals.WindowedTimestampedHeadersStoreToWindowedTimestampedStoreAdapter;
3942

4043
import org.slf4j.Logger;
4144
import org.slf4j.LoggerFactory;
@@ -484,7 +487,7 @@ private void updateWindowAndForward(final Window window,
484487

485488
@Override
486489
public KTableValueGetterSupplier<Windowed<KIn>, VAgg> view() {
487-
return new KTableValueGetterSupplier<Windowed<KIn>, VAgg>() {
490+
return new KTableValueGetterSupplier<>() {
488491

489492
public KTableValueGetter<Windowed<KIn>, VAgg> get() {
490493
return new KStreamWindowAggregateValueGetter();
@@ -502,7 +505,21 @@ private class KStreamWindowAggregateValueGetter implements KTableValueGetter<Win
502505

503506
@Override
504507
public void init(final ProcessorContext<?, ?> context) {
505-
windowStore = context.getStateStore(storeName);
508+
try {
509+
windowStore = new WindowedTimestampedHeadersStoreToWindowedTimestampedStoreAdapter<>(context.getStateStore(storeName));
510+
} catch (final ClassCastException swallow) {
511+
// not timestamped store
512+
513+
// Try headers-aware timestamped store
514+
try {
515+
windowStore = context.getStateStore(storeName);
516+
} catch (final ClassCastException fatal) {
517+
final StateStore store = context.getStateStore(storeName);
518+
final String storeType = store == null ? "null" : store.getClass().getName();
519+
throw new InvalidStateStoreException("Sliding-KTable state store must implement either "
520+
+ "TimestampedWindowStore, or TimestampedWindowStoreWithHeaders. Got: " + storeType);
521+
}
522+
}
506523
}
507524

508525
@Override
@@ -511,9 +528,6 @@ public ValueTimestampHeaders<VAgg> get(final Windowed<KIn> windowedKey) {
511528
return windowStore.fetch(key, windowedKey.window().start());
512529
}
513530

514-
@Override
515-
public void close() {}
516-
517531
@Override
518532
public boolean isVersioned() {
519533
return false;

streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowStoreMaterializer.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@
2121
import org.apache.kafka.streams.kstream.EmitStrategy;
2222
import org.apache.kafka.streams.kstream.SlidingWindows;
2323
import org.apache.kafka.streams.state.DslWindowParams;
24+
import org.apache.kafka.streams.state.HeadersBytesStoreSupplier;
2425
import org.apache.kafka.streams.state.StoreBuilder;
2526
import org.apache.kafka.streams.state.Stores;
26-
import org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
2727
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
2828
import org.apache.kafka.streams.state.WindowStore;
2929

@@ -71,12 +71,20 @@ public StoreBuilder<?> builder() {
7171
))
7272
: (WindowBytesStoreSupplier) materialized.storeSupplier();
7373

74-
final StoreBuilder<TimestampedWindowStoreWithHeaders<K, V>> builder = Stores
75-
.timestampedWindowStoreWithHeadersBuilder(
74+
final StoreBuilder<?> builder;
75+
if (supplier instanceof HeadersBytesStoreSupplier) {
76+
builder = Stores.timestampedWindowStoreWithHeadersBuilder(
7677
supplier,
7778
materialized.keySerde(),
7879
materialized.valueSerde()
7980
);
81+
} else {
82+
builder = Stores.timestampedWindowStoreBuilder(
83+
supplier,
84+
materialized.keySerde(),
85+
materialized.valueSerde()
86+
);
87+
}
8088

8189
if (materialized.loggingEnabled()) {
8290
builder.withLoggingEnabled(materialized.logConfig());

0 commit comments

Comments
 (0)