Skip to content

Commit e11ee48

Browse files
committed
kip-1128-poc
1 parent 89f3888 commit e11ee48

10 files changed

Lines changed: 1425 additions & 5 deletions

File tree

streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1577,6 +1577,7 @@ <KOut, VOut> KStream<KOut, VOut> process(
15771577
* <p>However, because the key cannot be modified, some restrictions apply to a {@link FixedKeyProcessor} compared
15781578
* to a {@link Processor}: for example, forwarding result records from a {@link Punctuator} is not possible.
15791579
*/
1580+
@Deprecated
15801581
<VOut> KStream<K, VOut> processValues(
15811582
final FixedKeyProcessorSupplier<? super K, ? super V, ? extends VOut> processorSupplier,
15821583
final String... stateStoreNames
@@ -1587,9 +1588,22 @@ <VOut> KStream<K, VOut> processValues(
15871588
*
15881589
* <p>Takes an additional {@link Named} parameter that is used to name the processor in the topology.
15891590
*/
1591+
@Deprecated
15901592
<VOut> KStream<K, VOut> processValues(
15911593
final FixedKeyProcessorSupplier<? super K, ? super V, ? extends VOut> processorSupplier,
15921594
final Named named,
15931595
final String... stateStoreNames
15941596
);
1597+
1598+
// newly added
1599+
<VOut> KStream<K, VOut> processFixedKey(
1600+
final FixedKeyProcessorSupplier<? super K, ? super V, ? extends VOut> processorSupplier,
1601+
final String... stateStoreNames
1602+
);
1603+
1604+
<VOut> KStream<K, VOut> processFixedKey(
1605+
final FixedKeyProcessorSupplier<? super K, ? super V, ? extends VOut> processorSupplier,
1606+
final Named named,
1607+
final String... stateStoreNames
1608+
);
15951609
}

streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.kafka.streams.processor.ProcessorContext;
2828
import org.apache.kafka.streams.processor.StateStore;
2929
import org.apache.kafka.streams.processor.StreamPartitioner;
30+
import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier;
3031
import org.apache.kafka.streams.processor.api.Record;
3132
import org.apache.kafka.streams.query.StateQueryRequest;
3233
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
@@ -820,6 +821,7 @@ <KR> KStream<KR, V> toStream(final KeyValueMapper<? super K, ? super V, ? extend
820821
* @see #mapValues(ValueMapper)
821822
* @see #mapValues(ValueMapperWithKey)
822823
*/
824+
@Deprecated
823825
<VR> KTable<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> transformerSupplier,
824826
final String... stateStoreNames);
825827

@@ -894,6 +896,7 @@ <VR> KTable<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super
894896
* @see #mapValues(ValueMapper)
895897
* @see #mapValues(ValueMapperWithKey)
896898
*/
899+
@Deprecated
897900
<VR> KTable<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> transformerSupplier,
898901
final Named named,
899902
final String... stateStoreNames);
@@ -973,6 +976,7 @@ <VR> KTable<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super
973976
* @see #mapValues(ValueMapper)
974977
* @see #mapValues(ValueMapperWithKey)
975978
*/
979+
@Deprecated
976980
<VR> KTable<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> transformerSupplier,
977981
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized,
978982
final String... stateStoreNames);
@@ -1053,11 +1057,29 @@ <VR> KTable<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super
10531057
* @see #mapValues(ValueMapper)
10541058
* @see #mapValues(ValueMapperWithKey)
10551059
*/
1060+
@Deprecated
10561061
<VR> KTable<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> transformerSupplier,
10571062
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized,
10581063
final Named named,
10591064
final String... stateStoreNames);
10601065

