Skip to content

Commit dda9c79

Browse files
committed
optional next node - changes in compiler
1 parent 388f795 commit dda9c79

File tree

12 files changed

+142
-148
lines changed

12 files changed

+142
-148
lines changed

designer/server/src/test/scala/pl/touk/nussknacker/test/utils/domain/ProcessTestData.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ object ProcessTestData {
207207

208208
// TODO: merge with this below
209209
val sampleScenario: CanonicalProcess = {
210-
def endWithMessage(idSuffix: String, message: String): SubsequentNode = {
210+
def endWithMessage(idSuffix: String, message: String): Option[SubsequentNode] = {
211211
GraphBuilder
212212
.buildVariable("message" + idSuffix, "output", "message" -> s"'$message'".spel)
213213
.emptySink(
@@ -493,7 +493,7 @@ object ProcessTestData {
493493
.to(endWithMessage)
494494
}
495495

496-
private def endWithMessage: SubsequentNode = {
496+
private def endWithMessage: Option[SubsequentNode] = {
497497
val idSuffix = "suffix"
498498
val endMessage = "#test #{#input} #test \n#{\"abc\".toString + {1,2,3}.toString + \"abc\"}\n#test\n#{\"ab{}c\"}"
499499

engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/ResultCollectingListenerSpec.scala

+18-49
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ class ResultCollectingListenerSpec
113113
}
114114

115115
test("union of two sources with additional variable in only one of the branches - without sinks") {
116-
val scenario = removeSinks(
116+
val scenario =
117117
ScenarioBuilder
118118
.streaming("sample-union")
119119
.sources(
@@ -134,14 +134,13 @@ class ResultCollectingListenerSpec
134134
"bar" -> List("Output expression" -> "'bar source'".spel)
135135
)
136136
)
137-
.emptySink("end", "dead-end") // sink must be created by the DSL, but is then removed
137+
.endWithoutSink
138138
)
139-
)
140139

141140
withCollectingTestResults(
142141
scenario,
143142
testResults => {
144-
assertNumberOfSamplesThatFinishedInNode(testResults, "artificialDeadEndSink-after-union", 8)
143+
assertNumberOfSamplesThatFinishedInNode(testResults, "union", 8)
145144
transitionVariables(testResults, "start-foo", Some("union")) shouldBe Set(
146145
Map("input" -> 10),
147146
Map("input" -> 20),
@@ -160,17 +159,7 @@ class ResultCollectingListenerSpec
160159
Map("input" -> 300, "customVariableInBarBranch" -> 150),
161160
Map("input" -> 400, "customVariableInBarBranch" -> 200),
162161
)
163-
transitionVariables(testResults, "union", Some("artificialDeadEndSink-after-union")) shouldBe Set(
164-
Map("input" -> 10, "dataIsFrom" -> "foo source"),
165-
Map("input" -> 20, "dataIsFrom" -> "foo source"),
166-
Map("input" -> 30, "dataIsFrom" -> "foo source"),
167-
Map("input" -> 40, "dataIsFrom" -> "foo source"),
168-
Map("input" -> 100, "customVariableInBarBranch" -> 50, "dataIsFrom" -> "bar source"),
169-
Map("input" -> 200, "customVariableInBarBranch" -> 100, "dataIsFrom" -> "bar source"),
170-
Map("input" -> 300, "customVariableInBarBranch" -> 150, "dataIsFrom" -> "bar source"),
171-
Map("input" -> 400, "customVariableInBarBranch" -> 200, "dataIsFrom" -> "bar source"),
172-
)
173-
transitionVariables(testResults, "artificialDeadEndSink-after-union", None) shouldBe Set(
162+
transitionVariables(testResults, "union", None) shouldBe Set(
174163
Map("input" -> 10, "dataIsFrom" -> "foo source"),
175164
Map("input" -> 20, "dataIsFrom" -> "foo source"),
176165
Map("input" -> 30, "dataIsFrom" -> "foo source"),
@@ -292,20 +281,19 @@ class ResultCollectingListenerSpec
292281
}
293282

294283
test("there is a split - fail to compile without sinks") {
295-
val scenario = removeSinks(
284+
val scenario =
296285
ScenarioBuilder
297286
.streaming("sample-split")
298287
.source("start-foo", "start1")
299288
.split(
300289
"split",
301290
GraphBuilder
302291
.buildSimpleVariable("bv1", "timesTwo", "#input*2".spel)
303-
.emptySink("end1", "dead-end"),
292+
.endWithoutSink,
304293
GraphBuilder
305294
.buildSimpleVariable("bv2", "timesFour", "#input*4".spel)
306-
.emptySink("end2", "dead-end")
295+
.endWithoutSink
307296
)
308-
)
309297

310298
catchExceptionMessage(
311299
withCollectingTestResults(scenario, _ => ())
@@ -314,26 +302,25 @@ class ResultCollectingListenerSpec
314302
}
315303

316304
test("there is a split - without sinks") {
317-
val scenario = removeSinks(
305+
val scenario =
318306
ScenarioBuilder
319307
.streaming("sample-split")
320308
.source("start-foo", "start1")
321309
.split(
322310
"split",
323311
GraphBuilder
324312
.buildSimpleVariable("bv1", "timesTwo", "#input*2".spel)
325-
.emptySink("end1", "dead-end"),
313+
.endWithoutSink,
326314
GraphBuilder
327315
.buildSimpleVariable("bv2", "timesFour", "#input*4".spel)
328-
.emptySink("end2", "dead-end")
316+
.endWithoutSink
329317
)
330-
)
331318

332319
withCollectingTestResults(
333320
scenario,
334321
testResults => {
335-
assertNumberOfSamplesThatFinishedInNode(testResults, "artificialDeadEndSink-after-bv1", 4)
336-
assertNumberOfSamplesThatFinishedInNode(testResults, "artificialDeadEndSink-after-bv2", 4)
322+
assertNumberOfSamplesThatFinishedInNode(testResults, "bv1", 4)
323+
assertNumberOfSamplesThatFinishedInNode(testResults, "bv2", 4)
337324
transitionVariables(testResults, "start-foo", Some("split")) shouldBe Set(
338325
Map("input" -> 10),
339326
Map("input" -> 20),
@@ -352,25 +339,13 @@ class ResultCollectingListenerSpec
352339
Map("input" -> 30),
353340
Map("input" -> 40),
354341
)
355-
transitionVariables(testResults, "bv1", Some("artificialDeadEndSink-after-bv1")) shouldBe Set(
342+
transitionVariables(testResults, "bv1", None) shouldBe Set(
356343
Map("input" -> 10, "timesTwo" -> 20),
357344
Map("input" -> 20, "timesTwo" -> 40),
358345
Map("input" -> 30, "timesTwo" -> 60),
359346
Map("input" -> 40, "timesTwo" -> 80),
360347
)
361-
transitionVariables(testResults, "bv2", Some("artificialDeadEndSink-after-bv2")) shouldBe Set(
362-
Map("input" -> 10, "timesFour" -> 40),
363-
Map("input" -> 20, "timesFour" -> 80),
364-
Map("input" -> 30, "timesFour" -> 120),
365-
Map("input" -> 40, "timesFour" -> 160),
366-
)
367-
transitionVariables(testResults, "artificialDeadEndSink-after-bv1", None) shouldBe Set(
368-
Map("input" -> 10, "timesTwo" -> 20),
369-
Map("input" -> 20, "timesTwo" -> 40),
370-
Map("input" -> 30, "timesTwo" -> 60),
371-
Map("input" -> 40, "timesTwo" -> 80),
372-
)
373-
transitionVariables(testResults, "artificialDeadEndSink-after-bv2", None) shouldBe Set(
348+
transitionVariables(testResults, "bv2", None) shouldBe Set(
374349
Map("input" -> 10, "timesFour" -> 40),
375350
Map("input" -> 20, "timesFour" -> 80),
376351
Map("input" -> 30, "timesFour" -> 120),
@@ -447,7 +422,7 @@ class ResultCollectingListenerSpec
447422
}
448423

449424
test("there is a fragment - without sinks") {
450-
val scenarioWithFragment = removeSinks(
425+
val scenarioWithFragment =
451426
ScenarioBuilder
452427
.streaming("sample-scenario-with-fragment")
453428
.source("source", "start1")
@@ -456,9 +431,8 @@ class ResultCollectingListenerSpec
456431
"fragment1",
457432
List("fragment1_input" -> "#input".spel),
458433
Map("output" -> "fragmentResult"),
459-
Map("output" -> GraphBuilder.emptySink("end", "dead-end"))
434+
Map("output" -> None)
460435
)
461-
)
462436

463437
val fragment = ScenarioBuilder
464438
.fragment("fragment1", "fragment1_input" -> classOf[Int])
@@ -470,7 +444,7 @@ class ResultCollectingListenerSpec
470444
withCollectingTestResults(
471445
scenario,
472446
testResults => {
473-
assertNumberOfSamplesThatFinishedInNode(testResults, "artificialDeadEndSink-after-sub-fragmentEnd", 3)
447+
assertNumberOfSamplesThatFinishedInNode(testResults, "sub-fragmentEnd", 3)
474448
assertNumberOfSamplesThatFinishedInNode(testResults, "sub-filter", 1)
475449
transitionVariables(testResults, "source", Some("sub")) shouldBe Set(
476450
Map("input" -> 10),
@@ -502,17 +476,12 @@ class ResultCollectingListenerSpec
502476
transitionVariables(
503477
testResults,
504478
"sub-fragmentEnd",
505-
Some("artificialDeadEndSink-after-sub-fragmentEnd")
479+
None
506480
) shouldBe Set(
507481
Map("input" -> 20, "fragmentResult" -> Map("output" -> 20)),
508482
Map("input" -> 30, "fragmentResult" -> Map("output" -> 30)),
509483
Map("input" -> 40, "fragmentResult" -> Map("output" -> 40)),
510484
)
511-
transitionVariables(testResults, "artificialDeadEndSink-after-sub-fragmentEnd", None) shouldBe Set(
512-
Map("input" -> 20, "fragmentResult" -> Map("output" -> 20)),
513-
Map("input" -> 30, "fragmentResult" -> Map("output" -> 30)),
514-
Map("input" -> 40, "fragmentResult" -> Map("output" -> 40)),
515-
)
516485
},
517486
allowEndingScenarioWithoutSink = true,
518487
)

engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/registrar/FlinkProcessRegistrar.scala

-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import pl.touk.nussknacker.engine.flink.api.NkGlobalParameters
2121
import pl.touk.nussknacker.engine.flink.api.compat.ExplicitUidInOperatorsSupport
2222
import pl.touk.nussknacker.engine.flink.api.process._
2323
import pl.touk.nussknacker.engine.flink.api.typeinformation.TypeInformationDetection
24-
import pl.touk.nussknacker.engine.flink.util.sink.EmptySink
2524
import pl.touk.nussknacker.engine.graph.node.{BranchEndDefinition, NodeData}
2625
import pl.touk.nussknacker.engine.node.NodeComponentInfoExtractor.fromScenarioNode
2726
import pl.touk.nussknacker.engine.process.{ExecutionConfigPreparer, FlinkCompatibilityProvider, FlinkJobConfig}

engine/flink/management/src/it/scala/pl/touk/nussknacker/engine/management/streaming/SampleProcess.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ object SampleProcess {
2626
.emptySink("end", "kafka-string", "Topic" -> s"'output-$name'".spel, "Value" -> "#input".spel)
2727
}
2828

29-
private def endWithMessage(idSuffix: String, message: String): SubsequentNode = {
29+
private def endWithMessage(idSuffix: String, message: String): Option[SubsequentNode] = {
3030
GraphBuilder
3131
.buildVariable("message" + idSuffix, "output", "message" -> s"'$message'".spel)
3232
.emptySink("end" + idSuffix, "monitor")

0 commit comments

Comments
 (0)