Skip to content

Commit cad38a0

Browse files
committed
[core] Introduce IncrementalSplit to simplify DataSplit
1 parent 8526f01 commit cad38a0

File tree

18 files changed

+386
-195
lines changed

18 files changed

+386
-195
lines changed

paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -229,10 +229,6 @@ public RecordReader<KeyValue> createReader(Split split) throws IOException {
229229
}
230230

231231
public RecordReader<KeyValue> createReader(DataSplit split) throws IOException {
232-
if (!split.beforeFiles().isEmpty()) {
233-
throw new IllegalArgumentException("This read cannot accept split with before files.");
234-
}
235-
236232
if (split.isStreaming() || split.bucket() == BucketMode.POSTPONE_BUCKET) {
237233
return createNoMergeReader(
238234
split.partition(),

paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,6 @@
5353
import org.apache.paimon.utils.IOExceptionSupplier;
5454
import org.apache.paimon.utils.RoaringBitmap32;
5555

56-
import org.slf4j.Logger;
57-
import org.slf4j.LoggerFactory;
58-
5956
import javax.annotation.Nullable;
6057

6158
import java.io.IOException;
@@ -70,8 +67,6 @@
7067
/** A {@link SplitRead} to read raw file directly from {@link DataSplit}. */
7168
public class RawFileSplitRead implements SplitRead<InternalRow> {
7269

73-
private static final Logger LOG = LoggerFactory.getLogger(RawFileSplitRead.class);
74-
7570
private final FileIO fileIO;
7671
private final SchemaManager schemaManager;
7772
private final TableSchema schema;
@@ -145,10 +140,6 @@ public SplitRead<InternalRow> withLimit(@Nullable Integer limit) {
145140
@Override
146141
public RecordReader<InternalRow> createReader(Split s) throws IOException {
147142
DataSplit split = (DataSplit) s;
148-
if (!split.beforeFiles().isEmpty()) {
149-
LOG.info("Ignore split before files: {}", split.beforeFiles());
150-
}
151-
152143
List<DataFileMeta> files = split.dataFiles();
153144
DeletionVector.Factory dvFactory =
154145
DeletionVector.factory(fileIO, files, split.deletionFiles().orElse(null));

paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java

Lines changed: 8 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,6 @@ public class DataSplit implements Split {
6969
private String bucketPath;
7070
@Nullable private Integer totalBuckets;
7171

72-
private List<DataFileMeta> beforeFiles = new ArrayList<>();
73-
@Nullable private List<DeletionFile> beforeDeletionFiles;
74-
7572
private List<DataFileMeta> dataFiles;
7673
@Nullable private List<DeletionFile> dataDeletionFiles;
7774

@@ -100,14 +97,6 @@ public String bucketPath() {
10097
return totalBuckets;
10198
}
10299

103-
public List<DataFileMeta> beforeFiles() {
104-
return beforeFiles;
105-
}
106-
107-
public Optional<List<DeletionFile>> beforeDeletionFiles() {
108-
return Optional.ofNullable(beforeDeletionFiles);
109-
}
110-
111100
public List<DataFileMeta> dataFiles() {
112101
return dataFiles;
113102
}
@@ -125,10 +114,6 @@ public boolean rawConvertible() {
125114
return rawConvertible;
126115
}
127116

128-
public OptionalLong latestFileCreationEpochMillis() {
129-
return this.dataFiles.stream().mapToLong(DataFileMeta::creationTimeEpochMillis).max();
130-
}
131-
132117
public OptionalLong earliestFileCreationEpochMillis() {
133118
return this.dataFiles.stream().mapToLong(DataFileMeta::creationTimeEpochMillis).min();
134119
}
@@ -227,32 +212,6 @@ public Long nullCount(int fieldIndex, SimpleStatsEvolutions evolutions) {
227212
return sum;
228213
}
229214

230-
/**
231-
* Obtain merged row count as much as possible. There are two scenarios where accurate row count
232-
* can be calculated:
233-
*
234-
* <p>1. raw file and no deletion file.
235-
*
236-
* <p>2. raw file + deletion file with cardinality.
237-
*/
238-
public long partialMergedRowCount() {
239-
long sum = 0L;
240-
if (rawConvertible) {
241-
List<RawFile> rawFiles = convertToRawFiles().orElse(null);
242-
if (rawFiles != null) {
243-
for (int i = 0; i < rawFiles.size(); i++) {
244-
RawFile rawFile = rawFiles.get(i);
245-
if (dataDeletionFiles == null || dataDeletionFiles.get(i) == null) {
246-
sum += rawFile.rowCount();
247-
} else if (dataDeletionFiles.get(i).cardinality() != null) {
248-
sum += rawFile.rowCount() - dataDeletionFiles.get(i).cardinality();
249-
}
250-
}
251-
}
252-
}
253-
return sum;
254-
}
255-
256215
@Override
257216
public Optional<List<RawFile>> convertToRawFiles() {
258217
if (rawConvertible) {
@@ -319,8 +278,6 @@ public boolean equals(Object o) {
319278
&& Objects.equals(partition, dataSplit.partition)
320279
&& Objects.equals(bucketPath, dataSplit.bucketPath)
321280
&& Objects.equals(totalBuckets, dataSplit.totalBuckets)
322-
&& Objects.equals(beforeFiles, dataSplit.beforeFiles)
323-
&& Objects.equals(beforeDeletionFiles, dataSplit.beforeDeletionFiles)
324281
&& Objects.equals(dataFiles, dataSplit.dataFiles)
325282
&& Objects.equals(dataDeletionFiles, dataSplit.dataDeletionFiles);
326283
}
@@ -333,8 +290,6 @@ public int hashCode() {
333290
bucket,
334291
bucketPath,
335292
totalBuckets,
336-
beforeFiles,
337-
beforeDeletionFiles,
338293
dataFiles,
339294
dataDeletionFiles,
340295
isStreaming,
@@ -371,8 +326,6 @@ protected void assign(DataSplit other) {
371326
this.bucket = other.bucket;
372327
this.bucketPath = other.bucketPath;
373328
this.totalBuckets = other.totalBuckets;
374-
this.beforeFiles = other.beforeFiles;
375-
this.beforeDeletionFiles = other.beforeDeletionFiles;
376329
this.dataFiles = other.dataFiles;
377330
this.dataDeletionFiles = other.dataDeletionFiles;
378331
this.isStreaming = other.isStreaming;
@@ -394,12 +347,10 @@ public void serialize(DataOutputView out) throws IOException {
394347
}
395348

396349
DataFileMetaSerializer dataFileSer = new DataFileMetaSerializer();
397-
out.writeInt(beforeFiles.size());
398-
for (DataFileMeta file : beforeFiles) {
399-
dataFileSer.serialize(file, out);
400-
}
401350

402-
DeletionFile.serializeList(out, beforeDeletionFiles);
351+
// compatible with old beforeFiles
352+
out.writeInt(0);
353+
DeletionFile.serializeList(out, null);
403354

404355
out.writeInt(dataFiles.size());
405356
for (DataFileMeta file : dataFiles) {
@@ -428,13 +379,15 @@ public static DataSplit deserialize(DataInputView in) throws IOException {
428379
FunctionWithIOException<DataInputView, DeletionFile> deletionFileSerde =
429380
getDeletionFileSerde(version);
430381
int beforeNumber = in.readInt();
431-
List<DataFileMeta> beforeFiles = new ArrayList<>(beforeNumber);
432-
for (int i = 0; i < beforeNumber; i++) {
433-
beforeFiles.add(dataFileSer.apply(in));
382+
if (beforeNumber > 0) {
383+
throw new RuntimeException("Cannot deserialize data split with before files.");
434384
}
435385

436386
List<DeletionFile> beforeDeletionFiles =
437387
DeletionFile.deserializeList(in, deletionFileSerde);
388+
if (beforeDeletionFiles != null) {
389+
throw new RuntimeException("Cannot deserialize data split with before deletion files.");
390+
}
438391

439392
int fileNumber = in.readInt();
440393
List<DataFileMeta> dataFiles = new ArrayList<>(fileNumber);
@@ -454,14 +407,10 @@ public static DataSplit deserialize(DataInputView in) throws IOException {
454407
.withBucket(bucket)
455408
.withBucketPath(bucketPath)
456409
.withTotalBuckets(totalBuckets)
457-
.withBeforeFiles(beforeFiles)
458410
.withDataFiles(dataFiles)
459411
.isStreaming(isStreaming)
460412
.rawConvertible(rawConvertible);
461413

462-
if (beforeDeletionFiles != null) {
463-
builder.withBeforeDeletionFiles(beforeDeletionFiles);
464-
}
465414
if (dataDeletionFiles != null) {
466415
builder.withDataDeletionFiles(dataDeletionFiles);
467416
}
@@ -539,16 +488,6 @@ public Builder withTotalBuckets(Integer totalBuckets) {
539488
return this;
540489
}
541490

542-
public Builder withBeforeFiles(List<DataFileMeta> beforeFiles) {
543-
this.split.beforeFiles = new ArrayList<>(beforeFiles);
544-
return this;
545-
}
546-
547-
public Builder withBeforeDeletionFiles(List<DeletionFile> beforeDeletionFiles) {
548-
this.split.beforeDeletionFiles = new ArrayList<>(beforeDeletionFiles);
549-
return this;
550-
}
551-
552491
public Builder withDataFiles(List<DataFileMeta> dataFiles) {
553492
this.split.dataFiles = new ArrayList<>(dataFiles);
554493
return this;

0 commit comments

Comments
 (0)