Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions docs/en/connector-v2/source/HdfsFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ Read data from hdfs file system.
| common-options | | no | - | Source plugin common parameters, please refer to [Source Common Options](../source-common-options.md) for details. |
| file_filter_modified_start | string | no | - | File modification time filter. The connector will filter some files base on the last modification start time (include start time). The default data format is `yyyy-MM-dd HH:mm:ss`. |
| file_filter_modified_end | string | no | - | File modification time filter. The connector will filter some files base on the last modification end time (not include end time). The default data format is `yyyy-MM-dd HH:mm:ss`. |
| enable_file_split | boolean | no | false | Turn on logical file split to improve parallelism for huge files. Only supported for `text`/`csv`/`json`/`parquet` and non-compressed format. |
| file_split_size | long | no | 134217728 | Split size in bytes when `enable_file_split=true`. For `text`/`csv`/`json`, the split end will be aligned to the next `row_delimiter`. For `parquet`, the split unit is RowGroup and will never break a RowGroup. |
| quote_char | string | no | " | A single character that encloses CSV fields, allowing fields with commas, line breaks, or quotes to be read correctly. |
| escape_char | string | no | - | A single character that allows the quote or other special characters to appear inside a CSV field without ending the field. |

Expand Down Expand Up @@ -254,6 +256,18 @@ Only used when `sync_mode=update`. Supported values: `len_mtime` (default), `che

- `len_mtime`: SKIP only when both `len` and `mtime` are equal, otherwise COPY.
- `checksum`: SKIP only when `len` is equal and Hadoop `getFileChecksum` is equal, otherwise COPY (only valid when `update_strategy=strict`).

### enable_file_split [boolean]

Turn on the file splitting function, the default is false. It can be selected when the file type is csv, text, json, parquet and non-compressed format.

- `text`/`csv`/`json`: split by `file_split_size` and align to the next `row_delimiter` to avoid breaking records.
- `parquet`: split by RowGroup (logical split), never breaks a RowGroup.

### file_split_size [long]

File split size, which can be filled in when the enable_file_split parameter is true. The unit is the number of bytes. The default value is the number of bytes of 128MB, which is 134217728.

### quote_char [string]

A single character that encloses CSV fields, allowing fields with commas, line breaks, or quotes to be read correctly.
Expand Down
14 changes: 14 additions & 0 deletions docs/zh/connector-v2/source/HdfsFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ import ChangeLog from '../changelog/connector-file-hadoop.md';
| common-options | | 否 | - | 数据源插件通用参数,请参阅 [数据源通用选项](../source-common-options.md) 了解详情。 |
| file_filter_modified_start | string | 否 | - | 按照最后修改时间过滤文件。 要过滤的开始时间(包括改时间),时间格式是:`yyyy-MM-dd HH:mm:ss` |
| file_filter_modified_end | string | 否 | - | 按照最后修改时间过滤文件。 要过滤的结束时间(不包括改时间),时间格式是:`yyyy-MM-dd HH:mm:ss` |
| enable_file_split | boolean | 否 | false | 开启大文件拆分以提升并行度。仅支持 `text`/`csv`/`json`/`parquet` 且非压缩格式(`compress_codec=none` 且 `archive_compress_codec=none`)。 |
| file_split_size | long | 否 | 134217728 | `enable_file_split=true` 时生效,单位字节。`text`/`csv`/`json` 按 `file_split_size` 拆分并对齐到下一个 `row_delimiter`;`parquet` 以 RowGroup 为拆分单位,不会切开 RowGroup。 |
| quote_char | string | 否 | " | 用于包裹 CSV 字段的单字符,可保证包含逗号、换行符或引号的字段被正确解析。 |
| escape_char | string | 否 | - | 用于在 CSV 字段内转义引号或其他特殊字符,使其不会结束字段。 |

Expand Down Expand Up @@ -255,6 +257,18 @@ abc.*

- `len_mtime`:`len` 与 `mtime` 都相同才 SKIP,否则 COPY。
- `checksum`:要求 `len` 相同且 Hadoop `getFileChecksum` 相同才 SKIP,否则 COPY(仅在 `update_strategy=strict` 时生效)。

### enable_file_split [boolean]

开启大文件拆分功能,默认 false。仅支持 `csv`/`text`/`json`/`parquet` 且非压缩格式(`compress_codec=none` 且 `archive_compress_codec=none`)。

- `text`/`csv`/`json`:按 `file_split_size` 拆分并对齐到下一个 `row_delimiter`,避免切开一行/一条记录。
- `parquet`:以 RowGroup 为逻辑拆分单位,不会切开 RowGroup。

### file_split_size [long]

`enable_file_split=true` 时生效,单位字节。默认 128MB(134217728)。

### quote_char [string]

用于包裹 CSV 字段的单字符,可保证包含逗号、换行符或引号的字段被正确解析。
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,51 @@
package org.apache.seatunnel.connectors.seatunnel.file.hdfs.source;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
import org.apache.seatunnel.connectors.seatunnel.file.hdfs.config.MultipleTableHdfsFileSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.split.HdfsFileSplitStrategyFactory;
import org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.split.HdfsMultipleTableFileSourceSplitEnumerator;
import org.apache.seatunnel.connectors.seatunnel.file.source.BaseMultipleTableFileSource;
import org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSourceSplit;
import org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSplitStrategy;
import org.apache.seatunnel.connectors.seatunnel.file.source.state.FileSourceState;

public class HdfsFileSource extends BaseMultipleTableFileSource {

private final MultipleTableHdfsFileSourceConfig sourceConfig;
private final FileSplitStrategy fileSplitStrategy;

public HdfsFileSource(ReadonlyConfig readonlyConfig) {
super(new MultipleTableHdfsFileSourceConfig(readonlyConfig));
this(
new MultipleTableHdfsFileSourceConfig(readonlyConfig),
HdfsFileSplitStrategyFactory.initFileSplitStrategy(readonlyConfig));
}

private HdfsFileSource(
MultipleTableHdfsFileSourceConfig sourceConfig, FileSplitStrategy fileSplitStrategy) {
super(sourceConfig, fileSplitStrategy);
this.sourceConfig = sourceConfig;
this.fileSplitStrategy = fileSplitStrategy;
}

@Override
public String getPluginName() {
return FileSystemType.HDFS.getFileSystemPluginName();
}

@Override
public SourceSplitEnumerator<FileSourceSplit, FileSourceState> createEnumerator(
SourceSplitEnumerator.Context<FileSourceSplit> enumeratorContext) {
return new HdfsMultipleTableFileSourceSplitEnumerator(
enumeratorContext, sourceConfig, fileSplitStrategy);
}

@Override
public SourceSplitEnumerator<FileSourceSplit, FileSourceState> restoreEnumerator(
SourceSplitEnumerator.Context<FileSourceSplit> enumeratorContext,
FileSourceState checkpointState) {
return new HdfsMultipleTableFileSourceSplitEnumerator(
enumeratorContext, sourceConfig, fileSplitStrategy, checkpointState);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,18 @@ public OptionRule optionRule() {
Arrays.asList(
FileFormat.TEXT, FileFormat.JSON, FileFormat.CSV, FileFormat.XML),
FileBaseSourceOptions.ENCODING)
.conditional(
FileBaseSourceOptions.FILE_FORMAT_TYPE,
Arrays.asList(
FileFormat.TEXT,
FileFormat.JSON,
FileFormat.CSV,
FileFormat.PARQUET),
FileBaseSourceOptions.ENABLE_FILE_SPLIT)
.conditional(
FileBaseSourceOptions.ENABLE_FILE_SPLIT,
Boolean.TRUE,
FileBaseSourceOptions.FILE_SPLIT_SIZE)
.optional(FileBaseSourceOptions.PARSE_PARTITION_FROM_PATH)
.optional(FileBaseSourceOptions.DATE_FORMAT_LEGACY)
.optional(FileBaseSourceOptions.DATETIME_FORMAT_LEGACY)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.split;

import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy;
import org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSourceSplit;
import org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSplitStrategy;

import org.apache.hadoop.fs.FSDataInputStream;

import java.io.Closeable;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;

public class HdfsFileAccordingToSplitSizeSplitStrategy implements FileSplitStrategy, Closeable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public class HdfsFileAccordingToSplitSizeSplitStrategy implements FileSplitStrategy, Closeable {
public class HdfsFileAccordingToSplitSizeSplitStrategy extends AccordingToSplitSizeSplitStrategy{

1、Modify AccordingToSplitSizeSplitStrategy to introduce HadoopFileSystemProxy
2、Delete LocalFileAccordingToSplitSizeSplitStrategy

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, so you still suggest reusing the base code as much as possible? My main concern is that it might affect the logic of the localfile code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, so you still suggest reusing the base code as much as possible? My main concern is that it might affect the logic of the localfile code.

Yes, the sharding logic should be maintained uniformly in the base, and there should not be a lot of code redundancy. It is also convenient to add this function to file system connectors such as S3 and OSS.


private static final int BUFFER_SIZE = 64 * 1024;

private final HadoopFileSystemProxy hadoopFileSystemProxy;
private final long skipHeaderRowNumber;
private final long splitSize;
private final byte[] delimiterBytes;

public HdfsFileAccordingToSplitSizeSplitStrategy(
HadoopConf hadoopConf,
String rowDelimiter,
long skipHeaderRowNumber,
String encodingName,
long splitSize) {
if (splitSize <= 0) {
throw new SeaTunnelRuntimeException(
FileConnectorErrorCode.FILE_SPLIT_SIZE_ILLEGAL,
"SplitSizeBytes must be greater than 0");
}
this.hadoopFileSystemProxy = new HadoopFileSystemProxy(hadoopConf);
this.skipHeaderRowNumber = skipHeaderRowNumber;
this.splitSize = splitSize;
this.delimiterBytes = rowDelimiter.getBytes(Charset.forName(encodingName));
}

@Override
public List<FileSourceSplit> split(String tableId, String filePath) {
List<FileSourceSplit> splits = new ArrayList<>();
long fileSize = safeGetFileSize(filePath);
if (fileSize == 0) {
return splits;
}
try (FSDataInputStream input = hadoopFileSystemProxy.getInputStream(filePath)) {
long currentStart = 0;
if (skipHeaderRowNumber > 0) {
currentStart = skipLinesUsingBuffer(input, skipHeaderRowNumber);
}
while (currentStart < fileSize) {
long tentativeEnd = currentStart + splitSize;
if (tentativeEnd >= fileSize) {
splits.add(
new FileSourceSplit(
tableId, filePath, currentStart, fileSize - currentStart));
break;
}
long actualEnd = findNextDelimiterWithSeek(input, tentativeEnd, fileSize);
if (actualEnd <= currentStart) {
actualEnd = tentativeEnd;
}
splits.add(
new FileSourceSplit(
tableId, filePath, currentStart, actualEnd - currentStart));
currentStart = actualEnd;
}
return splits;
} catch (IOException e) {
throw new SeaTunnelRuntimeException(FileConnectorErrorCode.FILE_READ_FAILED, e);
}
}

private long safeGetFileSize(String filePath) {
try {
return hadoopFileSystemProxy.getFileStatus(filePath).getLen();
} catch (IOException e) {
throw new SeaTunnelRuntimeException(FileConnectorErrorCode.FILE_READ_FAILED, e);
}
}

private long skipLinesUsingBuffer(FSDataInputStream input, long skipLines) throws IOException {
input.seek(0);
byte[] buffer = new byte[BUFFER_SIZE];
int matched = 0;
long lines = 0;
long pos = 0;
int n;
while ((n = input.read(buffer)) != -1) {
for (int i = 0; i < n; i++) {
pos++;
if (buffer[i] == delimiterBytes[matched]) {
matched++;
if (matched == delimiterBytes.length) {
matched = 0;
lines++;
if (lines >= skipLines) {
return pos;
}
}
} else {
matched = buffer[i] == delimiterBytes[0] ? 1 : 0;
}
}
}
return pos;
}

private long findNextDelimiterWithSeek(FSDataInputStream input, long startPos, long fileSize)
throws IOException {
long scanStart = Math.max(0, startPos - (delimiterBytes.length - 1));
input.seek(scanStart);
byte[] buffer = new byte[BUFFER_SIZE];
int matched = 0;
long pos = scanStart;
int n;
while ((n = input.read(buffer)) != -1) {
for (int i = 0; i < n; i++) {
pos++;
if (buffer[i] == delimiterBytes[matched]) {
matched++;
if (matched == delimiterBytes.length) {
long endPos = pos;
if (endPos >= startPos) {
return endPos;
}
matched = 0;
}
} else {
matched = buffer[i] == delimiterBytes[0] ? 1 : 0;
}
}
}
return Math.min(fileSize, pos);
}

@Override
public void close() throws IOException {
hadoopFileSystemProxy.close();
}
}
Loading