|
| 1 | +# Snowflake |
| 2 | + |
| 3 | +> JDBC Snowflake Sink连接器 |
| 4 | +
|
| 5 | +## 支持的引擎 |
| 6 | + |
| 7 | +> Spark<br/> |
| 8 | +> Flink<br/> |
| 9 | +> SeaTunnel Zeta<br/> |
| 10 | +
|
| 11 | +## 主要特性 |
| 12 | + |
| 13 | +- [ ] [精确一次](../../concept/connector-v2-features.md) |
| 14 | +- [x] [(CDC)](../../concept/connector-v2-features.md) |
| 15 | + |
| 16 | +## 描述 |
| 17 | + |
| 18 | +通过JDBC写入数据。支持批处理模式和流处理模式,支持并发写入。 |
| 19 | + |
| 20 | +## 支持的数据源列表 |
| 21 | + |
| 22 | +| 数据源 | 支持的版本 | 驱动类 | URL | Maven | |
| 23 | +|------------|--------------------------------------------------------------|---------------------------------------------|--------------------------------------------------------------|---------------------------------------------------------------------------| |
| 24 | +| Snowflake | 不同依赖版本对应不同的驱动类。 | net.snowflake.client.jdbc.SnowflakeDriver | jdbc:snowflake://<account_name>.snowflakecomputing.com | [下载](https://mvnrepository.com/artifact/net.snowflake/snowflake-jdbc) | |
| 25 | + |
| 26 | +## 数据库依赖 |
| 27 | + |
| 28 | +> 请下载支持列表中对应的'Maven'依赖,并将其复制到'$SEATUNNEL_HOME/plugins/jdbc/lib/'工作目录下<br/> |
| 29 | +> 例如Snowflake数据源:cp snowflake-connector-java-xxx.jar $SEATUNNEL_HOME/plugins/jdbc/lib/ |
| 30 | +
|
| 31 | +## 数据类型映射 |
| 32 | + |
| 33 | +| Snowflake 数据类型 | SeaTunnel 数据类型 | |
| 34 | +|--------------------------------------------------------------------------|--------------------| |
| 35 | +| BOOLEAN | BOOLEAN | |
| 36 | +| TINYINT<br/>SMALLINT<br/>BYTEINT<br/> | SHORT_TYPE | |
| 37 | +| INT<br/>INTEGER<br/> | INT | |
| 38 | +| BIGINT | LONG | |
| 39 | +| DECIMAL<br/>NUMERIC<br/>NUMBER<br/> | DECIMAL(x,y) | |
| 40 | +| DECIMAL(x,y)(获取指定列的大小>38) | DECIMAL(38,18) | |
| 41 | +| REAL<br/>FLOAT4 | FLOAT | |
| 42 | +| DOUBLE<br/>DOUBLE PRECISION<br/>FLOAT8<br/>FLOAT<br/> | DOUBLE | |
| 43 | +| CHAR<br/>CHARACTER<br/>VARCHAR<br/>STRING<br/>TEXT<br/>VARIANT<br/>OBJECT| STRING | |
| 44 | +| DATE | DATE | |
| 45 | +| TIME | TIME | |
| 46 | +| DATETIME<br/>TIMESTAMP<br/>TIMESTAMP_LTZ<br/>TIMESTAMP_NTZ<br/>TIMESTAMP_TZ | TIMESTAMP | |
| 47 | +| BINARY<br/>VARBINARY<br/>GEOGRAPHY<br/>GEOMETRY | BYTES | |
| 48 | + |
| 49 | +## 配置选项 |
| 50 | + |
| 51 | +| 名称 | 类型 | 必填 | 默认值 | 描述 | |
| 52 | +|-------------------------------------------|---------|------|--------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| |
| 53 | +| url | String | 是 | - | JDBC连接的URL。参考示例:jdbc:snowflake://<account_name>.snowflakecomputing.com | |
| 54 | +| driver | String | 是 | - | 用于连接远程数据源的JDBC类名,<br/>如果使用Snowflake,值为`net.snowflake.client.jdbc.SnowflakeDriver`。 | |
| 55 | +| user | String | 否 | - | 连接实例的用户名 | |
| 56 | +| password | String | 否 | - | 连接实例的密码 | |
| 57 | +| query | String | 否 | - | 使用此SQL将上游输入数据写入数据库。例如`INSERT ...`,`query`具有更高的优先级 | |
| 58 | +| database | String | 否 | - | 使用此`database`和`table-name`自动生成SQL并接收上游输入数据写入数据库。<br/>此选项与`query`互斥,且具有更高的优先级。 | |
| 59 | +| table | String | 否 | - | 使用`database`和此`table-name`自动生成SQL并接收上游输入数据写入数据库。<br/>此选项与`query`互斥,且具有更高的优先级。 | |
| 60 | +| primary_keys | Array | 否 | - | 此选项用于在自动生成SQL时支持`insert`、`delete`和`update`等操作。 | |
| 61 | +| support_upsert_by_query_primary_key_exist | Boolean | 否 | false | 选择使用INSERT SQL、UPDATE SQL来处理更新事件(INSERT, UPDATE_AFTER),基于查询主键是否存在。此配置仅在数据库不支持upsert语法时使用。**注意**:此方法性能较低。 | |
| 62 | +| connection_check_timeout_sec | Int | 否 | 30 | 用于验证连接的操作的等待时间(秒)。 | |
| 63 | +| max_retries | Int | 否 | 0 | 提交失败(executeBatch)的重试次数 | |
| 64 | +| batch_size | Int | 否 | 1000 | 对于批处理写入,当缓冲的记录数达到`batch_size`或时间达到`checkpoint.interval`时,<br/>数据将被刷新到数据库中 | |
| 65 | +| max_commit_attempts | Int | 否 | 3 | 事务提交失败的重试次数 | |
| 66 | +| transaction_timeout_sec | Int | 否 | -1 | 事务打开后的超时时间,默认为-1(永不超时)。注意,设置超时可能会影响<br/>精确一次语义 | |
| 67 | +| auto_commit | Boolean | 否 | true | 默认启用自动事务提交 | |
| 68 | +| properties | Map | 否 | - | 额外的连接配置参数,当properties和URL中有相同参数时,优先级由驱动程序的<br/>具体实现决定。例如,在MySQL中,properties优先于URL。 | |
| 69 | +| common-options | | 否 | - | 接收器插件通用参数,详情请参考[接收器通用选项](../sink-common-options.md) | |
| 70 | +| enable_upsert | Boolean | 否 | true | 通过主键存在启用upsert,如果任务没有键重复数据,将此参数设置为`false`可以加快数据导入速度 | |
| 71 | + |
| 72 | +## 提示 |
| 73 | + |
| 74 | +> 如果未设置`partition_column`,将以单并发运行,如果设置了`partition_column`,将根据任务的并发度并行执行。 |
| 75 | +
|
| 76 | +## 任务示例 |
| 77 | + |
| 78 | +### 简单示例: |
| 79 | + |
| 80 | +> 此示例定义了一个SeaTunnel同步任务,通过FakeSource自动生成数据并发送到JDBC Sink。FakeSource总共生成16行数据(row.num=16),每行有两个字段,name(字符串类型)和age(int类型)。最终目标表`test_table`中也将有16行数据。在运行此作业之前,您需要在Snowflake数据库中创建数据库`test`和表`test_table`。如果您尚未安装和部署SeaTunnel,请按照[安装SeaTunnel](../../start-v2/locally/deployment.md)中的说明进行安装和部署。然后按照[使用SeaTunnel Engine快速入门](../../start-v2/locally/quick-start-seatunnel-engine.md)中的说明运行此作业。 |
| 81 | +
|
| 82 | +``` |
| 83 | +# 定义运行时环境 |
| 84 | +env { |
| 85 | + parallelism = 1 |
| 86 | + job.mode = "BATCH" |
| 87 | +} |
| 88 | +source { |
| 89 | + # 这是一个示例源插件,**仅用于测试和演示功能源插件** |
| 90 | + FakeSource { |
| 91 | + parallelism = 1 |
| 92 | + plugin_output = "fake" |
| 93 | + row.num = 16 |
| 94 | + schema = { |
| 95 | + fields { |
| 96 | + name = "string" |
| 97 | + age = "int" |
| 98 | + } |
| 99 | + } |
| 100 | + } |
| 101 | + # 如果您想了解更多关于如何配置SeaTunnel的信息,并查看完整的源插件列表, |
| 102 | + # 请访问 https://seatunnel.apache.org/docs/connector-v2/source |
| 103 | +} |
| 104 | +transform { |
| 105 | +
|
| 106 | + # 如果您想了解更多关于如何配置SeaTunnel的信息,并查看完整的转换插件列表, |
| 107 | + # 请访问 https://seatunnel.apache.org/docs/transform-v2 |
| 108 | +} |
| 109 | +sink { |
| 110 | + jdbc { |
| 111 | + url = "jdbc:snowflake://<account_name>.snowflakecomputing.com" |
| 112 | + driver = "net.snowflake.client.jdbc.SnowflakeDriver" |
| 113 | + user = "root" |
| 114 | + password = "123456" |
| 115 | + query = "insert into test_table(name,age) values(?,?)" |
| 116 | + } |
| 117 | + # 如果您想了解更多关于如何配置SeaTunnel的信息,并查看完整的接收器插件列表, |
| 118 | + # 请访问 https://seatunnel.apache.org/docs/connector-v2/sink |
| 119 | +} |
| 120 | +``` |
| 121 | + |
| 122 | +### CDC(变更数据捕获)事件 |
| 123 | + |
| 124 | +> 我们也支持CDC变更数据。在这种情况下,您需要配置`database`、`table`和`primary_keys`。 |
| 125 | +
|
| 126 | +``` |
| 127 | +sink { |
| 128 | + jdbc { |
| 129 | + url = "jdbc:snowflake://<account_name>.snowflakecomputing.com" |
| 130 | + driver = "net.snowflake.client.jdbc.SnowflakeDriver" |
| 131 | + user = "root" |
| 132 | + password = "123456" |
| 133 | + generate_sink_sql = true |
| 134 | + |
| 135 | + |
| 136 | + # 您需要同时配置database和table |
| 137 | + database = test |
| 138 | + table = sink_table |
| 139 | + primary_keys = ["id","name"] |
| 140 | + } |
| 141 | +} |
| 142 | +``` |
0 commit comments