Skip to content

Commit 607a57c

Browse files
committed
add flink sql validations using env
1 parent bcad9aa commit 607a57c

21 files changed

+1064
-294
lines changed

build.sbt

+3-1
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,7 @@ lazy val commonSettings =
294294
// You can find versions provided by Flink in it's lib/flink-dist-*.jar/META-INF/DEPENDENCIES file.
295295
val flinkV = "1.19.2"
296296
val flinkConnectorKafkaV = "3.2.0-1.19"
297+
val jdbcFlinkConnectorV = "3.2.0-1.19"
297298
val flinkCommonsCompressV = "1.26.0"
298299
val flinkCommonsLang3V = "3.12.0"
299300
val flinkCommonsTextV = "1.10.0"
@@ -1861,7 +1862,8 @@ lazy val flinkTableApiComponents = (project in flink("components/table"))
18611862
"org.apache.flink" % "flink-sql-parser" % flinkV,
18621863
"org.apache.flink" % "flink-connector-files" % flinkV, // needed for testing data generation
18631864
"org.apache.flink" % "flink-json" % flinkV, // needed for testing data generation
1864-
"org.apache.flink" % "flink-csv" % flinkV % Test,
1865+
"org.apache.flink" % "flink-csv" % flinkV % Test,
1866+
"org.apache.flink" % "flink-connector-jdbc" % jdbcFlinkConnectorV % Test,
18651867
)
18661868
}
18671869
)

engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/FlinkSqlTableTestCases.scala

+6-2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package pl.touk.nussknacker.engine.flink.table
22

