|
| 1 | +# Apache Iceberg |
| 2 | + |
| 3 | +> Apache Iceberg sink连接器 |
| 4 | +
|
| 5 | +## Iceberg 版本支持 |
| 6 | + |
| 7 | +- 1.4.2 |
| 8 | + |
| 9 | +## 引擎支持 |
| 10 | + |
| 11 | +> Spark<br/> |
| 12 | +> Flink<br/> |
| 13 | +> SeaTunnel Zeta<br/> |
| 14 | +
|
| 15 | +## 描述 |
| 16 | + |
| 17 | +Apache Iceberg 目标连接器支持cdc模式、自动建表及表结构变更. |
| 18 | + |
| 19 | +## 主要特性 |
| 20 | + |
| 21 | +- [x] [支持多表写入](../../concept/connector-v2-features.md) |
| 22 | + |
| 23 | +## 支持的数据源信息 |
| 24 | + |
| 25 | +| 数据源 | 依赖项 | Maven依赖 | |
| 26 | +|---------|-----------|---------------------------------------------------------------------| |
| 27 | +| Iceberg | hive-exec | [下载](https://mvnrepository.com/artifact/org.apache.hive/hive-exec) | |
| 28 | +| Iceberg | libfb303 | [下载](https://mvnrepository.com/artifact/org.apache.thrift/libfb303) | |
| 29 | + |
| 30 | +## 数据库依赖 |
| 31 | + |
| 32 | +> 为了确保与不同版本的 Hadoop 和 Hive 兼容,项目 pom 文件中的 hive-exec 依赖范围被设置为 provided。因此,如果您使用 Flink 引擎,可能需要将以下 Jar 包添加到 <FLINK_HOME>/lib 目录中;如果您使用的是 Spark 引擎并且已经集成了 Hadoop,则无需添加以下 Jar 包。 |
| 33 | +
|
| 34 | +``` |
| 35 | +hive-exec-xxx.jar |
| 36 | +libfb303-xxx.jar |
| 37 | +``` |
| 38 | + |
| 39 | +> 某些版本的 hive-exec 包中不包含 libfb303-xxx.jar,因此您还需要手动导入该 Jar 包。 |
| 40 | +
|
| 41 | +## 数据类型映射 |
| 42 | + |
| 43 | +| SeaTunnel 数据类型 | Iceberg 数据类型 | |
| 44 | +|----------------|------------------| |
| 45 | +| BOOLEAN | BOOLEAN | |
| 46 | +| INT | INTEGER | |
| 47 | +| BIGINT | LONG | |
| 48 | +| FLOAT | FLOAT | |
| 49 | +| DOUBLE | DOUBLE | |
| 50 | +| DATE | DATE | |
| 51 | +| TIME | TIME | |
| 52 | +| TIMESTAMP | TIMESTAMP | |
| 53 | +| STRING | STRING | |
| 54 | +| BYTES | FIXED<br/>BINARY | |
| 55 | +| DECIMAL | DECIMAL | |
| 56 | +| ROW | STRUCT | |
| 57 | +| ARRAY | LIST | |
| 58 | +| MAP | MAP | |
| 59 | + |
| 60 | +## Sink 选项 |
| 61 | + |
| 62 | +| 名称 | 类型 | 是否必须 | 默认 | 描述 | |
| 63 | +|----------------------------------------|---------|------|------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| |
| 64 | +| catalog_name | string | yes | default | 用户指定的目录名称,默认为`default` | |
| 65 | +| namespace | string | yes | default | backend catalog(元数据存储的后端目录)中 Iceberg 数据库的名称,默认为 `default` | |
| 66 | +| table | string | yes | - | backend catalog(元数据存储的后端目录)中 Iceberg 表的名称 | |
| 67 | +| iceberg.catalog.config | map | yes | - | 用于指定初始化 Iceberg Catalog 的属性,这些属性可以参考此文件:"https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/CatalogProperties.java" | |
| 68 | +| hadoop.config | map | no | - | 传递给 Hadoop 配置的属性 | |
| 69 | +| iceberg.hadoop-conf-path | string | no | - | 指定`core-site.xml`、`hdfs-site.xml`、`hive-site.xml` 文件的加载路径 | |
| 70 | +| case_sensitive | boolean | no | false | 列名匹配时是否区分大小写 | |
| 71 | +| iceberg.table.write-props | map | no | - | 传递给 Iceberg 写入器初始化的属性,这些属性具有最高优先级,例如 `write.format.default`、`write.target-file-size-bytes` 等设置。具体参数可以参考:'https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/TableProperties.java'. | |
| 72 | +| iceberg.table.auto-create-props | map | no | - | Iceberg 自动建表时指定的配置 | |
| 73 | +| iceberg.table.schema-evolution-enabled | boolean | no | false | 设置为 true 时,Iceberg 表可以在同步过程中支持 schema 变更 | |
| 74 | +| iceberg.table.primary-keys | string | no | - | 用于标识表中一行数据的主键列列表,默认情况下以逗号分隔 | |
| 75 | +| iceberg.table.partition-keys | string | no | - | 创建表时使用的分区字段列表,默认情况下以逗号分隔 | |
| 76 | +| iceberg.table.upsert-mode-enabled | boolean | no | false | 设置为 `true` 以启用 upsert 模式,默认值为 `false` | |
| 77 | +| schema_save_mode | Enum | no | CREATE_SCHEMA_WHEN_NOT_EXIST | schema 变更方式, 请参考下面的 `schema_save_mode` | |
| 78 | +| data_save_mode | Enum | no | APPEND_DATA | 数据写入方式, 请参考下面的 `data_save_mode` | |
| 79 | +| custom_sql | string | no | - | 自定义 `delete` 数据的 SQL 语句,用于数据写入方式。例如: `delete from ... where ...` | |
| 80 | +| iceberg.table.commit-branch | string | no | - | 提交的默认分支 | |
| 81 | + |
| 82 | +## 任务示例 |
| 83 | + |
| 84 | +### 简单示例: |
| 85 | + |
| 86 | +```hocon |
| 87 | +env { |
| 88 | + parallelism = 1 |
| 89 | + job.mode = "STREAMING" |
| 90 | + checkpoint.interval = 5000 |
| 91 | +} |
| 92 | +
|
| 93 | +source { |
| 94 | + MySQL-CDC { |
| 95 | + plugin_output = "customers_mysql_cdc_iceberg" |
| 96 | + server-id = 5652 |
| 97 | + username = "st_user" |
| 98 | + password = "seatunnel" |
| 99 | + table-names = ["mysql_cdc.mysql_cdc_e2e_source_table"] |
| 100 | + base-url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc" |
| 101 | + } |
| 102 | +} |
| 103 | +
|
| 104 | +transform { |
| 105 | +} |
| 106 | +
|
| 107 | +sink { |
| 108 | + Iceberg { |
| 109 | + catalog_name="seatunnel_test" |
| 110 | + iceberg.catalog.config={ |
| 111 | + "type"="hadoop" |
| 112 | + "warehouse"="file:///tmp/seatunnel/iceberg/hadoop-sink/" |
| 113 | + } |
| 114 | + namespace="seatunnel_namespace" |
| 115 | + table="iceberg_sink_table" |
| 116 | + iceberg.table.write-props={ |
| 117 | + write.format.default="parquet" |
| 118 | + write.target-file-size-bytes=536870912 |
| 119 | + } |
| 120 | + iceberg.table.primary-keys="id" |
| 121 | + iceberg.table.partition-keys="f_datetime" |
| 122 | + iceberg.table.upsert-mode-enabled=true |
| 123 | + iceberg.table.schema-evolution-enabled=true |
| 124 | + case_sensitive=true |
| 125 | + } |
| 126 | +} |
| 127 | +``` |
| 128 | + |
| 129 | +### Hive Catalog: |
| 130 | + |
| 131 | +```hocon |
| 132 | +sink { |
| 133 | + Iceberg { |
| 134 | + catalog_name="seatunnel_test" |
| 135 | + iceberg.catalog.config={ |
| 136 | + type = "hive" |
| 137 | + uri = "thrift://localhost:9083" |
| 138 | + warehouse = "hdfs://your_cluster//tmp/seatunnel/iceberg/" |
| 139 | + } |
| 140 | + namespace="seatunnel_namespace" |
| 141 | + table="iceberg_sink_table" |
| 142 | + iceberg.table.write-props={ |
| 143 | + write.format.default="parquet" |
| 144 | + write.target-file-size-bytes=536870912 |
| 145 | + } |
| 146 | + iceberg.table.primary-keys="id" |
| 147 | + iceberg.table.partition-keys="f_datetime" |
| 148 | + iceberg.table.upsert-mode-enabled=true |
| 149 | + iceberg.table.schema-evolution-enabled=true |
| 150 | + case_sensitive=true |
| 151 | + } |
| 152 | +} |
| 153 | +``` |
| 154 | + |
| 155 | +### Hadoop catalog: |
| 156 | + |
| 157 | +```hocon |
| 158 | +sink { |
| 159 | + Iceberg { |
| 160 | + catalog_name="seatunnel_test" |
| 161 | + iceberg.catalog.config={ |
| 162 | + type = "hadoop" |
| 163 | + warehouse = "hdfs://your_cluster/tmp/seatunnel/iceberg/" |
| 164 | + } |
| 165 | + namespace="seatunnel_namespace" |
| 166 | + table="iceberg_sink_table" |
| 167 | + iceberg.table.write-props={ |
| 168 | + write.format.default="parquet" |
| 169 | + write.target-file-size-bytes=536870912 |
| 170 | + } |
| 171 | + iceberg.table.primary-keys="id" |
| 172 | + iceberg.table.partition-keys="f_datetime" |
| 173 | + iceberg.table.upsert-mode-enabled=true |
| 174 | + iceberg.table.schema-evolution-enabled=true |
| 175 | + case_sensitive=true |
| 176 | + } |
| 177 | +} |
| 178 | +
|
| 179 | +``` |
| 180 | + |
| 181 | +### Multiple table(多表写入) |
| 182 | + |
| 183 | +#### 示例1 |
| 184 | + |
| 185 | +```hocon |
| 186 | +env { |
| 187 | + parallelism = 1 |
| 188 | + job.mode = "STREAMING" |
| 189 | + checkpoint.interval = 5000 |
| 190 | +} |
| 191 | +
|
| 192 | +source { |
| 193 | + Mysql-CDC { |
| 194 | + base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel" |
| 195 | + username = "root" |
| 196 | + password = "******" |
| 197 | + |
| 198 | + table-names = ["seatunnel.role","seatunnel.user","galileo.Bucket"] |
| 199 | + } |
| 200 | +} |
| 201 | +
|
| 202 | +transform { |
| 203 | +} |
| 204 | +
|
| 205 | +sink { |
| 206 | + Iceberg { |
| 207 | + ... |
| 208 | + namespace = "${database_name}_test" |
| 209 | + table = "${table_name}_test" |
| 210 | + } |
| 211 | +} |
| 212 | +``` |
| 213 | + |
| 214 | +#### 示例2 |
| 215 | + |
| 216 | +```hocon |
| 217 | +env { |
| 218 | + parallelism = 1 |
| 219 | + job.mode = "BATCH" |
| 220 | +} |
| 221 | +
|
| 222 | +source { |
| 223 | + Jdbc { |
| 224 | + driver = oracle.jdbc.driver.OracleDriver |
| 225 | + url = "jdbc:oracle:thin:@localhost:1521/XE" |
| 226 | + user = testUser |
| 227 | + password = testPassword |
| 228 | +
|
| 229 | + table_list = [ |
| 230 | + { |
| 231 | + table_path = "TESTSCHEMA.TABLE_1" |
| 232 | + }, |
| 233 | + { |
| 234 | + table_path = "TESTSCHEMA.TABLE_2" |
| 235 | + } |
| 236 | + ] |
| 237 | + } |
| 238 | +} |
| 239 | +
|
| 240 | +transform { |
| 241 | +} |
| 242 | +
|
| 243 | +sink { |
| 244 | + Iceberg { |
| 245 | + ... |
| 246 | + namespace = "${schema_name}_test" |
| 247 | + table = "${table_name}_test" |
| 248 | + } |
| 249 | +} |
| 250 | +``` |
| 251 | + |
| 252 | +## Changelog(变更日志) |
| 253 | + |
| 254 | +### 2.3.4-SNAPSHOT 2024-01-18 |
| 255 | + |
| 256 | +- Add Iceberg Sink Connector |
| 257 | + |
| 258 | +### next version |
| 259 | + |
0 commit comments