Skip to content

Commit 8526f01

Browse files
authored
[core] Introduce mergedRowCount in Split (apache#7090)
1 parent f08c561 commit 8526f01

30 files changed

+193
-34
lines changed

paimon-core/src/main/java/org/apache/paimon/globalindex/IndexedSplit.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import java.util.Arrays;
3636
import java.util.List;
3737
import java.util.Objects;
38+
import java.util.OptionalLong;
3839

3940
/** Indexed split for global index. */
4041
public class IndexedSplit implements Split {
@@ -71,6 +72,11 @@ public long rowCount() {
7172
return rowRanges.stream().mapToLong(r -> r.to - r.from + 1).sum();
7273
}
7374

75+
@Override
76+
public OptionalLong mergedRowCount() {
77+
return OptionalLong.of(rowCount());
78+
}
79+
7480
@Override
7581
public boolean equals(Object o) {
7682
if (this == o) {

paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
import java.util.Map;
6464
import java.util.Objects;
6565
import java.util.Optional;
66+
import java.util.OptionalLong;
6667
import java.util.Set;
6768
import java.util.stream.Collectors;
6869

@@ -249,6 +250,7 @@ private boolean sameRowTypeIgnoreNullable(RowType mainRowType, RowType fallbackR
249250

250251
/** Split for fallback read. */
251252
public interface FallbackSplit extends Split {
253+
252254
boolean isFallback();
253255

254256
Split wrapped();
@@ -281,6 +283,11 @@ public Split wrapped() {
281283
public long rowCount() {
282284
return split.rowCount();
283285
}
286+
287+
@Override
288+
public OptionalLong mergedRowCount() {
289+
return split.mergedRowCount();
290+
}
284291
}
285292

286293
/** DataSplit fallback implementation. */

paimon-core/src/main/java/org/apache/paimon/table/format/FormatDataSplit.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import javax.annotation.Nullable;
2626

2727
import java.util.Objects;
28+
import java.util.OptionalLong;
2829

2930
/** {@link FormatDataSplit} for format table. */
3031
public class FormatDataSplit implements Split {
@@ -85,6 +86,11 @@ public long rowCount() {
8586
return -1;
8687
}
8788

89+
@Override
90+
public OptionalLong mergedRowCount() {
91+
return OptionalLong.empty();
92+
}
93+
8894
@Override
8995
public boolean equals(Object o) {
9096
if (this == o) {

paimon-core/src/main/java/org/apache/paimon/table/object/ObjectTableImpl.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import java.util.Map;
5252
import java.util.Objects;
5353
import java.util.Optional;
54+
import java.util.OptionalLong;
5455

5556
import static org.apache.paimon.data.BinaryString.fromString;
5657

@@ -189,6 +190,11 @@ public boolean equals(Object o) {
189190
public int hashCode() {
190191
return Objects.hash(location);
191192
}
193+
194+
@Override
195+
public OptionalLong mergedRowCount() {
196+
return OptionalLong.empty();
197+
}
192198
}
193199

194200
private static class ObjectRead implements InnerTableRead {

paimon-core/src/main/java/org/apache/paimon/table/source/ChainSplit.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import java.util.List;
3636
import java.util.Map;
3737
import java.util.Objects;
38+
import java.util.OptionalLong;
3839

3940
/**
4041
* A split describes chain table read scope. It follows DataSplit's custom serialization pattern and
@@ -87,6 +88,11 @@ public long rowCount() {
8788
return sum;
8889
}
8990

91+
@Override
92+
public OptionalLong mergedRowCount() {
93+
return OptionalLong.empty();
94+
}
95+
9096
@Override
9197
public boolean equals(Object o) {
9298
if (this == o) {

paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@
5555

5656
import static org.apache.paimon.io.DataFilePathFactory.INDEX_PATH_SUFFIX;
5757
import static org.apache.paimon.utils.Preconditions.checkArgument;
58-
import static org.apache.paimon.utils.Preconditions.checkState;
5958

6059
/** Input splits. Needed by most batch computation engines. */
6160
public class DataSplit implements Split {
@@ -143,17 +142,31 @@ public long rowCount() {
143142
return rowCount;
144143
}
145144

146-
/** Whether it is possible to calculate the merged row count. */
147-
public boolean mergedRowCountAvailable() {
148-
return rawConvertible
149-
&& (dataDeletionFiles == null
150-
|| dataDeletionFiles.stream()
151-
.allMatch(f -> f == null || f.cardinality() != null));
152-
}
145+
@Override
146+
public OptionalLong mergedRowCount() {
147+
if (!rawConvertible
148+
|| (dataDeletionFiles != null
149+
&& !dataDeletionFiles.stream()
150+
.allMatch(f -> f == null || f.cardinality() != null))) {
151+
return OptionalLong.empty();
152+
}
153153

154-
public long mergedRowCount() {
155-
checkState(mergedRowCountAvailable());
156-
return partialMergedRowCount();
154+
long sum = 0L;
155+
List<RawFile> rawFiles = convertToRawFiles().orElse(null);
156+
if (rawFiles != null) {
157+
for (int i = 0; i < rawFiles.size(); i++) {
158+
RawFile rawFile = rawFiles.get(i);
159+
DeletionFile deletionFile =
160+
dataDeletionFiles == null ? null : dataDeletionFiles.get(i);
161+
Long cardinality = deletionFile == null ? null : deletionFile.cardinality();
162+
if (deletionFile == null) {
163+
sum += rawFile.rowCount();
164+
} else if (cardinality != null) {
165+
sum += rawFile.rowCount() - cardinality;
166+
}
167+
}
168+
}
169+
return OptionalLong.of(sum);
157170
}
158171

159172
public Object minValue(int fieldIndex, DataField dataField, SimpleStatsEvolutions evolutions) {

paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import java.util.ArrayList;
3838
import java.util.List;
3939
import java.util.Optional;
40+
import java.util.OptionalLong;
4041

4142
import static org.apache.paimon.table.source.PushDownUtils.minmaxAvailable;
4243

@@ -144,10 +145,10 @@ private Optional<StartingScanner.Result> applyPushDownLimit() {
144145

145146
List<Split> limitedSplits = new ArrayList<>();
146147
for (DataSplit dataSplit : splits) {
147-
if (dataSplit.rawConvertible()) {
148-
long partialMergedRowCount = dataSplit.partialMergedRowCount();
148+
OptionalLong mergedRowCount = dataSplit.mergedRowCount();
149+
if (mergedRowCount.isPresent()) {
149150
limitedSplits.add(dataSplit);
150-
scannedRowCount += partialMergedRowCount;
151+
scannedRowCount += mergedRowCount.getAsLong();
151152
if (scannedRowCount >= pushDownLimit) {
152153
SnapshotReader.Plan newPlan =
153154
new PlanImpl(plan.watermark(), plan.snapshotId(), limitedSplits);

paimon-core/src/main/java/org/apache/paimon/table/source/QueryAuthSplit.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222

2323
import javax.annotation.Nullable;
2424

25+
import java.util.OptionalLong;
26+
2527
/** A wrapper class for {@link Split} that adds query authorization information. */
2628
public class QueryAuthSplit implements Split {
2729

@@ -48,4 +50,9 @@ public TableQueryAuthResult authResult() {
4850
public long rowCount() {
4951
return split.rowCount();
5052
}
53+
54+
@Override
55+
public OptionalLong mergedRowCount() {
56+
return split.mergedRowCount();
57+
}
5158
}

paimon-core/src/main/java/org/apache/paimon/table/source/Split.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.io.Serializable;
2424
import java.util.List;
2525
import java.util.Optional;
26+
import java.util.OptionalLong;
2627

2728
/**
2829
* An input split for reading.
@@ -32,8 +33,20 @@
3233
@Public
3334
public interface Split extends Serializable {
3435

36+
/**
37+
* The row count in files, may be duplicated, such as in the primary key table and the
38+
* Data-Evolution Append table.
39+
*/
3540
long rowCount();
3641

42+
/**
43+
* Return the merged row count of data files. For example, when the delete vector is enabled in
44+
* the primary key table, the number of rows that have been deleted will be subtracted from the
45+
* returned result. In the Data Evolution mode of the Append table, the actual number of rows
46+
* will be returned.
47+
*/
48+
OptionalLong mergedRowCount();
49+
3750
/**
3851
* If all files in this split can be read without merging, returns an {@link Optional} wrapping
3952
* a list of {@link RawFile}s to be read without merging. Otherwise, returns {@link

paimon-core/src/main/java/org/apache/paimon/table/system/AggregationFieldsTable.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import java.util.List;
5454
import java.util.Map;
5555
import java.util.Objects;
56+
import java.util.OptionalLong;
5657
import java.util.function.Function;
5758

5859
import static org.apache.paimon.catalog.Identifier.SYSTEM_TABLE_SPLITTER;
@@ -160,6 +161,11 @@ public boolean equals(Object o) {
160161
public int hashCode() {
161162
return Objects.hash(location);
162163
}
164+
165+
@Override
166+
public OptionalLong mergedRowCount() {
167+
return OptionalLong.empty();
168+
}
163169
}
164170

165171
/** {@link TableRead} implementation for {@link AggregationFieldsTable}. */

0 commit comments

Comments
 (0)