33
import pl.touk.nussknacker.engine.flink.table.definition.FlinkDataDefinition.FlinkSqlDdlStatement.{
44
CatalogName,
5+
CatalogType,
56
CreateCatalog,
67
SqlOption
78
}
@@ -17,7 +18,9 @@ object FlinkSqlTableTestCases {
1718
| someIntComputed AS someInt * 2,
1819
| `file.name` STRING NOT NULL METADATA
1920
|) WITH (
20-
| 'connector' = 'filesystem'
21+
| 'connector' = 'filesystem',
22+
| 'path' = '.',
23+
| 'format' = 'csv'
2124
|);""".stripMargin
2225

2326
val unboundedDatagenTable: String =
@@ -83,7 +86,8 @@ object FlinkSqlTableTestCases {
8386
SqlOption("username", "username"),
8487
SqlOption("password", "password"),
8588
SqlOption("base-url", "jdbc:postgresql://localhost:5432"),
86-
)
89+
),
90+
CatalogType("jdbc")
8791
)
8892

8993
}

engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/definition/FlinkDdlParserTest.scala

+37-25
Original file line numberDiff line numberDiff line change
@@ -6,35 +6,18 @@ import org.scalatest.funsuite.AnyFunSuite
66
import org.scalatest.matchers.should.Matchers
77
import pl.touk.nussknacker.engine.flink.table.FlinkSqlTableTestCases
88
import pl.touk.nussknacker.engine.flink.table.definition.FlinkDataDefinition.FlinkSqlDdlStatement.{
9+
Connector,
910
CreateTable,
1011
SqlString
1112
}
12-
import pl.touk.nussknacker.engine.flink.table.definition.FlinkDdlParseError.{ParseError, UnallowedStatement}
13+
import pl.touk.nussknacker.engine.flink.table.definition.FlinkDataDefinitionCreationError.FlinkDdlParseError._
1314
import pl.touk.nussknacker.engine.flink.table.definition.FlinkDdlParserTest.syntacticallyInvalidSqlStatements
1415

1516
class FlinkDdlParserTest extends AnyFunSuite with Matchers {
1617

17-
test("return error for syntactically invalid statements") {
18-
syntacticallyInvalidSqlStatements.foreach { s =>
19-
FlinkDdlParser.parse(s) should matchPattern { case Invalid(NonEmptyList(ParseError(_), _)) => }
20-
}
21-
}
22-
23-
test("return multiple errors for multiple unallowed statements") {
24-
val sqlStatements = FlinkDdlParserTest.unallowedSqlStatements.mkString(";\n")
25-
FlinkDdlParser.parse(sqlStatements) should matchPattern {
26-
case Invalid(NonEmptyList(UnallowedStatement(_), List(UnallowedStatement(_)))) =>
27-
}
28-
}
29-
30-
test("parses semantically invalid but parseable statements") {
31-
val sqlStatements = FlinkDdlParserTest.semanticallyIllegalButParseableStatements.mkString(";\n")
32-
FlinkDdlParser.parse(sqlStatements) should matchPattern { case Valid(_) => }
33-
}
34-
3518
test("parse valid create table statements") {
3619
FlinkDdlParser.parse(FlinkSqlTableTestCases.unboundedKafkaTable) shouldBe Valid(
37-
List(CreateTable(SqlString(FlinkSqlTableTestCases.unboundedKafkaTableFormatted)))
20+
List(CreateTable(SqlString(FlinkSqlTableTestCases.unboundedKafkaTableFormatted), Connector("kafka")))
3821
)
3922
}
4023

@@ -51,13 +34,45 @@ class FlinkDdlParserTest extends AnyFunSuite with Matchers {
5134
s"${FlinkSqlTableTestCases.PostgresCatalog.postgresCatalog}"
5235
) shouldBe Valid(
5336
List(
54-
CreateTable(SqlString(FlinkSqlTableTestCases.unboundedKafkaTableFormatted)),
55-
CreateTable(SqlString(FlinkSqlTableTestCases.unboundedDatagenTableFormatted)),
37+
CreateTable(SqlString(FlinkSqlTableTestCases.unboundedKafkaTableFormatted), Connector("kafka")),
38+
CreateTable(SqlString(FlinkSqlTableTestCases.unboundedDatagenTableFormatted), Connector("datagen")),
5639
FlinkSqlTableTestCases.PostgresCatalog.postgresCatalogParsed
5740
)
5841
)
5942
}
6043

44+
test("parses semantically invalid but parseable statements") {
45+
val sqlStatements = FlinkDdlParserTest.semanticallyIllegalButParseableStatements.mkString(";\n")
46+
FlinkDdlParser.parse(sqlStatements) should matchPattern { case Valid(_) => }
47+
}
48+
49+
test("return error for syntactically invalid statements") {
50+
syntacticallyInvalidSqlStatements.foreach { s =>
51+
FlinkDdlParser.parse(s) should matchPattern { case Invalid(NonEmptyList(ParseError(_), Nil)) => }
52+
}
53+
}
54+
55+
test("return multiple errors for multiple unallowed statements") {
56+
val sqlStatements = FlinkDdlParserTest.unallowedSqlStatements.mkString(";\n")
57+
FlinkDdlParser.parse(sqlStatements) should matchPattern {
58+
case Invalid(NonEmptyList(UnallowedStatement(_), List(UnallowedStatement(_)))) =>
59+
}
60+
}
61+
62+
test("return error for missing connector option") {
63+
val sqlStatement = "CREATE TABLE testTable (str STRING) WITH ()"
64+
FlinkDdlParser.parse(sqlStatement) should matchPattern {
65+
case Invalid(NonEmptyList(MissingConnectorOption(_), Nil)) =>
66+
}
67+
}
68+
69+
test("return error for missing catalog type") {
70+
val sqlStatement = "CREATE CATALOG testCatalog WITH ()"
71+
FlinkDdlParser.parse(sqlStatement) should matchPattern {
72+
case Invalid(NonEmptyList(MissingCatalogTypeOption(_, _), Nil)) =>
73+
}
74+
}
75+
6176
}
6277

6378
object FlinkDdlParserTest {
@@ -86,9 +101,6 @@ object FlinkDdlParserTest {
86101
|) WITH (
87102
| 'connector' = 'datagen'
88103
|)""".stripMargin,
89-
s"""|CREATE TABLE testTableWithoutOptions (
90-
| col STRING
91-
|)""".stripMargin,
92104
s"""|CREATE TABLE testTableWithEmptyConnector (
93105
| col STRING
94106
|) WITH (

0 commit comments

Comments
 (0)