Skip to content

Commit 3144200

Browse files
committed
[core] Correct merged row count for Data Evolution Split
1 parent 8526f01 commit 3144200

File tree

2 files changed

+69
-28
lines changed

2 files changed

+69
-28
lines changed

paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java

Lines changed: 51 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.paimon.types.DataTypes;
4040
import org.apache.paimon.utils.FunctionWithIOException;
4141
import org.apache.paimon.utils.InternalRowUtils;
42+
import org.apache.paimon.utils.RangeHelper;
4243
import org.apache.paimon.utils.SerializationUtils;
4344

4445
import javax.annotation.Nullable;
@@ -144,29 +145,61 @@ public long rowCount() {
144145

145146
@Override
146147
public OptionalLong mergedRowCount() {
147-
if (!rawConvertible
148-
|| (dataDeletionFiles != null
149-
&& !dataDeletionFiles.stream()
150-
.allMatch(f -> f == null || f.cardinality() != null))) {
151-
return OptionalLong.empty();
148+
if (rawMergedRowCountAvailable()) {
149+
return OptionalLong.of(rawMergedRowCount());
152150
}
151+
if (dataEvolutionRowCountAvailable()) {
152+
return OptionalLong.of(dataEvolutionMergedRowCount());
153+
}
154+
return OptionalLong.empty();
155+
}
153156

157+
private boolean rawMergedRowCountAvailable() {
158+
return rawConvertible
159+
&& (dataDeletionFiles == null
160+
|| dataDeletionFiles.stream()
161+
.allMatch(f -> f == null || f.cardinality() != null));
162+
}
163+
164+
private long rawMergedRowCount() {
154165
long sum = 0L;
155-
List<RawFile> rawFiles = convertToRawFiles().orElse(null);
156-
if (rawFiles != null) {
157-
for (int i = 0; i < rawFiles.size(); i++) {
158-
RawFile rawFile = rawFiles.get(i);
159-
DeletionFile deletionFile =
160-
dataDeletionFiles == null ? null : dataDeletionFiles.get(i);
161-
Long cardinality = deletionFile == null ? null : deletionFile.cardinality();
162-
if (deletionFile == null) {
163-
sum += rawFile.rowCount();
164-
} else if (cardinality != null) {
165-
sum += rawFile.rowCount() - cardinality;
166-
}
166+
for (int i = 0; i < dataFiles.size(); i++) {
167+
DataFileMeta file = dataFiles.get(i);
168+
DeletionFile deletionFile = dataDeletionFiles == null ? null : dataDeletionFiles.get(i);
169+
Long cardinality = deletionFile == null ? null : deletionFile.cardinality();
170+
if (deletionFile == null) {
171+
sum += file.rowCount();
172+
} else if (cardinality != null) {
173+
sum += file.rowCount() - cardinality;
167174
}
168175
}
169-
return OptionalLong.of(sum);
176+
return sum;
177+
}
178+
179+
private boolean dataEvolutionRowCountAvailable() {
180+
for (DataFileMeta file : dataFiles) {
181+
if (file.firstRowId() == null) {
182+
return false;
183+
}
184+
}
185+
return true;
186+
}
187+
188+
private long dataEvolutionMergedRowCount() {
189+
long sum = 0L;
190+
RangeHelper<DataFileMeta> rangeHelper =
191+
new RangeHelper<>(
192+
DataFileMeta::nonNullFirstRowId,
193+
f -> f.nonNullFirstRowId() + f.rowCount() - 1);
194+
List<List<DataFileMeta>> ranges = rangeHelper.mergeOverlappingRanges(dataFiles);
195+
for (List<DataFileMeta> group : ranges) {
196+
long maxCount = 0;
197+
for (DataFileMeta file : group) {
198+
maxCount = Math.max(maxCount, file.rowCount());
199+
}
200+
sum += maxCount;
201+
}
202+
return sum;
170203
}
171204

172205
public Object minValue(int fieldIndex, DataField dataField, SimpleStatsEvolutions evolutions) {

paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.apache.paimon.table.source.EndOfScanException;
4545
import org.apache.paimon.table.source.ReadBuilder;
4646
import org.apache.paimon.table.source.Split;
47+
import org.apache.paimon.table.source.TableScan;
4748
import org.apache.paimon.types.DataTypes;
4849
import org.apache.paimon.types.RowType;
4950
import org.apache.paimon.utils.Range;
@@ -55,6 +56,7 @@
5556
import java.util.Collections;
5657
import java.util.Iterator;
5758
import java.util.List;
59+
import java.util.OptionalLong;
5860
import java.util.concurrent.atomic.AtomicInteger;
5961
import java.util.stream.Collectors;
6062

@@ -87,8 +89,20 @@ public void testBasic() throws Exception {
8789
}
8890

8991
ReadBuilder readBuilder = getTableDefault().newReadBuilder();
90-
RecordReader<InternalRow> reader =
91-
readBuilder.newRead().createReader(readBuilder.newScan().plan());
92+
TableScan.Plan plan = readBuilder.newScan().plan();
93+
94+
// assert merged row count
95+
long noMergedRowCount = plan.splits().stream().mapToLong(Split::rowCount).sum();
96+
long mergedRowCount =
97+
plan.splits().stream()
98+
.map(Split::mergedRowCount)
99+
.filter(OptionalLong::isPresent)
100+
.mapToLong(OptionalLong::getAsLong)
101+
.sum();
102+
assertThat(noMergedRowCount).isEqualTo(2);
103+
assertThat(mergedRowCount).isEqualTo(1);
104+
105+
RecordReader<InternalRow> reader = readBuilder.newRead().createReader(plan);
92106
assertThat(reader).isInstanceOf(DataEvolutionFileReader.class);
93107
reader.forEachRemaining(
94108
r -> {
@@ -105,10 +119,7 @@ public void testBasic() throws Exception {
105119
.newRead()
106120
.createReader(readBuilder.newScan().plan());
107121
AtomicInteger cnt = new AtomicInteger(0);
108-
reader.forEachRemaining(
109-
r -> {
110-
cnt.incrementAndGet();
111-
});
122+
reader.forEachRemaining(r -> cnt.incrementAndGet());
112123
assertThat(cnt.get()).isEqualTo(1);
113124

114125
// projection with an empty read type
@@ -119,10 +130,7 @@ public void testBasic() throws Exception {
119130
.newRead()
120131
.createReader(readBuilder.newScan().plan());
121132
AtomicInteger cnt1 = new AtomicInteger(0);
122-
reader.forEachRemaining(
123-
r -> {
124-
cnt1.incrementAndGet();
125-
});
133+
reader.forEachRemaining(r -> cnt1.incrementAndGet());
126134
assertThat(cnt1.get()).isEqualTo(1);
127135
}
128136

0 commit comments

Comments
 (0)