Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1577,6 +1577,7 @@ <KOut, VOut> KStream<KOut, VOut> process(
* <p>However, because the key cannot be modified, some restrictions apply to a {@link FixedKeyProcessor} compared
* to a {@link Processor}: for example, forwarding result records from a {@link Punctuator} is not possible.
*/
@Deprecated
<VOut> KStream<K, VOut> processValues(
final FixedKeyProcessorSupplier<? super K, ? super V, ? extends VOut> processorSupplier,
final String... stateStoreNames
Expand All @@ -1587,9 +1588,22 @@ <VOut> KStream<K, VOut> processValues(
*
* <p>Takes an additional {@link Named} parameter that is used to name the processor in the topology.
*/
@Deprecated
<VOut> KStream<K, VOut> processValues(
final FixedKeyProcessorSupplier<? super K, ? super V, ? extends VOut> processorSupplier,
final Named named,
final String... stateStoreNames
);

// newly added
<VOut> KStream<K, VOut> processFixedKey(
final FixedKeyProcessorSupplier<? super K, ? super V, ? extends VOut> processorSupplier,
final String... stateStoreNames
);

<VOut> KStream<K, VOut> processFixedKey(
final FixedKeyProcessorSupplier<? super K, ? super V, ? extends VOut> processorSupplier,
final Named named,
final String... stateStoreNames
);
}
22 changes: 22 additions & 0 deletions streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.query.StateQueryRequest;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
Expand Down Expand Up @@ -820,6 +821,7 @@ <KR> KStream<KR, V> toStream(final KeyValueMapper<? super K, ? super V, ? extend
* @see #mapValues(ValueMapper)
* @see #mapValues(ValueMapperWithKey)
*/
@Deprecated
<VR> KTable<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> transformerSupplier,
final String... stateStoreNames);

Expand Down Expand Up @@ -894,6 +896,7 @@ <VR> KTable<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super
* @see #mapValues(ValueMapper)
* @see #mapValues(ValueMapperWithKey)
*/
@Deprecated
<VR> KTable<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> transformerSupplier,
final Named named,
final String... stateStoreNames);
Expand Down Expand Up @@ -973,6 +976,7 @@ <VR> KTable<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super
* @see #mapValues(ValueMapper)
* @see #mapValues(ValueMapperWithKey)
*/
@Deprecated
<VR> KTable<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> transformerSupplier,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized,
final String... stateStoreNames);
Expand Down Expand Up @@ -1053,11 +1057,29 @@ <VR> KTable<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super
* @see #mapValues(ValueMapper)
* @see #mapValues(ValueMapperWithKey)
*/
@Deprecated
<VR> KTable<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> transformerSupplier,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized,
final Named named,
final String... stateStoreNames);

// newly added
<VOut> KTable<K, VOut> processFixedKey(final FixedKeyProcessorSupplier<? super K, ? super V, VOut> processorSupplier,
final String... stateStoreNames);

<VOut> KTable<K, VOut> processFixedKey(final FixedKeyProcessorSupplier<? super K, ? super V, VOut> processorSupplier,
final Named named,
final String... stateStoreNames);

<VOut> KTable<K, VOut> processFixedKey(final FixedKeyProcessorSupplier<? super K, ? super V, VOut> processorSupplier,
final Materialized<K, VOut, KeyValueStore<Bytes, byte[]>> materialized,
final String... stateStoreNames);

<VOut> KTable<K, VOut> processFixedKey(final FixedKeyProcessorSupplier<? super K, ? super V, VOut> processorSupplier,
final Materialized<K, VOut, KeyValueStore<Bytes, byte[]>> materialized,
final Named named,
final String... stateStoreNames);

/**
* Re-groups the records of this {@code KTable} using the provided {@link KeyValueMapper} and default serializers
* and deserializers.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
* @see Transformer
*/

@Deprecated
public interface ValueTransformerWithKey<K, V, VR> {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
* @see Transformer
* @see TransformerSupplier
*/
@Deprecated
@FunctionalInterface
public interface ValueTransformerWithKeySupplier<K, V, VR> extends ConnectedStoreProvider, Supplier<ValueTransformerWithKey<K, V, VR>> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1326,6 +1326,7 @@ public <KOut, VOut> KStream<KOut, VOut> process(
builder);
}

