|
| 1 | +# ObsFile |
| 2 | + |
| 3 | +> Obs file sink 连接器 |
| 4 | +
|
| 5 | +## 支持这些引擎 |
| 6 | + |
| 7 | +> Spark |
| 8 | +> |
| 9 | +> Flink |
| 10 | +> |
| 11 | +> Seatunnel Zeta |
| 12 | +
|
| 13 | +## 主要特性 |
| 14 | + |
| 15 | +- [x] [精确一次](../../concept/connector-v2-features.md) |
| 16 | + |
| 17 | +默认情况下,我们使用2PC commit来确保“精确一次”` |
| 18 | + |
| 19 | +- [x] 文件格式类型 |
| 20 | + - [x] text |
| 21 | + - [x] csv |
| 22 | + - [x] parquet |
| 23 | + - [x] orc |
| 24 | + - [x] json |
| 25 | + - [x] excel |
| 26 | + |
| 27 | +## 描述 |
| 28 | + |
| 29 | +将数据输出到华为云obs文件系统。 |
| 30 | + |
| 31 | +如果你使用spark/flink,为了使用这个连接器,你必须确保你的spark/flink集群已经集成了hadoop。测试的hadoop版本是2.x。 |
| 32 | + |
| 33 | +如果你使用SeaTunnel Engine,当你下载并安装SeaTunnel引擎时,它会自动集成hadoop jar。您可以在${SEATUNNEL_HOME}/lib下检查jar包以确认这一点。 |
| 34 | + |
| 35 | +为了支持更多的文件类型,我们进行了一些权衡,因此我们使用HDFS协议对OBS进行内部访问,而这个连接器需要一些hadoop依赖。 |
| 36 | +它只支持hadoop版本**2.9.X+**。 |
| 37 | + |
| 38 | +## 所需Jar包列表 |
| 39 | + |
| 40 | +| jar | 支持的版本 | Maven下载链接 | |
| 41 | +|--------------------|-----------------------------|---------------------------------------------------------------------------------------------------| |
| 42 | +| hadoop-huaweicloud | support version >= 3.1.1.29 | [下载](https://repo.huaweicloud.com/artifactory/sdk_public/org/apache/hadoop/hadoop-huaweicloud/) | |
| 43 | +| esdk-obs-java | support version >= 3.19.7.3 | [下载](https://repo.huaweicloud.com/artifactory/sdk_public/com/huawei/storage/esdk-obs-java/) | |
| 44 | +| okhttp | support version >= 3.11.0 | [下载](https://repo1.maven.org/maven2/com/squareup/okhttp3/okhttp/) | |
| 45 | +| okio | support version >= 1.14.0 | [下载](https://repo1.maven.org/maven2/com/squareup/okio/okio/) | |
| 46 | + |
| 47 | +>请下载“Maven”对应的支持列表,并将其复制到“$SEATUNNEL_HOME/plugins/jdbc/lib/”工作目录。 |
| 48 | +> |
| 49 | +>并将所有jar复制到$SEATUNNEL_HOME/lib/ |
| 50 | +
|
| 51 | +## 参数 |
| 52 | + |
| 53 | +| 名称 | 类型 | 是否必填 | 默认值 | 描述 | |
| 54 | +|----------------------------------|---------|---------|--------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------| |
| 55 | +| path | string | 是 | - | 目标目录路径。 | |
| 56 | +| bucket | string | 是 | - | obs文件系统的bucket地址,例如:`obs://obs-bucket-name`. | |
| 57 | +| access_key | string | 是 | - | obs文件系统的访问密钥。 | |
| 58 | +| access_secret | string | 是 | - | obs文件系统的访问私钥。 | |
| 59 | +| endpoint | string | 是 | - | obs文件系统的终端。 | |
| 60 | +| custom_filename | boolean | 否 | false | 是否需要自定义文件名。 | |
| 61 | +| file_name_expression | string | 否 | "${transactionId}" | 描述将在“路径”中创建的文件表达式。仅在custom_filename为true时使用。[提示](#file_name_expression) | |
| 62 | +| filename_time_format | string | 否 | "yyyy.MM.dd" | 指定“path”的时间格式。仅在custom_filename为true时使用。[提示](#filename_time_format) | |
| 63 | +| file_format_type | string | 否 | "csv" | 支持的文件类型。[提示](#file_format_type) | |
| 64 | +| field_delimiter | string | 否 | '\001' | 数据行中列之间的分隔符。仅在file_format为文本时使用。 | |
| 65 | +| row_delimiter | string | 否 | "\n" | 文件中行之间的分隔符。仅被“text”文件格式需要。 | |
| 66 | +| have_partition | boolean | 否 | false | 是否需要处理分区。 | |
| 67 | +| partition_by | array | 否 | - | 根据所选字段对数据进行分区。只有在have_partition为true时才使用。 | |
| 68 | +| partition_dir_expression | string | 否 | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | 只有在have_partition为真true时才使用。[提示](#partition_dir_expression) | |
| 69 | +| is_partition_field_write_in_file | boolean | 否 | false | 只有在have_partition为true时才使用。[提示](#is_partition_field_write_in_file) | |
| 70 | +| sink_columns | array | 否 | | 当此参数为空时,所有字段都是接收列。[提示](#sink_columns) | |
| 71 | +| is_enable_transaction | boolean | 否 | true | [提示](#is_enable_transaction) | |
| 72 | +| batch_size | int | 否 | 1000000 | [提示](#batch_size) | |
| 73 | +| single_file_mode | boolean | 否 | false | 每个并行处理只会输出一个文件。启用此参数后,batch_size将不会生效。输出文件名没有文件块后缀。 | |
| 74 | +| create_empty_file_when_no_data | boolean | 否 | false | 当上游没有数据同步时,仍然会生成相应的数据文件。 | |
| 75 | +| compress_codec | string | 否 | none | [提示](#compress_codec) | |
| 76 | +| common-options | object | 否 | - | [提示](#common_options) | |
| 77 | +| max_rows_in_memory | int | 否 | - | 当文件格式为Excel时,内存中可以缓存的最大数据项数。仅在file_format为excel时使用。 | |
| 78 | +| sheet_name | string | 否 | Sheet${Random number} | 标签页。仅在file_format为excel时使用。 | |
| 79 | + |
| 80 | +### 提示 |
| 81 | + |
| 82 | +#### <span id="file_name_expression"> file_name_expression </span> |
| 83 | + |
| 84 | +>仅在“custom_filename”为“true”时使用。 |
| 85 | +> |
| 86 | +>`file_name_expression`描述了将在`path`中创建的文件表达式。 |
| 87 | +> |
| 88 | +>我们可以在“file_name_expression”中添加变量“${now}”或“${uuid}”,类似于“test_${uuid}_${now}”, |
| 89 | +> |
| 90 | +>“${now}”表示当前时间,其格式可以通过指定选项“filename_time_format”来定义。 |
| 91 | +请注意,如果`is_enable_transaction`为`true`,我们将自动添加`${transactionId}_`在文件的开头。 |
| 92 | + |
| 93 | +#### <span id="filename_time_format"> filename_time_format </span> |
| 94 | + |
| 95 | +>仅在“custom_filename”为“true”时使用。 |
| 96 | +> |
| 97 | +>当`file_name_expression`参数中的格式为`xxxx-${now}`时,`filename_time_format`可以指定路径的时间格式,默认值为`yyyy.MM.dd`。常用的时间格式如下: |
| 98 | +
|
| 99 | +| Symbol | Description | |
| 100 | +|--------|--------------------| |
| 101 | +| y | Year | |
| 102 | +| M | Month | |
| 103 | +| d | Day of month | |
| 104 | +| H | Hour in day (0-23) | |
| 105 | +| m | Minute in hour | |
| 106 | +| s | Second in minute | |
| 107 | + |
| 108 | +#### <span id="file_format_type"> file_format_type </span> |
| 109 | + |
| 110 | +>我们支持以下文件类型: |
| 111 | +> |
| 112 | +> `text` `json` `csv` `orc` `parquet` `excel` |
| 113 | +
|
| 114 | +请注意,最终文件名将以file_format的后缀结尾,文本文件的后缀为“txt”。 |
| 115 | + |
| 116 | +#### <span id="partition_dir_expression"> partition_dir_expression </span> |
| 117 | + |
| 118 | +>仅在“have_partition”为“true”时使用。 |
| 119 | +> |
| 120 | +>如果指定了`partition_by`,我们将根据分区信息生成相应的分区目录,并将最终文件放置在分区目录中。 |
| 121 | +> |
| 122 | +>默认的`partition_dir_expression`是`${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/`.`k0`是第一个分区字段,`v0`是第一个划分字段的值。 |
| 123 | +
|
| 124 | +#### <span id="is_partition_field_write_in_file"> is_partition_field_write_in_file </span> |
| 125 | + |
| 126 | +>仅在“have_partition”为“true”时使用。 |
| 127 | +> |
| 128 | +>如果`is_partition_field_write_in_file`为`true`,则分区字段及其值将写入数据文件。 |
| 129 | +> |
| 130 | +>例如,如果你想写一个Hive数据文件,它的值应该是“false”。 |
| 131 | +
|
| 132 | +#### <span id="sink_columns"> sink_columns </span> |
| 133 | + |
| 134 | +>哪些列需要写入文件,默认值是从“Transform”或“Source”获取的所有列。 |
| 135 | +>字段的顺序决定了文件实际写入的顺序。 |
| 136 | +
|
| 137 | +#### <span id="is_enable_transaction"> is_enable_transaction </span> |
| 138 | + |
| 139 | +>如果`is_enable_transaction`为`true`,我们将确保数据在写入目标目录时不会丢失或重复。 |
| 140 | +> |
| 141 | +>请注意,如果`is_enable_transaction`为`true`,我们将自动添加`${transactionId}_`在文件的开头。现在只支持“true”。 |
| 142 | +
|
| 143 | +#### <span id="batch_size"> batch_size </span> |
| 144 | + |
| 145 | +>文件中的最大行数。对于SeaTunnel引擎,文件中的行数由“batch_size”和“checkpoint.interval”共同决定。如果“checkpoint.interval”的值足够大,sink writer将在文件中写入行,直到文件中的行大于“batch_size”。如果“checkpoint.interval”较小,则接收器写入程序将在新的检查点触发时创建一个新文件。 |
| 146 | +
|
| 147 | +#### <span id="compress_codec"> compress_codec </span> |
| 148 | + |
| 149 | +>文件的压缩编解码器和支持的详细信息如下所示: |
| 150 | +> |
| 151 | +> - txt: `lzo` `none` |
| 152 | +> - json: `lzo` `none` |
| 153 | +> - csv: `lzo` `none` |
| 154 | +> - orc: `lzo` `snappy` `lz4` `zlib` `none` |
| 155 | +> - parquet: `lzo` `snappy` `lz4` `gzip` `brotli` `zstd` `none` |
| 156 | +
|
| 157 | +请注意,excel类型不支持任何压缩格式 |
| 158 | + |
| 159 | +#### <span id="common_options"> common options </span> |
| 160 | + |
| 161 | +>Sink插件常用参数,请参考[Sink common Options](../Sink-common-Options.md)了解详细信息。 |
| 162 | +
|
| 163 | +## 任务示例 |
| 164 | + |
| 165 | +### text 文件 |
| 166 | + |
| 167 | +>对于具有“have_partition”、“custom_filename”和“sink_columns”的文本文件格式。 |
| 168 | +
|
| 169 | +```hocon |
| 170 | +
|
| 171 | + ObsFile { |
| 172 | + path="/seatunnel/text" |
| 173 | + bucket = "obs://obs-bucket-name" |
| 174 | + access_key = "xxxxxxxxxxx" |
| 175 | + access_secret = "xxxxxxxxxxx" |
| 176 | + endpoint = "obs.xxxxxx.myhuaweicloud.com" |
| 177 | + file_format_type = "text" |
| 178 | + field_delimiter = "\t" |
| 179 | + row_delimiter = "\n" |
| 180 | + have_partition = true |
| 181 | + partition_by = ["age"] |
| 182 | + partition_dir_expression = "${k0}=${v0}" |
| 183 | + is_partition_field_write_in_file = true |
| 184 | + custom_filename = true |
| 185 | + file_name_expression = "${transactionId}_${now}" |
| 186 | + filename_time_format = "yyyy.MM.dd" |
| 187 | + sink_columns = ["name","age"] |
| 188 | + is_enable_transaction = true |
| 189 | + } |
| 190 | +
|
| 191 | +``` |
| 192 | + |
| 193 | +### parquet 文件 |
| 194 | + |
| 195 | +>适用于带有“have_partition”和“sink_columns”的拼花地板文件格式。 |
| 196 | +
|
| 197 | +```hocon |
| 198 | +
|
| 199 | + ObsFile { |
| 200 | + path = "/seatunnel/parquet" |
| 201 | + bucket = "obs://obs-bucket-name" |
| 202 | + access_key = "xxxxxxxxxxx" |
| 203 | + access_secret = "xxxxxxxxxxxxxxxxx" |
| 204 | + endpoint = "obs.xxxxxx.myhuaweicloud.com" |
| 205 | + have_partition = true |
| 206 | + partition_by = ["age"] |
| 207 | + partition_dir_expression = "${k0}=${v0}" |
| 208 | + is_partition_field_write_in_file = true |
| 209 | + file_format_type = "parquet" |
| 210 | + sink_columns = ["name","age"] |
| 211 | + } |
| 212 | +
|
| 213 | +``` |
| 214 | + |
| 215 | +### orc 文件 |
| 216 | + |
| 217 | +>对于orc文件格式的简单配置。 |
| 218 | +
|
| 219 | +```hocon |
| 220 | +
|
| 221 | + ObsFile { |
| 222 | + path="/seatunnel/orc" |
| 223 | + bucket = "obs://obs-bucket-name" |
| 224 | + access_key = "xxxxxxxxxxx" |
| 225 | + access_secret = "xxxxxxxxxxx" |
| 226 | + endpoint = "obs.xxxxx.myhuaweicloud.com" |
| 227 | + file_format_type = "orc" |
| 228 | + } |
| 229 | +
|
| 230 | +``` |
| 231 | + |
| 232 | +### json 文件 |
| 233 | + |
| 234 | +>对于json文件格式简单配置。 |
| 235 | +
|
| 236 | +```hcocn |
| 237 | +
|
| 238 | + ObsFile { |
| 239 | + path = "/seatunnel/json" |
| 240 | + bucket = "obs://obs-bucket-name" |
| 241 | + access_key = "xxxxxxxxxxx" |
| 242 | + access_secret = "xxxxxxxxxxx" |
| 243 | + endpoint = "obs.xxxxx.myhuaweicloud.com" |
| 244 | + file_format_type = "json" |
| 245 | + } |
| 246 | +
|
| 247 | +``` |
| 248 | + |
| 249 | +### excel 文件 |
| 250 | + |
| 251 | +>对于excel文件格式简单配置。 |
| 252 | +
|
| 253 | +```hcocn |
| 254 | +
|
| 255 | + ObsFile { |
| 256 | + path = "/seatunnel/excel" |
| 257 | + bucket = "obs://obs-bucket-name" |
| 258 | + access_key = "xxxxxxxxxxx" |
| 259 | + access_secret = "xxxxxxxxxxx" |
| 260 | + endpoint = "obs.xxxxx.myhuaweicloud.com" |
| 261 | + file_format_type = "excel" |
| 262 | + } |
| 263 | +
|
| 264 | +``` |
| 265 | + |
| 266 | +### csv 文件 |
| 267 | + |
| 268 | +>对于csv文件格式简单配置。 |
| 269 | +
|
| 270 | +```hcocn |
| 271 | +
|
| 272 | + ObsFile { |
| 273 | + path = "/seatunnel/csv" |
| 274 | + bucket = "obs://obs-bucket-name" |
| 275 | + access_key = "xxxxxxxxxxx" |
| 276 | + access_secret = "xxxxxxxxxxx" |
| 277 | + endpoint = "obs.xxxxx.myhuaweicloud.com" |
| 278 | + file_format_type = "csv" |
| 279 | + } |
| 280 | +
|
| 281 | +``` |
| 282 | + |
| 283 | +## 变更日志 |
| 284 | + |
| 285 | +### 下一版本 |
| 286 | + |
| 287 | +- 添加 Obs Sink 连接器 |
| 288 | + |
0 commit comments