@@ -16,7 +16,7 @@ import pl.touk.nussknacker.engine.flink.table.source.TableSource.SQL_EXPRESSION_
16
16
import pl .touk .nussknacker .engine .flink .table .utils .ModelClassLoaderSimulationSuite
17
17
import pl .touk .nussknacker .engine .flink .util .test .FlinkTestScenarioRunner
18
18
import pl .touk .nussknacker .engine .process .FlinkJobConfig .ExecutionMode
19
- import pl .touk .nussknacker .engine .util .test .TestScenarioRunner
19
+ import pl .touk .nussknacker .engine .util .test .{ RunListResult , TestScenarioRunner }
20
20
import pl .touk .nussknacker .test .{PatientScalaFutures , ValidatedValuesDetailedMessage }
21
21
22
22
class TableSourceTest
@@ -66,8 +66,6 @@ class TableSourceTest
66
66
val scenario = ScenarioBuilder
67
67
.streaming(" test" )
68
68
.source(" start" , " table" , " Table" -> s " '`default_catalog`.`default_database`.`test_table`' " .spel)
69
- // TODO_PAWEL this is only to test in debug
70
- // .buildSimpleVariable("sth", "someVariable", "#input.![#this]".spel)
71
69
.emptySink(s " end " , TestScenarioRunner .testResultSink, " value" -> " #input" .spel)
72
70
73
71
val result = runner
@@ -80,12 +78,11 @@ class TableSourceTest
80
78
result.successes.loneElement
81
79
}
82
80
83
- test(" be possible to use merge" ) {
84
- import scala .jdk .CollectionConverters ._
81
+ private def evaluateExpression (expression : String ) = {
85
82
val scenario = ScenarioBuilder
86
83
.streaming(" test" )
87
84
.source(" start" , " table" , " Table" -> s " '`default_catalog`.`default_database`.`test_table`' " .spel)
88
- .buildSimpleVariable(" sth" , " someVariable" , " #COLLECTION.merge(#input, #input).get('quantity') " .spel)
85
+ .buildSimpleVariable(" sth" , " someVariable" , expression .spel)
89
86
.emptySink(s " end " , TestScenarioRunner .testResultSink, " value" -> " #someVariable" .spel)
90
87
91
88
val result = runner
@@ -94,46 +91,24 @@ class TableSourceTest
94
91
nodesData = NodesDeploymentData (Map (NodeId (" start" ) -> Map (SQL_EXPRESSION_PARAMETER_NAME -> " true = true" )))
95
92
)
96
93
.validValue
94
+ result
95
+ }
96
+
97
+ test(" be possible to use merge" ) {
98
+ val result = evaluateExpression(" #COLLECTION.merge(#input, #input).get('quantity')" )
97
99
result.errors shouldBe empty
98
100
result.successes(0 ) shouldBe a[Int ]
99
101
}
100
102
101
103
test(" be possible to use selection" ) {
102
- import scala .jdk .CollectionConverters ._
103
- val scenario = ScenarioBuilder
104
- .streaming(" test" )
105
- .source(" start" , " table" , " Table" -> s " '`default_catalog`.`default_database`.`test_table`' " .spel)
106
- .buildSimpleVariable(
107
- " sth" ,
108
- " someVariable" ,
109
- " #input.?[(#this.value / 10 + 42 - #this.value / 10) == 42].quantity" .spel
110
- )
111
- .emptySink(s " end " , TestScenarioRunner .testResultSink, " value" -> " #someVariable" .spel)
112
-
113
- val result = runner
114
- .runWithoutData[Row ](
115
- scenario,
116
- nodesData = NodesDeploymentData (Map (NodeId (" start" ) -> Map (SQL_EXPRESSION_PARAMETER_NAME -> " true = true" )))
117
- )
118
- .validValue
104
+ val result = evaluateExpression(" #input.?[(#this.value / 10 + 42 - #this.value / 10) == 42].quantity" )
119
105
result.errors shouldBe empty
120
106
result.successes(0 ) shouldBe a[Int ]
121
107
}
122
108
123
109
test(" be possible to use projection" ) {
124
110
import scala .jdk .CollectionConverters ._
125
- val scenario = ScenarioBuilder
126
- .streaming(" test" )
127
- .source(" start" , " table" , " Table" -> s " '`default_catalog`.`default_database`.`test_table`' " .spel)
128
- .buildSimpleVariable(" sth" , " someVariable" , " #input.![#this.value / 10 + 42 - #this.value / 10]" .spel)
129
- .emptySink(s " end " , TestScenarioRunner .testResultSink, " value" -> " #someVariable" .spel)
130
-
131
- val result = runner
132
- .runWithoutData[Row ](
133
- scenario,
134
- nodesData = NodesDeploymentData (Map (NodeId (" start" ) -> Map (SQL_EXPRESSION_PARAMETER_NAME -> " true = true" )))
135
- )
136
- .validValue
111
+ val result = evaluateExpression(" #input.![#this.value / 10 + 42 - #this.value / 10]" )
137
112
result.errors shouldBe empty
138
113
result.successes(0 ) shouldBe List (42 ).asJava
139
114
}
0 commit comments