|
| 1 | +# S3Redshift |
| 2 | + |
| 3 | +>S3Redshift的作用是将数据写入S3,然后使用Redshift的COPY命令将数据从S3导入Redshift。 |
| 4 | +
|
| 5 | +## 描述 |
| 6 | + |
| 7 | +将数据输出到AWS Redshift。 |
| 8 | + |
| 9 | +>提示: |
| 10 | +
|
| 11 | +>我们基于[S3File](S3File.md)来实现这个连接器。因此,您可以使用与S3File相同的配置。 |
| 12 | +>为了支持更多的文件类型,我们进行了一些权衡,因此我们使用HDFS协议对S3进行内部访问,而这个连接器需要一些hadoop依赖。 |
| 13 | +>它只支持hadoop版本**2.6.5+**。 |
| 14 | +
|
| 15 | +## 主要特性 |
| 16 | + |
| 17 | +- [x] [精确一次](../../concept/connector-v2-features.md) |
| 18 | + |
| 19 | +默认情况下,我们使用2PC commit来确保“精确一次”` |
| 20 | + |
| 21 | +- [x] 文件格式类型 |
| 22 | + - [x] text |
| 23 | + - [x] csv |
| 24 | + - [x] parquet |
| 25 | + - [x] orc |
| 26 | + - [x] json |
| 27 | + |
| 28 | +## 参数 |
| 29 | + |
| 30 | +| 名称 | 类型 | 是否必填 | 默认值 | |
| 31 | +|----------------------------------|---------|----------|-----------------------------------------------------------| |
| 32 | +| jdbc_url | string | 是 | - | |
| 33 | +| jdbc_user | string | 是 | - | |
| 34 | +| jdbc_password | string | 是 | - | |
| 35 | +| execute_sql | string | 是 | - | |
| 36 | +| path | string | 是 | - | |
| 37 | +| bucket | string | 是 | - | |
| 38 | +| access_key | string | 否 | - | |
| 39 | +| access_secret | string | 否 | - | |
| 40 | +| hadoop_s3_properties | map | 否 | - | |
| 41 | +| file_name_expression | string | 否 | "${transactionId}" | |
| 42 | +| file_format_type | string | 否 | "text" | |
| 43 | +| filename_time_format | string | 否 | "yyyy.MM.dd" | |
| 44 | +| field_delimiter | string | 否 | '\001' | |
| 45 | +| row_delimiter | string | 否 | "\n" | |
| 46 | +| partition_by | array | 否 | - | |
| 47 | +| partition_dir_expression | string | 否 | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | |
| 48 | +| is_partition_field_write_in_file | boolean | 否 | false | |
| 49 | +| sink_columns | array | 否 | 当此参数为空时,所有字段都是sink列 | |
| 50 | +| is_enable_transaction | boolean | 否 | true | |
| 51 | +| batch_size | int | 否 | 1000000 | |
| 52 | +| common-options | | 否 | - | |
| 53 | + |
| 54 | +### jdbc_url |
| 55 | + |
| 56 | +连接到Redshift数据库的JDBC URL。 |
| 57 | + |
| 58 | +### jdbc_user |
| 59 | + |
| 60 | +连接到Redshift数据库的用户名。 |
| 61 | + |
| 62 | +### jdbc_password |
| 63 | + |
| 64 | +连接到Redshift数据库的密码。 |
| 65 | + |
| 66 | +### execute_sql |
| 67 | + |
| 68 | +数据写入S3后要执行的SQL。 |
| 69 | + |
| 70 | +示例: |
| 71 | + |
| 72 | +```sql |
| 73 | + |
| 74 | +COPY target_table FROM 's3://yourbucket${path}' IAM_ROLE 'arn:XXX' REGION 'your region' format as json 'auto'; |
| 75 | +``` |
| 76 | + |
| 77 | +`target_table`是Redshift中的表名。 |
| 78 | + |
| 79 | +`${path}`是写入S3的文件的路径。请确认您的sql包含此变量。并且不需要替换它。我们将在执行sql时替换它。 |
| 80 | +IAM_ROLE是有权访问S3的角色。 |
| 81 | +format是写入S3的文件的格式。请确认此格式与您在配置中设置的文件格式相同。 |
| 82 | + |
| 83 | +请参阅[Redshift COPY](https://docs.aws.amazon.com/redshift/latest/dg/r_COPY.html)了解更多详情。 |
| 84 | + |
| 85 | +请确认该角色有权访问S3。 |
| 86 | +### path [string] |
| 87 | + |
| 88 | +目标目录路径是必填项。 |
| 89 | + |
| 90 | +### bucket [string] |
| 91 | + |
| 92 | +s3文件系统的bucket地址,例如:`s3n://seatunnel-test`,如果使用`s3a`协议,则此参数应为`s3a://seatunnel-test`。 |
| 93 | + |
| 94 | +### access_key [string] |
| 95 | + |
| 96 | +s3文件系统的access_key。如果未设置此参数,请确认凭据提供程序链可以正确进行身份验证,您可以检查这个[hadoop-aws](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html) |
| 97 | + |
| 98 | +### access_secret [string] |
| 99 | + |
| 100 | +s3文件系统的access_secret。如果未设置此参数,请确认凭据提供程序链可以正确进行身份验证,您可以检查这个[hadoop-aws](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html) |
| 101 | + |
| 102 | +### hadoop_s3_properties [map] |
| 103 | + |
| 104 | +如果您需要添加其他选项,可以在此处添加并参考[Hadoop-AWS](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html) |
| 105 | + |
| 106 | +``` |
| 107 | +hadoop_s3_properties { |
| 108 | + "fs.s3a.aws.credentials.provider" = "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider" |
| 109 | + } |
| 110 | +``` |
| 111 | + |
| 112 | +### file_name_expression [string] |
| 113 | + |
| 114 | +`file_name_expression`描述了将在`path`中创建的文件表达式。我们可以在`file_name_expression`中添加变量`${now}`或`${uuid}`,类似于`test_${uuid}_${now}`, |
| 115 | +`${now}`表示当前时间,其格式可以通过指定选项`filename_time_format`来定义。 |
| 116 | +请注意,如果`is_enable_transaction`为`true`,我们将自动添加`${transactionId}_`在文件的开头。 |
| 117 | + |
| 118 | +### file_format_type [string] |
| 119 | + |
| 120 | +我们支持以下文件类型: |
| 121 | + |
| 122 | +`text` `csv` `parquet` `orc` `json` |
| 123 | + |
| 124 | +请注意,最终文件名将以file_format_type的后缀结尾,文本文件的后缀为“txt”。 |
| 125 | + |
| 126 | +### filename_time_format [string] |
| 127 | + |
| 128 | +当`file_name_expression`参数中的格式为`xxxx-${now}`时,`filename_time_format`可以指定路径的时间格式,默认值为`yyyy.MM.dd`。常用的时间格式如下: |
| 129 | + |
| 130 | +| Symbol | Description | |
| 131 | +|--------|--------------------| |
| 132 | +| y | Year | |
| 133 | +| M | Month | |
| 134 | +| d | Day of month | |
| 135 | +| H | Hour in day (0-23) | |
| 136 | +| m | Minute in hour | |
| 137 | +| s | Second in minute | |
| 138 | + |
| 139 | +请参阅[Java SimpleDateFormat](https://docs.oracle.com/javase/tutorial/i18n/format/simpleDateFormat.html)了解详细的时间格式语法。 |
| 140 | + |
| 141 | +### field_delimiter [string] |
| 142 | + |
| 143 | +数据行中列之间的分隔符。仅被“text”和“csv”文件格式需要。 |
| 144 | + |
| 145 | +### row_delimiter [string] |
| 146 | + |
| 147 | +文件中行之间的分隔符。仅被“text”和“csv”文件格式需要。 |
| 148 | + |
| 149 | +### partition_by [array] |
| 150 | + |
| 151 | +基于选定字段对数据进行分区 |
| 152 | + |
| 153 | +### partition_dir_expression [string] |
| 154 | + |
| 155 | +如果指定了`partition_by`,我们将根据分区信息生成相应的分区目录,并将最终文件放置在分区目录中。 |
| 156 | + |
| 157 | +默认的`partition_dir_expression`是 `${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/`。`k0`是第一个分区字段,`v0`是第一个划分字段的值。 |
| 158 | + |
| 159 | +### is_partition_field_write_in_file [boolean] |
| 160 | + |
| 161 | +如果`is_partition_field_write_in_file`为`true`,则分区字段及其值将写入数据文件。 |
| 162 | + |
| 163 | +例如,如果你想写一个Hive数据文件,它的值应该是“false”。 |
| 164 | + |
| 165 | +### sink_columns [array] |
| 166 | + |
| 167 | +哪些列需要写入文件,默认值是从“Transform”或“Source”获取的所有列。 |
| 168 | +字段的顺序决定了文件实际写入的顺序。 |
| 169 | + |
| 170 | +### is_enable_transaction [boolean] |
| 171 | + |
| 172 | +如果`is_enable_transaction`为true,我们将确保数据在写入目标目录时不会丢失或重复。 |
| 173 | +请注意,如果`is_enable_transaction`为`true`,我们将自动添加`${transactionId}_`在文件的开头。 |
| 174 | +现在只支持“true”。 |
| 175 | + |
| 176 | +### batch_size [int] |
| 177 | + |
| 178 | +文件中的最大行数。对于SeaTunnel引擎,文件中的行数由“batch_size”和“checkpoint.interval”共同决定。如果“checkpoint.interval”的值足够大,sink writer将在文件中写入行,直到文件中的行大于“batch_size”。如果“checkpoint.interval”较小,则接收器写入程序将在新的检查点触发时创建一个新文件。 |
| 179 | + |
| 180 | +### common options |
| 181 | + |
| 182 | +Sink插件常用参数,请参考[Sink Common Options](../sink-common-options.md)了解详细信息。 |
| 183 | + |
| 184 | +## 示例 |
| 185 | + |
| 186 | +用于 text 文件格式 |
| 187 | + |
| 188 | +```hocon |
| 189 | +
|
| 190 | + S3Redshift { |
| 191 | + jdbc_url = "jdbc:redshift://xxx.amazonaws.com.cn:5439/xxx" |
| 192 | + jdbc_user = "xxx" |
| 193 | + jdbc_password = "xxxx" |
| 194 | + execute_sql="COPY table_name FROM 's3://test${path}' IAM_ROLE 'arn:aws-cn:iam::xxx' REGION 'cn-north-1' removequotes emptyasnull blanksasnull maxerror 100 delimiter '|' ;" |
| 195 | + access_key = "xxxxxxxxxxxxxxxxx" |
| 196 | + secret_key = "xxxxxxxxxxxxxxxxx" |
| 197 | + bucket = "s3a://seatunnel-test" |
| 198 | + tmp_path = "/tmp/seatunnel" |
| 199 | + path="/seatunnel/text" |
| 200 | + row_delimiter="\n" |
| 201 | + partition_dir_expression="${k0}=${v0}" |
| 202 | + is_partition_field_write_in_file=true |
| 203 | + file_name_expression="${transactionId}_${now}" |
| 204 | + file_format_type = "text" |
| 205 | + filename_time_format="yyyy.MM.dd" |
| 206 | + is_enable_transaction=true |
| 207 | + hadoop_s3_properties { |
| 208 | + "fs.s3a.aws.credentials.provider" = "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider" |
| 209 | + } |
| 210 | + } |
| 211 | +
|
| 212 | +``` |
| 213 | + |
| 214 | +用于 parquet 文件格式 |
| 215 | + |
| 216 | +```hocon |
| 217 | +
|
| 218 | + S3Redshift { |
| 219 | + jdbc_url = "jdbc:redshift://xxx.amazonaws.com.cn:5439/xxx" |
| 220 | + jdbc_user = "xxx" |
| 221 | + jdbc_password = "xxxx" |
| 222 | + execute_sql="COPY table_name FROM 's3://test${path}' IAM_ROLE 'arn:aws-cn:iam::xxx' REGION 'cn-north-1' format as PARQUET;" |
| 223 | + access_key = "xxxxxxxxxxxxxxxxx" |
| 224 | + secret_key = "xxxxxxxxxxxxxxxxx" |
| 225 | + bucket = "s3a://seatunnel-test" |
| 226 | + tmp_path = "/tmp/seatunnel" |
| 227 | + path="/seatunnel/parquet" |
| 228 | + row_delimiter="\n" |
| 229 | + partition_dir_expression="${k0}=${v0}" |
| 230 | + is_partition_field_write_in_file=true |
| 231 | + file_name_expression="${transactionId}_${now}" |
| 232 | + file_format_type = "parquet" |
| 233 | + filename_time_format="yyyy.MM.dd" |
| 234 | + is_enable_transaction=true |
| 235 | + hadoop_s3_properties { |
| 236 | + "fs.s3a.aws.credentials.provider" = "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider" |
| 237 | + } |
| 238 | + } |
| 239 | +
|
| 240 | +``` |
| 241 | + |
| 242 | +用于 orc 文件格式 |
| 243 | + |
| 244 | +```hocon |
| 245 | +
|
| 246 | + S3Redshift { |
| 247 | + jdbc_url = "jdbc:redshift://xxx.amazonaws.com.cn:5439/xxx" |
| 248 | + jdbc_user = "xxx" |
| 249 | + jdbc_password = "xxxx" |
| 250 | + execute_sql="COPY table_name FROM 's3://test${path}' IAM_ROLE 'arn:aws-cn:iam::xxx' REGION 'cn-north-1' format as ORC;" |
| 251 | + access_key = "xxxxxxxxxxxxxxxxx" |
| 252 | + secret_key = "xxxxxxxxxxxxxxxxx" |
| 253 | + bucket = "s3a://seatunnel-test" |
| 254 | + tmp_path = "/tmp/seatunnel" |
| 255 | + path="/seatunnel/orc" |
| 256 | + row_delimiter="\n" |
| 257 | + partition_dir_expression="${k0}=${v0}" |
| 258 | + is_partition_field_write_in_file=true |
| 259 | + file_name_expression="${transactionId}_${now}" |
| 260 | + file_format_type = "orc" |
| 261 | + filename_time_format="yyyy.MM.dd" |
| 262 | + is_enable_transaction=true |
| 263 | + hadoop_s3_properties { |
| 264 | + "fs.s3a.aws.credentials.provider" = "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider" |
| 265 | + } |
| 266 | + } |
| 267 | +
|
| 268 | +``` |
| 269 | + |
| 270 | +## 变更日志 |
| 271 | + |
| 272 | +### 2.3.0-beta 2022-10-20 |
| 273 | + |
0 commit comments