Skip to content

Commit a8d0c66

Browse files
author
Pawel Czajka
committed
Merge remote-tracking branch 'origin/staging' into 2065-time-machine
2 parents 769bc2e + 85c2287 commit a8d0c66

File tree

783 files changed

+30148
-10959
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

783 files changed

+30148
-10959
lines changed

.github/workflows/pr.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -481,7 +481,8 @@ jobs:
481481
name: CypressTests-results
482482
path: designer/client/cypress-test-results/*.xml
483483
- name: Store test results
484-
if: failure()
484+
# We have to store test results also for success, because pipeline can fail on "Force fail if update PR created" stage
485+
if: success() || failure()
485486
uses: actions/upload-artifact@v4
486487
with:
487488
name: e2e-test-results

.run/RunEnvForLocalDesigner.run.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,4 @@
1212
<option name="Make" enabled="true" />
1313
</method>
1414
</configuration>
15-
</component>
15+
</component>

benchmarks/src/test/scala/pl/touk/nussknacker/engine/benchmarks/interpreter/InterpreterSetup.scala

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,13 @@ package pl.touk.nussknacker.engine.benchmarks.interpreter
33
import cats.Monad
44
import cats.data.Validated.{Invalid, Valid}
55
import cats.data.ValidatedNel
6-
import pl.touk.nussknacker.engine.{api, CustomProcessValidatorLoader, InterpretationResult, RuntimeMode}
6+
import pl.touk.nussknacker.engine.{
7+
api,
8+
CustomProcessValidatorLoader,
9+
InterpretationResult,
10+
RuntimeMode,
11+
ScenarioCompilationDependencies
12+
}
713
import pl.touk.nussknacker.engine.Interpreter.InterpreterShape
814
import pl.touk.nussknacker.engine.api._
915
import pl.touk.nussknacker.engine.api.component.{
@@ -13,6 +19,7 @@ import pl.touk.nussknacker.engine.api.component.{
1319
UnboundedStreamComponent
1420
}
1521
import pl.touk.nussknacker.engine.api.context.ProcessCompilationError
22+
import pl.touk.nussknacker.engine.api.definition.EngineScenarioCompilationDependencies
1623
import pl.touk.nussknacker.engine.api.exception.NuExceptionInfo
1724
import pl.touk.nussknacker.engine.api.process._
1825
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
@@ -38,10 +45,20 @@ class InterpreterSetup[T: ClassTag] {
3845
val jobData = JobData(process.metaData, ProcessVersion.empty.copy(processName = process.metaData.name))
3946
val compilerData = prepareCompilerData(jobData, additionalComponents)
4047
val interpreter = compilerData.interpreter
41-
val parts = failOnErrors(compilerData.compile(process))
48+
49+
implicit val engineScenarioCompilationDependencies: EngineScenarioCompilationDependencies =
50+
EngineScenarioCompilationDependencies.empty
51+
implicit val scenarioCompilationDependencies: ScenarioCompilationDependencies =
52+
new ScenarioCompilationDependencies(jobData, engineScenarioCompilationDependencies)
53+
54+
val parts = failOnErrors(compilerData.compile(process))
4255

4356
def compileNode(part: ProcessPart) =
44-
failOnErrors(compilerData.subPartCompiler.compile(part.node, part.validationContext)(jobData).result)
57+
failOnErrors(
58+
compilerData.subPartCompiler
59+
.compile(part.node, part.validationContext)
60+
.result
61+
)
4562

4663
val compiled = compileNode(parts.sources.head)
4764
(initialCtx: Context, ec: ServiceExecutionContext) => interpreter.interpret[F](compiled, jobData, initialCtx, ec)
@@ -66,7 +83,8 @@ class InterpreterSetup[T: ClassTag] {
6683
ComponentDefinitionExtractionMode.FinalDefinition
6784
),
6885
ModelDefinitionBuilder.emptyExpressionConfig,
69-
ClassExtractionSettings.Default
86+
ClassExtractionSettings.Default,
87+
allowEndingScenarioWithoutSink = false,
7088
)
7189
val definitionsWithTypes = ModelDefinitionWithClasses(definitions)
7290

build.sbt

Lines changed: 81 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,11 @@ import scala.util.Try
1414
import scala.xml.Elem
1515
import scala.xml.transform.{RewriteRule, RuleTransformer}
1616

17-
// Warning: Flink doesn't work correctly with 2.12.11
17+
// Warning: Flink doesn't work correctly with Scala 2.12.11 and higher.
18+
// Upgrading to a newer version of Scala 2.12 causes the JavaCollectionsSerializationTest to fail
19+
// because these versions switched to the same Java collection wrappers used in Scala 2.13.
20+
// These wrappers lack dedicated Kryo serializers, which we added in flink-scala-2.13 library https://github.com/TouK/flink-scala-2.13.
21+
// To bump Scala 2.12 we would need to do flink-scala-2.12 similar to flink-scala-2.13.
1822
val scala212 = "2.12.10"
1923
val scala213 = "2.13.15"
2024

@@ -105,6 +109,8 @@ def defaultMergeStrategy: String => MergeStrategy = {
105109
case PathList(ps @ _*) if ps.last == "module-info.class" => MergeStrategy.discard
106110
// we override Spring's class and we want to keep only our implementation
107111
case PathList(ps @ _*) if ps.last == "NumberUtils.class" => MergeStrategy.first
112+
case PathList(ps @ _*) if ps.last == "Projection.class" => MergeStrategy.first
113+
case PathList(ps @ _*) if ps.last == "Selection.class" => MergeStrategy.first
108114
// merge Netty version information files
109115
case PathList(ps @ _*) if ps.last == "io.netty.versions.properties" => MergeStrategy.concat
110116
// due to swagger-parser dependencies having different schema definitions (json-schema-validator and json-schema-core)
@@ -295,6 +301,7 @@ lazy val commonSettings =
295301
// You can find versions provided by Flink in it's lib/flink-dist-*.jar/META-INF/DEPENDENCIES file.
296302
val flinkV = "1.19.2"
297303
val flinkConnectorKafkaV = "3.2.0-1.19"
304+
val jdbcFlinkConnectorV = "3.2.0-1.19"
298305
val flinkCommonsCompressV = "1.26.0"
299306
val flinkCommonsLang3V = "3.12.0"
300307
val flinkCommonsTextV = "1.10.0"
@@ -306,6 +313,8 @@ val avroV = "1.11.4"
306313
//we should use max(version used by confluent, version acceptable by flink), https://docs.confluent.io/platform/current/installation/versions-interoperability.html - confluent version reference
307314
val kafkaV = "3.8.1"
308315
// to update we need configurable SpEL length limit from 6.0.9, but 6.x requires JDK 17
316+
// when updating note that we have copied and modified class org.springframework.expression.spel.ast.Projection
317+
// and org.springframework.util.NumberUtils and org.springframework.expression.spel.ast.Selection
309318
val springV = "5.2.23.RELEASE"
310319
val scalaTestV = "3.2.18"
311320
val scalaCheckV = "1.17.1"
@@ -323,6 +332,7 @@ val jacksonV = "2.17.2"
323332
val catsV = "2.12.0"
324333
val catsEffectV = "3.5.4"
325334
val everitSchemaV = "1.14.4"
335+
val fastParseV = "3.1.1"
326336
val slf4jV = "1.7.36"
327337
val scalaLoggingV = "3.9.5"
328338
val scalaCompatV = "1.0.2"
@@ -358,6 +368,7 @@ val cronParserV = "9.1.6" // 9.1.7+ requires JDK 16+
358368
val javaxValidationApiV = "2.0.1.Final"
359369
val caffeineCacheV = "3.1.8"
360370
val sttpV = "3.9.8"
371+
val sttpSharedV = "1.3.22"
361372
val tapirV = "1.11.7"
362373
val openapiCirceYamlV = "0.11.3"
363374
//we use legacy version because this one supports Scala 2.12
@@ -636,6 +647,7 @@ lazy val flinkDeploymentManager = (project in flink("management"))
636647
)
637648
.dependsOn(
638649
deploymentManagerApi % Provided,
650+
scenarioCompilerFlinkDeps,
639651
flinkMiniCluster,
640652
commonUtils % Provided,
641653
utilsInternal % Provided,
@@ -680,6 +692,7 @@ lazy val flinkDevModel = (project in flink("management/dev-model"))
680692
// It has to be in the default, Compile scope because all components are eagerly loaded so it will be loaded also
681693
// on the Flink side where this library is missing
682694
liteComponentsApi,
695+
defaultHelpers,
683696
componentsUtils % Provided,
684697
// TODO: NodeAdditionalInfoProvider & ComponentExtractor should probably be moved to API?
685698
scenarioCompiler % Provided,
@@ -807,6 +820,7 @@ lazy val flinkExecutor = (project in flink("executor"))
807820
}.toList,
808821
)
809822
.dependsOn(
823+
scenarioCompilerFlinkDeps,
810824
flinkComponentsUtils,
811825
flinkExtensionsApi,
812826
scenarioCompiler,
@@ -818,6 +832,19 @@ lazy val flinkExecutor = (project in flink("executor"))
818832
flinkTestUtils % Test,
819833
)
820834

835+
lazy val scenarioCompilerFlinkDeps = (project in flink("scenario-compiler-deps"))
836+
.settings(commonSettings)
837+
.settings(
838+
name := "nussknacker-flink-scenario-compiler-deps",
839+
libraryDependencies ++= {
840+
Seq(
841+
// Dependencies below are provided by flink-dist jar in production flink or by flink DM for scenario testing/state verification purpose
842+
"org.apache.flink" % "flink-streaming-java" % flinkV % Provided,
843+
)
844+
}
845+
)
846+
.dependsOn(componentsApi % Provided)
847+
821848
lazy val scenarioCompiler = (project in file("scenario-compiler"))
822849
.settings(commonSettings)
823850
.settings(
@@ -826,10 +853,12 @@ lazy val scenarioCompiler = (project in file("scenario-compiler"))
826853
Seq(
827854
"org.typelevel" %% "cats-effect" % catsEffectV,
828855
"org.scala-lang.modules" %% "scala-java8-compat" % scalaCompatV,
856+
"com.lihaoyi" %% "fastparse" % fastParseV,
829857
"org.apache.avro" % "avro" % avroV % Test,
830858
"org.scalacheck" %% "scalacheck" % scalaCheckV % Test,
831859
"com.cronutils" % "cron-utils" % cronParserV % Test,
832-
"org.scalatestplus" %% s"scalacheck-$scalaCheckVshort" % scalaTestPlusV % Test
860+
"org.scalatestplus" %% s"scalacheck-$scalaCheckVshort" % scalaTestPlusV % Test,
861+
"org.apache.flink" % "flink-core" % flinkV % Test,
833862
)
834863
}
835864
)
@@ -1011,6 +1040,7 @@ lazy val flinkKafkaComponentsUtils = (project in flink("kafka-components-utils")
10111040
.dependsOn(
10121041
componentsApi % Provided,
10131042
kafkaComponentsUtils,
1043+
schemedKafkaComponentsUtils,
10141044
flinkComponentsUtils % Provided,
10151045
flinkExtensionsApi % Provided,
10161046
componentsUtils % Provided,
@@ -1131,7 +1161,12 @@ lazy val mathUtils = (project in utils("math-utils"))
11311161
lazy val defaultHelpers = (project in utils("default-helpers"))
11321162
.settings(commonSettings)
11331163
.settings(
1134-
name := "nussknacker-default-helpers"
1164+
name := "nussknacker-default-helpers",
1165+
libraryDependencies ++= {
1166+
Seq(
1167+
"org.apache.flink" % "flink-core" % flinkV % Test,
1168+
)
1169+
}
11351170
)
11361171
.dependsOn(mathUtils, commonUtils, testUtils % Test, scenarioCompiler % "test->test;test->compile")
11371172

@@ -1229,16 +1264,34 @@ lazy val flinkMiniCluster = (project in flink("minicluster"))
12291264
name := "nussknacker-flink-minicluster",
12301265
libraryDependencies ++= {
12311266
Seq(
1232-
("org.apache.flink" % "flink-streaming-java" % flinkV)
1267+
("org.apache.flink" % "flink-streaming-java" % flinkV)
12331268
.excludeAll(
12341269
ExclusionRule("log4j", "log4j"),
12351270
ExclusionRule("org.slf4j", "slf4j-log4j12"),
12361271
ExclusionRule("com.esotericsoftware", "kryo-shaded"),
12371272
),
1238-
"org.apache.flink" % "flink-statebackend-rocksdb" % flinkV,
1239-
"org.scala-lang.modules" %% "scala-collection-compat" % scalaCollectionsCompatV % Provided,
1240-
"com.typesafe.scala-logging" %% "scala-logging" % scalaLoggingV % Provided,
1241-
"com.softwaremill.retry" %% "retry" % retryV,
1273+
"org.apache.flink" % "flink-statebackend-rocksdb" % flinkV,
1274+
// Below is a list of libs that are available in flink distribution
1275+
// We want to make flink minicluster as featured as standard flink distribution
1276+
"org.apache.flink" % "flink-connector-files" % flinkV,
1277+
"org.apache.flink" % "flink-csv" % flinkV,
1278+
"org.apache.flink" % "flink-json" % flinkV,
1279+
("org.apache.flink" % "flink-table-api-java-bridge" % flinkV)
1280+
.excludeAll(
1281+
ExclusionRule("com.esotericsoftware", "kryo-shaded")
1282+
),
1283+
("org.apache.flink" % "flink-table-runtime" % flinkV)
1284+
.excludeAll(
1285+
ExclusionRule("com.esotericsoftware", "kryo-shaded")
1286+
),
1287+
("org.apache.flink" % "flink-table-planner-loader" % flinkV)
1288+
.excludeAll(
1289+
ExclusionRule("com.esotericsoftware", "kryo-shaded")
1290+
),
1291+
// end of list
1292+
"org.scala-lang.modules" %% "scala-collection-compat" % scalaCollectionsCompatV % Provided,
1293+
"com.typesafe.scala-logging" %% "scala-logging" % scalaLoggingV % Provided,
1294+
"com.softwaremill.retry" %% "retry" % retryV,
12421295
) ++ flinkLibScalaDeps(scalaVersion.value)
12431296
}
12441297
)
@@ -1329,7 +1382,12 @@ lazy val liteKafkaComponentsTests: Project = (project in lite("components/kafka-
13291382
)
13301383
},
13311384
)
1332-
.dependsOn(liteEngineKafkaComponentsApi % Test, componentsUtils % Test, liteComponentsTestkit % Test)
1385+
.dependsOn(
1386+
liteEngineKafkaComponentsApi % Test,
1387+
componentsUtils % Test,
1388+
liteComponentsTestkit % Test,
1389+
kafkaTestUtils % Test
1390+
)
13331391

13341392
lazy val liteRequestResponseComponents = (project in lite("components/request-response"))
13351393
.settings(commonSettings)
@@ -1717,6 +1775,7 @@ lazy val httpUtils = (project in utils("http-utils"))
17171775
"com.softwaremill.sttp.client3" %% "circe" % sttpV,
17181776
"com.softwaremill.sttp.client3" %% "async-http-client-backend-future" % sttpV,
17191777
"io.netty" % "netty-transport-native-epoll" % nettyV,
1778+
"io.netty" % "netty-handler" % nettyV,
17201779
)
17211780
}
17221781
)
@@ -1739,7 +1798,8 @@ lazy val openapiComponents = (project in component("openapi"))
17391798
ExclusionRule(organization = "jakarta.validation")
17401799
),
17411800
"org.apache.flink" % "flink-streaming-java" % flinkV % Provided,
1742-
"org.scalatest" %% "scalatest" % scalaTestV % "it,test"
1801+
"org.scalatest" %% "scalatest" % scalaTestV % "it,test",
1802+
"org.wiremock" % "wiremock" % wireMockV % Test,
17431803
),
17441804
)
17451805
.dependsOn(
@@ -1836,9 +1896,7 @@ lazy val flinkBaseComponentsTests = (project in flink("components/base-tests"))
18361896
.settings(
18371897
name := "nussknacker-flink-base-components-tests",
18381898
libraryDependencies ++= Seq(
1839-
"org.apache.flink" % "flink-connector-files" % flinkV % Test,
1840-
"org.apache.flink" % "flink-csv" % flinkV % Test,
1841-
"org.apache.flink" % "flink-json" % flinkV % Test
1899+
"org.apache.flink" % "flink-connector-jdbc" % jdbcFlinkConnectorV % Test,
18421900
)
18431901
)
18441902
.dependsOn(
@@ -1869,16 +1927,12 @@ lazy val flinkTableApiComponents = (project in flink("components/table"))
18691927
name := "nussknacker-flink-table-components",
18701928
libraryDependencies ++= {
18711929
Seq(
1872-
"org.apache.calcite" % "calcite-linq4j" % calciteV, // required by fliink-sql-parser
1873-
"org.apache.flink" % "flink-table-api-java" % flinkV,
1874-
"org.apache.flink" % "flink-table-api-java-bridge" % flinkV,
1875-
"org.apache.flink" % "flink-table-planner-loader" % flinkV,
1876-
"org.apache.flink" % "flink-table-runtime" % flinkV,
1877-
"org.apache.flink" % "flink-clients" % flinkV,
18781930
"org.apache.flink" % "flink-sql-parser" % flinkV,
1879-
"org.apache.flink" % "flink-connector-files" % flinkV, // needed for testing data generation
1880-
"org.apache.flink" % "flink-json" % flinkV, // needed for testing data generation
1881-
"org.apache.flink" % "flink-csv" % flinkV % Test,
1931+
"org.apache.calcite" % "calcite-linq4j" % calciteV, // required by fliink-sql-parser
1932+
"org.apache.flink" % "flink-streaming-java" % flinkV % Provided,
1933+
"org.apache.flink" % "flink-table-api-java" % flinkV % Provided,
1934+
"org.apache.flink" % "flink-table-api-java-bridge" % flinkV % Provided,
1935+
"org.apache.flink" % "flink-connector-jdbc" % jdbcFlinkConnectorV % Test,
18821936
)
18831937
}
18841938
)
@@ -1889,7 +1943,8 @@ lazy val flinkTableApiComponents = (project in flink("components/table"))
18891943
componentsUtils % Provided,
18901944
flinkComponentsUtils % Provided,
18911945
jsonUtils % Provided,
1892-
flinkMiniCluster % Provided,
1946+
extensionsApi % Provided,
1947+
flinkMiniCluster % Test,
18931948
testUtils % Test,
18941949
flinkComponentsTestkit % Test,
18951950
)
@@ -1931,7 +1986,8 @@ lazy val customHttpServiceApi = (project in file("designer/custom-http-service-a
19311986
name := "nussknacker-custom-http-service-api",
19321987
libraryDependencies ++= {
19331988
Seq(
1934-
"org.apache.pekko" %% "pekko-http" % pekkoHttpV,
1989+
"org.apache.pekko" %% "pekko-http" % pekkoHttpV,
1990+
"com.softwaremill.sttp.shared" %% "pekko" % sttpSharedV,
19351991
)
19361992
}
19371993
)
@@ -2237,6 +2293,7 @@ lazy val modules = List[ProjectReference](
22372293
utilsInternal,
22382294
testUtils,
22392295
flinkExecutor,
2296+
scenarioCompilerFlinkDeps,
22402297
flinkSchemedKafkaComponentsUtils,
22412298
flinkKafkaComponentsUtils,
22422299
flinkComponentsUtils,

common-api/src/main/scala/pl/touk/nussknacker/engine/api/parameter/ParameterName.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import io.circe.generic.extras.semiauto.{deriveUnwrappedDecoder, deriveUnwrapped
55

66
final case class ParameterName(value: String) {
77
def withBranchId(branchId: String): ParameterName = ParameterName(s"$value for branch $branchId")
8+
9+
override def toString: String = value
810
}
911

1012
object ParameterName {

common-api/src/main/scala/pl/touk/nussknacker/engine/graph/expression/Expression.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import pl.touk.nussknacker.engine.graph.expression.Expression.Language
77
import pl.touk.nussknacker.engine.graph.expression.Expression.Language.{
88
DictKeyWithLabel,
99
Json,
10+
JsonTemplate,
1011
Spel,
1112
SpelTemplate,
1213
TabularDataDefinition
@@ -25,6 +26,7 @@ object Expression {
2526
case DictKeyWithLabel => "dictKeyWithLabel"
2627
case TabularDataDefinition => "tabularDataDefinition"
2728
case Json => "json"
29+
case JsonTemplate => "jsonTemplate"
2830
}
2931

3032
}
@@ -35,6 +37,7 @@ object Expression {
3537
object DictKeyWithLabel extends Language
3638
object TabularDataDefinition extends Language
3739
object Json extends Language
40+
object JsonTemplate extends Language
3841

3942
implicit val encoder: Encoder[Language] = Encoder.encodeString.contramap(_.toString)
4043

@@ -44,6 +47,7 @@ object Expression {
4447
case "dictKeyWithLabel" => Right(DictKeyWithLabel)
4548
case "tabularDataDefinition" => Right(TabularDataDefinition)
4649
case "json" => Right(Json)
50+
case "jsonTemplate" => Right(JsonTemplate)
4751
case unknown => Left(s"Unknown language [$unknown]")
4852
}
4953

@@ -61,4 +65,6 @@ object Expression {
6165
def tabularDataDefinition(definition: String): Expression = Expression(Language.TabularDataDefinition, definition)
6266

6367
def json(jsonString: String): Expression = Expression(Language.Json, jsonString)
68+
69+
def jsonTemplate(jsonTemplateString: String): Expression = Expression(Language.JsonTemplate, jsonTemplateString)
6470
}

components-api/src/main/java/pl/touk/nussknacker/engine/api/DefaultValue.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,5 @@
99
@Retention(RetentionPolicy.RUNTIME)
1010
public @interface DefaultValue {
1111
String value();
12-
}
12+
ExpressionLanguage language() default ExpressionLanguage.SPEL;
13+
}

0 commit comments

Comments
 (0)