|
| 1 | +# CosFile |
| 2 | + |
| 3 | +> Cos 文件接收器连接器 |
| 4 | +
|
| 5 | +## 描述 |
| 6 | + |
| 7 | +将数据输出到cos文件系统. |
| 8 | + |
| 9 | +:::提示 |
| 10 | + |
| 11 | +如果你使用spark/flink,为了使用这个连接器,你必须确保你的spark/flilk集群已经集成了hadoop。测试的hadoop版本是2.x |
| 12 | + |
| 13 | +如果你使用SeaTunnel Engine,当你下载并安装SeaTunnel引擎时,它会自动集成hadoop jar。您可以在${SEATUNNEL_HOME}/lib下检查jar包以确认这一点. |
| 14 | + |
| 15 | +要使用此连接器,您需要将hadoop cos-{hadoop.version}-{version}.jar和cos_api-bundle-{version}.jar位于${SEATUNNEL_HOME}/lib目录中,下载:[Hoop cos发布](https://github.com/tencentyun/hadoop-cos/releases). 它只支持hadoop 2.6.5+和8.0.2版本+. |
| 16 | + |
| 17 | +::: |
| 18 | + |
| 19 | +## 关键特性 |
| 20 | + |
| 21 | +- [x] [精确一次](../../concept/connector-v2-features.md) |
| 22 | + |
| 23 | +默认情况下,我们使用2PC commit来确保 `精确一次` |
| 24 | + |
| 25 | +- [x] 文件格式类型 |
| 26 | + - [x] text |
| 27 | + - [x] csv |
| 28 | + - [x] parquet |
| 29 | + - [x] orc |
| 30 | + - [x] json |
| 31 | + - [x] excel |
| 32 | + - [x] xml |
| 33 | + - [x] binary |
| 34 | + |
| 35 | +## 选项 |
| 36 | + |
| 37 | +| 名称 | 类型 | 必需 | 默认值 | 描述 | |
| 38 | +|---------------------------------------|---------|----|--------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------| |
| 39 | +| path | string | 是 | - | | |
| 40 | +| tmp_path | string | 否 | /tmp/seatunnel | 结果文件将首先写入tmp路径,然后使用“mv”将tmp目录提交到目标目录。需要一个COS目录. | |
| 41 | +| bucket | string | 是 | - | | |
| 42 | +| secret_id | string | 是 | - | | |
| 43 | +| secret_key | string | 是 | - | | |
| 44 | +| region | string | 是 | - | | |
| 45 | +| custom_filename | boolean | 否 | false | 是否需要自定义文件名 | |
| 46 | +| file_name_expression | string | 否 | "${transactionId}" | 仅在custom_filename为true时使用 | |
| 47 | +| filename_time_format | string | 否 | "yyyy.MM.dd" | 仅在custom_filename为true时使用 | |
| 48 | +| file_format_type | string | 否 | "csv" | | |
| 49 | +| field_delimiter | string | 否 | '\001' | 仅在file_format为text时使用 | |
| 50 | +| row_delimiter | string | 否 | "\n" | 仅在file_format为text时使用 | |
| 51 | +| have_partition | boolean | 否 | false | 是否需要处理分区. | |
| 52 | +| partition_by | array | 否 | - | 只有在have_partition为true时才使用 | |
| 53 | +| partition_dir_expression | string | 否 | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | 只有在have_partition为true时才使用 | |
| 54 | +| is_partition_field_write_in_file | boolean | 否 | false | 只有在have_partition为true时才使用 | |
| 55 | +| sink_columns | array | 否 | | 当此参数为空时,所有字段都是接收列 | |
| 56 | +| is_enable_transaction | boolean | 否 | true | | |
| 57 | +| batch_size | int | 否 | 1000000 | | |
| 58 | +| compress_codec | string | 否 | none | | |
| 59 | +| common-options | object | 否 | - | | |
| 60 | +| max_rows_in_memory | int | 否 | - | 仅在file_format为excel时使用. | |
| 61 | +| sheet_name | string | 否 | Sheet${Random number} | 仅在file_format为excel时使用. | |
| 62 | +| csv_string_quote_mode | enum | 否 | MINIMAL | 仅在file_format为csv时使用. | |
| 63 | +| xml_root_tag | string | 否 | RECORDS | 仅在file_format为xml时使用. | |
| 64 | +| xml_row_tag | string | 否 | RECORD | 仅在file_format为xml时使用. | |
| 65 | +| xml_use_attr_format | boolean | 否 | - | 仅在file_format为xml时使用. | |
| 66 | +| single_file_mode | boolean | 否 | false | 每个并行处理只会输出一个文件。启用此参数后,batch_size将不会生效。输出文件名没有文件块后缀. | |
| 67 | +| create_empty_file_when_no_data | boolean | 否 | false | 当上游没有数据同步时,仍然会生成相应的数据文件. | |
| 68 | +| parquet_avro_write_timestamp_as_int96 | boolean | 否 | false | 仅在file_format为parquet时使用. | |
| 69 | +| parquet_avro_write_fixed_as_int96 | array | 否 | - | 仅在file_format为parquet时使用. | |
| 70 | +| encoding | string | 否 | "UTF-8" | 仅当file_format_type为json、text、csv、xml时使用. | |
| 71 | + |
| 72 | +### path [string] |
| 73 | + |
| 74 | +目标目录路径是必需的. |
| 75 | + |
| 76 | +### bucket [string] |
| 77 | + |
| 78 | +cos文件系统的bucket地址,例如:`cosn://seatunnel-test-1259587829` |
| 79 | + |
| 80 | +### secret_id [string] |
| 81 | + |
| 82 | +cos文件系统的密钥id. |
| 83 | + |
| 84 | +### secret_key [string] |
| 85 | + |
| 86 | +cos文件系统的密钥. |
| 87 | + |
| 88 | +### region [string] |
| 89 | + |
| 90 | +cos文件系统的分区. |
| 91 | + |
| 92 | +### custom_filename [boolean] |
| 93 | + |
| 94 | +是否自定义文件名 |
| 95 | + |
| 96 | +### file_name_expression [string] |
| 97 | + |
| 98 | +仅在 `custom_filename` 为 `true`时使用 |
| 99 | + |
| 100 | +`file_name_expression`描述了将在`path`中创建的文件表达式。我们可以在`file_name_expression`中添加变量`${now}`或`${uuid}`,类似于`test_${uuid}_${now}`, |
| 101 | +`${now}`表示当前时间,其格式可以通过指定选项`filename_time_format`来定义. |
| 102 | + |
| 103 | +请注意,如果`is_enable_transaction`为`true`,我们将自动添加`${transactionId}_`在文件的开头 |
| 104 | + |
| 105 | +### filename_time_format [string] |
| 106 | + |
| 107 | +仅在 `custom_filename` 为 `true` 时使用` |
| 108 | + |
| 109 | +当 `file_name_expression` 参数中的格式为 `xxxx-${now}` 时,`filename_time_format` 可以指定路径的时间格式,默认值为 `yyyy.MM.dd`。常用的时间格式如下: |
| 110 | + |
| 111 | +| 符号| 描述 | |
| 112 | +|--------|----------| |
| 113 | +| y | 年 | |
| 114 | +| M | 月 | |
| 115 | +| d | 日 | |
| 116 | +| H | 时 (0-23) | |
| 117 | +| m | 分 | |
| 118 | +| s | 秒 | |
| 119 | + |
| 120 | +### file_format_type [string] |
| 121 | + |
| 122 | +我们支持以下文件类型: |
| 123 | + |
| 124 | +`text` `csv` `parquet` `orc` `json` `excel` `xml` `binary` |
| 125 | + |
| 126 | +请注意,最终文件名将以 file_format 的后缀结尾, 文本文件的后缀为 `txt`. |
| 127 | + |
| 128 | +### field_delimiter [string] |
| 129 | + |
| 130 | +数据行中列之间的分隔符. 仅需要 `text` 文件格式. |
| 131 | + |
| 132 | +### row_delimiter [string] |
| 133 | + |
| 134 | +文件中行之间的分隔符. 只需要 `text` 文件格式. |
| 135 | + |
| 136 | +### have_partition [boolean] |
| 137 | + |
| 138 | +是否需要处理分区. |
| 139 | + |
| 140 | +### partition_by [array] |
| 141 | + |
| 142 | +仅在 `have_partition` 为 `true` 时使用. |
| 143 | + |
| 144 | +基于选定字段对数据进行分区. |
| 145 | + |
| 146 | +### partition_dir_expression [string] |
| 147 | + |
| 148 | +仅在 `have_partition` 为 `true` 时使用. |
| 149 | + |
| 150 | +如果指定了 `partition_by` ,我们将根据分区信息生成相应的分区目录,并将最终文件放置在分区目录中。 |
| 151 | +默认的 `partition_dir_expression` 是 `${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/`. `k0` 是第一个分区字段 , `v0` 是第一个划分字段的值. |
| 152 | + |
| 153 | +### is_partition_field_write_in_file [boolean] |
| 154 | + |
| 155 | +仅在 `have_partition` 为 `true` 时使用. |
| 156 | + |
| 157 | +如果 `is_partition_field_write_in_file` 为 `true`, 分区字段及其值将写入数据文件. |
| 158 | + |
| 159 | +例如,如果你想写一个Hive数据文件,它的值应该是 `false`. |
| 160 | + |
| 161 | +### sink_columns [array] |
| 162 | + |
| 163 | +哪些列需要写入文件,默认值是从 `Transform` 或 `Source` 获取的所有列. |
| 164 | +字段的顺序决定了文件实际写入的顺序. |
| 165 | + |
| 166 | +### is_enable_transaction [boolean] |
| 167 | + |
| 168 | +如果 `is_enable_transaction` 为 `true`, 我们将确保数据在写入目标目录时不会丢失或重复. |
| 169 | + |
| 170 | +请注意,如果 `is_enable_transaction` 为 `true`, 我们将自动添加 `${transactionId}_` 在文件的开头. |
| 171 | + |
| 172 | +现在只支持 `true` . |
| 173 | + |
| 174 | +### batch_size [int] |
| 175 | + |
| 176 | +文件中的最大行数。对于SeaTunnel引擎,文件中的行数由 `batch_size` 和 `checkpoint.interval` 共同决定. 如果 `checkpoint.interval` 的值足够大, 接收器写入程序将在文件中写入行,直到文件中的行大于 `batch_size`. 如果 `checkpoint.interval` 较小, 则接收器写入程序将在新的检查点触发时创建一个新文件. |
| 177 | + |
| 178 | +### compress_codec [string] |
| 179 | + |
| 180 | +文件的压缩编解码器和支持的详细信息如下所示: |
| 181 | + |
| 182 | +- txt: `lzo` `none` |
| 183 | +- json: `lzo` `none` |
| 184 | +- csv: `lzo` `none` |
| 185 | +- orc: `lzo` `snappy` `lz4` `zlib` `none` |
| 186 | +- parquet: `lzo` `snappy` `lz4` `gzip` `brotli` `zstd` `none` |
| 187 | + |
| 188 | +Tips: excel 类型不支持任何压缩格式 |
| 189 | + |
| 190 | +### common options |
| 191 | + |
| 192 | +接收器写入插件常用参数,请参考 [Sink Common Options](../sink-common-options.md) 了解详细信息. |
| 193 | + |
| 194 | +### max_rows_in_memory [int] |
| 195 | + |
| 196 | +当文件格式为Excel时,内存中可以缓存的最大数据项数. |
| 197 | + |
| 198 | +### sheet_name [string] |
| 199 | + |
| 200 | +编写工作簿的工作表 |
| 201 | + |
| 202 | +### csv_string_quote_mode [string] |
| 203 | + |
| 204 | +当文件格式为CSV时,CSV的字符串引用模式. |
| 205 | + |
| 206 | +- ALL: 所有字符串字段都将被引用. |
| 207 | +- MINIMAL: 引号字段包含特殊字符,如字段分隔符、引号字符或行分隔符字符串中的任何字符. |
| 208 | +- NONE: 从不引用字段。当分隔符出现在数据中时,打印机会用转义符作为前缀。如果未设置转义符,格式验证将抛出异常. |
| 209 | + |
| 210 | +### xml_root_tag [string] |
| 211 | + |
| 212 | +指定XML文件中根元素的标记名. |
| 213 | + |
| 214 | +### xml_row_tag [string] |
| 215 | + |
| 216 | +指定XML文件中数据行的标记名称. |
| 217 | + |
| 218 | +### xml_use_attr_format [boolean] |
| 219 | + |
| 220 | +指定是否使用标记属性格式处理数据. |
| 221 | + |
| 222 | +### parquet_avro_write_timestamp_as_int96 [boolean] |
| 223 | + |
| 224 | +支持从时间戳写入Parquet INT96,仅适用于拼花地板文件. |
| 225 | + |
| 226 | +### parquet_avro_write_fixed_as_int96 [array] |
| 227 | + |
| 228 | +支持从12字节字段写入Parquet INT96,仅适用于拼花地板文件. |
| 229 | + |
| 230 | +### encoding [string] |
| 231 | + |
| 232 | +仅当file_format_type为json、text、csv、xml时使用. |
| 233 | +要写入的文件的编码。此参数将由`Charset.forName(encoding)` 解析. |
| 234 | + |
| 235 | +## 示例 |
| 236 | + |
| 237 | +对于具有 `have_partition` 、 `custom_filename` 和 `sink_columns` 的文本文件格式 |
| 238 | + |
| 239 | +```hocon |
| 240 | +
|
| 241 | + CosFile { |
| 242 | + path="/sink" |
| 243 | + bucket = "cosn://seatunnel-test-1259587829" |
| 244 | + secret_id = "xxxxxxxxxxxxxxxxxxx" |
| 245 | + secret_key = "xxxxxxxxxxxxxxxxxxx" |
| 246 | + region = "ap-chengdu" |
| 247 | + file_format_type = "text" |
| 248 | + field_delimiter = "\t" |
| 249 | + row_delimiter = "\n" |
| 250 | + have_partition = true |
| 251 | + partition_by = ["age"] |
| 252 | + partition_dir_expression = "${k0}=${v0}" |
| 253 | + is_partition_field_write_in_file = true |
| 254 | + custom_filename = true |
| 255 | + file_name_expression = "${transactionId}_${now}" |
| 256 | + filename_time_format = "yyyy.MM.dd" |
| 257 | + sink_columns = ["name","age"] |
| 258 | + is_enable_transaction = true |
| 259 | + } |
| 260 | +
|
| 261 | +``` |
| 262 | + |
| 263 | +适用于带有`have_partition` 和 `sink_columns`的parquet 文件格式` |
| 264 | + |
| 265 | +```hocon |
| 266 | +
|
| 267 | + CosFile { |
| 268 | + path="/sink" |
| 269 | + bucket = "cosn://seatunnel-test-1259587829" |
| 270 | + secret_id = "xxxxxxxxxxxxxxxxxxx" |
| 271 | + secret_key = "xxxxxxxxxxxxxxxxxxx" |
| 272 | + region = "ap-chengdu" |
| 273 | + have_partition = true |
| 274 | + partition_by = ["age"] |
| 275 | + partition_dir_expression = "${k0}=${v0}" |
| 276 | + is_partition_field_write_in_file = true |
| 277 | + file_format_type = "parquet" |
| 278 | + sink_columns = ["name","age"] |
| 279 | + } |
| 280 | +
|
| 281 | +``` |
| 282 | + |
| 283 | +对于orc文件格式的简单配置 |
| 284 | + |
| 285 | +```bash |
| 286 | + |
| 287 | + CosFile { |
| 288 | + path="/sink" |
| 289 | + bucket = "cosn://seatunnel-test-1259587829" |
| 290 | + secret_id = "xxxxxxxxxxxxxxxxxxx" |
| 291 | + secret_key = "xxxxxxxxxxxxxxxxxxx" |
| 292 | + region = "ap-chengdu" |
| 293 | + file_format_type = "orc" |
| 294 | + } |
| 295 | + |
| 296 | +``` |
| 297 | + |
| 298 | +## 变更日志 |
| 299 | + |
| 300 | +### 下一个版本 |
| 301 | + |
| 302 | +- 添加文件cos接收器连接器 ([4979](https://github.com/apache/seatunnel/pull/4979)) |
| 303 | + |
0 commit comments