Skip to content

Commit e09445c

Browse files
authored
[Fix][Connector-V2] Fix SqlServer create table when database with dot (#9007)
1 parent 97191c5 commit e09445c

File tree

5 files changed

+77
-24
lines changed

5 files changed

+77
-24
lines changed

seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCreateTableSqlBuilder.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ public String build(TablePath tablePath, CatalogTable catalogTable) {
152152
tableAndColumnComment.append(
153153
String.format(
154154
"EXEC %s.sys.sp_addextendedproperty 'MS_Description', N'%s', 'schema', N'%s', 'table', N'%s';\n",
155-
tablePath.getDatabaseName(),
155+
"[" + tablePath.getDatabaseName() + "]",
156156
comment,
157157
tablePath.getSchemaName(),
158158
tablePath.getTableName()));
@@ -164,7 +164,7 @@ public String build(TablePath tablePath, CatalogTable catalogTable) {
164164
tableAndColumnComment.append(
165165
String.format(
166166
columnComment,
167-
tablePath.getDatabaseName(),
167+
"[" + tablePath.getDatabaseName() + "]",
168168
com,
169169
tablePath.getSchemaName(),
170170
tablePath.getTableName(),

seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilder.java

+1-6
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal;
1919

2020
import org.apache.seatunnel.api.table.catalog.Column;
21-
import org.apache.seatunnel.api.table.catalog.TablePath;
2221
import org.apache.seatunnel.api.table.catalog.TableSchema;
2322
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
2423
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkConfig;
@@ -61,11 +60,7 @@ public JdbcOutputFormat build() {
6160
JdbcOutputFormat.StatementExecutorFactory statementExecutorFactory;
6261

6362
final String database = jdbcSinkConfig.getDatabase();
64-
final String table =
65-
dialect.extractTableName(
66-
TablePath.of(
67-
jdbcSinkConfig.getDatabase() + "." + jdbcSinkConfig.getTable()));
68-
63+
final String table = jdbcSinkConfig.getTable();
6964
final List<String> primaryKeys = jdbcSinkConfig.getPrimaryKeys();
7065
if (jdbcSinkConfig.isUseCopyStatement()) {
7166
statementExecutorFactory =

seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/PreviewActionTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -373,8 +373,8 @@ public void testSqlServerPreviewAction() {
373373
+ "CREATE TABLE [testddatabase].[testtable] ( \n"
374374
+ "\t[test] NVARCHAR(MAX) NULL\n"
375375
+ ");\n"
376-
+ "EXEC testddatabase.sys.sp_addextendedproperty 'MS_Description', N'comment', 'schema', N'null', 'table', N'testtable';\n"
377-
+ "EXEC testddatabase.sys.sp_addextendedproperty 'MS_Description', N'', 'schema', N'null', 'table', N'testtable', 'column', N'test';\n"
376+
+ "EXEC [testddatabase].sys.sp_addextendedproperty 'MS_Description', N'comment', 'schema', N'null', 'table', N'testtable';\n"
377+
+ "EXEC [testddatabase].sys.sp_addextendedproperty 'MS_Description', N'', 'schema', N'null', 'table', N'testtable', 'column', N'test';\n"
378378
+ "\n"
379379
+ "END",
380380
Optional.of(CATALOG_TABLE));

seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCreateTableSqlBuilderTest.java

+14-14
Original file line numberDiff line numberDiff line change
@@ -120,13 +120,13 @@ public void testBuild() {
120120
+ "\t[lastUpdateTime] DATETIME2 NULL, \n"
121121
+ "\tPRIMARY KEY ([id])\n"
122122
+ ");\n"
123-
+ "EXEC test_database.sys.sp_addextendedproperty 'MS_Description', N'User table', 'schema', N'null', 'table', N'test_table';\n"
124-
+ "EXEC test_database.sys.sp_addextendedproperty 'MS_Description', N'blob_v', 'schema', N'null', 'table', N'test_table', 'column', N'blob_v';\n"
125-
+ "EXEC test_database.sys.sp_addextendedproperty 'MS_Description', N'createTime', 'schema', N'null', 'table', N'test_table', 'column', N'createTime';\n"
126-
+ "EXEC test_database.sys.sp_addextendedproperty 'MS_Description', N'name', 'schema', N'null', 'table', N'test_table', 'column', N'name';\n"
127-
+ "EXEC test_database.sys.sp_addextendedproperty 'MS_Description', N'id', 'schema', N'null', 'table', N'test_table', 'column', N'id';\n"
128-
+ "EXEC test_database.sys.sp_addextendedproperty 'MS_Description', N'age', 'schema', N'null', 'table', N'test_table', 'column', N'age';\n"
129-
+ "EXEC test_database.sys.sp_addextendedproperty 'MS_Description', N'lastUpdateTime', 'schema', N'null', 'table', N'test_table', 'column', N'lastUpdateTime';\n"
123+
+ "EXEC [test_database].sys.sp_addextendedproperty 'MS_Description', N'User table', 'schema', N'null', 'table', N'test_table';\n"
124+
+ "EXEC [test_database].sys.sp_addextendedproperty 'MS_Description', N'blob_v', 'schema', N'null', 'table', N'test_table', 'column', N'blob_v';\n"
125+
+ "EXEC [test_database].sys.sp_addextendedproperty 'MS_Description', N'createTime', 'schema', N'null', 'table', N'test_table', 'column', N'createTime';\n"
126+
+ "EXEC [test_database].sys.sp_addextendedproperty 'MS_Description', N'name', 'schema', N'null', 'table', N'test_table', 'column', N'name';\n"
127+
+ "EXEC [test_database].sys.sp_addextendedproperty 'MS_Description', N'id', 'schema', N'null', 'table', N'test_table', 'column', N'id';\n"
128+
+ "EXEC [test_database].sys.sp_addextendedproperty 'MS_Description', N'age', 'schema', N'null', 'table', N'test_table', 'column', N'age';\n"
129+
+ "EXEC [test_database].sys.sp_addextendedproperty 'MS_Description', N'lastUpdateTime', 'schema', N'null', 'table', N'test_table', 'column', N'lastUpdateTime';\n"
130130
+ "\n"
131131
+ "END";
132132

@@ -149,13 +149,13 @@ public void testBuild() {
149149
+ "\t[createTime] DATETIME2 NULL, \n"
150150
+ "\t[lastUpdateTime] DATETIME2 NULL\n"
151151
+ ");\n"
152-
+ "EXEC test_database.sys.sp_addextendedproperty 'MS_Description', N'User table', 'schema', N'null', 'table', N'test_table';\n"
153-
+ "EXEC test_database.sys.sp_addextendedproperty 'MS_Description', N'blob_v', 'schema', N'null', 'table', N'test_table', 'column', N'blob_v';\n"
154-
+ "EXEC test_database.sys.sp_addextendedproperty 'MS_Description', N'createTime', 'schema', N'null', 'table', N'test_table', 'column', N'createTime';\n"
155-
+ "EXEC test_database.sys.sp_addextendedproperty 'MS_Description', N'name', 'schema', N'null', 'table', N'test_table', 'column', N'name';\n"
156-
+ "EXEC test_database.sys.sp_addextendedproperty 'MS_Description', N'id', 'schema', N'null', 'table', N'test_table', 'column', N'id';\n"
157-
+ "EXEC test_database.sys.sp_addextendedproperty 'MS_Description', N'age', 'schema', N'null', 'table', N'test_table', 'column', N'age';\n"
158-
+ "EXEC test_database.sys.sp_addextendedproperty 'MS_Description', N'lastUpdateTime', 'schema', N'null', 'table', N'test_table', 'column', N'lastUpdateTime';\n"
152+
+ "EXEC [test_database].sys.sp_addextendedproperty 'MS_Description', N'User table', 'schema', N'null', 'table', N'test_table';\n"
153+
+ "EXEC [test_database].sys.sp_addextendedproperty 'MS_Description', N'blob_v', 'schema', N'null', 'table', N'test_table', 'column', N'blob_v';\n"
154+
+ "EXEC [test_database].sys.sp_addextendedproperty 'MS_Description', N'createTime', 'schema', N'null', 'table', N'test_table', 'column', N'createTime';\n"
155+
+ "EXEC [test_database].sys.sp_addextendedproperty 'MS_Description', N'name', 'schema', N'null', 'table', N'test_table', 'column', N'name';\n"
156+
+ "EXEC [test_database].sys.sp_addextendedproperty 'MS_Description', N'id', 'schema', N'null', 'table', N'test_table', 'column', N'id';\n"
157+
+ "EXEC [test_database].sys.sp_addextendedproperty 'MS_Description', N'age', 'schema', N'null', 'table', N'test_table', 'column', N'age';\n"
158+
+ "EXEC [test_database].sys.sp_addextendedproperty 'MS_Description', N'lastUpdateTime', 'schema', N'null', 'table', N'test_table', 'column', N'lastUpdateTime';\n"
159159
+ "\n"
160160
+ "END";
161161
CONSOLE.println(expectSkipIndex);

seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilderTest.java

+58
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,30 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal;
1919

20+
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
21+
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
22+
import org.apache.seatunnel.api.table.catalog.TableSchema;
2023
import org.apache.seatunnel.api.table.type.BasicType;
2124
import org.apache.seatunnel.api.table.type.RowKind;
2225
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
2326
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
2427
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
28+
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.TestConnection;
29+
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkConfig;
30+
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.SimpleJdbcConnectionProvider;
31+
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlserver.SqlServerDialect;
32+
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlserver.SqlserverJdbcRowConverter;
2533

2634
import org.junit.jupiter.api.Assertions;
2735
import org.junit.jupiter.api.Test;
36+
import org.mockito.ArgumentCaptor;
37+
import org.mockito.Mockito;
2838

39+
import java.io.IOException;
40+
import java.sql.SQLException;
2941
import java.util.Arrays;
42+
import java.util.HashMap;
43+
import java.util.Map;
3044
import java.util.function.Function;
3145

3246
public class JdbcOutputFormatBuilderTest {
@@ -71,4 +85,48 @@ public void testKeyExtractor() {
7185
updateAfter.setField(0, "2");
7286
Assertions.assertNotEquals(keyExtractor.apply(insertRow), keyExtractor.apply(updateAfter));
7387
}
88+
89+
@Test
90+
public void testBuildFormatWithDatabaseWithDot()
91+
throws SQLException, ClassNotFoundException, IOException {
92+
93+
TableSchema schema =
94+
TableSchema.builder()
95+
.column(PhysicalColumn.of("id", BasicType.INT_TYPE, 22L, false, null, "id"))
96+
.build();
97+
98+
Map<String, Object> config = new HashMap<>();
99+
config.put("database", "databasewith.dot");
100+
config.put("table", "dbo.tableName");
101+
102+
SqlServerDialect dialect = Mockito.mock(SqlServerDialect.class);
103+
Mockito.when(dialect.getRowConverter()).thenReturn(new SqlserverJdbcRowConverter());
104+
Mockito.when(
105+
dialect.getInsertIntoStatement(
106+
Mockito.anyString(), Mockito.anyString(), Mockito.any()))
107+
.thenReturn("");
108+
109+
SimpleJdbcConnectionProvider provider = Mockito.mock(SimpleJdbcConnectionProvider.class);
110+
Mockito.when(provider.getOrEstablishConnection()).thenReturn(new TestConnection());
111+
Mockito.when(provider.getConnection()).thenReturn(new TestConnection());
112+
113+
JdbcOutputFormat outputFormat =
114+
new JdbcOutputFormatBuilder(
115+
dialect,
116+
provider,
117+
JdbcSinkConfig.of(ReadonlyConfig.fromMap(config)),
118+
schema,
119+
schema)
120+
.build();
121+
outputFormat.open();
122+
123+
ArgumentCaptor<String> database = ArgumentCaptor.forClass(String.class);
124+
ArgumentCaptor<String> table = ArgumentCaptor.forClass(String.class);
125+
126+
Mockito.verify(dialect)
127+
.getInsertIntoStatement(database.capture(), table.capture(), Mockito.any());
128+
129+
Assertions.assertEquals("databasewith.dot", database.getValue());
130+
Assertions.assertEquals("dbo.tableName", table.getValue());
131+
}
74132
}

0 commit comments

Comments
 (0)