[Feature][Connector-V2][HdfsFile] Support true large-file split for parallel read#10332
[Feature][Connector-V2][HdfsFile] Support true large-file split for parallel read#10332davidzollo merged 9 commits intoapache:devfrom
Conversation
...l/connectors/seatunnel/file/hdfs/source/split/HdfsFileAccordingToSplitSizeSplitStrategy.java
Outdated
Show resolved
Hide resolved
...ache/seatunnel/connectors/seatunnel/file/hdfs/source/split/HdfsFileSplitStrategyFactory.java
Outdated
Show resolved
Hide resolved
...ache/seatunnel/connectors/seatunnel/file/hdfs/source/split/HdfsParquetFileSplitStrategy.java
Outdated
Show resolved
Hide resolved
…trategies in base and fix restore
...va/org/apache/seatunnel/connectors/seatunnel/file/source/split/ParquetFileSplitStrategy.java
Show resolved
Hide resolved
|
hi,It seems that a parquetd use case is still missing |
|
Thanks for implementing this important feature! The overall approach looks solid, but I found 2 CRITICAL issues that should be addressed before merging: 1. Poor error observability (GEN-002) 2. Severe performance regression for large file splits (GEN-003) Note on COR-001: The idempotency concern in |
The repair has been completed as required. |
...va/org/apache/seatunnel/connectors/seatunnel/file/source/split/ParquetFileSplitStrategy.java
Show resolved
Hide resolved
Issue 1: Binary Compatibility Break - Removal of LocalFileAccordingToSplitSizeSplitStrategyLocation:
Related Context:
Problem Description: Potential Risks:
Impact Scope:
Severity: MINOR Improvement Suggestions: // Keep old class as deprecated adapter
@Deprecated
public class LocalFileAccordingToSplitSizeSplitStrategy extends AccordingToSplitSizeSplitStrategy {
public LocalFileAccordingToSplitSizeSplitStrategy(
String rowDelimiter, long skipHeaderRowNumber, String encodingName, long splitSize) {
super(new LocalFileHadoopConfig(), rowDelimiter, skipHeaderRowNumber, encodingName, splitSize);
}
}Rationale: Keep one version as a deprecated adapter to give users migration time, and completely remove it in version 2.8.0. Issue 2: Boundary Condition Defect - delimiterBytes.length == 0 causes seek failureLocation: long scanStart = Math.max(0, startPos - (delimiterBytes.length - 1));
input.seek(scanStart);Related Context:
Problem Description: A more serious problem is: when Potential Risks:
Impact Scope:
Severity: MINOR Improvement Suggestions: private long findNextDelimiterWithSeek(FSDataInputStream input, long startPos, long fileSize)
throws IOException {
// Explicit assertion to clarify algorithm assumptions
if (delimiterBytes.length == 0) {
throw new IllegalStateException("delimiterBytes must not be empty");
}
// Extract magic numbers as constants
long scanStart = Math.max(0, startPos - (delimiterBytes.length - 1));
input.seek(scanStart);
// ...
}Rationale: Add defensive assertions to make algorithm assumptions explicit even though the constructor has validated, facilitating future maintenance. Issue 3: API Compatibility - AccordingToSplitSizeSplitStrategy changed from abstract to concreteLocation: // Old version
public abstract class AccordingToSplitSizeSplitStrategy implements FileSplitStrategy {
protected abstract InputStream getInputStream(String filePath) throws IOException;
protected abstract long getFileSize(String filePath) throws IOException;
}
// New version
public class AccordingToSplitSizeSplitStrategy implements FileSplitStrategy, Closeable {
public AccordingToSplitSizeSplitStrategy(HadoopConf hadoopConf, ...) {
this.hadoopFileSystemProxy = new HadoopFileSystemProxy(hadoopConf);
}
}Related Context:
Problem Description: Although there are no other subclasses within SeaTunnel, if external extensions inherit from this class, they will:
Potential Risks:
Impact Scope:
Severity: MINOR Improvement Suggestions: Option 2: Keep abstract base class, add concrete implementation // Keep old abstract class (deprecated)
@Deprecated
public abstract class LegacyAccordingToSplitSizeSplitStrategy implements FileSplitStrategy {
// Old abstract method
protected abstract InputStream getInputStream(String filePath) throws IOException;
protected abstract long getFileSize(String filePath) throws IOException;
}
// New concrete implementation
public class AccordingToSplitSizeSplitStrategy implements FileSplitStrategy, Closeable {
// New implementation
}Rationale: Although there are no internal subclasses currently, as a framework library, API stability should be maintained, at least marking Issue 4: Code Duplication - HdfsFileSource and LocalFileSource have identical initialization logicLocation:
Related Context: // HdfsFileSource.java
private static FileSplitStrategy initFileSplitStrategy(MultipleTableHdfsFileSourceConfig config) {
Map<String, FileSplitStrategy> splitStrategies = new HashMap<>();
for (BaseFileSourceConfig fileSourceConfig : config.getFileSourceConfigs()) {
String tableId = fileSourceConfig.getCatalogTable().getTableId().toTablePath().toString();
splitStrategies.put(tableId,
FileSplitStrategyFactory.initFileSplitStrategy(
fileSourceConfig.getBaseFileSourceConfig(),
fileSourceConfig.getHadoopConfig()));
}
return new MultipleTableFileSplitStrategy(splitStrategies);
}
// LocalFileSource.java - Exactly the same!
private static FileSplitStrategy initFileSplitStrategy(MultipleTableLocalFileSourceConfig sourceConfig) {
Map<String, FileSplitStrategy> splitStrategies = new HashMap<>();
for (BaseFileSourceConfig fileSourceConfig : sourceConfig.getFileSourceConfigs()) {
String tableId = fileSourceConfig.getCatalogTable().getTableId().toTablePath().toString();
splitStrategies.put(tableId,
FileSplitStrategyFactory.initFileSplitStrategy(
fileSourceConfig.getBaseFileSourceConfig(),
fileSourceConfig.getHadoopConfig()));
}
return new MultipleTableFileSplitStrategy(splitStrategies);
}Problem Description: Potential Risks:
Impact Scope:
Severity: MINOR Improvement Suggestions: // BaseMultipleTableFileSource.java
public abstract class BaseMultipleTableFileSource {
protected static FileSplitStrategy initFileSplitStrategy(
BaseMultipleTableFileSourceConfig sourceConfig) {
Map<String, FileSplitStrategy> splitStrategies = new HashMap<>();
for (BaseFileSourceConfig fileSourceConfig : sourceConfig.getFileSourceConfigs()) {
String tableId = fileSourceConfig.getCatalogTable().getTableId().toTablePath().toString();
splitStrategies.put(tableId,
FileSplitStrategyFactory.initFileSplitStrategy(
fileSourceConfig.getBaseFileSourceConfig(),
fileSourceConfig.getHadoopConfig()));
}
return new MultipleTableFileSplitStrategy(splitStrategies);
}
}
// HdfsFileSource.java
private HdfsFileSource(MultipleTableHdfsFileSourceConfig sourceConfig) {
super(sourceConfig, initFileSplitStrategy(sourceConfig)); // Call parent class method
}
// LocalFileSource.java
private LocalFileSource(MultipleTableLocalFileSourceConfig sourceConfig) {
super(sourceConfig, initFileSplitStrategy(sourceConfig)); // Call parent class method
}Rationale: Extract common logic to parent class to reduce duplication and improve maintainability. Issue 6: Incomplete Documentation - Missing performance recommendations and limitation notesLocation: Related Context: Problem Description:
Potential Risks:
Impact Scope:
Severity: MINOR Improvement Suggestions: ### enable_file_split [boolean]
Turn on the file splitting function, the default is false. ...
**Performance Recommendations:**
- **Enable when**: Reading few large files (> 1GB each) with parallelism > 4
- **Disable when**: Reading many small files (< 128MB each) or parallelism = 1
- **Not supported for**: Compressed files (gzip, bzip2), archive files (zip, tar)
**Limitations:**
- Maximum splits per file: 100,000 (configurable via `seaunnel.source.file.split.max-count`)
- Memory overhead: ~100 bytes per split
### file_split_size [long]
File split size, which can be filled in when the enable_file_split parameter is true. ...
**Tuning Guidelines:**
- **Recommended**: 128MB - 512MB for HDFS, 64MB - 256MB for local file
- **Too small (< 16MB)**: Excessive splits, high overhead
- **Too large (> 1GB)**: Insufficient parallelism
- **Formula**: `file_split_size = total_file_size / desired_parallelism`Rationale: Add performance guidelines and limitation notes to help users use the feature correctly. Issue 7: Architectural Complexity - LocalFile requiring HadoopConf configuration concept overflowLocation: splitStrategies.put(tableId,
FileSplitStrategyFactory.initFileSplitStrategy(
fileSourceConfig.getBaseFileSourceConfig(),
fileSourceConfig.getHadoopConfig())); // ← Does LocalFile need HadoopConf?Related Context:
Problem Description:
Potential Risks:
Impact Scope:
Severity: MINOR Improvement Suggestions: For **LocalFile**, the `hadoop_conf` is optional and defaults to `file:///` schema.
You rarely need to configure it unless you have custom Hadoop settings.Option 2 (Complex): Abstract public interface FileSystemProxy {
FSDataInputStream getInputStream(String path) throws IOException;
FileStatus getFileStatus(String path) throws IOException;
}
public class HadoopFileSystemProxy implements FileSystemProxy { ... }
public class LocalFileSystemProxy implements FileSystemProxy { ... } // Pure Java NIORationale: Option 1 is simple and effective, Option 2 has clearer architecture but requires more work. The current solution is acceptable. Issue 8: Checkpoint Compatibility Not Fully ValidatedLocation: public FileSourceSplit(String tableId, String filePath, long start, long length) {
this.tableId = tableId;
this.filePath = filePath;
this.start = start; // ← New field assigned in constructor
this.length = length; // ← New field assigned in constructor
}Related Context:
Problem Description:
Validation: protected static InputStream safeSlice(InputStream in, long start, long length) throws IOException {
// ...
return new BoundedInputStream(in, length); // How does BoundedInputStream handle length=-1?
}Apache Commons IO's Potential Risks:
Impact Scope:
Severity: MAJOR Improvement Suggestions: // FileSourceSplit.java
public FileSourceSplit(String tableId, String filePath, long start, long length) {
this.tableId = tableId;
this.filePath = filePath;
this.start = start;
// Clarify semantics: -1 means "entire file" (compatible with old version)
this.length = length;
}
// AbstractReadStrategy.java
protected static InputStream safeSlice(InputStream in, long start, long length) throws IOException {
if (start > 0) {
if (in instanceof Seekable) {
((Seekable) in).seek(start);
} else {
// ...
}
}
// length=-1 means unlimited (compatible with old checkpoint)
if (length < 0) {
return in;
}
return new BoundedInputStream(in, length);
}Also add tests: @Test
void testDeserializeFromOldCheckpoint() {
// Simulate FileSourceSplit from old version serialization
FileSourceSplit oldSplit = new FileSourceSplit("table", "path"); // start=0, length=-1
// Verify reading entire file
List<String> records = readBySplit(oldSplit);
Assertions.assertEquals(fullFileRecords, records);
}Rationale: Clarify the semantics of |
The above suggestions have been implemented. |


Purpose of this pull request
#10326
HdfsFile source currently uses “one file = one split”, which limits parallelism when there are only a few huge files.
Does this PR introduce any user-facing change?
yes
text/csv/json: split by file_split_size and align split end to the next row_delimiter (HDFS seek-based implementation for large files).
parquet: split by RowGroup (logical split) and read footer metadata using HadoopConf-backed Configuration (works with Kerberos/HA/NameService).
How was this patch tested?
Unit tests
HdfsFileAccordingToSplitSizeSplitStrategyTest#testReadBySplitsShouldMatchFullRead
E2E
HdfsFileIT#testHdfsTextReadWithFileSplit
Check list
New License Guide
incompatible-changes.mdto describe the incompatibility caused by this PR.