Skip to content

Commit f15e2ac

Browse files
Plamen PilevPlamen Pilev
Plamen Pilev
authored and
Plamen Pilev
committed
extend Transform and TransformValues with state stores
1 parent ef8a8fe commit f15e2ac

File tree

2 files changed

+15
-13
lines changed

2 files changed

+15
-13
lines changed

core/Stream/IKStream.cs

+9-7
Original file line numberDiff line numberDiff line change
@@ -1964,12 +1964,12 @@ void ForeachAsync(
19641964
/// <param name="processorSupplier">an instance of <see cref="ProcessorSupplier{K,V}"/> which contains the processor and a potential state store. Use <see cref="ProcessorBuilder"/> to build this supplier.</param>
19651965
/// <param name="named">A <see cref="string"/> config used to name the processor in the topology. Default : null</param>
19661966
/// <param name="storeNames">The names of the state stores used by the processor.</param>
1967-
void Process(ProcessorSupplier<K, V> processorSupplier, string named = null, params string[] storeNames);
1968-
1967+
void Process(ProcessorSupplier<K, V> processorSupplier, string named = null, params string[] storeNames);
1968+
19691969
#endregion
1970-
1970+
19711971
#region Transform
1972-
1972+
19731973
/// <summary>
19741974
/// Transform each record of the input stream into zero or one record in the output stream (both key and value type
19751975
/// can be altered arbitrarily).
@@ -2010,11 +2010,12 @@ void ForeachAsync(
20102010
/// </summary>
20112011
/// <param name="transformerSupplier">an instance of <see cref="TransformerSupplier{K,V,K1,V1}"/> which contains the transformer</param>
20122012
/// <param name="named">A <see cref="string"/> config used to name the processor in the topology. Default : null</param>
2013+
/// <param name="storeNames">The names of the state stores used by the processor.</param>
20132014
/// <typeparam name="K1">the key type of the new stream</typeparam>
20142015
/// <typeparam name="V1">the value type of the new stream</typeparam>
20152016
/// <returns>a <see cref="IKStream{K1,V1}"/> that contains more or less records with new key and value (possibly of different type)</returns>
2016-
IKStream<K1, V1> Transform<K1, V1>(TransformerSupplier<K, V, K1, V1> transformerSupplier, string named = null);
2017-
2017+
IKStream<K1, V1> Transform<K1, V1>(TransformerSupplier<K, V, K1, V1> transformerSupplier, string named = null, params string[] storeNames);
2018+
20182019
/// <summary>
20192020
/// Transform each record of the input stream into zero or one record in the output stream (only value type, the original key will be through to the upstream processors ).
20202021
/// A <see cref="Transform{K,V1}"/> (provided by the given <see cref="TransformerSupplier{K,V,K,V1}"/>) is applied to each input record and
@@ -2054,9 +2055,10 @@ void ForeachAsync(
20542055
/// </summary>
20552056
/// <param name="transformerSupplier">an instance of <see cref="TransformerSupplier{K,V,K,V1}"/> which contains the transformer</param>
20562057
/// <param name="named">A <see cref="string"/> config used to name the processor in the topology. Default : null</param>
2058+
/// <param name="storeNames">The names of the state stores used by the processor.</param>
20572059
/// <typeparam name="V1">the value type of the new stream</typeparam>
20582060
/// <returns>a <see cref="IKStream{K,V1}"/> that contains more or less records with new key and value (possibly of different type)</returns>
2059-
IKStream<K, V1> TransformValues<V1>(TransformerSupplier<K, V, K, V1> transformerSupplier, string named = null);
2061+
IKStream<K, V1> TransformValues<V1>(TransformerSupplier<K, V, K, V1> transformerSupplier, string named = null, params string[] storeNames);
20602062

20612063
#endregion
20622064

core/Stream/Internal/KStream.cs

+6-6
Original file line numberDiff line numberDiff line change
@@ -118,19 +118,19 @@ public void Process(ProcessorSupplier<K, V> processorSupplier, string named = nu
118118

119119
#region Transform
120120

121-
public IKStream<K1, V1> Transform<K1, V1>(TransformerSupplier<K, V, K1, V1> transformerSupplier, string named = null)
122-
=> DoTransform(transformerSupplier, true, named);
121+
public IKStream<K1, V1> Transform<K1, V1>(TransformerSupplier<K, V, K1, V1> transformerSupplier, string named = null, params string[] storeNames)
122+
=> DoTransform(transformerSupplier, true, named, storeNames);
123123

124-
public IKStream<K, V1> TransformValues<V1>(TransformerSupplier<K, V, K, V1> transformerSupplier, string named = null)
125-
=> DoTransform(transformerSupplier, false, named);
124+
public IKStream<K, V1> TransformValues<V1>(TransformerSupplier<K, V, K, V1> transformerSupplier, string named = null, params string[] storeNames)
125+
=> DoTransform(transformerSupplier, false, named, storeNames);
126126

127127
private IKStream<K1, V1> DoTransform<K1, V1>(TransformerSupplier<K, V, K1, V1> transformerSupplier, bool changeKey, string named =
128-
null)
128+
null, params string[] storeNames)
129129
{
130130
string name = new Named(named).OrElseGenerateWithPrefix(builder, changeKey ? KStream.TRANSFORM_NAME : KStream.TRANSFORMVALUES_NAME );
131131
ProcessorParameters<K, V> processorParameters = new ProcessorParameters<K, V>(
132132
new KStreamTransformerSupplier<K, V, K1, V1>(transformerSupplier, changeKey), name);
133-
StatefulProcessorNode<K, V> processorNode = new StatefulProcessorNode<K, V>(name, processorParameters, transformerSupplier.StoreBuilder);
133+
StatefulProcessorNode<K, V> processorNode = new StatefulProcessorNode<K, V>(name, processorParameters, transformerSupplier.StoreBuilder, storeNames);
134134

135135
builder.AddGraphNode(Node, processorNode);
136136

0 commit comments

Comments
 (0)