Skip to content
14 changes: 14 additions & 0 deletions docs/en/connectors/source/HdfsFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ Read data from hdfs file system.
| common-options | | no | - | Source plugin common parameters, please refer to [Source Common Options](../source-common-options.md) for details. |
| file_filter_modified_start | string | no | - | File modification time filter. The connector will filter some files base on the last modification start time (include start time). The default data format is `yyyy-MM-dd HH:mm:ss`. |
| file_filter_modified_end | string | no | - | File modification time filter. The connector will filter some files base on the last modification end time (not include end time). The default data format is `yyyy-MM-dd HH:mm:ss`. |
| enable_file_split | boolean | no | false | Turn on logical file split to improve parallelism for huge files. Only supported for `text`/`csv`/`json`/`parquet` and non-compressed format. |
| file_split_size | long | no | 134217728 | Split size in bytes when `enable_file_split=true`. For `text`/`csv`/`json`, the split end will be aligned to the next `row_delimiter`. For `parquet`, the split unit is RowGroup and will never break a RowGroup. |
| quote_char | string | no | " | A single character that encloses CSV fields, allowing fields with commas, line breaks, or quotes to be read correctly. |
| escape_char | string | no | - | A single character that allows the quote or other special characters to appear inside a CSV field without ending the field. |

Expand Down Expand Up @@ -254,6 +256,18 @@ Only used when `sync_mode=update`. Supported values: `len_mtime` (default), `che

- `len_mtime`: SKIP only when both `len` and `mtime` are equal, otherwise COPY.
- `checksum`: SKIP only when `len` is equal and Hadoop `getFileChecksum` is equal, otherwise COPY (only valid when `update_strategy=strict`).

### enable_file_split [boolean]

Turn on the file splitting function, the default is false. It can be selected when the file type is csv, text, json, parquet and non-compressed format.

- `text`/`csv`/`json`: split by `file_split_size` and align to the next `row_delimiter` to avoid breaking records.
- `parquet`: split by RowGroup (logical split), never breaks a RowGroup.

### file_split_size [long]

File split size, which can be filled in when the enable_file_split parameter is true. The unit is the number of bytes. The default value is the number of bytes of 128MB, which is 134217728.

### quote_char [string]

A single character that encloses CSV fields, allowing fields with commas, line breaks, or quotes to be read correctly.
Expand Down
14 changes: 14 additions & 0 deletions docs/zh/connectors/source/HdfsFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ import ChangeLog from '../changelog/connector-file-hadoop.md';
| common-options | | 否 | - | 数据源插件通用参数,请参阅 [数据源通用选项](../source-common-options.md) 了解详情。 |
| file_filter_modified_start | string | 否 | - | 按照最后修改时间过滤文件。 要过滤的开始时间(包括改时间),时间格式是:`yyyy-MM-dd HH:mm:ss` |
| file_filter_modified_end | string | 否 | - | 按照最后修改时间过滤文件。 要过滤的结束时间(不包括改时间),时间格式是:`yyyy-MM-dd HH:mm:ss` |
| enable_file_split | boolean | 否 | false | 开启大文件拆分以提升并行度。仅支持 `text`/`csv`/`json`/`parquet` 且非压缩格式(`compress_codec=none` 且 `archive_compress_codec=none`)。 |
| file_split_size | long | 否 | 134217728 | `enable_file_split=true` 时生效,单位字节。`text`/`csv`/`json` 按 `file_split_size` 拆分并对齐到下一个 `row_delimiter`;`parquet` 以 RowGroup 为拆分单位,不会切开 RowGroup。 |
| quote_char | string | 否 | " | 用于包裹 CSV 字段的单字符,可保证包含逗号、换行符或引号的字段被正确解析。 |
| escape_char | string | 否 | - | 用于在 CSV 字段内转义引号或其他特殊字符,使其不会结束字段。 |

Expand Down Expand Up @@ -255,6 +257,18 @@ abc.*

- `len_mtime`:`len` 与 `mtime` 都相同才 SKIP,否则 COPY。
- `checksum`:要求 `len` 相同且 Hadoop `getFileChecksum` 相同才 SKIP,否则 COPY(仅在 `update_strategy=strict` 时生效)。

### enable_file_split [boolean]

开启大文件拆分功能,默认 false。仅支持 `csv`/`text`/`json`/`parquet` 且非压缩格式(`compress_codec=none` 且 `archive_compress_codec=none`)。

- `text`/`csv`/`json`:按 `file_split_size` 拆分并对齐到下一个 `row_delimiter`,避免切开一行/一条记录。
- `parquet`:以 RowGroup 为逻辑拆分单位,不会切开 RowGroup。

### file_split_size [long]

`enable_file_split=true` 时生效,单位字节。默认 128MB(134217728)。

### quote_char [string]

用于包裹 CSV 字段的单字符,可保证包含逗号、换行符或引号的字段被正确解析。
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ public enum FileConnectorErrorCode implements SeaTunnelErrorCode {
FILE_READ_FAILED("FILE-08", "File read failed"),
BINARY_FILE_PART_ORDER_ERROR("FILE-09", "Binary file fragment order abnormality"),
FILE_SPLIT_SIZE_ILLEGAL("FILE-10", "SplitSizeBytes must be greater than 0"),
FILE_SPLIT_FAIL("FILE-11", "File split fail");
FILE_SPLIT_FAIL("FILE-11", "File split fail"),
FILE_NOT_FOUND("FILE-12", "File not found"),
FILE_ACCESS_DENIED("FILE-13", "File access denied"),
FILE_IO_TIMEOUT("FILE-14", "File IO timeout");

private final String code;
private final String description;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ public SourceSplitEnumerator<FileSourceSplit, FileSourceState> restoreEnumerator
SourceSplitEnumerator.Context<FileSourceSplit> enumeratorContext,
FileSourceState checkpointState) {
return new MultipleTableFileSourceSplitEnumerator(
enumeratorContext, baseMultipleTableFileSourceConfig, checkpointState);
enumeratorContext,
baseMultipleTableFileSourceConfig,
fileSplitStrategy,
checkpointState);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Seekable;

import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -509,13 +510,19 @@ protected boolean checkFileType(String fileName, FileFormat fileFormat) {

protected static InputStream safeSlice(InputStream in, long start, long length)
throws IOException {
long toSkip = start;
while (toSkip > 0) {
long skipped = in.skip(toSkip);
if (skipped <= 0) {
throw new SeaTunnelException("skipped error");
if (start > 0) {
if (in instanceof Seekable) {
((Seekable) in).seek(start);
} else {
long toSkip = start;
while (toSkip > 0) {
long skipped = in.skip(toSkip);
if (skipped <= 0) {
throw new SeaTunnelException("skipped error");
}
toSkip -= skipped;
}
}
toSkip -= skipped;
}
return new BoundedInputStream(in, length);
}
Expand Down
Loading