Skip to content

Commit 0c47dc8

Browse files
Plamen PilevPlamen Pilev
Plamen Pilev
authored and
Plamen Pilev
committed
Merge branch 'streambuilder-state-store' into develop-publish-1.5.0.100
2 parents 81e55ae + f15e2ac commit 0c47dc8

File tree

6 files changed

+82
-32
lines changed

6 files changed

+82
-32
lines changed

core/Stream/IKStream.cs

+14-11
Original file line numberDiff line numberDiff line change
@@ -1920,12 +1920,12 @@ void ForeachAsync(
19201920
Func<ExternalRecord<K, V>, ExternalContext, Task> asyncAction,
19211921
RetryPolicy retryPolicy = null,
19221922
RequestSerDes<K, V> requestSerDes = null,
1923-
string named = null);
1924-
1923+
string named = null);
1924+
19251925
#endregion
1926-
1926+
19271927
#region Process
1928-
1928+
19291929
/// <summary>
19301930
/// Process all records in this stream, one record at a time, by applying a processor (provided by the given
19311931
/// <see cref="ProcessorSupplier{K,V}"/>.
@@ -1963,12 +1963,13 @@ void ForeachAsync(
19631963
/// </summary>
19641964
/// <param name="processorSupplier">an instance of <see cref="ProcessorSupplier{K,V}"/> which contains the processor and a potential state store. Use <see cref="ProcessorBuilder"/> to build this supplier.</param>
19651965
/// <param name="named">A <see cref="string"/> config used to name the processor in the topology. Default : null</param>
1966-
void Process(ProcessorSupplier<K, V> processorSupplier, string named = null);
1967-
1966+
/// <param name="storeNames">The names of the state stores used by the processor.</param>
1967+
void Process(ProcessorSupplier<K, V> processorSupplier, string named = null, params string[] storeNames);
1968+
19681969
#endregion
1969-
1970+
19701971
#region Transform
1971-
1972+
19721973
/// <summary>
19731974
/// Transform each record of the input stream into zero or one record in the output stream (both key and value type
19741975
/// can be altered arbitrarily).
@@ -2009,11 +2010,12 @@ void ForeachAsync(
20092010
/// </summary>
20102011
/// <param name="transformerSupplier">an instance of <see cref="TransformerSupplier{K,V,K1,V1}"/> which contains the transformer</param>
20112012
/// <param name="named">A <see cref="string"/> config used to name the processor in the topology. Default : null</param>
2013+
/// <param name="storeNames">The names of the state stores used by the processor.</param>
20122014
/// <typeparam name="K1">the key type of the new stream</typeparam>
20132015
/// <typeparam name="V1">the value type of the new stream</typeparam>
20142016
/// <returns>a <see cref="IKStream{K1,V1}"/> that contains more or less records with new key and value (possibly of different type)</returns>
2015-
IKStream<K1, V1> Transform<K1, V1>(TransformerSupplier<K, V, K1, V1> transformerSupplier, string named = null);
2016-
2017+
IKStream<K1, V1> Transform<K1, V1>(TransformerSupplier<K, V, K1, V1> transformerSupplier, string named = null, params string[] storeNames);
2018+
20172019
/// <summary>
20182020
/// Transform each record of the input stream into zero or one record in the output stream (only value type, the original key will be through to the upstream processors ).
20192021
/// A <see cref="Transform{K,V1}"/> (provided by the given <see cref="TransformerSupplier{K,V,K,V1}"/>) is applied to each input record and
@@ -2053,9 +2055,10 @@ void ForeachAsync(
20532055
/// </summary>
20542056
/// <param name="transformerSupplier">an instance of <see cref="TransformerSupplier{K,V,K,V1}"/> which contains the transformer</param>
20552057
/// <param name="named">A <see cref="string"/> config used to name the processor in the topology. Default : null</param>
2058+
/// <param name="storeNames">The names of the state stores used by the processor.</param>
20562059
/// <typeparam name="V1">the value type of the new stream</typeparam>
20572060
/// <returns>a <see cref="IKStream{K,V1}"/> that contains more or less records with new key and value (possibly of different type)</returns>
2058-
IKStream<K, V1> TransformValues<V1>(TransformerSupplier<K, V, K, V1> transformerSupplier, string named = null);
2061+
IKStream<K, V1> TransformValues<V1>(TransformerSupplier<K, V, K, V1> transformerSupplier, string named = null, params string[] storeNames);
20592062

20602063
#endregion
20612064

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
using Streamiz.Kafka.Net.Processors.Internal;
2+
using Streamiz.Kafka.Net.State;
3+
4+
namespace Streamiz.Kafka.Net.Stream.Internal.Graph.Nodes
5+
{
6+
internal class StateStoreNode : StreamGraphNode
7+
{
8+
private readonly IStoreBuilder storeBuilder;
9+
10+
public StateStoreNode(IStoreBuilder storeBuilder, string streamGraphNode) : base(streamGraphNode)
11+
{
12+
this.storeBuilder = storeBuilder;
13+
}
14+
15+
public override void WriteToTopology(InternalTopologyBuilder builder)
16+
{
17+
builder.AddStateStore(storeBuilder);
18+
}
19+
}
20+
}

core/Stream/Internal/Graph/Nodes/StatefulProcessorNode.cs

+7-6
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,26 @@
1-
using Streamiz.Kafka.Net.Processors;
2-
using Streamiz.Kafka.Net.Processors.Internal;
1+
using Streamiz.Kafka.Net.Processors.Internal;
32
using Streamiz.Kafka.Net.State;
43

54
namespace Streamiz.Kafka.Net.Stream.Internal.Graph.Nodes
65
{
76
internal class StatefulProcessorNode<K, V> : ProcessorGraphNode<K, V>
87
{
98
private readonly string[] storeNames;
10-
private readonly IStoreBuilder storeBuilder;
11-
9+
private readonly IStoreBuilder storeBuilder;
10+
1211
/// <summary>
1312
/// Create a node representing a stateful processor,
1413
/// where the store needs to be built and registered as part of building this node.
1514
/// </summary>
1615
/// <param name="nameNode"></param>
1716
/// <param name="parameters"></param>
1817
/// <param name="storeBuilder"></param>
19-
public StatefulProcessorNode(string nameNode, ProcessorParameters<K, V> parameters, IStoreBuilder storeBuilder)
18+
/// <param name="storeNames">The names of the state stores used by the processor.</param>
19+
20+
public StatefulProcessorNode(string nameNode, ProcessorParameters<K, V> parameters, IStoreBuilder storeBuilder, params string[] storeNames)
2021
: base(nameNode, parameters)
2122
{
22-
storeNames = null;
23+
this.storeNames = storeNames;
2324
this.storeBuilder = storeBuilder;
2425
}
2526

core/Stream/Internal/InternalStreamBuilder.cs

+13-3
Original file line numberDiff line numberDiff line change
@@ -137,9 +137,19 @@ internal void AddGraphNode(StreamGraphNode root, StreamGraphNode node)
137137
logger.LogDebug("Adding node {Node} in root node {Root}", node, root);
138138
root.AppendChild(node);
139139
nodes.Add(node);
140-
}
141-
142-
140+
}
141+
142+
143143
#endregion
144+
145+
#region Build Store
146+
public void AddStateStore(IStoreBuilder storeBuilder)
147+
{
148+
var name = NewStoreName(string.Empty);
149+
150+
var node = new StateStoreNode(storeBuilder, name);
151+
this.AddGraphNode(root, node);
152+
}
153+
#endregion
144154
}
145155
}

core/Stream/Internal/KStream.cs

+8-8
Original file line numberDiff line numberDiff line change
@@ -104,12 +104,12 @@ public IKStream<K, V> FilterNot(Func<K, V, bool> predicate, string named = null)
104104

105105
#region Process
106106

107-
public void Process(ProcessorSupplier<K, V> processorSupplier, string named = null)
107+
public void Process(ProcessorSupplier<K, V> processorSupplier, string named = null, params string[] storeNames)
108108
{
109109
string name = new Named(named).OrElseGenerateWithPrefix(builder, KStream.PROCESSOR_NAME);
110110
ProcessorParameters<K, V> processorParameters = new ProcessorParameters<K, V>(
111111
new KStreamProcessorSupplier<K, V>(processorSupplier), name);
112-
StatefulProcessorNode<K, V> processorNode = new StatefulProcessorNode<K, V>(name, processorParameters, processorSupplier.StoreBuilder);
112+
StatefulProcessorNode<K, V> processorNode = new StatefulProcessorNode<K, V>(name, processorParameters, processorSupplier.StoreBuilder, storeNames);
113113

114114
builder.AddGraphNode(Node, processorNode);
115115
}
@@ -118,19 +118,19 @@ public void Process(ProcessorSupplier<K, V> processorSupplier, string named = nu
118118

119119
#region Transform
120120

121-
public IKStream<K1, V1> Transform<K1, V1>(TransformerSupplier<K, V, K1, V1> transformerSupplier, string named = null)
122-
=> DoTransform(transformerSupplier, true, named);
121+
public IKStream<K1, V1> Transform<K1, V1>(TransformerSupplier<K, V, K1, V1> transformerSupplier, string named = null, params string[] storeNames)
122+
=> DoTransform(transformerSupplier, true, named, storeNames);
123123

124-
public IKStream<K, V1> TransformValues<V1>(TransformerSupplier<K, V, K, V1> transformerSupplier, string named = null)
125-
=> DoTransform(transformerSupplier, false, named);
124+
public IKStream<K, V1> TransformValues<V1>(TransformerSupplier<K, V, K, V1> transformerSupplier, string named = null, params string[] storeNames)
125+
=> DoTransform(transformerSupplier, false, named, storeNames);
126126

127127
private IKStream<K1, V1> DoTransform<K1, V1>(TransformerSupplier<K, V, K1, V1> transformerSupplier, bool changeKey, string named =
128-
null)
128+
null, params string[] storeNames)
129129
{
130130
string name = new Named(named).OrElseGenerateWithPrefix(builder, changeKey ? KStream.TRANSFORM_NAME : KStream.TRANSFORMVALUES_NAME );
131131
ProcessorParameters<K, V> processorParameters = new ProcessorParameters<K, V>(
132132
new KStreamTransformerSupplier<K, V, K1, V1>(transformerSupplier, changeKey), name);
133-
StatefulProcessorNode<K, V> processorNode = new StatefulProcessorNode<K, V>(name, processorParameters, transformerSupplier.StoreBuilder);
133+
StatefulProcessorNode<K, V> processorNode = new StatefulProcessorNode<K, V>(name, processorParameters, transformerSupplier.StoreBuilder, storeNames);
134134

135135
builder.AddGraphNode(Node, processorNode);
136136

core/StreamBuilder.cs

+20-4
Original file line numberDiff line numberDiff line change
@@ -730,13 +730,29 @@ public IGlobalKTable<K, V> GlobalTable<K, V, KS, VS>(string topic, Materialized<
730730
public IGlobalKTable<K, V> GlobalTable<K, V, KS, VS>(string topic, Materialized<K, V, IKeyValueStore<Bytes, byte[]>> materialized, string named, ITimestampExtractor extractor)
731731
where KS : ISerDes<K>, new()
732732
where VS : ISerDes<V>, new()
733-
=> GlobalTable(topic, new KS(), new VS(), materialized, named, extractor);
734-
735-
733+
=> GlobalTable(topic, new KS(), new VS(), materialized, named, extractor);
734+
735+
736736
#endregion
737-
737+
738738
#endregion
739+
740+
#region State Store
741+
742+
/// <summary>
743+
/// Adds a state store to the underlying <see cref="Topology"/>.
744+
/// It is required to connect state stores to <see cref="Streamiz.Kafka.Net.Processors.Public.IProcessor{K, V}"/>
745+
/// or <see cref="Streamiz.Kafka.Net.Processors.Public.ITransformer{K, V, K1, V1}"/>
746+
/// before they can be used.
747+
/// </summary>
748+
/// <param name="storeBuilder">The builder used to obtain the <see cref="IStateStore"/> instance.</param>
749+
public void AddStateStore(IStoreBuilder storeBuilder)
750+
{
751+
internalStreamBuilder.AddStateStore(storeBuilder);
752+
}
739753

754+
#endregion
755+
740756
/// <summary>
741757
/// Returns the <see cref="Topology"/> that represents the specified processing logic.
742758
/// Note that using this method means no optimizations are performed.

0 commit comments

Comments
 (0)