Skip to content

Commit 9f22628

Browse files
authored
KAFKA-20220: Enable TimestampedKVStoreWithHeaders in DSL (1/N) (apache#21572)
This PR introduces the DslStoreFormat enum and extends DslKeyValueParams to enable headers-aware key-value stores in the Kafka Streams DSL, implementing the foundational infrastructure for [ KIP-1285](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1285%3A+DSL+Opt-in+Support+for+Headers-Aware+State+Stores). Reviewers: Matthias J. Sax <matthias@confluent.io>, TengYao Chi <frankvicky@apache.org>
1 parent fdeabe5 commit 9f22628

10 files changed

Lines changed: 271 additions & 16 deletions

File tree

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.streams;
18+
19+
import java.util.Locale;
20+
21+
public enum DslStoreFormat {
22+
23+
/** The non-timestamped state stores */
24+
PLAIN("PLAIN"),
25+
26+
/** The timestamped state stores */
27+
TIMESTAMPED("TIMESTAMPED"),
28+
29+
/** The headers-aware state stores */
30+
HEADERS("HEADERS");
31+
32+
/**
33+
* String representation of the DSL store format.
34+
*/
35+
public final String name;
36+
37+
DslStoreFormat(final String name) {
38+
this.name = name;
39+
}
40+
41+
/**
42+
* Case-insensitive DSL store format lookup by string name.
43+
*/
44+
public static DslStoreFormat of(final String name) {
45+
return DslStoreFormat.valueOf(name.toUpperCase(Locale.ROOT));
46+
}
47+
}

streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -537,6 +537,14 @@ public class StreamsConfig extends AbstractConfig {
537537
static final String DSL_STORE_SUPPLIERS_CLASS_DOC = "Defines which store implementations to plug in to DSL operators. Must implement the <code>org.apache.kafka.streams.state.DslStoreSuppliers</code> interface.";
538538
static final Class<?> DSL_STORE_SUPPLIERS_CLASS_DEFAULT = BuiltInDslStoreSuppliers.RocksDBDslStoreSuppliers.class;
539539

540+
/** {@code dsl.store.suppliers.class } */
541+
public static final String DSL_STORE_FORMAT_CONFIG = "dsl.store.format";
542+
public static final String DSL_STORE_FORMAT_DEFAULT = "DEFAULT";
543+
public static final String DSL_STORE_FORMAT_HEADERS = "HEADERS";
544+
private static final String DSL_STORE_FORMAT_DOC = "Specifies the state store format for DSL operators. " +
545+
"'DEFAULT' creates either timestamped or plain state stores, depending on context. " +
546+
"'HEADERS' creates headers-aware stores that preserve record headers.";
547+
540548
/** {@code default key.serde} */
541549
@SuppressWarnings("WeakerAccess")
542550
public static final String DEFAULT_KEY_SERDE_CLASS_CONFIG = "default.key.serde";
@@ -1112,6 +1120,12 @@ public class StreamsConfig extends AbstractConfig {
11121120
DSL_STORE_SUPPLIERS_CLASS_DEFAULT,
11131121
Importance.LOW,
11141122
DSL_STORE_SUPPLIERS_CLASS_DOC)
1123+
.define(DSL_STORE_FORMAT_CONFIG,
1124+
Type.STRING,
1125+
DSL_STORE_FORMAT_DEFAULT,
1126+
ConfigDef.CaseInsensitiveValidString.in(DSL_STORE_FORMAT_DEFAULT, DSL_STORE_FORMAT_HEADERS),
1127+
Importance.LOW,
1128+
DSL_STORE_FORMAT_DOC)
11151129
.define(DEFAULT_CLIENT_SUPPLIER_CONFIG,
11161130
Type.CLASS,
11171131
DefaultKafkaClientSupplier.class.getName(),

streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractConfigurableStoreFactory.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
package org.apache.kafka.streams.kstream.internals;
1818

19+
import org.apache.kafka.streams.DslStoreFormat;
1920
import org.apache.kafka.streams.StreamsConfig;
2021
import org.apache.kafka.streams.processor.internals.StoreFactory;
2122
import org.apache.kafka.streams.state.DslStoreSuppliers;
@@ -26,6 +27,7 @@
2627
public abstract class AbstractConfigurableStoreFactory implements StoreFactory {
2728
private final Set<String> connectedProcessorNames = new HashSet<>();
2829
private DslStoreSuppliers dslStoreSuppliers;
30+
private DslStoreFormat dslStoreFormat;
2931

3032
public AbstractConfigurableStoreFactory(final DslStoreSuppliers initialStoreSuppliers) {
3133
this.dslStoreSuppliers = initialStoreSuppliers;
@@ -40,13 +42,22 @@ public void configure(final StreamsConfig config) {
4042
config.originals()
4143
);
4244
}
45+
final String dslStoreFormatValue = config.getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG);
46+
if (dslStoreFormatValue.equalsIgnoreCase(StreamsConfig.DSL_STORE_FORMAT_HEADERS)) {
47+
dslStoreFormat = DslStoreFormat.HEADERS;
48+
}
49+
// else dslStoreFormat remains null and the lower layers decide between PLAIN and TIMESTAMPED
4350
}
4451

4552
@Override
4653
public Set<String> connectedProcessorNames() {
4754
return connectedProcessorNames;
4855
}
4956

57+
public DslStoreFormat dslStoreFormat() {
58+
return dslStoreFormat;
59+
}
60+
5061
protected DslStoreSuppliers dslStoreSuppliers() {
5162
if (dslStoreSuppliers == null) {
5263
throw new IllegalStateException("Expected configure() to be called before using dslStoreSuppliers");

streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.apache.kafka.streams.kstream.internals;
1818

1919
import org.apache.kafka.common.utils.Bytes;
20+
import org.apache.kafka.streams.DslStoreFormat;
2021
import org.apache.kafka.streams.state.DslKeyValueParams;
2122
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
2223
import org.apache.kafka.streams.state.KeyValueStore;
@@ -44,8 +45,9 @@ public KeyValueStoreMaterializer(
4445

4546
@Override
4647
public StoreBuilder<?> builder() {
48+
final DslStoreFormat storeFormat = dslStoreFormat() == null ? DslStoreFormat.TIMESTAMPED : DslStoreFormat.HEADERS;
4749
final KeyValueBytesStoreSupplier supplier = materialized.storeSupplier() == null
48-
? dslStoreSuppliers().keyValueStore(new DslKeyValueParams(materialized.storeName(), true))
50+
? dslStoreSuppliers().keyValueStore(new DslKeyValueParams(materialized.storeName(), storeFormat))
4951
: (KeyValueBytesStoreSupplier) materialized.storeSupplier();
5052

5153
final StoreBuilder<?> builder;
@@ -55,10 +57,17 @@ public StoreBuilder<?> builder() {
5557
materialized.keySerde(),
5658
materialized.valueSerde());
5759
} else {
58-
builder = Stores.timestampedKeyValueStoreBuilder(
60+
if (storeFormat == DslStoreFormat.HEADERS) {
61+
builder = Stores.timestampedKeyValueStoreBuilderWithHeaders(
5962
supplier,
6063
materialized.keySerde(),
6164
materialized.valueSerde());
65+
} else {
66+
builder = Stores.timestampedKeyValueStoreBuilder(
67+
supplier,
68+
materialized.keySerde(),
69+
materialized.valueSerde());
70+
}
6271
}
6372

6473
if (materialized.loggingEnabled()) {
@@ -68,10 +77,10 @@ public StoreBuilder<?> builder() {
6877
}
6978

7079
if (materialized.cachingEnabled()) {
71-
if (!(builder instanceof VersionedKeyValueStoreBuilder)) {
72-
builder.withCachingEnabled();
73-
} else {
80+
if (builder instanceof VersionedKeyValueStoreBuilder) {
7481
LOG.info("Not enabling caching for store '{}' as versioned stores do not support caching.", supplier.name());
82+
} else {
83+
builder.withCachingEnabled();
7584
}
7685
}
7786

streams/src/main/java/org/apache/kafka/streams/kstream/internals/OuterStreamJoinStoreFactory.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.apache.kafka.streams.kstream.internals;
1818

1919
import org.apache.kafka.common.utils.Time;
20+
import org.apache.kafka.streams.DslStoreFormat;
2021
import org.apache.kafka.streams.kstream.JoinWindows;
2122
import org.apache.kafka.streams.processor.internals.StoreFactory;
2223
import org.apache.kafka.streams.state.DslKeyValueParams;
@@ -95,7 +96,8 @@ public StoreBuilder<?> builder() {
9596
final TimestampedKeyAndJoinSideSerde<K> timestampedKeyAndJoinSideSerde = new TimestampedKeyAndJoinSideSerde<>(streamJoined.keySerde());
9697
final LeftOrRightValueSerde<V1, V2> leftOrRightValueSerde = new LeftOrRightValueSerde<>(streamJoined.valueSerde(), streamJoined.otherValueSerde());
9798

98-
final DslKeyValueParams dslKeyValueParams = new DslKeyValueParams(name, false);
99+
final DslStoreFormat storeFormat = dslStoreFormat() == null ? DslStoreFormat.PLAIN : DslStoreFormat.HEADERS;
100+
final DslKeyValueParams dslKeyValueParams = new DslKeyValueParams(name, storeFormat);
99101
final KeyValueBytesStoreSupplier supplier;
100102

101103
if (passedInDslStoreSuppliers != null) {

streams/src/main/java/org/apache/kafka/streams/kstream/internals/SubscriptionStoreFactory.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.kafka.common.serialization.Serde;
2121
import org.apache.kafka.common.serialization.Serdes;
22+
import org.apache.kafka.streams.DslStoreFormat;
2223
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper;
2324
import org.apache.kafka.streams.processor.internals.StoreFactory;
2425
import org.apache.kafka.streams.state.DslKeyValueParams;
@@ -46,8 +47,9 @@ public SubscriptionStoreFactory(
4647
@Override
4748
public StoreBuilder<?> builder() {
4849
StoreBuilder<?> builder;
50+
final DslStoreFormat storeFormat = dslStoreFormat() == null ? DslStoreFormat.TIMESTAMPED : DslStoreFormat.HEADERS;
4951
builder = Stores.timestampedKeyValueStoreBuilder(
50-
dslStoreSuppliers().keyValueStore(new DslKeyValueParams(name, true)),
52+
dslStoreSuppliers().keyValueStore(new DslKeyValueParams(name, storeFormat)),
5153
new Serdes.BytesSerde(),
5254
subscriptionWrapperSerde
5355
);

streams/src/main/java/org/apache/kafka/streams/state/BuiltInDslStoreSuppliers.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
package org.apache.kafka.streams.state;
1818

19+
import org.apache.kafka.streams.DslStoreFormat;
1920
import org.apache.kafka.streams.kstream.EmitStrategy;
2021
import org.apache.kafka.streams.state.internals.RocksDbIndexedTimeOrderedWindowBytesStoreSupplier;
2122
import org.apache.kafka.streams.state.internals.RocksDbTimeOrderedSessionBytesStoreSupplier;
@@ -36,9 +37,18 @@ public static class RocksDBDslStoreSuppliers implements DslStoreSuppliers {
3637

3738
@Override
3839
public KeyValueBytesStoreSupplier keyValueStore(final DslKeyValueParams params) {
39-
return params.isTimestamped()
40-
? Stores.persistentTimestampedKeyValueStore(params.name())
41-
: Stores.persistentKeyValueStore(params.name());
40+
final DslStoreFormat storeFormat = params.dslStoreFormat();
41+
switch (storeFormat) {
42+
case HEADERS:
43+
return Stores.persistentTimestampedKeyValueStoreWithHeaders(params.name());
44+
case TIMESTAMPED:
45+
return Stores.persistentTimestampedKeyValueStore(params.name());
46+
case PLAIN:
47+
return Stores.persistentKeyValueStore(params.name());
48+
default:
49+
throw new IllegalArgumentException("Unsupported DslStoreFormat: " + storeFormat +
50+
". Expected one of: HEADERS, TIMESTAMPED, or PLAIN");
51+
}
4252
}
4353

4454
@Override

streams/src/main/java/org/apache/kafka/streams/state/DslKeyValueParams.java

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
*/
1717
package org.apache.kafka.streams.state;
1818

19+
import org.apache.kafka.streams.DslStoreFormat;
20+
1921
import java.util.Objects;
2022

2123
/**
@@ -26,23 +28,52 @@ public class DslKeyValueParams {
2628

2729
private final String name;
2830
private final boolean isTimestamped;
31+
private final DslStoreFormat dslStoreFormat;
2932

3033
/**
34+
* @deprecated Since 4.3. Use {@link #DslKeyValueParams(String, DslStoreFormat)} instead.
3135
* @param name the name of the store (cannot be {@code null})
3236
* @param isTimestamped whether the returned stores should be timestamped, see ({@link TimestampedKeyValueStore}
3337
*/
38+
@Deprecated
3439
public DslKeyValueParams(final String name, final boolean isTimestamped) {
3540
Objects.requireNonNull(name);
3641
this.name = name;
3742
this.isTimestamped = isTimestamped;
43+
// If isTimestamped is false and the user is still calling the old deprecated constructor, we should assume they mean plain.
44+
this.dslStoreFormat = isTimestamped ? DslStoreFormat.TIMESTAMPED : DslStoreFormat.PLAIN;
45+
}
46+
47+
/**
48+
* @param name the name of the store (cannot be {@code null})
49+
* @param dslStoreFormat the format of the state store, see ({@link DslStoreFormat}
50+
*/
51+
public DslKeyValueParams(final String name, final DslStoreFormat dslStoreFormat) {
52+
this.name = Objects.requireNonNull(name);
53+
this.dslStoreFormat = Objects.requireNonNull(dslStoreFormat);
54+
this.isTimestamped = dslStoreFormat == DslStoreFormat.TIMESTAMPED;
3855
}
3956

4057
public String name() {
4158
return name;
4259
}
4360

61+
/**
62+
* @deprecated Since 4.3. Use {@link #dslStoreFormat()} instead to check the store format.
63+
* @return {@code true} if the store format is {@link DslStoreFormat#TIMESTAMPED}, {@code false} otherwise
64+
*/
65+
@Deprecated
4466
public boolean isTimestamped() {
45-
return isTimestamped;
67+
return dslStoreFormat == DslStoreFormat.TIMESTAMPED;
68+
}
69+
70+
/**
71+
* Returns the store format for this key-value store.
72+
*
73+
* @return the {@link DslStoreFormat} specifying whether to use plain, timestamped, or headers-aware stores
74+
*/
75+
public DslStoreFormat dslStoreFormat() {
76+
return dslStoreFormat;
4677
}
4778

4879
@Override
@@ -55,19 +86,21 @@ public boolean equals(final Object o) {
5586
}
5687
final DslKeyValueParams that = (DslKeyValueParams) o;
5788
return isTimestamped == that.isTimestamped
89+
&& dslStoreFormat == that.dslStoreFormat
5890
&& Objects.equals(name, that.name);
5991
}
6092

6193
@Override
6294
public int hashCode() {
63-
return Objects.hash(name, isTimestamped);
95+
return Objects.hash(name, isTimestamped, dslStoreFormat);
6496
}
6597

6698
@Override
6799
public String toString() {
68100
return "DslKeyValueParams{" +
69101
"name='" + name + '\'' +
70102
"isTimestamped=" + isTimestamped +
103+
"dslStoreFormat=" + dslStoreFormat +
71104
'}';
72105
}
73106
}

0 commit comments

Comments
 (0)