Skip to content

Commit 353bbd2

Browse files
authored
[Feature][Jdbc] Support sink ddl for postgresql (#8276)
1 parent ecd8269 commit 353bbd2

File tree

24 files changed

+1640
-37
lines changed

24 files changed

+1640
-37
lines changed

Diff for: docs/en/concept/schema-evolution.md

+41
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ Schema Evolution means that the schema of a data table can be changed and the da
2121
### Sink
2222
[Jdbc-Mysql](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/sink/Jdbc.md)
2323
[Jdbc-Oracle](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/sink/Jdbc.md)
24+
[Jdbc-Postgres](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/sink/Jdbc.md)
2425
[StarRocks](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/sink/StarRocks.md)
2526
[Doris](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/sink/Doris.md)
2627
[Paimon](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/sink/Paimon.md#Schema-Evolution)
@@ -245,4 +246,44 @@ sink {
245246
}
246247
}
247248
}
249+
```
250+
251+
### Mysql-CDC -> Jdbc-Postgres
252+
```hocon
253+
env {
254+
# You can set engine configuration here
255+
parallelism = 5
256+
job.mode = "STREAMING"
257+
checkpoint.interval = 5000
258+
read_limit.bytes_per_second=7000000
259+
read_limit.rows_per_second=400
260+
}
261+
262+
source {
263+
MySQL-CDC {
264+
server-id = 5652-5657
265+
username = "st_user_source"
266+
password = "mysqlpw"
267+
table-names = ["shop.products"]
268+
base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"
269+
270+
schema-changes.enabled = true
271+
}
272+
}
273+
274+
sink {
275+
jdbc {
276+
url = "jdbc:postgresql://postgresql:5432/shop"
277+
driver = "org.postgresql.Driver"
278+
user = "postgres"
279+
password = "postgres"
280+
generate_sink_sql = true
281+
database = shop
282+
table = "public.sink_table_with_schema_change"
283+
primary_keys = ["id"]
284+
285+
# Validate ddl update for sink writer multi replica
286+
multi_table_sink_replica = 2
287+
}
288+
}
248289
```

Diff for: docs/zh/concept/schema-evolution.md

+41
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
### 目标
2222
[Jdbc-Mysql](https://github.com/apache/seatunnel/blob/dev/docs/zh/connector-v2/sink/Jdbc.md)
2323
[Jdbc-Oracle](https://github.com/apache/seatunnel/blob/dev/docs/zh/connector-v2/sink/Jdbc.md)
24+
[Jdbc-Postgres](https://github.com/apache/seatunnel/blob/dev/docs/zh/connector-v2/sink/Jdbc.md)
2425
[StarRocks](https://github.com/apache/seatunnel/blob/dev/docs/zh/connector-v2/sink/StarRocks.md)
2526
[Doris](https://github.com/apache/seatunnel/blob/dev/docs/zh/connector-v2/sink/Doris.md)
2627
[Paimon](https://github.com/apache/seatunnel/blob/dev/docs/zh/connector-v2/sink/Paimon.md#模式演变)
@@ -247,3 +248,43 @@ sink {
247248
}
248249
}
249250
```
251+
252+
### Mysql-CDC -> Jdbc-Postgres
253+
```hocon
254+
env {
255+
# You can set engine configuration here
256+
parallelism = 5
257+
job.mode = "STREAMING"
258+
checkpoint.interval = 5000
259+
read_limit.bytes_per_second=7000000
260+
read_limit.rows_per_second=400
261+
}
262+
263+
source {
264+
MySQL-CDC {
265+
server-id = 5652-5657
266+
username = "st_user_source"
267+
password = "mysqlpw"
268+
table-names = ["shop.products"]
269+
base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"
270+
271+
schema-changes.enabled = true
272+
}
273+
}
274+
275+
sink {
276+
jdbc {
277+
url = "jdbc:postgresql://postgresql:5432/shop"
278+
driver = "org.postgresql.Driver"
279+
user = "postgres"
280+
password = "postgres"
281+
generate_sink_sql = true
282+
database = shop
283+
table = "public.sink_table_with_schema_change"
284+
primary_keys = ["id"]
285+
286+
# Validate ddl update for sink writer multi replica
287+
multi_table_sink_replica = 2
288+
}
289+
}
290+
```

Diff for: seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlTypeUtils.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
2222
import org.apache.seatunnel.connectors.seatunnel.common.source.TypeDefineUtils;
2323
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MySqlTypeConverter;
24-
import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.MysqlDefaultValueUtils;
24+
import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.DefaultValueUtils;
2525

