@@ -14,47 +14,56 @@ import pl.touk.nussknacker.engine.flink.test.FlinkSpec
14
14
import pl .touk .nussknacker .engine .flink .test .ScalatestMiniClusterJobStatusCheckingOps .miniClusterWithServicesToOps
15
15
import pl .touk .nussknacker .engine .flink .util .source .CollectionSource
16
16
import pl .touk .nussknacker .engine .flink .util .transformer .FlinkBaseComponentProvider
17
+ import pl .touk .nussknacker .engine .graph .expression .Expression
17
18
import pl .touk .nussknacker .engine .process .helpers .ConfigCreatorWithCollectingListener
18
19
import pl .touk .nussknacker .engine .process .runner .FlinkScenarioUnitTestJob
19
20
import pl .touk .nussknacker .engine .spel .SpelExtension ._
20
21
import pl .touk .nussknacker .engine .testing .LocalModelData
21
22
import pl .touk .nussknacker .engine .testmode .{ResultsCollectingListener , ResultsCollectingListenerHolder }
22
23
24
+ import java .util
23
25
import scala .collection .mutable
24
26
import scala .jdk .CollectionConverters ._
27
+ import scala .util .Random
25
28
26
29
class JavaCollectionsSerializationTest extends AnyFunSuite with FlinkSpec with Matchers with Inside {
27
30
28
31
private val processId = " aggregateFilterProcess"
29
32
30
- private val process : CanonicalProcess =
31
- ScenarioBuilder
33
+ private def process ( expressionOption : Option [ Expression ] = None ) : CanonicalProcess = {
34
+ val scenario = ScenarioBuilder
32
35
.streaming(processId)
33
36
.parallelism(1 )
34
37
.source(" start" , " start" )
38
+
39
+ expressionOption
40
+ .map(expression => scenario.buildSimpleVariable(" mapVariable" , " mapVariable" , expression))
41
+ .getOrElse(scenario)
35
42
.customNodeNoOutput(
36
43
" delay" ,
37
44
" delay" ,
38
45
" key" -> " #input.id" .spel,
39
46
" delay" -> " T(java.time.Duration).parse('PT30M')" .spel
40
47
)
41
48
.emptySink(" end" , " dead-end" )
49
+ }
50
+
51
+ private val record = Record (
52
+ id = " 2" ,
53
+ map = mutable.Map (1 -> " a" ).asJava,
54
+ list = mutable.ListBuffer (" abc" ).asJava,
55
+ set = mutable.Set (" def" ).asJava
56
+ )
42
57
43
58
// In Scala 2.13 all java collections class wrappers were rewritten from case class to regular class. Now kryo does not
44
59
// serialize them properly, so JavaWrapperScala2_13Registrar class was added to fix this issue. This test verifies
45
60
// if we can serialize and deserialize records properly.
46
61
test(" should serialize record with java map, list and set" ) {
47
- val record = Record (
48
- id = " 2" ,
49
- map = mutable.Map (1 -> " a" ).asJava,
50
- list = mutable.ListBuffer (" abc" ).asJava,
51
- set = mutable.Set (" def" ).asJava
52
- )
53
62
54
63
ResultsCollectingListenerHolder .withListener { collectingListener =>
55
64
val model = modelData(collectingListener, List (record))
56
65
57
- runScenario(model, process)
66
+ runScenario(model, process() )
58
67
59
68
val result = collectingListener.results
60
69
.nodeResults(" end" )
@@ -64,6 +73,40 @@ class JavaCollectionsSerializationTest extends AnyFunSuite with FlinkSpec with M
64
73
}
65
74
}
66
75
76
+ test(" should serialize java map without changing fields order" ) {
77
+
78
+ ResultsCollectingListenerHolder .withListener { collectingListener =>
79
+ val model = modelData(collectingListener, List (record))
80
+
81
+ val sampleMap =
82
+ ('a' to 'z' )
83
+ .map(x => x -> Random .nextInt())
84
+ .sortBy(_._2)
85
+
86
+ runScenario(
87
+ model,
88
+ process(
89
+ Some (
90
+ sampleMap
91
+ .map { case (c, i) => s """ " $c" : $i""" }
92
+ .mkString(" {" , " ," , " }" )
93
+ .spel
94
+ )
95
+ )
96
+ )
97
+
98
+ val linkedHashMap = new util.LinkedHashMap [String , AnyRef ]()
99
+ sampleMap.foreach { case (char, int) => linkedHashMap.put(s " $char" , Integer .valueOf(int)) }
100
+
101
+ val result = collectingListener.results
102
+ .nodeResults(" end" )
103
+ .map(_.variableTyped[Map [_, _]](" mapVariable" ))
104
+
105
+ result shouldBe List (Some (linkedHashMap))
106
+
107
+ }
108
+ }
109
+
67
110
def modelData (collectingListener : ResultsCollectingListener [Any ], list : List [Record ] = List ()): LocalModelData = {
68
111
val sourceComponent = SourceFactory .noParamUnboundedStreamFactory[Record ](
69
112
CollectionSource [Record ](list, None , Typed .fromDetailedType[List [Record ]])
0 commit comments