Skip to content

Commit 71444ec

Browse files
committed
[core] Introduce minRowId and maxRowId to ManifestList file
1 parent 424dc45 commit 71444ec

File tree

11 files changed

+127
-35
lines changed

11 files changed

+127
-35
lines changed

paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ public class ManifestEntryWriter extends SingleFileWriter<ManifestEntry, Manifes
176176
private int maxBucket = Integer.MIN_VALUE;
177177
private int minLevel = Integer.MAX_VALUE;
178178
private int maxLevel = Integer.MIN_VALUE;
179+
private @Nullable RowIdStats rowIdStats = new RowIdStats();
179180

180181
ManifestEntryWriter(FormatWriterFactory factory, Path path, String fileCompression) {
181182
super(
@@ -208,6 +209,14 @@ public void write(ManifestEntry entry) throws IOException {
208209
maxBucket = Math.max(maxBucket, entry.bucket());
209210
minLevel = Math.min(minLevel, entry.level());
210211
maxLevel = Math.max(maxLevel, entry.level());
212+
if (rowIdStats != null) {
213+
Long firstRowId = entry.file().firstRowId();
214+
if (firstRowId == null) {
215+
rowIdStats = null;
216+
} else {
217+
rowIdStats.collect(firstRowId, entry.file().rowCount());
218+
}
219+
}
211220

212221
partitionStatsCollector.collect(entry.partition());
213222
}
@@ -226,7 +235,20 @@ public ManifestFileMeta result() throws IOException {
226235
minBucket,
227236
maxBucket,
228237
minLevel,
229-
maxLevel);
238+
maxLevel,
239+
rowIdStats == null ? null : rowIdStats.minRowId,
240+
rowIdStats == null ? null : rowIdStats.maxRowId);
241+
}
242+
}
243+
244+
private static class RowIdStats {
245+
246+
private long minRowId = Long.MAX_VALUE;
247+
private long maxRowId = Long.MIN_VALUE;
248+
249+
private void collect(long firstRowId, long rowCount) {
250+
minRowId = Math.min(minRowId, firstRowId);
251+
maxRowId = Math.max(maxRowId, firstRowId + rowCount - 1);
230252
}
231253
}
232254

paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,9 @@ public class ManifestFileMeta {
5454
new DataField(6, "_MIN_BUCKET", new IntType(true)),
5555
new DataField(7, "_MAX_BUCKET", new IntType(true)),
5656
new DataField(8, "_MIN_LEVEL", new IntType(true)),
57-
new DataField(9, "_MAX_LEVEL", new IntType(true))));
57+
new DataField(9, "_MAX_LEVEL", new IntType(true)),
58+
new DataField(10, "_MIN_ROW_ID", new BigIntType(true)),
59+
new DataField(11, "_MAX_ROW_ID", new BigIntType(true))));
5860

5961
private final String fileName;
6062
private final long fileSize;
@@ -66,6 +68,8 @@ public class ManifestFileMeta {
6668
private final @Nullable Integer maxBucket;
6769
private final @Nullable Integer minLevel;
6870
private final @Nullable Integer maxLevel;
71+
private final @Nullable Long minRowId;
72+
private final @Nullable Long maxRowId;
6973

7074
public ManifestFileMeta(
7175
String fileName,
@@ -77,7 +81,9 @@ public ManifestFileMeta(
7781
@Nullable Integer minBucket,
7882
@Nullable Integer maxBucket,
7983
@Nullable Integer minLevel,
80-
@Nullable Integer maxLevel) {
84+
@Nullable Integer maxLevel,
85+
@Nullable Long minRowId,
86+
@Nullable Long maxRowId) {
8187
this.fileName = fileName;
8288
this.fileSize = fileSize;
8389
this.numAddedFiles = numAddedFiles;
@@ -88,6 +94,8 @@ public ManifestFileMeta(
8894
this.maxBucket = maxBucket;
8995
this.minLevel = minLevel;
9096
this.maxLevel = maxLevel;
97+
this.minRowId = minRowId;
98+
this.maxRowId = maxRowId;
9199
}
92100

93101
public String fileName() {
@@ -130,6 +138,14 @@ public long schemaId() {
130138
return maxLevel;
131139
}
132140

141+
public @Nullable Long minRowId() {
142+
return minRowId;
143+
}
144+
145+
public @Nullable Long maxRowId() {
146+
return maxRowId;
147+
}
148+
133149
@Override
134150
public boolean equals(Object o) {
135151
if (!(o instanceof ManifestFileMeta)) {
@@ -145,7 +161,9 @@ public boolean equals(Object o) {
145161
&& Objects.equals(minBucket, that.minBucket)
146162
&& Objects.equals(maxBucket, that.maxBucket)
147163
&& Objects.equals(minLevel, that.minLevel)
148-
&& Objects.equals(maxLevel, that.maxLevel);
164+
&& Objects.equals(maxLevel, that.maxLevel)
165+
&& Objects.equals(minRowId, that.minRowId)
166+
&& Objects.equals(maxRowId, that.maxRowId);
149167
}
150168

151169
@Override
@@ -160,13 +178,15 @@ public int hashCode() {
160178
minBucket,
161179
maxBucket,
162180
minLevel,
163-
maxLevel);
181+
maxLevel,
182+
minRowId,
183+
maxRowId);
164184
}
165185

166186
@Override
167187
public String toString() {
168188
return String.format(
169-
"{%s, %d, %d, %d, %s, %d, %s, %s, %s, %s}",
189+
"{%s, %d, %d, %d, %s, %d, %s, %s, %s, %s, %s, %s}",
170190
fileName,
171191
fileSize,
172192
numAddedFiles,
@@ -176,7 +196,9 @@ public String toString() {
176196
minBucket,
177197
maxBucket,
178198
minLevel,
179-
maxLevel);
199+
maxLevel,
200+
minRowId,
201+
maxRowId);
180202
}
181203

182204
// ----------------------- Serialization -----------------------------

paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMetaSerializer.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,9 @@ public InternalRow convertTo(ManifestFileMeta meta) {
5050
meta.minBucket(),
5151
meta.maxBucket(),
5252
meta.minLevel(),
53-
meta.maxLevel());
53+
meta.maxLevel(),
54+
meta.minRowId(),
55+
meta.maxRowId());
5456
}
5557

5658
@Override
@@ -75,6 +77,8 @@ public ManifestFileMeta convertFrom(int version, InternalRow row) {
7577
row.isNullAt(6) ? null : row.getInt(6),
7678
row.isNullAt(7) ? null : row.getInt(7),
7779
row.isNullAt(8) ? null : row.getInt(8),
78-
row.isNullAt(9) ? null : row.getInt(9));
80+
row.isNullAt(9) ? null : row.getInt(9),
81+
row.isNullAt(10) ? null : row.getLong(10),
82+
row.isNullAt(11) ? null : row.getLong(11));
7983
}
8084
}

paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ public abstract class AbstractFileStoreScan implements FileStoreScan {
8989

9090
private ScanMetrics scanMetrics = null;
9191
private boolean dropStats;
92+
@Nullable protected List<Long> rowIdList;
9293

9394
public AbstractFileStoreScan(
9495
ManifestsReader manifestsReader,
@@ -238,8 +239,8 @@ public FileStoreScan keepStats() {
238239
}
239240

240241
@Override
241-
public FileStoreScan withRowIds(List<Long> indices) {
242-
// do nothing by default
242+
public FileStoreScan withRowIds(List<Long> rowIdList) {
243+
this.rowIdList = rowIdList;
243244
return this;
244245
}
245246

@@ -260,14 +261,15 @@ public Plan plan() {
260261
ManifestsReader.Result manifestsResult = readManifests();
261262
Snapshot snapshot = manifestsResult.snapshot;
262263
List<ManifestFileMeta> manifests = manifestsResult.filteredManifests;
264+
manifests = postFilterManifests(manifests);
263265

264266
Iterator<ManifestEntry> iterator = readManifestEntries(manifests, false);
265267
List<ManifestEntry> files = new ArrayList<>();
266268
while (iterator.hasNext()) {
267269
files.add(iterator.next());
268270
}
269271

270-
files = postFilter(files);
272+
files = postFilterManifestEntries(files);
271273

272274
if (wholeBucketFilterEnabled()) {
273275
// We group files by bucket here, and filter them by the whole bucket filter.
@@ -437,7 +439,13 @@ protected TableSchema scanTableSchema(long id) {
437439
/** Note: Keep this thread-safe. */
438440
protected abstract boolean filterByStats(ManifestEntry entry);
439441

440-
protected abstract List<ManifestEntry> postFilter(List<ManifestEntry> entries);
442+
protected List<ManifestFileMeta> postFilterManifests(List<ManifestFileMeta> manifests) {
443+
return manifests;
444+
}
445+
446+
protected List<ManifestEntry> postFilterManifestEntries(List<ManifestEntry> entries) {
447+
return entries;
448+
}
441449

442450
protected boolean wholeBucketFilterEnabled() {
443451
return false;

paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import javax.annotation.Nullable;
3434

3535
import java.io.IOException;
36-
import java.util.List;
3736
import java.util.Map;
3837
import java.util.concurrent.ConcurrentHashMap;
3938

@@ -122,11 +121,6 @@ protected boolean filterByStats(ManifestEntry entry) {
122121
return testFileIndex(entry.file().embeddedIndex(), entry);
123122
}
124123

125-
@Override
126-
protected List<ManifestEntry> postFilter(List<ManifestEntry> entries) {
127-
return entries;
128-
}
129-
130124
private boolean testFileIndex(@Nullable byte[] embeddedIndexBytes, ManifestEntry entry) {
131125
if (embeddedIndexBytes == null) {
132126
return true;

paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.paimon.io.DataFileMeta;
2626
import org.apache.paimon.manifest.ManifestEntry;
2727
import org.apache.paimon.manifest.ManifestFile;
28+
import org.apache.paimon.manifest.ManifestFileMeta;
2829
import org.apache.paimon.predicate.Predicate;
2930
import org.apache.paimon.reader.DataEvolutionArray;
3031
import org.apache.paimon.reader.DataEvolutionRow;
@@ -35,8 +36,6 @@
3536
import org.apache.paimon.utils.RangeHelper;
3637
import org.apache.paimon.utils.SnapshotManager;
3738

38-
import javax.annotation.Nullable;
39-
4039
import java.util.Arrays;
4140
import java.util.Collection;
4241
import java.util.Comparator;
@@ -51,7 +50,6 @@
5150
public class DataEvolutionFileStoreScan extends AppendOnlyFileStoreScan {
5251

5352
private boolean dropStats = false;
54-
@Nullable private List<Long> indices;
5553

5654
public DataEvolutionFileStoreScan(
5755
ManifestsReader manifestsReader,
@@ -74,29 +72,57 @@ public DataEvolutionFileStoreScan(
7472

7573
@Override
7674
public FileStoreScan dropStats() {
75+
// overwrite to keep stats here
76+
// TODO refactor this hacky
7777
this.dropStats = true;
7878
return this;
7979
}
8080

8181
@Override
8282
public FileStoreScan keepStats() {
83+
// overwrite to keep stats here
84+
// TODO refactor this hacky
8385
this.dropStats = false;
8486
return this;
8587
}
8688

89+
@Override
8790
public DataEvolutionFileStoreScan withFilter(Predicate predicate) {
91+
// overwrite to keep all filter here
92+
// TODO refactor this hacky
8893
this.inputFilter = predicate;
8994
return this;
9095
}
9196

9297
@Override
93-
public FileStoreScan withRowIds(List<Long> indices) {
94-
this.indices = indices;
95-
return this;
98+
protected List<ManifestFileMeta> postFilterManifests(List<ManifestFileMeta> manifests) {
99+
if (rowIdList == null || rowIdList.isEmpty()) {
100+
return manifests;
101+
}
102+
return manifests.stream().filter(this::filterManifestByRowIds).collect(Collectors.toList());
103+
}
104+
105+
private boolean filterManifestByRowIds(ManifestFileMeta manifest) {
106+
if (rowIdList == null || rowIdList.isEmpty()) {
107+
return true;
108+
}
109+
110+
Long min = manifest.minRowId();
111+
Long max = manifest.maxRowId();
112+
if (min == null || max == null) {
113+
return true;
114+
}
115+
116+
for (long rowId : rowIdList) {
117+
if (rowId >= min && rowId <= max) {
118+
return true;
119+
}
120+
}
121+
return false;
96122
}
97123

98124
@Override
99-
protected List<ManifestEntry> postFilter(List<ManifestEntry> entries) {
125+
protected List<ManifestEntry> postFilterManifestEntries(List<ManifestEntry> entries) {
100126
if (inputFilter == null) {
101127
return entries;
102128
}
@@ -214,7 +240,7 @@ static EvolutionStats evolutionStats(
214240
@Override
215241
protected boolean filterByStats(ManifestEntry entry) {
216242
// If indices is null, all entries should be kept
217-
if (this.indices == null) {
243+
if (this.rowIdList == null) {
218244
return true;
219245
}
220246

@@ -228,7 +254,7 @@ protected boolean filterByStats(ManifestEntry entry) {
228254
long rowCount = entry.file().rowCount();
229255
long endRowId = firstRowId + rowCount;
230256

231-
for (Long index : this.indices) {
257+
for (Long index : this.rowIdList) {
232258
if (index >= firstRowId && index < endRowId) {
233259
return true;
234260
}

paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -153,11 +153,6 @@ protected boolean filterByStats(ManifestEntry entry) {
153153
file.rowCount(), stats.minValues(), stats.maxValues(), stats.nullCounts());
154154
}
155155

156-
@Override
157-
protected List<ManifestEntry> postFilter(List<ManifestEntry> entries) {
158-
return entries;
159-
}
160-
161156
@Override
162157
protected ManifestEntry dropStats(ManifestEntry entry) {
163158
if (!isValueFilterEnabled() && wholeBucketFilterEnabled()) {

paimon-core/src/test/java/org/apache/paimon/manifest/LegacyManifestFileMetaSerializerPaimon10.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,8 @@ public ManifestFileMeta convertFrom(int version, InternalRow row) {
9090
null,
9191
null,
9292
null,
93+
null,
94+
null,
9395
null);
9496
}
9597
}

paimon-core/src/test/java/org/apache/paimon/manifest/ManifestListTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,8 @@ private List<ManifestFileMeta> getLegacyMetaPaimon10(List<ManifestFileMeta> meta
140140
null,
141141
null,
142142
null,
143+
null,
144+
null,
143145
null));
144146
}
145147
return result;

paimon-core/src/test/java/org/apache/paimon/manifest/ManifestTestDataGenerator.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,9 @@ public ManifestFileMeta createManifestFileMeta(List<ManifestEntry> entries) {
126126
minBucket,
127127
maxBucket,
128128
minLevel,
129-
maxLevel);
129+
maxLevel,
130+
null,
131+
null);
130132
}
131133

132134
private void mergeLevelsIfNeeded(BinaryRow partition, int bucket) {

0 commit comments

Comments
 (0)