@SuppressWarnings("deprecation")
@Override
public <VOut> KStream<K, VOut> processValues(
final FixedKeyProcessorSupplier<? super K, ? super V, ? extends VOut> processorSupplier,
Expand All @@ -1338,6 +1339,7 @@ public <VOut> KStream<K, VOut> processValues(
);
}

@SuppressWarnings("deprecation")
@Override
public <VOut> KStream<K, VOut> processValues(
final FixedKeyProcessorSupplier<? super K, ? super V, ? extends VOut> processorSupplier,
Expand Down Expand Up @@ -1372,4 +1374,43 @@ public <VOut> KStream<K, VOut> processValues(
processNode,
builder);
}

@Override
public <VOut> KStream<K, VOut> processFixedKey(final FixedKeyProcessorSupplier<? super K, ? super V, ? extends VOut> processorSupplier, final String... stateStoreNames) {
return processFixedKey(
processorSupplier,
Named.as(builder.newProcessorName(PROCESSVALUES_NAME)),
stateStoreNames
);
}

@Override
public <VOut> KStream<K, VOut> processFixedKey(final FixedKeyProcessorSupplier<? super K, ? super V, ? extends VOut> processorSupplier, final Named named, final String... stateStoreNames) {
ApiUtils.checkSupplier(processorSupplier);
Objects.requireNonNull(named, "named cannot be null");
Objects.requireNonNull(stateStoreNames, "stateStoreNames cannot be a null array");
for (final String stateStoreName : stateStoreNames) {
Objects.requireNonNull(stateStoreName, "state store name cannot be null");
}

final String name = new NamedInternal(named).name();
final ProcessorToStateConnectorNode<? super K, ? super V> processNode = new ProcessorToStateConnectorNode<>(
name,
new ProcessorParameters<>(processorSupplier, name),
stateStoreNames
);

processNode.setValueChangingOperation(true);

builder.addGraphNode(graphNode, processNode);
// cannot inherit value serde
return new KStreamImpl<>(
name,
keySerde,
null,
subTopologySourceNodes,
repartitionRequired,
processNode,
builder);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.apache.kafka.streams.kstream.internals.suppress.NamedSuppressed;
import org.apache.kafka.streams.kstream.internals.suppress.SuppressedInternal;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.internals.InternalResourcesNaming;
import org.apache.kafka.streams.processor.internals.InternalTopicProperties;
Expand Down Expand Up @@ -126,6 +127,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<

private static final String TRANSFORMVALUES_NAME = "KTABLE-TRANSFORMVALUES-";

private static final String PROCESSFIXEDKEY_NAME = "KTABLE-PROCESSFIXEDKEY-";

private static final String FK_JOIN = "KTABLE-FK-JOIN-";
private static final String FK_JOIN_STATE_STORE_NAME = FK_JOIN + "SUBSCRIPTION-STATE-STORE-";
private static final String SUBSCRIPTION_REGISTRATION = FK_JOIN + "SUBSCRIPTION-REGISTRATION-";
Expand Down Expand Up @@ -396,12 +399,14 @@ public <VR> KTable<K, VR> mapValues(final ValueMapperWithKey<? super K, ? super
return doMapValues(mapper, named, materializedInternal);
}

@SuppressWarnings("deprecation")
@Override
public <VR> KTable<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> transformerSupplier,
final String... stateStoreNames) {
return doTransformValues(transformerSupplier, null, NamedInternal.empty(), stateStoreNames);
}

@SuppressWarnings("deprecation")
@Override
public <VR> KTable<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> transformerSupplier,
final Named named,
Expand All @@ -410,13 +415,15 @@ public <VR> KTable<K, VR> transformValues(final ValueTransformerWithKeySupplier<
return doTransformValues(transformerSupplier, null, new NamedInternal(named), stateStoreNames);
}

@SuppressWarnings("deprecation")
@Override
public <VR> KTable<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> transformerSupplier,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized,
final String... stateStoreNames) {
return transformValues(transformerSupplier, materialized, NamedInternal.empty(), stateStoreNames);
}

@SuppressWarnings("deprecation")
@Override
public <VR> KTable<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> transformerSupplier,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized,
Expand All @@ -429,7 +436,39 @@ public <VR> KTable<K, VR> transformValues(final ValueTransformerWithKeySupplier<
return doTransformValues(transformerSupplier, materializedInternal, new NamedInternal(named), stateStoreNames);
}

@SuppressWarnings("resource")
@Override
public <VOut> KTable<K, VOut> processFixedKey(final FixedKeyProcessorSupplier<? super K, ? super V, VOut> processorSupplier,
final String... stateStoreNames) {
return doProcessFixedKey(processorSupplier, null, NamedInternal.empty(), stateStoreNames);
}

