Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/en/connectors/source/LocalFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ File modification time filter. The connector will filter some files base on the

### enable_file_split [string]

Turn on the file splitting function, the default is false.It can be selected when the file type is csv, text, json, parquet and non-compressed format.
Turn on the file splitting function, the default is false.It can be selected when the file type is csv, text, json, parquet, orc and non-compressed format.

### file_split_size [long]

Expand Down
2 changes: 1 addition & 1 deletion docs/zh/connectors/source/LocalFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ null_format 定义哪些字符串可以表示为 null。

### enable_file_split [boolean]

开启文件分割功能,默认为false。文件类型为csv、text、json、parquet非压缩格式时可选择
开启文件分割功能,默认为false。文件类型为csv、text、json、parquet、orc非压缩格式时可选择

### file_split_size [long]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ public boolean supportFileSplit() {
case TEXT:
case JSON:
case PARQUET:
case ORC:
return true;
default:
log.info("The {} file type does not support file split", this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSourceSplit;

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -84,13 +85,23 @@ public class OrcReadStrategy extends AbstractReadStrategy {
@Override
public void read(String path, String tableId, Collector<SeaTunnelRow> output)
throws FileConnectorException, IOException {
this.read(new FileSourceSplit(path), output);
}

@Override
public void read(FileSourceSplit split, Collector<SeaTunnelRow> output)
throws FileConnectorException, IOException {
String tableId = split.getTableId();
String path = split.getFilePath();
if (Boolean.FALSE.equals(checkFileType(path))) {
String errorMsg =
String.format(
"This file [%s] is not a orc file, please check the format of this file",
path);
throw new FileConnectorException(FileConnectorErrorCode.FILE_TYPE_INVALID, errorMsg);
}
final boolean useSplitRange =
enableSplitFile && split.getStart() >= 0 && split.getLength() > 0;
Map<String, String> partitionsMap = parsePartitionsByPath(path);
try (Reader reader =
hadoopFileSystemProxy.doWithHadoopAuth(
Expand All @@ -107,6 +118,12 @@ public void read(String path, String tableId, Collector<SeaTunnelRow> output)
}
List<TypeDescription> children = schema.getChildren();
RecordReader rows = reader.rows(reader.options().schema(schema));
if (useSplitRange) {
long start = split.getStart();
long length = split.getLength();
Reader.Options options = new Reader.Options().range(start, length);
rows = reader.rows(options);
}
VectorizedRowBatch rowBatch = schema.createRowBatch();
while (rows.nextBatch(rowBatch)) {
int num = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;

import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;

@Slf4j
public abstract class AccordingToSplitSizeSplitStrategy implements FileSplitStrategy {

private final long skipHeaderRowNumber;
Expand Down Expand Up @@ -66,6 +69,7 @@ public List<FileSourceSplit> split(String tableId, String filePath) {
new FileSourceSplit(tableId, filePath, currentStart, actualEnd - currentStart));
currentStart = actualEnd;
}
log.info("Split file {} into {} splits.", filePath, splits.size());
return splits;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public String splitId() {
if (tableId == null) {
return filePath;
}
return tableId + "_" + filePath + "_" + start;
return tableId + "_" + filePath + "_" + start + "_" + length;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.file.source.split;

import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.orc.OrcFile;
import org.apache.orc.Reader;
import org.apache.orc.StripeInformation;

import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
* {@link OrcFileSplitStrategy} defines a split strategy for ORC files based on ORC stripes.
*
* <p>This strategy uses {@code Stripe} as the minimum indivisible split unit and generates {@link
* FileSourceSplit}s by merging one or more contiguous stripes according to the configured split
* size. A split will never break a stripe, ensuring correctness and compatibility with ORC readers.
*
* <p>The generated split range ({@code start}, {@code length}) represents a byte range covering
* complete stripes. The actual row-level reading and decoding are delegated to the ORC reader
* implementation.
*/
@Slf4j
public class OrcFileSplitStrategy implements FileSplitStrategy {

private final long splitSizeBytes;

public OrcFileSplitStrategy(long splitSizeBytes) {
if (splitSizeBytes <= 0) {
throw new SeaTunnelRuntimeException(
FileConnectorErrorCode.FILE_SPLIT_SIZE_ILLEGAL,
"SplitSizeBytes must be greater than 0");
}
this.splitSizeBytes = splitSizeBytes;
}

@Override
public List<FileSourceSplit> split(String tableId, String filePath) {
try {
return splitByStripes(tableId, filePath, readStripes(filePath));
} catch (IOException e) {
throw new SeaTunnelRuntimeException(FileConnectorErrorCode.FILE_SPLIT_FAIL, e);
}
}

/** Core split logic based on stripe metadata. This method is IO-free and unit-test friendly. */
List<FileSourceSplit> splitByStripes(
String tableId, String filePath, List<StripeInformation> stripes) {
List<FileSourceSplit> splits = new ArrayList<>();
if (stripes == null || stripes.isEmpty()) {
return splits;
}
long currentStart = 0;
long currentLength = 0;
boolean hasOpenSplit = false;
for (StripeInformation stripe : stripes) {
long stripeStart = stripe.getOffset();
long stripeSize = stripe.getLength();
if (!hasOpenSplit) {
currentStart = stripeStart;
currentLength = stripeSize;
hasOpenSplit = true;
continue;
}
if (currentLength + stripeSize > splitSizeBytes) {
splits.add(new FileSourceSplit(tableId, filePath, currentStart, currentLength));
currentStart = stripeStart;
currentLength = stripeSize;
} else {
currentLength += stripeSize;
}
}
if (hasOpenSplit && currentLength > 0) {
splits.add(new FileSourceSplit(tableId, filePath, currentStart, currentLength));
}
log.info("Split orc file {} into {} splits.", filePath, splits.size());
return splits;
}

private List<StripeInformation> readStripes(String filePath) throws IOException {
Path path = new Path(filePath);
Configuration conf = new Configuration();
OrcFile.ReaderOptions readerOptions = OrcFile.readerOptions(conf);
try (Reader reader = OrcFile.createReader(path, readerOptions)) {
return reader.getStripes();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.util.HadoopInputFile;

import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -46,6 +48,7 @@
* <p>This design enables efficient parallel reading of Parquet files while preserving Parquet
* format semantics and avoiding invalid byte-level splits.
*/
@Slf4j
public class ParquetFileSplitStrategy implements FileSplitStrategy {

private final long splitSizeBytes;
Expand Down Expand Up @@ -104,6 +107,7 @@ List<FileSourceSplit> splitByRowGroups(
if (hasOpenSplit && currentLength > 0) {
splits.add(new FileSourceSplit(tableId, filePath, currentStart, currentLength));
}
log.info("Split parquet file {} into {} splits.", filePath, splits.size());
return splits;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.connectors.seatunnel.file.source.split;

import org.apache.orc.StripeInformation;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import static org.mockito.Mockito.when;

public class OrcFileSplitStrategyTest {

private static final String TABLE_ID = "test.test_table";
private static final String FILE_PATH = "/tmp/test.orc";

@Test
void testSplitByStripesEmpty() {
OrcFileSplitStrategy strategy = new OrcFileSplitStrategy(100);
List<FileSourceSplit> splits =
strategy.splitByStripes(TABLE_ID, FILE_PATH, Collections.emptyList());
Assertions.assertTrue(splits.isEmpty());
}

@Test
void testSplitByStripesSingleStripe() {
OrcFileSplitStrategy strategy = new OrcFileSplitStrategy(1000);
List<StripeInformation> stripes = new ArrayList<>();
stripes.add(mockStripe(0, 200));
List<FileSourceSplit> splits = strategy.splitByStripes(TABLE_ID, FILE_PATH, stripes);
Assertions.assertEquals(1, splits.size());
FileSourceSplit split = splits.get(0);
Assertions.assertEquals(0, split.getStart());
Assertions.assertEquals(200, split.getLength());
}

@Test
void testSplitByStripesMergeStripes() {
OrcFileSplitStrategy strategy = new OrcFileSplitStrategy(500);
List<StripeInformation> stripes = new ArrayList<>();
stripes.add(mockStripe(0, 100));
stripes.add(mockStripe(100, 150));
stripes.add(mockStripe(250, 200));
List<FileSourceSplit> splits = strategy.splitByStripes(TABLE_ID, FILE_PATH, stripes);
Assertions.assertEquals(1, splits.size());
FileSourceSplit split = splits.get(0);
Assertions.assertEquals(0, split.getStart());
Assertions.assertEquals(450, split.getLength());
}

@Test
void testSplitByStripesSplitWhenExceedsThreshold() {
OrcFileSplitStrategy strategy = new OrcFileSplitStrategy(300);
List<StripeInformation> stripes = new ArrayList<>();
stripes.add(mockStripe(0, 100));
stripes.add(mockStripe(100, 150));
stripes.add(mockStripe(250, 200));
List<FileSourceSplit> splits = strategy.splitByStripes(TABLE_ID, FILE_PATH, stripes);
Assertions.assertEquals(2, splits.size());
FileSourceSplit first = splits.get(0);
Assertions.assertEquals(0, first.getStart());
Assertions.assertEquals(250, first.getLength());
FileSourceSplit second = splits.get(1);
Assertions.assertEquals(250, second.getStart());
Assertions.assertEquals(200, second.getLength());
}

private StripeInformation mockStripe(long offset, long length) {
StripeInformation stripe = Mockito.mock(StripeInformation.class);
when(stripe.getOffset()).thenReturn(offset);
when(stripe.getLength()).thenReturn(length);
return stripe;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.connectors.seatunnel.file.source.split.DefaultFileSplitStrategy;
import org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSplitStrategy;
import org.apache.seatunnel.connectors.seatunnel.file.source.split.OrcFileSplitStrategy;
import org.apache.seatunnel.connectors.seatunnel.file.source.split.ParquetFileSplitStrategy;

import static org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions.DEFAULT_ROW_DELIMITER;
Expand All @@ -45,6 +46,9 @@ public static FileSplitStrategy initFileSplitStrategy(ReadonlyConfig readonlyCon
if (FileFormat.PARQUET == readonlyConfig.get(FileBaseSourceOptions.FILE_FORMAT_TYPE)) {
return new ParquetFileSplitStrategy(fileSplitSize);
}
if (FileFormat.ORC == readonlyConfig.get(FileBaseSourceOptions.FILE_FORMAT_TYPE)) {
return new OrcFileSplitStrategy(fileSplitSize);
}
String rowDelimiter =
!readonlyConfig.getOptional(FileBaseSourceOptions.ROW_DELIMITER).isPresent()
? DEFAULT_ROW_DELIMITER
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.seatunnel.connectors.seatunnel.file.local.source.split.LocalFileSplitStrategyFactory;
import org.apache.seatunnel.connectors.seatunnel.file.source.split.DefaultFileSplitStrategy;
import org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSplitStrategy;
import org.apache.seatunnel.connectors.seatunnel.file.source.split.OrcFileSplitStrategy;
import org.apache.seatunnel.connectors.seatunnel.file.source.split.ParquetFileSplitStrategy;

import org.junit.jupiter.api.Assertions;
Expand All @@ -43,7 +44,7 @@ void testInitFileSplitStrategy() {
map.put(FileBaseSourceOptions.ENABLE_FILE_SPLIT.key(), true);
FileSplitStrategy fileSplitStrategy =
LocalFileSplitStrategyFactory.initFileSplitStrategy(ReadonlyConfig.fromMap(map));
Assertions.assertInstanceOf(DefaultFileSplitStrategy.class, fileSplitStrategy);
Assertions.assertInstanceOf(OrcFileSplitStrategy.class, fileSplitStrategy);
// test text, no split
Map<String, Object> map1 = new HashMap<>();
map1.put(FileBaseSourceOptions.FILE_FORMAT_TYPE.key(), FileFormat.TEXT);
Expand Down Expand Up @@ -99,5 +100,12 @@ void testInitFileSplitStrategy() {
fileSplitStrategy =
LocalFileSplitStrategyFactory.initFileSplitStrategy(ReadonlyConfig.fromMap(map7));
Assertions.assertInstanceOf(ParquetFileSplitStrategy.class, fileSplitStrategy);
// test BINARY
Map<String, Object> map8 = new HashMap<>();
map8.put(FileBaseSourceOptions.FILE_FORMAT_TYPE.key(), FileFormat.BINARY);
map8.put(FileBaseSourceOptions.ENABLE_FILE_SPLIT.key(), true);
fileSplitStrategy =
LocalFileSplitStrategyFactory.initFileSplitStrategy(ReadonlyConfig.fromMap(map8));
Assertions.assertInstanceOf(DefaultFileSplitStrategy.class, fileSplitStrategy);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,7 @@ public void testLocalFileReadAndWrite(TestContainer container)
helper.execute("/orc/fake_to_local_file_orc.conf");
// test read local orc file
helper.execute("/orc/local_file_orc_to_assert.conf");
helper.execute("/orc/local_file_orc_enable_split_to_assert.conf");
// test read local orc file with projection
helper.execute("/orc/local_file_orc_projection_to_assert.conf");
// test read local orc file with projection and type cast
Expand Down
Loading