import ChangeLog from '../changelog/connector-jdbc.md';
JDBC 数据接收器
通过jdbc写入数据。支持批处理模式和流处理模式,支持并发写入,支持精确一次语义(使用XA事务保证)
- 需要确保jdbc驱动jar包已经放在目录
${SEATUNNEL_HOME}/plugins/
下。
- 需要确保jdbc驱动jar包已经放到
${SEATUNNEL_HOME}/lib/
目录下。
使用 Xa transactions
来确保 exactly-once
。所以仅对于支持 Xa transactions
的数据库支持 exactly-once
。你可以设置 is_exactly_once=true
来启用它。
名称 | 类型 | 是否必须 | 默认值 |
---|---|---|---|
url | String | 是 | - |
driver | String | 是 | - |
user | String | 否 | - |
password | String | 否 | - |
query | String | 否 | - |
compatible_mode | String | 否 | - |
database | String | 否 | - |
table | String | 否 | - |
primary_keys | Array | 否 | - |
support_upsert_by_query_primary_key_exist | Boolean | 否 | false |
connection_check_timeout_sec | Int | 否 | 30 |
max_retries | Int | 否 | 0 |
batch_size | Int | 否 | 1000 |
is_exactly_once | Boolean | 否 | false |
generate_sink_sql | Boolean | 否 | false |
xa_data_source_class_name | String | 否 | - |
max_commit_attempts | Int | 否 | 3 |
transaction_timeout_sec | Int | 否 | -1 |
auto_commit | Boolean | 否 | true |
field_ide | String | 否 | - |
properties | Map | 否 | - |
common-options | 否 | - | |
schema_save_mode | Enum | 否 | CREATE_SCHEMA_WHEN_NOT_EXIST |
data_save_mode | Enum | 否 | APPEND_DATA |
custom_sql | String | 否 | - |
enable_upsert | Boolean | 否 | true |
use_copy_statement | Boolean | 否 | false |
write_mode | Enum | 否 | sql |
temp_table_name | String | 否 | - |
temp_column_batch_code | String | 否 | - |
temp_column_row_kind | String | 否 | - |
用于连接远程数据源的 jdbc 类名,如果使用MySQL,则值为com.mysql.cj.jdbc.Driver
用户名
密码
JDBC 连接的 URL。参考案例:jdbc:postgresql://localhost/test
使用 sql 语句将上游输入数据写入到数据库。如 INSERT ...
数据库的兼容模式,当数据库支持多种兼容模式时需要。
例如,使用 OceanBase 数据库时,需要将其设置为 'mysql' 或 'oracle' 。使用StarRocks时,需要将其设置为starrocks
。
Postgres 9.5及以下版本,请设置为 postgresLow
来支持 CDC
使用此 database
和 table-name
自动生成 SQL,并接收上游输入的数据写入数据库。
此选项与 query
选项是互斥的,此选项具有更高的优先级。
使用 database
和此 table-name
自动生成 SQL,并接收上游输入的数据写入数据库。
此选项与 query
选项是互斥的,此选项具有更高的优先级。
table参数可以填入一个任意的表名,这个名字最终会被用作创建表的表名,并且支持变量(${table_name}
,${schema_name}
)。
替换规则如下:${schema_name}
将替换传递给目标端的 SCHEMA 名称,${table_name}
将替换传递给目标端的表名。
mysql 接收器示例:
- test_${schema_name}_${table_name}_test
- sink_sinktable
- ss_${table_name}
pgsql (Oracle Sqlserver ...) 接收器示例:
- ${schema_name}.${table_name}_test
- dbo.tt_${table_name}_sink
- public.sink_table
Tip: 如果目标数据库有 SCHEMA 的概念,则表参数必须写成 xxx.xxx
该选项用于辅助生成 insert、delete、update 等 sql 语句。设置了该选项,将会根据该选项生成对应的 sql 语句
根据查询主键是否存在来选择使用 INSERT sql、UPDATE sql 来处理变更事件(INSERT、UPDATE_AFTER)。仅当数据库不支持 upsert 语法时才使用此配置 注意:该方法性能较低
用于验证数据库连接的有效性时等待数据库操作完成所需的时间,单位是秒
重试提交失败的最大次数(executeBatch)
对于批量写入,当缓冲的记录数达到 batch_size
数量或者时间达到 checkpoint.interval
时,数据将被刷新到数据库中
是否启用通过XA事务实现的精确一次语义。开启,你还需要设置 xa_data_source_class_name
根据要写入的数据库表结构生成 sql 语句
指数据库驱动的 XA 数据源的类名。以 MySQL 为例,其类名为 com.mysql.cj.jdbc.MysqlXADataSource。了解其他数据库的数据源类名,可以参考文档的附录部分
事务提交失败的最大重试次数
在事务开启后的超时时间,默认值为-1(即永不超时)。请注意,设置超时时间可能会影响到精确一次(exactly-once)的语义
默认启用自动事务提交
字段 field_ide
用于在从 source 同步到 sink 时,确定字段是否需要转换为大写或小写。'ORIGINAL' 表示不需要转换,'UPPERCASE' 表示转换为大写,'LOWERCASE' 表示转换为小写
附加连接配置参数,当属性和URL具有相同参数时,优先级由驱动程序的具体实现确定。例如,在 MySQL 中,属性配置优先于 URL。
Sink插件常用参数,请参考 Sink常用选项 了解详情
在启动同步任务之前,针对目标侧已有的表结构选择不同的处理方案
选项介绍:
RECREATE_SCHEMA
:当表不存在时会创建,当表已存在时会删除并重建
CREATE_SCHEMA_WHEN_NOT_EXIST
:当表不存在时会创建,当表已存在时则跳过创建
ERROR_WHEN_SCHEMA_NOT_EXIST
:当表不存在时将抛出错误
IGNORE
:忽略对表的处理
在启动同步任务之前,针对目标侧已存在的数据选择不同的处理方案
选项介绍:
DROP_DATA
:保留数据库结构,删除数据
APPEND_DATA
:保留数据库结构,保留数据
CUSTOM_PROCESSING
:允许用户自定义数据处理方式
ERROR_WHEN_DATA_EXISTS
:当有数据时抛出错误
当data_save_mode
选择CUSTOM_PROCESSING
时,需要填写CUSTOM_SQL
参数。该参数通常填写一条可以执行的SQL。SQL将在同步任务之前执行
启用通过主键更新插入,如果任务没有key重复数据,设置该参数为 false 可以加快数据导入速度
写入模式支持五种模式:SQL, COPY, COPY_SQL, MERGE, COPY_MERGE
- SQL(默认): 传统使用JDBC的SQL模式,支持全量和增量的写入
- COPY: 使用COPY命令导入数据(需数据库支持如Postgres),仅支持全量写入
- COPY_SQL: 使用COPY命令导入数据(需数据库支持),如果有增量数据动态切换为SQL模式写入
- MERGE: 使用COPY命令导入临时表(需数据库支持),然后MERGE到目标表(需数据库支持),支持全量和增量的写入
- COPY_MERGE: 使用COPY命令将全量数据导入目标表(需数据库支持);如果有增量数据动态切换为使用COPY命令导入到临时表,然后MERGE到目标表(需数据库支持),支持全量和增量的写入
注意: 当使用 MERGE/COPY_MERGE 写入模式时,它将自动创建一个与目标表结构相同的临时表。
在 MERGE/COPY_MERGE 写入模式中使用的临时表名称。如果未指定,系统将根据原始表名称生成,并添加后缀 _tmp
。
在 MERGE/COPY_MERGE 写入模式中用于批量写入数据的临时列。如果未指定,系统将默认使用 __st_batch_code
作为列名。
在 MERGE/COPY_MERGE 写入模式中用于识别数据类型的临时列。如果未指定,系统将默认使用 __st_row_kind
作为列名。
在 is_exactly_once = "true" 的情况下,使用 XA 事务。这需要数据库支持,有些数据库需要一些设置:
1 postgres 需要设置 max_prepared_transactions > 1
例如 ALTER SYSTEM set max_prepared_transactions to 10
2 mysql 版本需要 >= 8.0.29
并且非 root 用户需要授予 XA_RECOVER_ADMIN
权限。例如:将 test_db.* 上的 XA_RECOVER_ADMIN
授予 'user1'@'%'
3 mysql可以尝试在url中添加 rewriteBatchedStatements=true
参数以获得更好的性能
附录参数仅提供参考
数据源 | driver | url | xa_data_source_class_name | maven |
---|---|---|---|---|
MySQL | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | com.mysql.cj.jdbc.MysqlXADataSource | https://mvnrepository.com/artifact/mysql/mysql-connector-java |
PostgreSQL | org.postgresql.Driver | jdbc:postgresql://localhost:5432/postgres | org.postgresql.xa.PGXADataSource | https://mvnrepository.com/artifact/org.postgresql/postgresql |
DM | dm.jdbc.driver.DmDriver | jdbc:dm://localhost:5236 | dm.jdbc.driver.DmdbXADataSource | https://mvnrepository.com/artifact/com.dameng/DmJdbcDriver18 |
Phoenix | org.apache.phoenix.queryserver.client.Driver | jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF | / | https://mvnrepository.com/artifact/com.aliyun.phoenix/ali-phoenix-shaded-thin-client |
SQL Server | com.microsoft.sqlserver.jdbc.SQLServerDriver | jdbc:sqlserver://localhost:1433 | com.microsoft.sqlserver.jdbc.SQLServerXADataSource | https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc |
Oracle | oracle.jdbc.OracleDriver | jdbc:oracle:thin:@localhost:1521/xepdb1 | oracle.jdbc.xa.OracleXADataSource | https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8 |
sqlite | org.sqlite.JDBC | jdbc:sqlite:test.db | / | https://mvnrepository.com/artifact/org.xerial/sqlite-jdbc |
GBase8a | com.gbase.jdbc.Driver | jdbc:gbase://e2e_gbase8aDb:5258/test | / | https://cdn.gbase.cn/products/30/p5CiVwXBKQYIUGN8ecHvk/gbase-connector-java-9.5.0.7-build1-bin.jar |
StarRocks | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | / | https://mvnrepository.com/artifact/mysql/mysql-connector-java |
db2 | com.ibm.db2.jcc.DB2Driver | jdbc:db2://localhost:50000/testdb | com.ibm.db2.jcc.DB2XADataSource | https://mvnrepository.com/artifact/com.ibm.db2.jcc/db2jcc/db2jcc4 |
saphana | com.sap.db.jdbc.Driver | jdbc:sap://localhost:39015 | / | https://mvnrepository.com/artifact/com.sap.cloud.db.jdbc/ngdbc |
Doris | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | / | https://mvnrepository.com/artifact/mysql/mysql-connector-java |
teradata | com.teradata.jdbc.TeraDriver | jdbc:teradata://localhost/DBS_PORT=1025,DATABASE=test | / | https://mvnrepository.com/artifact/com.teradata.jdbc/terajdbc |
Redshift | com.amazon.redshift.jdbc42.Driver | jdbc:redshift://localhost:5439/testdb | com.amazon.redshift.xa.RedshiftXADataSource | https://mvnrepository.com/artifact/com.amazon.redshift/redshift-jdbc42 |
Snowflake | net.snowflake.client.jdbc.SnowflakeDriver | jdbc:snowflake://<account_name>.snowflakecomputing.com | / | https://mvnrepository.com/artifact/net.snowflake/snowflake-jdbc |
Vertica | com.vertica.jdbc.Driver | jdbc:vertica://localhost:5433 | / | https://repo1.maven.org/maven2/com/vertica/jdbc/vertica-jdbc/12.0.3-0/vertica-jdbc-12.0.3-0.jar |
Kingbase | com.kingbase8.Driver | jdbc:kingbase8://localhost:54321/db_test | / | https://repo1.maven.org/maven2/cn/com/kingbase/kingbase8/8.6.0/kingbase8-8.6.0.jar |
OceanBase | com.oceanbase.jdbc.Driver | jdbc:oceanbase://localhost:2881 | / | https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.12/oceanbase-client-2.4.12.jar |
opengauss | org.opengauss.Driver | jdbc:opengauss://localhost:5432/postgres | / | https://repo1.maven.org/maven2/org/opengauss/opengauss-jdbc/5.1.0-og/opengauss-jdbc-5.1.0-og.jar |
Highgo | com.highgo.jdbc.Driver | jdbc:highgo://localhost:5866/highgo | / | https://repo1.maven.org/maven2/com/highgo/HgdbJdbc/6.2.3/HgdbJdbc-6.2.3.jar |
简单示例
jdbc {
url = "jdbc:mysql://localhost:3306/test"
driver = "com.mysql.cj.jdbc.Driver"
user = "root"
password = "123456"
query = "insert into test_table(name,age) values(?,?)"
}
精确一次 (Exactly-once)
通过设置 is_exactly_once
开启精确一次语义
jdbc {
url = "jdbc:mysql://localhost:3306/test"
driver = "com.mysql.cj.jdbc.Driver"
max_retries = 0
user = "root"
password = "123456"
query = "insert into test_table(name,age) values(?,?)"
is_exactly_once = "true"
xa_data_source_class_name = "com.mysql.cj.jdbc.MysqlXADataSource"
}
变更数据捕获 (Change data capture) 事件
jdbc 接收 CDC 示例
sink {
jdbc {
url = "jdbc:mysql://localhost:3306"
driver = "com.mysql.cj.jdbc.Driver"
user = "root"
password = "123456"
database = "sink_database"
table = "sink_table"
primary_keys = ["key1", "key2", ...]
}
}
配置表生成策略 (schema_save_mode)
通过设置 schema_save_mode
配置为 CREATE_SCHEMA_WHEN_NOT_EXIST
来支持不存在表时创建表
sink {
jdbc {
url = "jdbc:mysql://localhost:3306"
driver = "com.mysql.cj.jdbc.Driver"
user = "root"
password = "123456"
database = "sink_database"
table = "sink_table"
primary_keys = ["key1", "key2", ...]
schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
data_save_mode="APPEND_DATA"
}
}
支持Postgres 9.5及以下版本的 CDC 示例
Postgres 9.5及以下版本,通过设置 compatible_mode
配置为 postgresLow
来支持 Postgres CDC 操作
sink {
jdbc {
url = "jdbc:postgresql://localhost:5432"
driver = "org.postgresql.Driver"
user = "root"
password = "123456"
compatible_mode="postgresLow"
database = "sink_database"
table = "sink_table"
support_upsert_by_query_primary_key_exist = true
generate_sink_sql = true
primary_keys = ["key1", "key2", ...]
}
}