1066+
// newly added
1067+
<VOut> KTable<K, VOut> processFixedKey(final FixedKeyProcessorSupplier<? super K, ? super V, VOut> processorSupplier,
1068+
final String... stateStoreNames);
1069+
1070+
<VOut> KTable<K, VOut> processFixedKey(final FixedKeyProcessorSupplier<? super K, ? super V, VOut> processorSupplier,
1071+
final Named named,
1072+
final String... stateStoreNames);
1073+
1074+
<VOut> KTable<K, VOut> processFixedKey(final FixedKeyProcessorSupplier<? super K, ? super V, VOut> processorSupplier,
1075+
final Materialized<K, VOut, KeyValueStore<Bytes, byte[]>> materialized,
1076+
final String... stateStoreNames);
1077+
1078+
<VOut> KTable<K, VOut> processFixedKey(final FixedKeyProcessorSupplier<? super K, ? super V, VOut> processorSupplier,
1079+
final Materialized<K, VOut, KeyValueStore<Bytes, byte[]>> materialized,
1080+
final Named named,
1081+
final String... stateStoreNames);
1082+
10611083
/**
10621084
* Re-groups the records of this {@code KTable} using the provided {@link KeyValueMapper} and default serializers
10631085
* and deserializers.

streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKey.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
* @see Transformer
5252
*/
5353

54+
@Deprecated
5455
public interface ValueTransformerWithKey<K, V, VR> {
5556

5657
/**

streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKeySupplier.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
* @see Transformer
3636
* @see TransformerSupplier
3737
*/
38+
@Deprecated
3839
@FunctionalInterface
3940
public interface ValueTransformerWithKeySupplier<K, V, VR> extends ConnectedStoreProvider, Supplier<ValueTransformerWithKey<K, V, VR>> {
4041

streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1326,6 +1326,7 @@ public <KOut, VOut> KStream<KOut, VOut> process(
13261326
builder);
13271327
}
13281328

1329+
@SuppressWarnings("deprecation")
13291330
@Override
13301331
public <VOut> KStream<K, VOut> processValues(
13311332
final FixedKeyProcessorSupplier<? super K, ? super V, ? extends VOut> processorSupplier,
@@ -1338,6 +1339,7 @@ public <VOut> KStream<K, VOut> processValues(
13381339
);
13391340
}
13401341

1342+
@SuppressWarnings("deprecation")
13411343
@Override
13421344
public <VOut> KStream<K, VOut> processValues(
13431345
final FixedKeyProcessorSupplier<? super K, ? super V, ? extends VOut> processorSupplier,
@@ -1372,4 +1374,43 @@ public <VOut> KStream<K, VOut> processValues(
13721374
processNode,
13731375
builder);
13741376
}
1377+
1378+
@Override
1379+
public <VOut> KStream<K, VOut> processFixedKey(final FixedKeyProcessorSupplier<? super K, ? super V, ? extends VOut> processorSupplier, final String... stateStoreNames) {
1380+
return processFixedKey(
1381+
processorSupplier,
1382+
Named.as(builder.newProcessorName(PROCESSVALUES_NAME)),
1383+
stateStoreNames
1384+
);
1385+
}
1386+
1387+
@Override
1388+
public <VOut> KStream<K, VOut> processFixedKey(final FixedKeyProcessorSupplier<? super K, ? super V, ? extends VOut> processorSupplier, final Named named, final String... stateStoreNames) {
1389+
ApiUtils.checkSupplier(processorSupplier);
1390+
Objects.requireNonNull(named, "named cannot be null");
1391+
Objects.requireNonNull(stateStoreNames, "stateStoreNames cannot be a null array");
1392+
for (final String stateStoreName : stateStoreNames) {
1393+
Objects.requireNonNull(stateStoreName, "state store name cannot be null");
1394+
}
1395+
1396+
final String name = new NamedInternal(named).name();
1397+
final ProcessorToStateConnectorNode<? super K, ? super V> processNode = new ProcessorToStateConnectorNode<>(
1398+
name,
1399+
new ProcessorParameters<>(processorSupplier, name),
1400+
stateStoreNames
1401+
);
1402+
1403+
processNode.setValueChangingOperation(true);
1404+
1405+
builder.addGraphNode(graphNode, processNode);
1406+
// cannot inherit value serde
1407+
return new KStreamImpl<>(
1408+
name,
1409+
keySerde,
1410+
null,
1411+
subTopologySourceNodes,
1412+
repartitionRequired,
1413+
processNode,
1414+
builder);
1415+
}
13751416
}

streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java

