|
| 1 | +# FtpFile |
| 2 | + |
| 3 | +> Ftp文件接收器连接器 |
| 4 | +
|
| 5 | +## 描述 |
| 6 | + |
| 7 | +将数据输出到Ftp。 |
| 8 | + |
| 9 | +:::提示 |
| 10 | + |
| 11 | +如果你使用spark/flink,为了使用这个连接器,你必须确保你的spark/flilk集群已经集成了hadoop。测试的hadoop版本是2.x。 |
| 12 | + |
| 13 | +如果你使用SeaTunnel Engine,当你下载并安装SeaTunnel引擎时,它会自动集成hadoop jar。您可以在${SEATUNNEL_HOME}/lib下检查jar包以确认这一点。 |
| 14 | + |
| 15 | +::: |
| 16 | + |
| 17 | +## 主要特性 |
| 18 | + |
| 19 | +- [x] [exactly-once](../../concept/connector-v2-features.md) |
| 20 | + |
| 21 | +默认情况下,我们使用2PC commit来确保 `exactly-once` |
| 22 | + |
| 23 | +- [x] file format |
| 24 | + - [x] text |
| 25 | + - [x] csv |
| 26 | + - [x] parquet |
| 27 | + - [x] orc |
| 28 | + - [x] json |
| 29 | + - [x] excel |
| 30 | + - [x] xml |
| 31 | + - [x] binary |
| 32 | + |
| 33 | +## 选项 |
| 34 | + |
| 35 | +| 名称 | 类型 | 是否必传 | 默认值 | 描述 | |
| 36 | +|---------------------------------------|---------|----------|--------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------| |
| 37 | +| host | string | 是 | - | | |
| 38 | +| port | int | 是 | - | | |
| 39 | +| user | string | 是 | - | | |
| 40 | +| password | string | 是 | - | | |
| 41 | +| path | string | 是 | - | | |
| 42 | +| tmp_path | string | 是 | /tmp/seatunnel | 结果文件将首先写入tmp路径,然后使用“mv”将tmp目录提交到目标目录。需要一个FTP目录。 | |
| 43 | +| connection_mode | string | 否 | active_local | 目标ftp连接模式 | |
| 44 | +| custom_filename | boolean | 否 | false | 是否需要自定义文件名 | |
| 45 | +| file_name_expression | string | 否 | "${transactionId}" | 仅在custom_filename为true时使用 | |
| 46 | +| filename_time_format | string | 否 | "yyyy.MM.dd" | 仅在custom_filename为true时使用 | |
| 47 | +| file_format_type | string | 否 | "csv" | | |
| 48 | +| field_delimiter | string | 否 | '\001' | 仅当file_format_type为文本时使用 | |
| 49 | +| row_delimiter | string | 否 | "\n" | 仅当file_format_type为文本时使用 | |
| 50 | +| have_partition | boolean | 否 | false | 是否需要处理分区。 | |
| 51 | +| partition_by | array | 否 | - | 只有在have_partition为真时才使用 | |
| 52 | +| partition_dir_expression | string | 否 | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | 只有在have_partition为真时才使用 | |
| 53 | +| is_partition_field_write_in_file | boolean | 否 | false | 只有在have_partition为真时才使用 | |
| 54 | +| sink_columns | array | 否 | | 当此参数为空时,所有字段都是接收列 | |
| 55 | +| is_enable_transaction | boolean | 否 | true | | |
| 56 | +| batch_size | int | 否 | 1000000 | | |
| 57 | +| compress_codec | string | 否 | none | | |
| 58 | +| common-options | object | 否 | - | | |
| 59 | +| max_rows_in_memory | int | 否 | - | 仅当file_format_type为excel时使用。 | |
| 60 | +| sheet_name | string | 否 | Sheet${Random number} | 仅当file_format_type为excel时使用。 | |
| 61 | +| csv_string_quote_mode | enum | 否 | MINIMAL | 仅在file_format为csv时使用。 | |
| 62 | +| xml_root_tag | string | 否 | RECORDS | 仅在file_format为xml时使用 | |
| 63 | +| xml_row_tag | string | 否 | RECORD | 仅在file_format为xml时使用 | |
| 64 | +| xml_use_attr_format | boolean | 否 | - | 仅在file_format为xml时使用 | |
| 65 | +| single_file_mode | boolean | 否 | false | 每个并行处理只会输出一个文件。启用此参数后,batch_size将不会生效。输出文件名没有文件块后缀。 | |
| 66 | +| create_empty_file_when_no_data | boolean | 否 | false | 当上游没有数据同步时,仍然会生成相应的数据文件。 | |
| 67 | +| parquet_avro_write_timestamp_as_int96 | boolean | 否 | false | 仅在file_format为拼花地板时使用。 | |
| 68 | +| parquet_avro_write_fixed_as_int96 | array | 否 | - | 仅在file_format为拼花地板时使用。 | |
| 69 | +| enable_header_write | boolean | 否 | false | 仅当file_format_type为文本、csv时使用<br/>false:不写标头,true:写标头。 | |
| 70 | +| encoding | string | 否 | "UTF-8" | 仅当file_format_type为json、text、csv、xml时使用。 | |
| 71 | +| schema_save_mode | string | 否 | CREATE_SCHEMA_WHEN_NOT_EXIST | 现有目录处理方法 | |
| 72 | +| data_save_mode | string | 否 | APPEND_DATA | 现有数据处理方法 | |
| 73 | + |
| 74 | +### host [string] |
| 75 | + |
| 76 | +需要目标ftp主机 |
| 77 | + |
| 78 | +### port [int] |
| 79 | + |
| 80 | +目标ftp端口是必需的 |
| 81 | + |
| 82 | +### user [string] |
| 83 | + |
| 84 | +目标ftp用户名是必需的 |
| 85 | + |
| 86 | +### password [string] |
| 87 | + |
| 88 | +需要目标ftp密码 |
| 89 | + |
| 90 | + |
| 91 | +### path [string] |
| 92 | + |
| 93 | +目标目录路径是必需的。 |
| 94 | + |
| 95 | + |
| 96 | +### connection_mode [string] |
| 97 | + |
| 98 | +目标ftp连接模式,默认为活动模式,支持以下模式: |
| 99 | + |
| 100 | +`active_local` `passive_local` |
| 101 | + |
| 102 | +### custom_filename [boolean] |
| 103 | + |
| 104 | +是否自定义文件名 |
| 105 | + |
| 106 | +### file_name_expression [string] |
| 107 | + |
| 108 | +仅在以下情况下使用 `custom_filename` 是 `true` |
| 109 | + |
| 110 | +`file_name_expression描述了将在`path`中创建的文件表达式。我们可以在"file_name_expression"中添加变量"${now}"或"${uuid}",类似于"test"_${uuid}_${现在}`, |
| 111 | +`${now}`表示当前时间,其格式可以通过指定选项`filename_time_format`来定义。 |
| 112 | + |
| 113 | +请注意,如果`is_enable_transaction`为`true`,我们将自动添加`${transactionId}_`在文件的开头。 |
| 114 | + |
| 115 | +### filename_time_format [string] |
| 116 | + |
| 117 | +仅在以下情况下使用 `custom_filename` is `true` |
| 118 | + |
| 119 | +当`file_name_expression`参数中的格式为`xxxx-${now}时,`filename_time_format`可以指定路径的时间格式,默认值为`yyyy。MM.dd。常用的时间格式如下: |
| 120 | + |
| 121 | +| 符号 | 描述 | |
| 122 | +|--------|--------------------| |
| 123 | +| y | Year | |
| 124 | +| M | Month | |
| 125 | +| d | Day of month | |
| 126 | +| H | Hour in day (0-23) | |
| 127 | +| m | Minute in hour | |
| 128 | +| s | Second in minute | |
| 129 | + |
| 130 | +### file_format_type [string] |
| 131 | + |
| 132 | +我们支持以下文件类型: |
| 133 | + |
| 134 | +`text` `csv` `parquet` `orc` `json` `excel` `xml` `binary` |
| 135 | + |
| 136 | +请注意,最终文件名将以file_format_type的后缀结尾,文本文件的后缀为"txt"。 |
| 137 | + |
| 138 | +### field_delimiter [string] |
| 139 | + |
| 140 | +数据行中列之间的分隔符。只需要"text"文件格式。 |
| 141 | + |
| 142 | +### row_delimiter [string] |
| 143 | + |
| 144 | +文件中行之间的分隔符。只需要"text"文件格式。 |
| 145 | + |
| 146 | +### have_partition [boolean] |
| 147 | + |
| 148 | +是否需要处理分区。 |
| 149 | + |
| 150 | +### partition_by [array] |
| 151 | + |
| 152 | +仅在以下情况下使用 `have_partition` is `true`. |
| 153 | + |
| 154 | +根据所选字段对数据进行分区。 |
| 155 | + |
| 156 | +### partition_dir_expression [string] |
| 157 | + |
| 158 | +仅在以下情况下使用 `have_partition` is `true`. |
| 159 | + |
| 160 | +如果指定了`partition_by`,我们将根据分区信息生成相应的分区目录,并将最终文件放置在分区目录中。 |
| 161 | + |
| 162 | +默认的`partition_dir_expression`是`${k0}=${v0}/${k1}=${1v1}//${kn}=${vn}/``k0是第一个分区字段,v0是第一个划分字段的值。 |
| 163 | + |
| 164 | +### is_partition_field_write_in_file [boolean] |
| 165 | + |
| 166 | +仅在以下情况下使用 `have_partition` is `true`. |
| 167 | + |
| 168 | +如果`is_partition_field_write_in_file`为`true`,则分区字段及其值将写入数据文件。 |
| 169 | + |
| 170 | +例如,如果你想写一个Hive数据文件,它的值应该是"false"。 |
| 171 | + |
| 172 | +### sink_columns [array] |
| 173 | + |
| 174 | +哪些列需要写入文件,默认值是从"Transform"或"Source"获取的所有列。 |
| 175 | +字段的顺序决定了文件实际写入的顺序。 |
| 176 | + |
| 177 | +### is_enable_transaction [boolean] |
| 178 | + |
| 179 | +如果`is_enable_transaction`为true,我们将确保数据在写入目标目录时不会丢失或重复。 |
| 180 | + |
| 181 | +请注意,如果`is_enable_transaction`为`true`,我们将自动添加`${transactionId}_`在文件的开头。 |
| 182 | + |
| 183 | +现在只支持"true"。 |
| 184 | + |
| 185 | +### batch_size [int] |
| 186 | + |
| 187 | +文件中的最大行数。对于SeaTunnel引擎,文件中的行数由"batch_size"和"checkpoint.interval"共同决定。如果"checkpoint.interval"的值足够大,sink writer将在文件中写入行,直到文件中的行大于"batch_size"。如果"checkpoint.interval"较小,则接收器写入程序将在新的检查点触发时创建一个新文件。 |
| 188 | + |
| 189 | +### compress_codec [string] |
| 190 | + |
| 191 | +文件的压缩编解码器和支持的详细信息如下所示: |
| 192 | + |
| 193 | +- txt: `lzo` `none` |
| 194 | +- json: `lzo` `none` |
| 195 | +- csv: `lzo` `none` |
| 196 | +- orc: `lzo` `snappy` `lz4` `zlib` `none` |
| 197 | +- parquet: `lzo` `snappy` `lz4` `gzip` `brotli` `zstd` `none` |
| 198 | + |
| 199 | +提示:excel类型不支持任何压缩格式 |
| 200 | + |
| 201 | +### common 选项 |
| 202 | + |
| 203 | +Sink插件常用参数,请参考[Sink common Options](../sink-common-options.md)了解详细信息。 |
| 204 | + |
| 205 | +### max_rows_in_memory [int] |
| 206 | + |
| 207 | +当文件格式为Excel时,内存中可以缓存的最大数据项数。 |
| 208 | + |
| 209 | +### sheet_name [string] |
| 210 | + |
| 211 | +编写工作簿的工作表 |
| 212 | + |
| 213 | +### csv_string_quote_mode [string] |
| 214 | + |
| 215 | +当文件格式为CSV时,CSV的字符串引用模式。 |
| 216 | + |
| 217 | +- ALL: 所有字符串字段都将被引用。 |
| 218 | +- MINIMAL: 引号字段包含特殊字符,如字段分隔符、引号字符或行分隔符字符串中的任何字符。 |
| 219 | +- NONE:从不引用字段。当分隔符出现在数据中时,打印机会用转义符作为前缀。如果未设置转义符,格式验证将抛出异常。 |
| 220 | + |
| 221 | +### xml_root_tag [string] |
| 222 | + |
| 223 | +指定XML文件中根元素的标记名。 |
| 224 | + |
| 225 | +### xml_row_tag [string] |
| 226 | + |
| 227 | +指定XML文件中数据行的标记名称。 |
| 228 | + |
| 229 | +### xml_use_attr_format [boolean] |
| 230 | + |
| 231 | +指定是否使用标记属性格式处理数据。 |
| 232 | + |
| 233 | +### parquet_avro_write_timestamp_as_int96 [boolean] |
| 234 | + |
| 235 | +支持从时间戳写入Parquet INT96,仅适用于拼花地板文件。 |
| 236 | + |
| 237 | +### parquet_avro_write_fixed_as_int96 [array] |
| 238 | + |
| 239 | +支持从12字节字段写入Parquet INT96,仅适用于拼花地板文件。 |
| 240 | + |
| 241 | +### enable_header_write [boolean] |
| 242 | + |
| 243 | +仅在以下情况下使用 file_format_type是文本,csv。否:不写标头,true:写标头。 |
| 244 | + |
| 245 | +### encoding [string] |
| 246 | + |
| 247 | +仅在以下情况下使用 file_format_type是json、文本、csv、xml。 |
| 248 | +要写入的文件的编码。此参数将由解析 `Charset.forName(encoding)`. |
| 249 | + |
| 250 | +### schema_save_mode [string] |
| 251 | + |
| 252 | +现有的目录处理方法。 |
| 253 | + |
| 254 | +- RECREATE_SCHEMA: 当目录不存在时创建,当目录存在时删除并重新创建 |
| 255 | +- CREATE_SCHEMA_WHEN_NOT_EXIST: 将在目录不存在时创建,在目录存在时跳过 |
| 256 | +- ERROR_WHEN_SCHEMA_NOT_EXIST: 当目录不存在时,将报告错误 |
| 257 | +- IGNORE :忽略表的处理 |
| 258 | + |
| 259 | +### data_save_mode [string] |
| 260 | + |
| 261 | +现有的数据处理方法。 |
| 262 | + |
| 263 | +- DROP_DATA: 保留目录并删除数据文件 |
| 264 | +- APPEND_DATA: 保存目录,保存数据文件 |
| 265 | +- ERROR_WHEN_DATA_EXISTS: 当有数据文件时,会报告错误 |
| 266 | + |
| 267 | +## 示例 |
| 268 | + |
| 269 | +用于文本文件格式的简单配置 |
| 270 | + |
| 271 | +```bash |
| 272 | + |
| 273 | +FtpFile { |
| 274 | + host = "xxx.xxx.xxx.xxx" |
| 275 | + port = 21 |
| 276 | + user = "username" |
| 277 | + password = "password" |
| 278 | + path = "/data/ftp" |
| 279 | + file_format_type = "text" |
| 280 | + field_delimiter = "\t" |
| 281 | + row_delimiter = "\n" |
| 282 | + sink_columns = ["name","age"] |
| 283 | +} |
| 284 | + |
| 285 | +``` |
| 286 | + |
| 287 | +用于文本文件格式 `have_partition` 和 `custom_filename` 和 `sink_columns` |
| 288 | + |
| 289 | +```bash |
| 290 | + |
| 291 | +FtpFile { |
| 292 | + host = "xxx.xxx.xxx.xxx" |
| 293 | + port = 21 |
| 294 | + user = "username" |
| 295 | + password = "password" |
| 296 | + path = "/data/ftp/seatunnel/job1" |
| 297 | + tmp_path = "/data/ftp/seatunnel/tmp" |
| 298 | + file_format_type = "text" |
| 299 | + field_delimiter = "\t" |
| 300 | + row_delimiter = "\n" |
| 301 | + have_partition = true |
| 302 | + partition_by = ["age"] |
| 303 | + partition_dir_expression = "${k0}=${v0}" |
| 304 | + is_partition_field_write_in_file = true |
| 305 | + custom_filename = true |
| 306 | + file_name_expression = "${transactionId}_${now}" |
| 307 | + sink_columns = ["name","age"] |
| 308 | + filename_time_format = "yyyy.MM.dd" |
| 309 | +} |
| 310 | + |
| 311 | +``` |
| 312 | + |
| 313 | +当我们的源端是多个表,并且希望不同的表达式到不同的目录时,我们可以这样配置 |
| 314 | + |
| 315 | +```hocon |
| 316 | +
|
| 317 | +FtpFile { |
| 318 | + host = "xxx.xxx.xxx.xxx" |
| 319 | + port = 21 |
| 320 | + user = "username" |
| 321 | + password = "password" |
| 322 | + path = "/data/ftp/seatunnel/job1/${table_name}" |
| 323 | + tmp_path = "/data/ftp/seatunnel/tmp" |
| 324 | + file_format_type = "text" |
| 325 | + field_delimiter = "\t" |
| 326 | + row_delimiter = "\n" |
| 327 | + have_partition = true |
| 328 | + partition_by = ["age"] |
| 329 | + partition_dir_expression = "${k0}=${v0}" |
| 330 | + is_partition_field_write_in_file = true |
| 331 | + custom_filename = true |
| 332 | + file_name_expression = "${transactionId}_${now}" |
| 333 | + sink_columns = ["name","age"] |
| 334 | + filename_time_format = "yyyy.MM.dd" |
| 335 | + schema_save_mode=RECREATE_SCHEMA |
| 336 | + data_save_mode=DROP_DATA |
| 337 | +} |
| 338 | +
|
| 339 | +``` |
| 340 | + |
| 341 | +## 更改日志 |
| 342 | + |
| 343 | +### 2.2.0-beta 2022-09-26 |
| 344 | + |
| 345 | +- 添加Ftp文件接收器连接器 |
| 346 | + |
| 347 | +### 2.3.0-beta 2022-10-20 |
| 348 | + |
| 349 | +- [BugFix] 修复windows环境下路径错误的bug ([2980](https://github.com/apache/seatunnel/pull/2980)) |
| 350 | +- [BugFix] 修复文件系统获取错误 ([3117](https://github.com/apache/seatunnel/pull/3117)) |
| 351 | +- [BugFix] 解决了无法从配置文件中将"\t"解析为分隔符的错误 ([3083](https://github.com/apache/seatunnel/pull/3083)) |
| 352 | + |
| 353 | +### 随后版本 |
| 354 | + |
| 355 | +- [BugFix] 修复了以下无法将数据写入文件的错误 ([3258](https://github.com/apache/seatunnel/pull/3258)) |
| 356 | + - 当上游的字段为空时,它将抛出NullPointerException |
| 357 | + - 接收器列映射失败 |
| 358 | + - 当从状态还原写入程序时,直接获取事务失败 |
| 359 | +- [Improve] 支持为每个文件设置批量大小 ([3625](https://github.com/apache/seatunnel/pull/3625)) |
| 360 | +- [Improve] 支持文件压缩 ([3899](https://github.com/apache/seatunnel/pull/3899)) |
| 361 | + |
0 commit comments