@Override
public <VOut> KTable<K, VOut> processFixedKey(final FixedKeyProcessorSupplier<? super K, ? super V, VOut> processorSupplier,
final Named named,
final String... stateStoreNames) {
Objects.requireNonNull(named, "named can't be null");
return doProcessFixedKey(processorSupplier, null, new NamedInternal(named), stateStoreNames);
}

@Override
public <VOut> KTable<K, VOut> processFixedKey(final FixedKeyProcessorSupplier<? super K, ? super V, VOut> processorSupplier,
final Materialized<K, VOut, KeyValueStore<Bytes, byte[]>> materialized,
final String... stateStoreNames) {
return processFixedKey(processorSupplier, materialized, NamedInternal.empty(), stateStoreNames);
}

@Override
public <VOut> KTable<K, VOut> processFixedKey(final FixedKeyProcessorSupplier<? super K, ? super V, VOut> processorSupplier,
final Materialized<K, VOut, KeyValueStore<Bytes, byte[]>> materialized,
final Named named,
final String... stateStoreNames) {
Objects.requireNonNull(materialized, "materialized can't be null");
Objects.requireNonNull(named, "named can't be null");
final MaterializedInternal<K, VOut, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
return doProcessFixedKey(processorSupplier, materializedInternal, new NamedInternal(named), stateStoreNames);
}

@SuppressWarnings({"resource", "deprecation"})
private <VR> KTable<K, VR> doTransformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> transformerSupplier,
final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal,
final NamedInternal namedInternal,
Expand Down Expand Up @@ -497,6 +536,74 @@ private <VR> KTable<K, VR> doTransformValues(final ValueTransformerWithKeySuppli
builder);
}

@SuppressWarnings("resource")
private <VOut> KTable<K, VOut> doProcessFixedKey(final FixedKeyProcessorSupplier<? super K, ? super V, VOut> supplier,
final MaterializedInternal<K, VOut, KeyValueStore<Bytes, byte[]>> materializedInternal,
final NamedInternal namedInternal,
final String... stateStoreNames) {
Objects.requireNonNull(stateStoreNames, "stateStoreNames");
final Serde<K> keySerde;
final Serde<VOut> valueSerde;
final String queryableStoreName;
final Set<StoreBuilder<?>> storeBuilder;

if (materializedInternal != null) {
// don't inherit parent value serde, since this operation may change the value type, more specifically:
// we preserve the key following the order of 1) materialized, 2) parent, 3) null
keySerde = materializedInternal.keySerde() != null ? materializedInternal.keySerde() : this.keySerde;
// we preserve the value following the order of 1) materialized, 2) null
valueSerde = materializedInternal.valueSerde();
queryableStoreName = materializedInternal.queryableStoreName();
// only materialize if materialized is specified and it has queryable name
if (queryableStoreName != null) {
final StoreFactory storeFactory = new KeyValueStoreMaterializer<>(materializedInternal);
storeBuilder = Set.of(new FactoryWrappingStoreBuilder<>(storeFactory));
} else {
storeBuilder = null;
}
} else {
keySerde = this.keySerde;
valueSerde = null;
queryableStoreName = null;
storeBuilder = null;
}

final String name = namedInternal.orElseGenerateWithPrefix(builder, PROCESSFIXEDKEY_NAME);

final KTableProcessorSupplier<K, V, K, VOut> tableProcessorSupplier = new KTableProcessFixedKey<>(
this,
supplier,
queryableStoreName);

final ProcessorParameters<K, VOut, ?, ?> processorParameters =
unsafeCastProcessorParametersToCompletelyDifferentType(
new ProcessorParameters<>(
new StoreDelegatingProcessorSupplier<>(
tableProcessorSupplier,
storeBuilder),
name
));

final GraphNode tableNode = new ProcessorToStateConnectorNode<>(
name,
processorParameters,
stateStoreNames
);
maybeSetOutputVersioned(tableNode, materializedInternal);

builder.addGraphNode(this.graphNode, tableNode);

return new KTableImpl<>(
name,
keySerde,
valueSerde,
subTopologySourceNodes,
queryableStoreName,
tableProcessorSupplier,
tableNode,
builder);
}

@Override
public KStream<K, V> toStream() {
return toStream(NamedInternal.empty());
Expand Down
Loading
Loading