Lines changed: 108 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
import org.apache.kafka.streams.kstream.internals.suppress.NamedSuppressed;
6666
import org.apache.kafka.streams.kstream.internals.suppress.SuppressedInternal;
6767
import org.apache.kafka.streams.processor.StreamPartitioner;
68+
import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier;
6869
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
6970
import org.apache.kafka.streams.processor.internals.InternalResourcesNaming;
7071
import org.apache.kafka.streams.processor.internals.InternalTopicProperties;
@@ -126,6 +127,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
126127

127128
private static final String TRANSFORMVALUES_NAME = "KTABLE-TRANSFORMVALUES-";
128129

130+
private static final String PROCESSFIXEDKEY_NAME = "KTABLE-PROCESSFIXEDKEY-";
131+
129132
private static final String FK_JOIN = "KTABLE-FK-JOIN-";
130133
private static final String FK_JOIN_STATE_STORE_NAME = FK_JOIN + "SUBSCRIPTION-STATE-STORE-";
131134
private static final String SUBSCRIPTION_REGISTRATION = FK_JOIN + "SUBSCRIPTION-REGISTRATION-";
@@ -396,12 +399,14 @@ public <VR> KTable<K, VR> mapValues(final ValueMapperWithKey<? super K, ? super
396399
return doMapValues(mapper, named, materializedInternal);
397400
}
398401

402+
@SuppressWarnings("deprecation")
399403
@Override
400404
public <VR> KTable<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> transformerSupplier,
401405
final String... stateStoreNames) {
402406
return doTransformValues(transformerSupplier, null, NamedInternal.empty(), stateStoreNames);
403407
}
404408

409+
@SuppressWarnings("deprecation")
405410
@Override
406411
public <VR> KTable<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> transformerSupplier,
407412
final Named named,
@@ -410,13 +415,15 @@ public <VR> KTable<K, VR> transformValues(final ValueTransformerWithKeySupplier<
410415
return doTransformValues(transformerSupplier, null, new NamedInternal(named), stateStoreNames);
411416
}
412417

418+
@SuppressWarnings("deprecation")
413419
@Override
414420
public <VR> KTable<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> transformerSupplier,
415421
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized,
416422
final String... stateStoreNames) {
417423
return transformValues(transformerSupplier, materialized, NamedInternal.empty(), stateStoreNames);
418424
}
419425

426+
@SuppressWarnings("deprecation")
420427
@Override
421428
public <VR> KTable<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> transformerSupplier,
422429
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized,
@@ -429,7 +436,39 @@ public <VR> KTable<K, VR> transformValues(final ValueTransformerWithKeySupplier<
429436
return doTransformValues(transformerSupplier, materializedInternal, new NamedInternal(named), stateStoreNames);
430437
}
431438

432-
@SuppressWarnings("resource")
439+
@Override
440+
public <VOut> KTable<K, VOut> processFixedKey(final FixedKeyProcessorSupplier<? super K, ? super V, VOut> processorSupplier,
441+
final String... stateStoreNames) {
442+
return doProcessFixedKey(processorSupplier, null, NamedInternal.empty(), stateStoreNames);
443+
}
444+
445+
@Override
446+
public <VOut> KTable<K, VOut> processFixedKey(final FixedKeyProcessorSupplier<? super K, ? super V, VOut> processorSupplier,
447+
final Named named,
448+
final String... stateStoreNames) {
449+
Objects.requireNonNull(named, "named can't be null");
450+
return doProcessFixedKey(processorSupplier, null, new NamedInternal(named), stateStoreNames);
451+
}
452+
453+
@Override
454+
public <VOut> KTable<K, VOut> processFixedKey(final FixedKeyProcessorSupplier<? super K, ? super V, VOut> processorSupplier,
455+
final Materialized<K, VOut, KeyValueStore<Bytes, byte[]>> materialized,
456+
final String... stateStoreNames) {
457+
return processFixedKey(processorSupplier, materialized, NamedInternal.empty(), stateStoreNames);
458+
}
459+
460+
@Override
461+
public <VOut> KTable<K, VOut> processFixedKey(final FixedKeyProcessorSupplier<? super K, ? super V, VOut> processorSupplier,
462+
final Materialized<K, VOut, KeyValueStore<Bytes, byte[]>> materialized,
463+
final Named named,
464+
final String... stateStoreNames) {
465+
Objects.requireNonNull(materialized, "materialized can't be null");
466+
Objects.requireNonNull(named, "named can't be null");
467+
final MaterializedInternal<K, VOut, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
468+
return doProcessFixedKey(processorSupplier, materializedInternal, new NamedInternal(named), stateStoreNames);
469+
}
470+
471+
@SuppressWarnings({"resource", "deprecation"})
433472
private <VR> KTable<K, VR> doTransformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> transformerSupplier,
434473
final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal,
435474
final NamedInternal namedInternal,
@@ -497,6 +536,74 @@ private <VR> KTable<K, VR> doTransformValues(final ValueTransformerWithKeySuppli
497536
builder);
498537
}
499538

