|
| 1 | +# MySQL |
| 2 | + |
| 3 | +> JDBC Mysql Sink 连接器 |
| 4 | + |
| 5 | +## 支持的Mysql版本 |
| 6 | + |
| 7 | +- 5.5/5.6/5.7/8.0/8.1/8.2/8.3/8.4 |
| 8 | + |
| 9 | +## 引擎支持 |
| 10 | + |
| 11 | +> Spark<br/> |
| 12 | +> Flink<br/> |
| 13 | +> SeaTunnel Zeta<br/> |
| 14 | +
|
| 15 | +## 描述 |
| 16 | + |
| 17 | +通过jdbc写入数据。支持批处理模式和流模式,支持并发写入,支持exactly-once精确一次 |
| 18 | +语义(使用XA事务保证)。 |
| 19 | + |
| 20 | +## 需要的依赖项 |
| 21 | + |
| 22 | +### 对于 Spark/Flink 引擎 |
| 23 | + |
| 24 | +> 1. 您需要确保 [jdbc 驱动程序 jar 包](https://mvnrepository.com/artifact/mysql/mysql-connector-java) 已放置在目录 `${SEATUNNEL_HOME}/plugins/` 中。 |
| 25 | +
|
| 26 | +### 对于 SeaTunnel Zeta 引擎 |
| 27 | + |
| 28 | +> 1. 您需要确保 [jdbc 驱动程序 jar 包](https://mvnrepository.com/artifact/mysql/mysql-connector-java) 已放置在目录 `${SEATUNNEL_HOME}/lib/` 中。 |
| 29 | +
|
| 30 | +## 主要功能 |
| 31 | + |
| 32 | +- [x] [精确一次](../../concept/connector-v2-features.md) |
| 33 | +- [x] [cdc](../../concept/connector-v2-features.md) |
| 34 | + |
| 35 | +>使用“Xa事务”来确保“精确一次”。因此,数据库只支持“精确一次”,即 |
| 36 | +>支持“Xa事务”。您可以设置`is_exactly_once=true `来启用它。 |
| 37 | +
|
| 38 | +## 支持的数据源信息 |
| 39 | + |
| 40 | +| 数据源 | 支持的版本 | 驱动器 | 网址 | Maven下载链接 | |
| 41 | +|-----|---------------------------------------------------------|--------------------------|---------------------------------------|---------------------------------------------------------------------| |
| 42 | +| Mysql | 不同的依赖版本具有不同的驱动程序类。 | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306:3306/test | [下载](https://mvnrepository.com/artifact/mysql/mysql-connector-java) | |
| 43 | + |
| 44 | + |
| 45 | +## 数据类型映射 |
| 46 | + |
| 47 | +| Mysql 数据类型 | SeaTunnel 数据类型 | |
| 48 | +|-----------------------------------------------------------------------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------| |
| 49 | +| BIT(1)<br/>INT UNSIGNED | BOOLEAN | |
| 50 | +| TINYINT<br/>TINYINT UNSIGNED<br/>SMALLINT<br/>SMALLINT UNSIGNED<br/>MEDIUMINT<br/>MEDIUMINT UNSIGNED<br/>INT<br/>INTEGER<br/>YEAR | INT | |
| 51 | +| INT UNSIGNED<br/>INTEGER UNSIGNED<br/>BIGINT | BIGINT | |
| 52 | +| BIGINT UNSIGNED | DECIMAL(20,0) | |
| 53 | +| DECIMAL(x,y)(获取指定列的列大小<38) | DECIMAL(x,y) | |
| 54 | +| DECIMAL(x,y)(获取指定列的列大小>38) | DECIMAL(38,18) | |
| 55 | +| DECIMAL UNSIGNED | DECIMAL((DECIMAL((获取指定列的列大小)+1,<br/>(获取指定列的小数点右侧的位数))) | |
| 56 | +| FLOAT<br/>FLOAT UNSIGNED | FLOAT | |
| 57 | +| DOUBLE<br/>DOUBLE UNSIGNED | DOUBLE | |
| 58 | +| CHAR<br/>VARCHAR<br/>TINYTEXT<br/>MEDIUMTEXT<br/>TEXT<br/>LONGTEXT<br/>JSON | STRING | |
| 59 | +| DATE | DATE | |
| 60 | +| TIME | TIME | |
| 61 | +| DATETIME<br/>TIMESTAMP | TIMESTAMP | |
| 62 | +| TINYBLOB<br/>MEDIUMBLOB<br/>BLOB<br/>LONGBLOB<br/>BINARY<br/>VARBINAR<br/>BIT(n) | BYTES | |
| 63 | +| GEOMETRY<br/>UNKNOWN | Not supported yet | |
| 64 | + |
| 65 | +## Sink 参数 |
| 66 | + |
| 67 | +| 名称 | 类型 | 是否必填 | 默认值 | 描述 | |
| 68 | +|-------------------------------------------|---------|----------|------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| |
| 69 | +| url | String | 是 | - | JDBC 连接的 URL。参见示例: <br/>`jdbc:mysql://localhost:3306:3306/test`。 | |
| 70 | +| driver | String | 是 | - | 用于连接远程数据源的 JDBC 类名,<br/>如果使用 MySQL,值为 `com.mysql.cj.jdbc.Driver`。 | |
| 71 | +| user | String | 否 | - | 连接实例用户名。 | |
| 72 | +| password | String | 否 | - | 连接实例密码。 | |
| 73 | +| query | String | 否 | - | 使用此sql将上游输入数据写入数据库。例如: `INSERT ...`,`query` 具有更高的优先级 | |
| 74 | +| database | String | 否 | - | 使用此 `database` 和 `table-name` 自动生成sql并接收上游输入数据写入数据库。<br/>此选项与`query` 互斥,具有更高的优先级 | |
| 75 | +| table | String | 否 | - | 使用数据库和此表名自动生成sql并接收上游输入数据写入数据库。<br/>此选项与`query` 互斥,具有更高的优先级 | |
| 76 | +| primary_keys | Array | 否 | - | 此选项用于支持以下操作,例如 `insert`, `delete`, 和 `update` 当自动生成sql. | |
| 77 | +| support_upsert_by_query_primary_key_exist | Boolean | 否 | false | 选择使用INSERT sql、UPDATE sql根据查询主键是否存在来处理更新事件(INSERT、UPDATE_AFTER)。此配置仅在数据库不支持升级语法时使用**注**:此方法性能低 | |
| 78 | +| connection_check_timeout_sec | Int | 否 | 30 | 等待用于验证连接的数据库操作完成的时间(秒)。 | |
| 79 | +| max_retries | Int | 否 | 0 | 提交失败的重试次数(executeBatch) | |
| 80 | +| batch_size | Int | 否 | 1000 | 对于批量写入,当缓冲记录的数量达到“batch_size”的数量或时间达到“checkpoint.interval”<br/>时,数据将被刷新到数据库中 | |
| 81 | +| is_exactly_once | Boolean | 否 | false | 是否启用精确一次语义,这将使用Xa事务。如果启用,则需要<br/>设置`xa_data_source_class_name`。 | |
| 82 | +| generate_sink_sql | Boolean | 否 | false | 根据要写入的数据库表生成sql语句 | |
| 83 | +| xa_data_source_class_name | String | 否 | - | 数据库Driver的xa数据源类名,例如mysql是`com.mysql.cj.jdbc。MysqlXADataSource,和<br/>请参阅附录了解其他数据源 | |
| 84 | +| max_commit_attempts | Int | 否 | 3 | 事务提交失败的重试次数 | |
| 85 | +| transaction_timeout_sec | Int | 否 | -1 | 事务打开后的超时,默认值为-1(永不超时)。请注意,设置超时可能会影响<br/>精确一次语义 | |
| 86 | +| auto_commit | Boolean | 否 | true | 默认情况下启用自动事务提交 | |
| 87 | +| field_ide | String | 否 | - | 确定从源同步到汇时是否需要转换字段`ORIGINAL表示不需要转换`大写`表示转换为大写`LOWERCASE表示转换为小写。 | |
| 88 | +| properties | Map | 否 | - | 其他连接配置参数,当属性和URL具有相同的参数时,优先级由驱动程序的特定实现决定。例如,在MySQL中,属性优先于URL。 | |
| 89 | +| common-options | | 否 | - | Sink插件常用参数,请参考 [Sink Common Options](../sink-common-options.md) 详见 | |
| 90 | +| schema_save_mode | Enum | 否 | CREATE_SCHEMA_WHEN_NOT_EXIST | 在启动同步任务之前,对目标侧的现有表面结构选择不同的处理方案。 | |
| 91 | +| data_save_mode | Enum | 否 | APPEND_DATA | 在启动同步任务之前,对目标端的现有数据选择不同的处理方案。 | |
| 92 | +| custom_sql | String | 否 | - | 当data_save_mode选择CUSTOM_PROCESSING时,您应该填写CUSTOM_SQL参数。此参数通常填充可以执行的SQL。SQL将在同步任务之前执行。 | |
| 93 | +| enable_upsert | Boolean | 否 | true | 通过primary_keys存在启用upstart,如果任务只有“插入”,将此参数设置为“false”可以加快数据导入 | |
| 94 | + |
| 95 | +### 提示 |
| 96 | + |
| 97 | +>如果未设置partition_column,它将以单并发运行,如果设置了partition_coolumn,它将根据任务的并发性并行执行。 |
| 98 | +
|
| 99 | +## 任务示例 |
| 100 | + |
| 101 | +### 简单的例子: |
| 102 | + |
| 103 | +>此示例定义了一个SeaTunnel同步任务,该任务通过FakeSource自动生成数据并将其发送到JDBC Sink。FakeSource总共生成16行数据(row.num=16),每行有两个字段,name(字符串类型)和age(int类型)。最终的目标表是test_table,表中也将有16行数据。在运行此作业之前,您需要在mysql中创建数据库测试表test_table。如果您尚未安装和部署SeaTunnel,则需要按照[安装SeaTunnel](../../start-v2/local/deployment.md)中的说明安装和部署SeaTunnel。然后按照[快速启动SeaTunnel引擎](../../Start-v2/locale/Quick-Start SeaTunnel Engine.md)中的说明运行此作业。 |
| 104 | +
|
| 105 | +``` |
| 106 | +# 定义运行时环境 |
| 107 | +env { |
| 108 | + parallelism = 1 |
| 109 | + job.mode = "BATCH" |
| 110 | +} |
| 111 | +
|
| 112 | +source { |
| 113 | + # This is a example source plugin **only for test and demonstrate the feature source plugin** |
| 114 | + FakeSource { |
| 115 | + parallelism = 1 |
| 116 | + plugin_output = "fake" |
| 117 | + row.num = 16 |
| 118 | + schema = { |
| 119 | + fields { |
| 120 | + name = "string" |
| 121 | + age = "int" |
| 122 | + } |
| 123 | + } |
| 124 | + } |
| 125 | + #如果你想了解更多关于如何配置seatunnel的信息,并查看完整的源插件列表, |
| 126 | + #请前往https://seatunnel.apache.org/docs/connector-v2/source |
| 127 | +} |
| 128 | +
|
| 129 | +transform { |
| 130 | + #如果你想了解更多关于如何配置seatunnel的信息,并查看转换插件的完整列表, |
| 131 | + #请前往https://seatunnel.apache.org/docs/transform-v2 |
| 132 | +} |
| 133 | +
|
| 134 | +sink { |
| 135 | + jdbc { |
| 136 | + url = "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" |
| 137 | + driver = "com.mysql.cj.jdbc.Driver" |
| 138 | + user = "root" |
| 139 | + password = "123456" |
| 140 | + query = "insert into test_table(name,age) values(?,?)" |
| 141 | + } |
| 142 | + #如果你想了解更多关于如何配置seatunnel的信息,并查看完整的sink插件列表, |
| 143 | + #请前往https://seatunnel.apache.org/docs/connector-v2/sink |
| 144 | +} |
| 145 | +``` |
| 146 | + |
| 147 | +### 生成Sink SQL |
| 148 | + |
| 149 | +>此示例不需要编写复杂的sql语句,您可以配置数据库名称表名以自动为您生成add语句 |
| 150 | +
|
| 151 | +``` |
| 152 | +sink { |
| 153 | + jdbc { |
| 154 | + url = "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" |
| 155 | + driver = "com.mysql.cj.jdbc.Driver" |
| 156 | + user = "root" |
| 157 | + password = "123456" |
| 158 | + # Automatically generate sql statements based on database table names |
| 159 | + generate_sink_sql = true |
| 160 | + database = test |
| 161 | + table = test_table |
| 162 | + } |
| 163 | +} |
| 164 | +``` |
| 165 | + |
| 166 | +### 精确一次: |
| 167 | + |
| 168 | +为了准确的书写场景,我们保证精确一次 |
| 169 | + |
| 170 | +``` |
| 171 | +sink { |
| 172 | + jdbc { |
| 173 | + url = "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" |
| 174 | + driver = "com.mysql.cj.jdbc.Driver" |
| 175 | + |
| 176 | + max_retries = 0 |
| 177 | + user = "root" |
| 178 | + password = "123456" |
| 179 | + query = "insert into test_table(name,age) values(?,?)" |
| 180 | + |
| 181 | + is_exactly_once = "true" |
| 182 | + |
| 183 | + xa_data_source_class_name = "com.mysql.cj.jdbc.MysqlXADataSource" |
| 184 | + } |
| 185 | +} |
| 186 | +``` |
| 187 | + |
| 188 | +### CDC(变更数据捕获)事件 |
| 189 | + |
| 190 | +>我们也支持CDC变更数据。在这种情况下,您需要配置数据库、表和主键。 |
| 191 | +
|
| 192 | +``` |
| 193 | +sink { |
| 194 | + jdbc { |
| 195 | + url = "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" |
| 196 | + driver = "com.mysql.cj.jdbc.Driver" |
| 197 | + user = "root" |
| 198 | + password = "123456" |
| 199 | + |
| 200 | + generate_sink_sql = true |
| 201 | + # You need to configure both database and table |
| 202 | + database = test |
| 203 | + table = sink_table |
| 204 | + primary_keys = ["id","name"] |
| 205 | + field_ide = UPPERCASE |
| 206 | + schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST" |
| 207 | + data_save_mode="APPEND_DATA" |
| 208 | + } |
| 209 | +} |
| 210 | +``` |
| 211 | + |
0 commit comments