2626
import io.debezium.connector.mysql.MySqlConnectorConfig;
2727
import io.debezium.connector.mysql.MySqlDefaultValueConverter;
@@ -69,7 +69,7 @@ public static org.apache.seatunnel.api.table.catalog.Column convertToSeaTunnelCo
6969
Optional<String> defaultValueExpression = column.defaultValueExpression();
7070
Object defaultValue = defaultValueExpression.orElse(null);
7171
if (defaultValueExpression.isPresent()
72-
&& !MysqlDefaultValueUtils.isSpecialDefaultValue(defaultValue)) {
72+
&& !DefaultValueUtils.isMysqlSpecialDefaultValue(defaultValue)) {
7373
defaultValue =
7474
mySqlDefaultValueConverter
7575
.parseDefaultValue(column, defaultValueExpression.get())

Diff for: seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java

+28-14
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
3535
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum;
3636
import org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceTable;
37+
import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.DefaultValueUtils;
3738

3839
import org.apache.commons.lang3.StringUtils;
3940

@@ -533,7 +534,8 @@ default boolean columnExists(Connection connection, TablePath tablePath, String
533534
default void applySchemaChange(
534535
Connection connection, TablePath tablePath, AlterTableAddColumnEvent event)
535536
throws SQLException {
536-
boolean sameCatalog = StringUtils.equals(dialectName(), event.getSourceDialectName());
537+
String sourceDialectName = event.getSourceDialectName();
538+
boolean sameCatalog = StringUtils.equals(dialectName(), sourceDialectName);
537539
BasicTypeDefine typeDefine = getTypeConverter().reconvert(event.getColumn());
538540
String columnType =
539541
sameCatalog ? event.getColumn().getSourceType() : typeDefine.getColumnType();
@@ -568,7 +570,9 @@ default void applySchemaChange(
568570
sqlBuilder.append(" NOT NULL");
569571
}
570572
if (sameCatalog) {
571-
sqlBuilder.append(" ").append(sqlClauseWithDefaultValue(typeDefine));
573+
sqlBuilder
574+
.append(" ")
575+
.append(sqlClauseWithDefaultValue(typeDefine, sourceDialectName));
572576
}
573577
}
574578

@@ -612,8 +616,8 @@ default void applySchemaChange(
612616
}
613617
return;
614618
}
615-
616-
boolean sameCatalog = StringUtils.equals(dialectName(), event.getSourceDialectName());
619+
String sourceDialectName = event.getSourceDialectName();
620+
boolean sameCatalog = StringUtils.equals(dialectName(), sourceDialectName);
617621
BasicTypeDefine typeDefine = getTypeConverter().reconvert(event.getColumn());
618622
String columnType =
619623
sameCatalog ? event.getColumn().getSourceType() : typeDefine.getColumnType();
@@ -649,7 +653,9 @@ default void applySchemaChange(
649653
sqlBuilder.append(" NOT NULL");
650654
}
651655
if (sameCatalog) {
652-
sqlBuilder.append(" ").append(sqlClauseWithDefaultValue(typeDefine));
656+
sqlBuilder
657+
.append(" ")
658+
.append(sqlClauseWithDefaultValue(typeDefine, sourceDialectName));
653659
}
654660
}
655661
if (event.getColumn().getComment() != null) {
@@ -674,8 +680,8 @@ default void applySchemaChange(
674680
default void applySchemaChange(
675681
Connection connection, TablePath tablePath, AlterTableModifyColumnEvent event)
676682
throws SQLException {
677-
678-
boolean sameCatalog = StringUtils.equals(dialectName(), event.getSourceDialectName());
683+
String sourceDialectName = event.getSourceDialectName();
684+
boolean sameCatalog = StringUtils.equals(dialectName(), sourceDialectName);
679685
BasicTypeDefine typeDefine = getTypeConverter().reconvert(event.getColumn());
680686
String columnType =
681687
sameCatalog ? event.getColumn().getSourceType() : typeDefine.getColumnType();
@@ -710,7 +716,9 @@ default void applySchemaChange(
710716
sqlBuilder.append(" NOT NULL");
711717
}
712718
if (sameCatalog) {
713-
sqlBuilder.append(" ").append(sqlClauseWithDefaultValue(typeDefine));
719+
sqlBuilder
720+
.append(" ")
721+
.append(sqlClauseWithDefaultValue(typeDefine, sourceDialectName));
714722
}
715723
}
716724
if (event.getColumn().getComment() != null) {
@@ -749,13 +757,15 @@ default void applySchemaChange(
749757
* Get the SQL clause for define column default value
750758
*
751759
* @param columnDefine column define
760+
* @param sourceDialectName
752761
* @return SQL clause for define default value
753762
*/
754-
default String sqlClauseWithDefaultValue(BasicTypeDefine columnDefine) {
763+
default String sqlClauseWithDefaultValue(
764+
BasicTypeDefine columnDefine, String sourceDialectName) {
755765
Object defaultValue = columnDefine.getDefaultValue();
756766
if (Objects.nonNull(defaultValue)
757-
&& needsQuotesWithDefaultValue(columnDefine.getColumnType())
758-
&& !isSpecialDefaultValue(defaultValue)) {
767+
&& needsQuotesWithDefaultValue(columnDefine)
768+
&& !isSpecialDefaultValue(defaultValue, sourceDialectName)) {
759769
defaultValue = quotesDefaultValue(defaultValue);
760770
}
761771
return "DEFAULT " + defaultValue;
@@ -774,20 +784,24 @@ default boolean supportDefaultValue(BasicTypeDefine columnDefine) {
774784
/**
775785
* whether quotes with default value
776786
*
777-
* @param sqlType sql type of column
787+
* @param columnDefine column define
778788
* @return whether needs quotes with the type
779789
*/
780-
default boolean needsQuotesWithDefaultValue(String sqlType) {
790+
default boolean needsQuotesWithDefaultValue(BasicTypeDefine columnDefine) {
781791
return false;
782792
}
783793

784794
/**
785795
* whether is special default value e.g. current_timestamp
786796
*
787797
* @param defaultValue default value of column
798+
* @param sourceDialectName source dialect name
788799
* @return whether is special default value e.g current_timestamp
789800
*/
790-
default boolean isSpecialDefaultValue(Object defaultValue) {
801+
default boolean isSpecialDefaultValue(Object defaultValue, String sourceDialectName) {
802+
if (DatabaseIdentifier.MYSQL.equals(sourceDialectName)) {
803+
return DefaultValueUtils.isMysqlSpecialDefaultValue(defaultValue);
804+
}
791805
return false;
792806
}
793807

Diff for: seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java

+2-8
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.SQLUtils;
2828
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum;
2929
import org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceTable;
30-
import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.MysqlDefaultValueUtils;
3130

3231
import org.apache.commons.lang3.StringUtils;
3332

@@ -241,8 +240,8 @@ public boolean supportDefaultValue(BasicTypeDefine typeBasicTypeDefine) {
241240
}
242241

243242
@Override
244-
public boolean needsQuotesWithDefaultValue(String sqlType) {
245-
MysqlType mysqlType = MysqlType.getByName(sqlType);
243+
public boolean needsQuotesWithDefaultValue(BasicTypeDefine columnDefine) {
244+
MysqlType mysqlType = MysqlType.getByName(columnDefine.getColumnType());
246245
switch (mysqlType) {
247246
case CHAR:
248247
case VARCHAR:
@@ -266,9 +265,4 @@ public boolean needsQuotesWithDefaultValue(String sqlType) {
266265
return false;
267266
}
268267
}
269-
270-
@Override
271-
public boolean isSpecialDefaultValue(Object defaultValue) {
272-
return MysqlDefaultValueUtils.isSpecialDefaultValue(defaultValue);
273-
}
274268
}

Diff for: seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlDialect.java

+2-8
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.SQLUtils;
2828
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum;
2929
import org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceTable;
30-
import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.MysqlDefaultValueUtils;
3130

3231
import org.apache.commons.lang3.StringUtils;
3332

@@ -244,8 +243,8 @@ public boolean supportDefaultValue(BasicTypeDefine typeBasicTypeDefine) {
244243
}
245244

246245
@Override
247-
public boolean needsQuotesWithDefaultValue(String sqlType) {
248-
OceanBaseMysqlType mysqlType = OceanBaseMysqlType.getByName(sqlType);
246+
public boolean needsQuotesWithDefaultValue(BasicTypeDefine columnDefine) {
247+
OceanBaseMysqlType mysqlType = OceanBaseMysqlType.getByName(columnDefine.getColumnType());
249248
switch (mysqlType) {
250249
case CHAR:
251250
case VARCHAR:
@@ -269,9 +268,4 @@ public boolean needsQuotesWithDefaultValue(String sqlType) {
269268
return false;
270269
}
271270
}
272-
273-
@Override
274-
public boolean isSpecialDefaultValue(Object defaultValue) {
275-
return MysqlDefaultValueUtils.isSpecialDefaultValue(defaultValue);
276-
}
277271
}

Diff for: seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -424,8 +424,8 @@ private String buildUpdateColumnSQL(
424424
} else {
425425
throw new IllegalArgumentException("Unsupported AlterTableColumnEvent: " + event);
426426
}
427-
428-
boolean sameCatalog = StringUtils.equals(dialectName(), event.getSourceDialectName());
427+
String sourceDialectName = event.getSourceDialectName();
428+
boolean sameCatalog = StringUtils.equals(dialectName(), sourceDialectName);
429429
BasicTypeDefine typeDefine = getTypeConverter().reconvert(column);
430430
String columnType = sameCatalog ? column.getSourceType() : typeDefine.getColumnType();
431431
StringBuilder sqlBuilder =
@@ -441,7 +441,7 @@ private String buildUpdateColumnSQL(
441441
// Only decorate with default value when source dialect is same as sink dialect
442442
// Todo Support for cross-database default values for ddl statements
443443
if (column.getDefaultValue() != null && sameCatalog) {
444-
sqlBuilder.append(" ").append(sqlClauseWithDefaultValue(typeDefine));
444+
sqlBuilder.append(" ").append(sqlClauseWithDefaultValue(typeDefine, sourceDialectName));
445445
}
446446
if (event instanceof AlterTableModifyColumnEvent) {
447447
boolean targetColumnNullable =

0 commit comments

Comments
 (0)