Skip to content

Commit 81ddef7

Browse files
[Fix][Connector-v2][SQLServer] Handle database names with special characters in SQL queries and add related tests (#10327)
1 parent 6a3fdf3 commit 81ddef7

File tree

8 files changed

+177
-17
lines changed

8 files changed

+177
-17
lines changed

seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/utils/SqlServerUtils.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -441,7 +441,7 @@ private static String buildSelectWithRowLimits(
441441
sql.append(" TOP( ").append(limit).append(") ");
442442
}
443443
sql.append(projection).append(" FROM ");
444-
sql.append(quoteSchemaAndTable(tableId));
444+
sql.append(quote(tableId));
445445
if (condition.isPresent()) {
446446
sql.append(" WHERE ").append(condition.get());
447447
}
@@ -467,7 +467,16 @@ public static String quote(String dbOrTableName) {
467467
}
468468

469469
public static String quote(TableId tableId) {
470-
return "[" + tableId.schema() + "].[" + tableId.table() + "]";
470+
StringBuilder quoted = new StringBuilder();
471+
if (tableId.catalog() != null && !tableId.catalog().isEmpty()) {
472+
quoted.append("[").append(tableId.catalog()).append("].");
473+
}
474+
quoted.append("[")
475+
.append(tableId.schema())
476+
.append("].[")
477+
.append(tableId.table())
478+
.append("]");
479+
return quoted.toString();
471480
}
472481

473482
private static void addPrimaryKeyColumnsToCondition(
@@ -495,7 +504,7 @@ private static String buildSelectWithBoundaryRowLimits(
495504
sql.append(" TOP( ").append(limit).append(") ");
496505
sql.append(projection);
497506
sql.append(" FROM ");
498-
sql.append(quoteSchemaAndTable(tableId));
507+
sql.append(quote(tableId));
499508
if (condition.isPresent()) {
500509
sql.append(" WHERE ").append(condition.get());
501510
}

seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/utils/TableDiscoveryUtils.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,9 @@ public static List<TableId> listTables(JdbcConnection jdbc, RelationalTableFilte
6363
for (String dbName : databaseNames) {
6464
try {
6565
jdbc.query(
66-
"SELECT * FROM "
66+
"SELECT * FROM ["
6767
+ dbName
68-
+ ".INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'BASE TABLE';",
68+
+ "].INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'BASE TABLE';",
6969
rs -> {
7070
while (rs.next()) {
7171
TableId tableId =

seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/utils/SqlServerUtilsTest.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public void testSplitScanQuery() {
3737
false,
3838
false);
3939
Assertions.assertEquals(
40-
"SELECT * FROM [schema1].[table1] WHERE [id] >= ? AND NOT ([id] = ?) AND [id] <= ?",
40+
"SELECT * FROM [db1].[schema1].[table1] WHERE [id] >= ? AND NOT ([id] = ?) AND [id] <= ?",
4141
splitScanSQL);
4242

4343
splitScanSQL =
@@ -47,7 +47,7 @@ public void testSplitScanQuery() {
4747
new String[] {"id"}, new SeaTunnelDataType[] {BasicType.LONG_TYPE}),
4848
true,
4949
true);
50-
Assertions.assertEquals("SELECT * FROM [schema1].[table1]", splitScanSQL);
50+
Assertions.assertEquals("SELECT * FROM [db1].[schema1].[table1]", splitScanSQL);
5151

5252
splitScanSQL =
5353
SqlServerUtils.buildSplitScanQuery(
@@ -57,7 +57,7 @@ public void testSplitScanQuery() {
5757
true,
5858
false);
5959
Assertions.assertEquals(
60-
"SELECT * FROM [schema1].[table1] WHERE [id] <= ? AND NOT ([id] = ?)",
60+
"SELECT * FROM [db1].[schema1].[table1] WHERE [id] <= ? AND NOT ([id] = ?)",
6161
splitScanSQL);
6262

6363
splitScanSQL =
@@ -67,6 +67,7 @@ public void testSplitScanQuery() {
6767
new String[] {"id"}, new SeaTunnelDataType[] {BasicType.LONG_TYPE}),
6868
false,
6969
true);
70-
Assertions.assertEquals("SELECT * FROM [schema1].[table1] WHERE [id] >= ?", splitScanSQL);
70+
Assertions.assertEquals(
71+
"SELECT * FROM [db1].[schema1].[table1] WHERE [id] >= ?", splitScanSQL);
7172
}
7273
}

seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -90,9 +90,9 @@ protected String getListDatabaseSql() {
9090

9191
@Override
9292
protected String getListTableSql(String databaseName) {
93-
return "SELECT TABLE_SCHEMA, TABLE_NAME FROM "
93+
return "SELECT TABLE_SCHEMA, TABLE_NAME FROM ["
9494
+ databaseName
95-
+ ".INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'BASE TABLE'";
95+
+ "].INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'BASE TABLE'";
9696
}
9797

9898
@Override
@@ -144,12 +144,12 @@ protected String getDropTableSql(TablePath tablePath) {
144144

145145
@Override
146146
protected String getCreateDatabaseSql(String databaseName) {
147-
return String.format("CREATE DATABASE %s", databaseName);
147+
return String.format("CREATE DATABASE [%s]", databaseName);
148148
}
149149

150150
@Override
151151
protected String getDropDatabaseSql(String databaseName) {
152-
return String.format("DROP DATABASE %s;", databaseName);
152+
return String.format("DROP DATABASE [%s];", databaseName);
153153
}
154154

155155
@Override

seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/PreviewActionTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -348,12 +348,12 @@ public void testSqlServerPreviewAction() {
348348
assertPreviewResult(
349349
catalog,
350350
Catalog.ActionType.CREATE_DATABASE,
351-
"CREATE DATABASE testddatabase",
351+
"CREATE DATABASE [testddatabase]",
352352
Optional.empty());
353353
assertPreviewResult(
354354
catalog,
355355
Catalog.ActionType.DROP_DATABASE,
356-
"DROP DATABASE testddatabase;",
356+
"DROP DATABASE [testddatabase];",
357357
Optional.empty());
358358
assertPreviewResult(
359359
catalog,

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -432,10 +432,13 @@ private void initializeSqlServerTable(String sqlFile) {
432432
Assertions.assertNotNull(ddlTestFile, "Cannot locate " + ddlFile);
433433
try (Connection connection = getJdbcConnection();
434434
Statement statement = connection.createStatement()) {
435-
dropTestDatabase(connection, sqlFile);
435+
List<String> ddlLines = Files.readAllLines(Paths.get(ddlTestFile.toURI()));
436+
String ddlContent = String.join("\n", ddlLines);
437+
String actualDatabaseName = extractDatabaseName(ddlContent);
438+
dropTestDatabase(connection, actualDatabaseName);
436439
final List<String> statements =
437440
Arrays.stream(
438-
Files.readAllLines(Paths.get(ddlTestFile.toURI())).stream()
441+
ddlLines.stream()
439442
.map(String::trim)
440443
.filter(x -> !x.startsWith("--") && !x.isEmpty())
441444
.map(
@@ -455,6 +458,17 @@ private void initializeSqlServerTable(String sqlFile) {
455458
}
456459
}
457460

461+
private String extractDatabaseName(String ddlContent) {
462+
Pattern createDbPattern =
463+
Pattern.compile(
464+
"CREATE\\s+DATABASE\\s+\\[?([^\\s\\];]+)\\]?", Pattern.CASE_INSENSITIVE);
465+
Matcher matcher = createDbPattern.matcher(ddlContent);
466+
if (matcher.find()) {
467+
return matcher.group(1);
468+
}
469+
return null;
470+
}
471+
458472
private void updateSourceTable(String table) {
459473
executeSql(
460474
"INSERT INTO "
@@ -587,4 +601,42 @@ protected static void disableDbCdc(Connection connection, String name) throws SQ
587601
Objects.requireNonNull(name);
588602
connection.createStatement().execute(DISABLE_DB_CDC.replace(STATEMENTS_PLACEHOLDER, name));
589603
}
604+
605+
@TestTemplate
606+
public void testDatabaseNameWithSpecialCharacters(TestContainer container) {
607+
initializeSqlServerTable("test_db_name");
608+
609+
CompletableFuture<Void> executeJobFuture =
610+
CompletableFuture.supplyAsync(
611+
() -> {
612+
try {
613+
container.executeJob("/sqlservercdc_special_db_name.conf");
614+
} catch (Exception e) {
615+
throw new RuntimeException(e);
616+
}
617+
return null;
618+
});
619+
620+
String sourceTable = "[test-db-name].dbo.simple_table";
621+
String sinkTable = "[test-db-name].dbo.simple_table_sink";
622+
String selectSql = "select id, name, value from %s order by id asc";
623+
624+
await().atMost(60000, TimeUnit.MILLISECONDS)
625+
.untilAsserted(
626+
() -> {
627+
Assertions.assertIterableEquals(
628+
querySql(selectSql, sourceTable),
629+
querySql(selectSql, sinkTable));
630+
});
631+
632+
executeSql("INSERT INTO [test-db-name].dbo.simple_table VALUES (4, 'test4', 400)");
633+
634+
await().atMost(60000, TimeUnit.MILLISECONDS)
635+
.untilAsserted(
636+
() -> {
637+
Assertions.assertIterableEquals(
638+
querySql(selectSql, sourceTable),
639+
querySql(selectSql, sinkTable));
640+
});
641+
}
590642
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
--
2+
-- Licensed to the Apache Software Foundation (ASF) under one or more
3+
-- contributor license agreements. See the NOTICE file distributed with
4+
-- this work for additional information regarding copyright ownership.
5+
-- The ASF licenses this file to You under the Apache License, Version 2.0
6+
-- (the "License"); you may not use this file except in compliance with
7+
-- the License. You may obtain a copy of the License at
8+
--
9+
-- http://www.apache.org/licenses/LICENSE-2.0
10+
--
11+
-- Unless required by applicable law or agreed to in writing, software
12+
-- distributed under the License is distributed on an "AS IS" BASIS,
13+
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
-- See the License for the specific language governing permissions and
15+
-- limitations under the License.
16+
17+
-- ----------------------------------------------------------------------------------------------------------------
18+
-- DATABASE: test-db-name (database name with hyphen to test special character handling)
19+
-- ----------------------------------------------------------------------------------------------------------------
20+
CREATE DATABASE [test-db-name];
21+
22+
USE [test-db-name];
23+
EXEC sys.sp_cdc_enable_db;
24+
25+
CREATE TABLE simple_table (
26+
id int NOT NULL,
27+
name varchar(100),
28+
value int,
29+
PRIMARY KEY (id)
30+
);
31+
32+
INSERT INTO simple_table VALUES (1, 'test1', 100);
33+
INSERT INTO simple_table VALUES (2, 'test2', 200);
34+
INSERT INTO simple_table VALUES (3, 'test3', 300);
35+
36+
EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'simple_table', @role_name = NULL, @supports_net_changes = 0;
37+
38+
CREATE TABLE simple_table_sink (
39+
id int NOT NULL,
40+
name varchar(100),
41+
value int,
42+
PRIMARY KEY (id)
43+
);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
env {
19+
# You can set engine configuration here
20+
parallelism = 1
21+
job.mode = "STREAMING"
22+
checkpoint.interval = 5000
23+
}
24+
25+
source {
26+
# This is a example source plugin **only for test and demonstrate the feature source plugin**
27+
SqlServer-CDC {
28+
plugin_output = "customers"
29+
username = "sa"
30+
password = "Password!"
31+
database-names = ["test-db-name"]
32+
table-names = ["test-db-name.dbo.simple_table"]
33+
url = "jdbc:sqlserver://sqlserver-host:1433;databaseName=test-db-name"
34+
35+
exactly_once = false
36+
}
37+
}
38+
39+
transform {
40+
}
41+
42+
sink {
43+
Jdbc {
44+
plugin_input = "customers"
45+
driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
46+
url = "jdbc:sqlserver://sqlserver-host:1433;encrypt=false"
47+
user = "sa"
48+
password = "Password!"
49+
generate_sink_sql = true
50+
database = "test-db-name"
51+
table = "dbo.simple_table_sink"
52+
batch_size = 1
53+
primary_keys = ["id"]
54+
}
55+
}

0 commit comments

Comments
 (0)