Skip to content

Commit 388f795

Browse files
committed
optional next node - changes in compiler
1 parent 9a619c3 commit 388f795

File tree

16 files changed

+214
-166
lines changed

16 files changed

+214
-166
lines changed

engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/registrar/FlinkProcessRegistrar.scala

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ import pl.touk.nussknacker.engine.api.component.NodeComponentInfo
1414
import pl.touk.nussknacker.engine.api.context.{JoinContextTransformation, ValidationContext}
1515
import pl.touk.nussknacker.engine.api.process.ProcessName
1616
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
17-
import pl.touk.nussknacker.engine.compile.ArtificialDeadEndSink
1817
import pl.touk.nussknacker.engine.compiledgraph.part._
1918
import pl.touk.nussknacker.engine.deployment.DeploymentData
2019
import pl.touk.nussknacker.engine.flink.FlinkScenarioCompilationDependencies
@@ -258,8 +257,6 @@ class FlinkProcessRegistrar(
258257
processPart match {
259258
case part @ SinkPart(sink: FlinkSink, _, contextBefore, _) =>
260259
registerSinkPark(start, part, sink, contextBefore)
261-
case part @ SinkPart(ArtificialDeadEndSink, _, contextBefore, _) =>
262-
registerSinkPark(start, part, EmptySink, contextBefore)
263260
case part: SinkPart =>
264261
// TODO: fixme "part.obj" is not stringified well
265262
// (eg. Scenario can only use flink sinks, instead given: pl.touk.nussknacker.engine.management.sample.sink.LiteDeadEndSink$@21220fd7)

scenario-api/src/main/scala/pl/touk/nussknacker/engine/build/GraphBuilder.scala

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,34 +20,36 @@ trait GraphBuilder[R] {
2020
def build(inner: GraphBuilder.Creator[R]): GraphBuilder[R]
2121

2222
def source(id: String, typ: String, params: (String, Expression)*): GraphBuilder[SourceNode] =
23-
new SimpleGraphBuilder(
24-
SourceNode(Source(id, SourceRef(typ, toNodeParameters(params))), _)
25-
)
23+
new SimpleGraphBuilder(node => SourceNode(Source(id, SourceRef(typ, toNodeParameters(params))), Some(node)))
2624

2725
def buildVariable(id: String, varName: String, fields: (String, Expression)*): GraphBuilder[R] =
2826
build(node =>
29-
creator(OneOutputSubsequentNode(VariableBuilder(id, varName, fields.map(f => Field(f._1, f._2)).toList), node))
27+
creator(
28+
OneOutputSubsequentNode(VariableBuilder(id, varName, fields.map(f => Field(f._1, f._2)).toList), Some(node))
29+
)
3030
)
3131

3232
def buildSimpleVariable(id: String, varName: String, value: Expression): GraphBuilder[R] =
33-
build(node => creator(OneOutputSubsequentNode(Variable(id, varName, value), node)))
33+
build(node => creator(OneOutputSubsequentNode(Variable(id, varName, value), Some(node))))
3434

3535
def enricher(id: String, output: String, svcId: String, params: (String, Expression)*): GraphBuilder[R] =
3636
build(node =>
3737
creator(
38-
OneOutputSubsequentNode(Enricher(id, ServiceRef(svcId, toNodeParameters(params)), output), node)
38+
OneOutputSubsequentNode(Enricher(id, ServiceRef(svcId, toNodeParameters(params)), output), Some(node))
3939
)
4040
)
4141

4242
def processor(id: String, svcId: String, params: (String, Expression)*): GraphBuilder[R] =
43-
build(node => creator(OneOutputSubsequentNode(Processor(id, ServiceRef(svcId, toNodeParameters(params))), node)))
43+
build(node =>
44+
creator(OneOutputSubsequentNode(Processor(id, ServiceRef(svcId, toNodeParameters(params))), Some(node)))
45+
)
4446

4547
def disabledProcessor(id: String, svcId: String, params: (String, Expression)*): GraphBuilder[R] =
4648
build(node =>
4749
creator(
4850
OneOutputSubsequentNode(
4951
Processor(id, ServiceRef(svcId, toNodeParameters(params)), isDisabled = Some(true)),
50-
node
52+
Some(node)
5153
)
5254
)
5355
)
@@ -98,24 +100,24 @@ trait GraphBuilder[R] {
98100
)
99101

100102
def fragmentInput(id: String, params: (String, Class[_])*): GraphBuilder[SourceNode] =
101-
new SimpleGraphBuilder(
103+
new SimpleGraphBuilder(node =>
102104
SourceNode(
103105
FragmentInputDefinition(
104106
id = id,
105107
parameters = params.map(kv => FragmentParameter(ParameterName(kv._1), FragmentClazzRef(kv._2.getName))).toList
106108
),
107-
_
109+
Some(node)
108110
)
109111
)
110112

111113
def fragmentInputWithRawParameters(id: String, params: FragmentParameter*): GraphBuilder[SourceNode] =
112-
new SimpleGraphBuilder(
114+
new SimpleGraphBuilder(node =>
113115
SourceNode(
114116
FragmentInputDefinition(
115117
id = id,
116118
parameters = params.toList
117119
),
118-
_
120+
Some(node)
119121
)
120122
)
121123

@@ -172,7 +174,7 @@ trait GraphBuilder[R] {
172174
creator(
173175
OneOutputSubsequentNode(
174176
CustomNode(id, Some(outputVar), customNodeRef, toNodeParameters(params)),
175-
node
177+
Some(node)
176178
)
177179
)
178180
)
@@ -184,7 +186,7 @@ trait GraphBuilder[R] {
184186
def customNodeNoOutput(id: String, customNodeRef: String, params: (String, Expression)*): GraphBuilder[R] =
185187
build(node =>
186188
creator(
187-
OneOutputSubsequentNode(CustomNode(id, None, customNodeRef, toNodeParameters(params)), node)
189+
OneOutputSubsequentNode(CustomNode(id, None, customNodeRef, toNodeParameters(params)), Some(node))
188190
)
189191
)
190192

@@ -203,7 +205,9 @@ trait GraphBuilder[R] {
203205
val branchParameters = branchParams.map { case (branchId, bParams) =>
204206
BranchParameters(branchId, toNodeParameters(bParams))
205207
}
206-
new SimpleGraphBuilder(SourceNode(node.Join(id, output, typ, toNodeParameters(params), branchParameters), _))
208+
new SimpleGraphBuilder(n =>
209+
SourceNode(node.Join(id, output, typ, toNodeParameters(params), branchParameters), Some(n))
210+
)
207211
}
208212

209213
def decisionTable(

scenario-api/src/main/scala/pl/touk/nussknacker/engine/canonize/MaybeArtificial.scala

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,9 @@ private[engine] object MaybeArtificial {
4545
// we need to make sure it's unique to prevent weird errors
4646
private def generateArtificialName() = s"$DummyObjectNamePrefix-${UUID.randomUUID()}"
4747

48-
def missingSinkError(errors: ProcessUncanonizationError*): MaybeArtificial[node.SubsequentNode] =
48+
def missingSinkError(errors: ProcessUncanonizationError*): MaybeArtificial[Option[node.SubsequentNode]] =
4949
new MaybeArtificial(
50-
node.EndingNode(node.Sink(generateArtificialName(), SinkRef(artificalSourceSinkRef, Nil), None)),
50+
Some(node.EndingNode(node.Sink(generateArtificialName(), SinkRef(artificalSourceSinkRef, Nil), None))),
5151
errors.toList
5252
)
5353

@@ -56,13 +56,4 @@ private[engine] object MaybeArtificial {
5656
node.SourceNode(node.Source(generateArtificialName(), SourceRef(artificalSourceSinkRef, Nil)), _)
5757
)
5858

59-
def addedArtificialDeadEndSink(previousNodeId: String): MaybeArtificial[node.SubsequentNode] =
60-
new MaybeArtificial(
61-
node.EndingNode(new ArtificialDeadEndSink(previousNodeId)),
62-
List.empty,
63-
)
64-
65-
class ArtificialDeadEndSink(val previousNodeId: String)
66-
extends node.Sink(s"artificialDeadEndSink-after-$previousNodeId", SinkRef(artificalSourceSinkRef, Nil), None)
67-
6859
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package pl.touk.nussknacker.engine.canonize
2+
3+
import pl.touk.nussknacker.engine.graph.node
4+
5+
// This trait and its 2 implementation encapsulate the logic of handling scenarios that do not end with sink
6+
// - older behavior is that error is returned for missing sinks
7+
// - new behavior, which we intend to become the default and only one in the future, is allowing missing sinks
8+
sealed trait MissingSinkHandler {
9+
def handleMissingSink(previousNodeId: String): MaybeArtificial[Option[node.SubsequentNode]]
10+
}
11+
12+
object MissingSinkHandler {
13+
14+
object AllowMissingSinkHandler extends MissingSinkHandler {
15+
16+
override def handleMissingSink(
17+
previousNodeId: String
18+
): MaybeArtificial[Option[node.SubsequentNode]] =
19+
new MaybeArtificial(None, Nil)
20+
21+
}
22+
23+
object DoNotAllowMissingSinkHandler extends MissingSinkHandler {
24+
25+
override def handleMissingSink(
26+
previousNodeId: String
27+
): MaybeArtificial[Option[node.SubsequentNode]] =
28+
new MaybeArtificial(None, InvalidTailOfBranch(previousNodeId) :: Nil)
29+
30+
}
31+
32+
}

scenario-api/src/main/scala/pl/touk/nussknacker/engine/canonize/ProcessCanonizer.scala

Lines changed: 31 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -24,29 +24,27 @@ object ProcessCanonizer {
2424

2525
def uncanonize(
2626
canonicalProcess: CanonicalProcess,
27-
allowEndingScenarioWithoutSink: Boolean,
27+
missingSinkHandler: MissingSinkHandler,
2828
): ValidatedNel[ProcessUncanonizationError, EspProcess] =
29-
uncanonizeArtificial(canonicalProcess, allowEndingScenarioWithoutSink).toValidNel
29+
uncanonizeArtificial(canonicalProcess, missingSinkHandler).toValidNel
3030

3131
def uncanonizeArtificial(
3232
canonicalProcess: CanonicalProcess,
33-
allowEndingScenarioWithoutSink: Boolean,
33+
missingSinkHandler: MissingSinkHandler,
3434
): MaybeArtificial[EspProcess] = {
3535

3636
val branches: MaybeArtificial[NonEmptyList[pl.touk.nussknacker.engine.graph.node.SourceNode]] =
37-
canonicalProcess.allStartNodes.map(uncanonizeSource(_, allowEndingScenarioWithoutSink)).sequence
37+
canonicalProcess.allStartNodes.map(uncanonizeSource(_)(missingSinkHandler)).sequence
3838

3939
branches.map(bList => EspProcess(canonicalProcess.metaData, bList))
4040
}
4141

4242
private def uncanonizeSource(
4343
canonicalNode: List[canonicalnode.CanonicalNode],
44-
allowEndingScenarioWithoutSink: Boolean,
45-
): MaybeArtificial[node.SourceNode] =
44+
)(implicit missingSinkHandler: MissingSinkHandler): MaybeArtificial[node.SourceNode] =
4645
canonicalNode match {
4746
case (a @ canonicalnode.FlatNode(data: node.StartingNodeData)) :: tail =>
48-
uncanonize(a, tail, allowEndingScenarioWithoutSink).map(node.SourceNode(data, _))
49-
47+
uncanonize(a, tail).map(node.SourceNode(data, _))
5048
case other :: _ =>
5149
MaybeArtificial.artificialSource(InvalidRootNode(other.id))
5250

@@ -57,98 +55,79 @@ object ProcessCanonizer {
5755
private def uncanonize(
5856
previous: canonicalnode.CanonicalNode,
5957
canonicalNode: List[canonicalnode.CanonicalNode],
60-
allowEndingScenarioWithoutSink: Boolean,
61-
): MaybeArtificial[node.SubsequentNode] =
58+
)(implicit missingSinkHandler: MissingSinkHandler): MaybeArtificial[Option[node.SubsequentNode]] =
6259
canonicalNode match {
6360
case canonicalnode.FlatNode(data: node.BranchEndData) :: Nil =>
64-
new MaybeArtificial(node.BranchEnd(data), Nil)
61+
new MaybeArtificial(Some(node.BranchEnd(data)), Nil)
6562

6663
case canonicalnode.FlatNode(data: node.EndingNodeData) :: Nil =>
67-
new MaybeArtificial(node.EndingNode(data), Nil)
64+
new MaybeArtificial(Some(node.EndingNode(data)), Nil)
6865

6966
case (a @ canonicalnode.FlatNode(data: node.OneOutputSubsequentNodeData)) :: tail =>
70-
uncanonize(a, tail, allowEndingScenarioWithoutSink).map(node.OneOutputSubsequentNode(data, _))
71-
67+
uncanonize(a, tail).map(node.OneOutputSubsequentNode(data, _)).map(Some(_): Option[node.SubsequentNode])
7268
case (a @ canonicalnode.FilterNode(data, nextFalse)) :: tail if nextFalse.isEmpty =>
73-
uncanonize(a, tail, allowEndingScenarioWithoutSink).map(nextTrue => node.FilterNode(data, Some(nextTrue), None))
69+
uncanonize(a, tail).map(nextTrue => Some(node.FilterNode(data, nextTrue, None)))
7470

7571
case (a @ canonicalnode.FilterNode(data, nextFalse)) :: tail if tail.isEmpty =>
76-
uncanonize(a, nextFalse, allowEndingScenarioWithoutSink).map { nextFalseV =>
77-
node.FilterNode(data, None, Some(nextFalseV))
78-
}
72+
uncanonize(a, nextFalse).map { nextFalseV => Some(node.FilterNode(data, None, nextFalseV)) }
7973

8074
case (a @ canonicalnode.FilterNode(data, nextFalse)) :: tail =>
81-
(uncanonize(a, tail, allowEndingScenarioWithoutSink), uncanonize(a, nextFalse, allowEndingScenarioWithoutSink))
82-
.mapN { (nextTrue, nextFalseV) =>
83-
node.FilterNode(data, Some(nextTrue), Some(nextFalseV))
84-
}
75+
(uncanonize(a, tail), uncanonize(a, nextFalse)).mapN { (nextTrue, nextFalseV) =>
76+
Some(node.FilterNode(data, nextTrue, nextFalseV))
77+
}
8578

8679
case (a @ canonicalnode.SwitchNode(data, Nil, defaultNext)) :: Nil =>
87-
uncanonize(a, defaultNext, allowEndingScenarioWithoutSink).map { defaultNextV =>
88-
node.SwitchNode(data, Nil, Some(defaultNextV))
89-
}
80+
uncanonize(a, defaultNext).map(defaultNextV => Some(node.SwitchNode(data, Nil, defaultNextV)))
9081

9182
case (a @ canonicalnode.SwitchNode(data, nexts, defaultNext)) :: Nil if defaultNext.isEmpty =>
9283
nexts
93-
.map { casee =>
94-
uncanonize(a, casee.nodes, allowEndingScenarioWithoutSink).map(node.Case(casee.expression, _))
95-
}
84+
.map(casee => uncanonize(a, casee.nodes).map(node.Case(casee.expression, _)))
9685
.sequence[MaybeArtificial, node.Case]
9786
.map(node.SwitchNode(data, _, None))
87+
.map(Some(_))
9888

9989
case (a @ canonicalnode.SwitchNode(data, nexts, defaultNext)) :: Nil =>
10090
val unFlattenNexts = nexts
101-
.map { casee =>
102-
uncanonize(a, casee.nodes, allowEndingScenarioWithoutSink).map(node.Case(casee.expression, _))
103-
}
91+
.map(casee => uncanonize(a, casee.nodes).map(node.Case(casee.expression, _)))
10492
.sequence[MaybeArtificial, node.Case]
10593

106-
(unFlattenNexts, uncanonize(a, defaultNext, allowEndingScenarioWithoutSink)).mapN { (nextsV, defaultNextV) =>
107-
node.SwitchNode(data, nextsV, Some(defaultNextV))
94+
(unFlattenNexts, uncanonize(a, defaultNext)).mapN { (nextsV, defaultNextV) =>
95+
Some(node.SwitchNode(data, nextsV, defaultNextV))
10896
}
10997

11098
case canonicalnode.SplitNode(bare, Nil) :: Nil =>
111-
handleMissingSink(bare.id, allowEndingScenarioWithoutSink)
99+
missingSinkHandler.handleMissingSink(bare.id)
112100

113101
case (a @ canonicalnode.SplitNode(bare, nexts)) :: Nil =>
114-
nexts.map(uncanonize(a, _, allowEndingScenarioWithoutSink)).sequence[MaybeArtificial, node.SubsequentNode].map {
115-
uncanonized =>
116-
node.SplitNode(bare, uncanonized)
117-
}
102+
nexts
103+
.map(uncanonize(a, _))
104+
.sequence[MaybeArtificial, Option[node.SubsequentNode]]
105+
.map { uncanonized =>
106+
Some(node.SplitNode(bare, uncanonized.flatten))
107+
}
118108

119109
case invalidHead :: _ =>
120110
MaybeArtificial.missingSinkError(InvalidTailOfBranch(invalidHead.id))
121111

122112
case Nil =>
123-
handleMissingSink(previous.id, allowEndingScenarioWithoutSink)
113+
missingSinkHandler.handleMissingSink(previous.id)
124114
}
125115

126-
private def handleMissingSink(
127-
previousNodeId: String,
128-
allowEndingScenarioWithoutSink: Boolean
129-
): MaybeArtificial[node.SubsequentNode] = {
130-
if (allowEndingScenarioWithoutSink) {
131-
MaybeArtificial.addedArtificialDeadEndSink(previousNodeId)
132-
} else {
133-
MaybeArtificial.missingSinkError(InvalidTailOfBranch(previousNodeId))
134-
}
135-
}
136-
137116
}
138117

139118
object NodeCanonizer {
140119

141120
def canonize(n: node.Node): List[canonicalnode.CanonicalNode] =
142121
n match {
143122
case oneOut: node.OneOutputNode =>
144-
canonicalnode.FlatNode(oneOut.data) :: canonize(oneOut.next)
123+
canonicalnode.FlatNode(oneOut.data) :: oneOut.next.map(canonize).getOrElse(Nil)
145124
case node.FilterNode(data, nextTrue, nextFalse) =>
146125
canonicalnode.FilterNode(data, nextFalse.toList.flatMap(canonize)) :: nextTrue.toList.flatMap(canonize)
147126
case node.SwitchNode(data, nexts, defaultNext) =>
148127
canonicalnode.SwitchNode(
149128
data = data,
150129
nexts = nexts.map { next =>
151-
canonicalnode.Case(next.expression, canonize(next.node))
130+
canonicalnode.Case(next.expression, next.node.map(canonize).getOrElse(Nil))
152131
},
153132
defaultNext = defaultNext.toList.flatMap(canonize)
154133
) :: Nil

scenario-api/src/main/scala/pl/touk/nussknacker/engine/graph/node.scala

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,14 +38,14 @@ object node {
3838
}
3939

4040
sealed trait OneOutputNode extends NodeWithData {
41-
def next: SubsequentNode
41+
def next: Option[SubsequentNode]
4242
}
4343

44-
case class SourceNode(data: StartingNodeData, next: SubsequentNode) extends OneOutputNode
44+
case class SourceNode(data: StartingNodeData, next: Option[SubsequentNode]) extends OneOutputNode
4545

4646
sealed trait SubsequentNode extends Node
4747

48-
case class OneOutputSubsequentNode(data: OneOutputSubsequentNodeData, next: SubsequentNode)
48+
case class OneOutputSubsequentNode(data: OneOutputSubsequentNodeData, next: Option[SubsequentNode])
4949
extends OneOutputNode
5050
with SubsequentNode
5151

@@ -61,7 +61,11 @@ object node {
6161

6262
case class SplitNode(data: Split, nextParts: List[SubsequentNode]) extends SubsequentNode
6363

64-
case class Case(expression: Expression, node: SubsequentNode)
64+
case class Case(expression: Expression, node: Option[SubsequentNode])
65+
66+
object Case {
67+
def apply(expression: Expression, node: SubsequentNode): Case = Case(expression, Some(node))
68+
}
6569

6670
case class EndingNode(data: EndingNodeData) extends SubsequentNode
6771

scenario-api/src/test/scala/pl/touk/nussknacker/engine/marshall/ProcessMarshallerSpec.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import pl.touk.nussknacker.engine.build.{GraphBuilder, ScenarioBuilder}
1616
import pl.touk.nussknacker.engine.canonicalgraph.{canonicalnode, CanonicalProcess}
1717
import pl.touk.nussknacker.engine.canonicalgraph.canonicalnode.{CanonicalNode, FlatNode}
1818
import pl.touk.nussknacker.engine.canonize.{MaybeArtificial, ProcessCanonizer}
19+
import pl.touk.nussknacker.engine.canonize.MissingSinkHandler.DoNotAllowMissingSinkHandler
1920
import pl.touk.nussknacker.engine.graph.expression.Expression
2021
import pl.touk.nussknacker.engine.graph.expression.Expression.Language
2122
import pl.touk.nussknacker.engine.graph.node
@@ -48,7 +49,7 @@ class ProcessMarshallerSpec
4849
.customNode("b", "alamakota == 'true'", "someRef")
4950
.buildVariable("c", "fooVar", "f1" -> "expr1".spel, "f2" -> "expr2".spel)
5051
.enricher("d", "barVar", "dService", "p1" -> "expr3".spel)
51-
.switch("f", "expr4".spel, "eVar", nestedGraph("e"), Case("e1".spel, GraphBuilder.emptySink("endE1", "")))
52+
.switch("f", "expr4".spel, "eVar", nestedGraph("e"), Case("e1".spel, Some(GraphBuilder.emptySink("endE1", ""))))
5253

5354
val result = marshallAndUnmarshall(process)
5455

@@ -227,7 +228,7 @@ class ProcessMarshallerSpec
227228
inside(
228229
ProcessCanonizer.uncanonize(
229230
CanonicalProcess(MetaData("1", StreamMetaData()), nodes.toList, List.empty),
230-
allowEndingScenarioWithoutSink = false
231+
DoNotAllowMissingSinkHandler,
231232
)
232233
) { case Invalid(NonEmptyList(canonize.InvalidTailOfBranch(id), Nil)) =>
233234
id shouldBe expectedBadNodeId

0 commit comments

Comments
 (0)