539+
@SuppressWarnings("resource")
540+
private <VOut> KTable<K, VOut> doProcessFixedKey(final FixedKeyProcessorSupplier<? super K, ? super V, VOut> supplier,
541+
final MaterializedInternal<K, VOut, KeyValueStore<Bytes, byte[]>> materializedInternal,
542+
final NamedInternal namedInternal,
543+
final String... stateStoreNames) {
544+
Objects.requireNonNull(stateStoreNames, "stateStoreNames");
545+
final Serde<K> keySerde;
546+
final Serde<VOut> valueSerde;
547+
final String queryableStoreName;
548+
final Set<StoreBuilder<?>> storeBuilder;
549+
550+
if (materializedInternal != null) {
551+
// don't inherit parent value serde, since this operation may change the value type, more specifically:
552+
// we preserve the key following the order of 1) materialized, 2) parent, 3) null
553+
keySerde = materializedInternal.keySerde() != null ? materializedInternal.keySerde() : this.keySerde;
554+
// we preserve the value following the order of 1) materialized, 2) null
555+
valueSerde = materializedInternal.valueSerde();
556+
queryableStoreName = materializedInternal.queryableStoreName();
557+
// only materialize if materialized is specified and it has queryable name
558+
if (queryableStoreName != null) {
559+
final StoreFactory storeFactory = new KeyValueStoreMaterializer<>(materializedInternal);
560+
storeBuilder = Set.of(new FactoryWrappingStoreBuilder<>(storeFactory));
561+
} else {
562+
storeBuilder = null;
563+
}
564+
} else {
565+
keySerde = this.keySerde;
566+
valueSerde = null;
567+
queryableStoreName = null;
568+
storeBuilder = null;
569+
}
570+
571+
final String name = namedInternal.orElseGenerateWithPrefix(builder, PROCESSFIXEDKEY_NAME);
572+
573+
final KTableProcessorSupplier<K, V, K, VOut> tableProcessorSupplier = new KTableProcessFixedKey<>(
574+
this,
575+
supplier,
576+
queryableStoreName);
577+
578+
final ProcessorParameters<K, VOut, ?, ?> processorParameters =
579+
unsafeCastProcessorParametersToCompletelyDifferentType(
580+
new ProcessorParameters<>(
581+
new StoreDelegatingProcessorSupplier<>(
582+
tableProcessorSupplier,
583+
storeBuilder),
584+
name
585+
));
586+
587+
final GraphNode tableNode = new ProcessorToStateConnectorNode<>(
588+
name,
589+
processorParameters,
590+
stateStoreNames
591+
);
592+
maybeSetOutputVersioned(tableNode, materializedInternal);
593+
594+
builder.addGraphNode(this.graphNode, tableNode);
595+
596+
return new KTableImpl<>(
597+
name,
598+
keySerde,
599+
valueSerde,
600+
subTopologySourceNodes,
601+
queryableStoreName,
602+
tableProcessorSupplier,
603+
tableNode,
604+
builder);
605+
}
606+
500607
@Override
501608
public KStream<K, V> toStream() {
502609
return toStream(NamedInternal.empty());

0 commit comments

Comments
 (0)