Skip to content

Flink SQL validations using env #7722

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 10 commits into
base: staging
Choose a base branch
from
5 changes: 4 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ lazy val commonSettings =
// You can find versions provided by Flink in it's lib/flink-dist-*.jar/META-INF/DEPENDENCIES file.
val flinkV = "1.19.2"
val flinkConnectorKafkaV = "3.2.0-1.19"
val jdbcFlinkConnectorV = "3.2.0-1.19"
val flinkCommonsCompressV = "1.26.0"
val flinkCommonsLang3V = "3.12.0"
val flinkCommonsTextV = "1.10.0"
Expand Down Expand Up @@ -1861,7 +1862,8 @@ lazy val flinkTableApiComponents = (project in flink("components/table"))
"org.apache.flink" % "flink-sql-parser" % flinkV,
"org.apache.flink" % "flink-connector-files" % flinkV, // needed for testing data generation
"org.apache.flink" % "flink-json" % flinkV, // needed for testing data generation
"org.apache.flink" % "flink-csv" % flinkV % Test,
"org.apache.flink" % "flink-csv" % flinkV % Test,
"org.apache.flink" % "flink-connector-jdbc" % jdbcFlinkConnectorV % Test,
)
}
)
Expand All @@ -1873,6 +1875,7 @@ lazy val flinkTableApiComponents = (project in flink("components/table"))
flinkComponentsUtils % Provided,
jsonUtils % Provided,
flinkMiniCluster % Provided,
extensionsApi % Provided,
testUtils % Test,
flinkComponentsTestkit % Test,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pl.touk.nussknacker.engine.flink.table

import pl.touk.nussknacker.engine.flink.table.definition.FlinkDataDefinition.FlinkSqlDdlStatement.{
CatalogName,
CatalogType,
CreateCatalog,
SqlOption
}
Expand All @@ -17,7 +18,9 @@ object FlinkSqlTableTestCases {
| someIntComputed AS someInt * 2,
| `file.name` STRING NOT NULL METADATA
|) WITH (
| 'connector' = 'filesystem'
| 'connector' = 'filesystem',
| 'path' = '.',
| 'format' = 'csv'
|);""".stripMargin

val unboundedDatagenTable: String =
Expand Down Expand Up @@ -83,7 +86,8 @@ object FlinkSqlTableTestCases {
SqlOption("username", "username"),
SqlOption("password", "password"),
SqlOption("base-url", "jdbc:postgresql://localhost:5432"),
)
),
CatalogType("jdbc")
)

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,35 +6,18 @@ import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers
import pl.touk.nussknacker.engine.flink.table.FlinkSqlTableTestCases
import pl.touk.nussknacker.engine.flink.table.definition.FlinkDataDefinition.FlinkSqlDdlStatement.{
Connector,
CreateTable,
SqlString
}
import pl.touk.nussknacker.engine.flink.table.definition.FlinkDdlParseError.{ParseError, UnallowedStatement}
import pl.touk.nussknacker.engine.flink.table.definition.FlinkDataDefinitionCreationError.FlinkDdlParseError._
import pl.touk.nussknacker.engine.flink.table.definition.FlinkDdlParserTest.syntacticallyInvalidSqlStatements

class FlinkDdlParserTest extends AnyFunSuite with Matchers {

test("return error for syntactically invalid statements") {
syntacticallyInvalidSqlStatements.foreach { s =>
FlinkDdlParser.parse(s) should matchPattern { case Invalid(NonEmptyList(ParseError(_), _)) => }
}
}

test("return multiple errors for multiple unallowed statements") {
val sqlStatements = FlinkDdlParserTest.unallowedSqlStatements.mkString(";\n")
FlinkDdlParser.parse(sqlStatements) should matchPattern {
case Invalid(NonEmptyList(UnallowedStatement(_), List(UnallowedStatement(_)))) =>
}
}

test("parses semantically invalid but parseable statements") {
val sqlStatements = FlinkDdlParserTest.semanticallyIllegalButParseableStatements.mkString(";\n")
FlinkDdlParser.parse(sqlStatements) should matchPattern { case Valid(_) => }
}

test("parse valid create table statements") {
FlinkDdlParser.parse(FlinkSqlTableTestCases.unboundedKafkaTable) shouldBe Valid(
List(CreateTable(SqlString(FlinkSqlTableTestCases.unboundedKafkaTableFormatted)))
List(CreateTable(SqlString(FlinkSqlTableTestCases.unboundedKafkaTableFormatted), Connector("kafka")))
)
}

Expand All @@ -51,13 +34,45 @@ class FlinkDdlParserTest extends AnyFunSuite with Matchers {
s"${FlinkSqlTableTestCases.PostgresCatalog.postgresCatalog}"
) shouldBe Valid(
List(
CreateTable(SqlString(FlinkSqlTableTestCases.unboundedKafkaTableFormatted)),
CreateTable(SqlString(FlinkSqlTableTestCases.unboundedDatagenTableFormatted)),
CreateTable(SqlString(FlinkSqlTableTestCases.unboundedKafkaTableFormatted), Connector("kafka")),
CreateTable(SqlString(FlinkSqlTableTestCases.unboundedDatagenTableFormatted), Connector("datagen")),
FlinkSqlTableTestCases.PostgresCatalog.postgresCatalogParsed
)
)
}

test("parses semantically invalid but parseable statements") {
val sqlStatements = FlinkDdlParserTest.semanticallyIllegalButParseableStatements.mkString(";\n")
FlinkDdlParser.parse(sqlStatements) should matchPattern { case Valid(_) => }
}

test("return error for syntactically invalid statements") {
syntacticallyInvalidSqlStatements.foreach { s =>
FlinkDdlParser.parse(s) should matchPattern { case Invalid(NonEmptyList(ParseError(_), Nil)) => }
}
}

test("return multiple errors for multiple unallowed statements") {
val sqlStatements = FlinkDdlParserTest.unallowedSqlStatements.mkString(";\n")
FlinkDdlParser.parse(sqlStatements) should matchPattern {
case Invalid(NonEmptyList(UnallowedStatement(_), List(UnallowedStatement(_)))) =>
}
}

test("return error for missing connector option") {
val sqlStatement = "CREATE TABLE testTable (str STRING) WITH ()"
FlinkDdlParser.parse(sqlStatement) should matchPattern {
case Invalid(NonEmptyList(MissingConnectorOption(_), Nil)) =>
}
}

test("return error for missing catalog type") {
val sqlStatement = "CREATE CATALOG testCatalog WITH ()"
FlinkDdlParser.parse(sqlStatement) should matchPattern {
case Invalid(NonEmptyList(MissingCatalogTypeOption(_, _), Nil)) =>
}
}

}

object FlinkDdlParserTest {
Expand Down Expand Up @@ -86,9 +101,6 @@ object FlinkDdlParserTest {
|) WITH (
| 'connector' = 'datagen'
|)""".stripMargin,
s"""|CREATE TABLE testTableWithoutOptions (
| col STRING
|)""".stripMargin,
s"""|CREATE TABLE testTableWithEmptyConnector (
| col STRING
|) WITH (
Expand Down
Loading
Loading