Skip to content

Commit 3d64955

Browse files
committed
[core] Refactor row id pushdown to DataEvolutionFileStoreScan
1 parent 4b34e36 commit 3d64955

File tree

14 files changed

+161
-251
lines changed

14 files changed

+161
-251
lines changed

docs/layouts/shortcodes/generated/core_configuration.html

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1037,12 +1037,6 @@
10371037
<td>String</td>
10381038
<td>Time field for record level expire. It supports the following types: `timestamps in seconds with INT`,`timestamps in seconds with BIGINT`, `timestamps in milliseconds with BIGINT` or `timestamp`.</td>
10391039
</tr>
1040-
<tr>
1041-
<td><h5>row-id-push-down.enabled</h5></td>
1042-
<td style="word-wrap: break-word;">false</td>
1043-
<td>Boolean</td>
1044-
<td>Whether to enable row id push down for scan. Currently, only the data evolution table supports row id push down.</td>
1045-
</tr>
10461040
<tr>
10471041
<td><h5>row-tracking.enabled</h5></td>
10481042
<td style="word-wrap: break-word;">false</td>

paimon-api/src/main/java/org/apache/paimon/CoreOptions.java

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2146,14 +2146,6 @@ public InlineElement getDescription() {
21462146
.withDescription(
21472147
"Whether to try upgrading the data files after overwriting a primary key table.");
21482148

2149-
public static final ConfigOption<Boolean> ROW_ID_PUSH_DOWN_ENABLED =
2150-
key("row-id-push-down.enabled")
2151-
.booleanType()
2152-
.defaultValue(false)
2153-
.withDescription(
2154-
"Whether to enable row id push down for scan."
2155-
+ " Currently, only the data evolution table supports row id push down.");
2156-
21572149
private final Options options;
21582150

21592151
public CoreOptions(Map<String, String> options) {
@@ -3338,10 +3330,6 @@ public boolean overwriteUpgrade() {
33383330
return options.get(OVERWRITE_UPGRADE);
33393331
}
33403332

3341-
public boolean rowIdPushDownEnabled() {
3342-
return options.get(ROW_ID_PUSH_DOWN_ENABLED);
3343-
}
3344-
33453333
/** Specifies the merge engine for table with primary key. */
33463334
public enum MergeEngine implements DescribedEnum {
33473335
DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),

paimon-common/src/main/java/org/apache/paimon/predicate/RowIdPredicateVisitor.java

Lines changed: 23 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import java.util.ArrayList;
2424
import java.util.List;
25+
import java.util.Optional;
2526

2627
import static org.apache.paimon.table.SpecialFields.ROW_ID;
2728

@@ -40,10 +41,10 @@
4041
* AND _ROW_ID IN (1, 2)}).
4142
* </ul>
4243
*/
43-
public class RowIdPredicateVisitor implements PredicateVisitor<List<Range>> {
44+
public class RowIdPredicateVisitor implements PredicateVisitor<Optional<List<Range>>> {
4445

4546
@Override
46-
public List<Range> visit(LeafPredicate predicate) {
47+
public Optional<List<Range>> visit(LeafPredicate predicate) {
4748
if (ROW_ID.name().equals(predicate.fieldName())) {
4849
LeafFunction function = predicate.function();
4950
if (function instanceof Equal || function instanceof In) {
@@ -53,57 +54,55 @@ public List<Range> visit(LeafPredicate predicate) {
5354
}
5455
// The list output by getRangesFromList is already sorted,
5556
// and has no overlap
56-
return Range.getRangesFromList(rowIds);
57+
return Optional.of(Range.toRanges(rowIds));
5758
}
5859
}
59-
return null;
60+
return Optional.empty();
6061
}
6162

6263
@Override
63-
public List<Range> visit(CompoundPredicate predicate) {
64+
public Optional<List<Range>> visit(CompoundPredicate predicate) {
6465
CompoundPredicate.Function function = predicate.function();
65-
List<Range> rowIds = null;
66+
Optional<List<Range>> rowIds = Optional.empty();
6667
// `And` means we should get the intersection of all children.
6768
if (function instanceof And) {
6869
for (Predicate child : predicate.children()) {
69-
List<Range> childList = child.visit(this);
70-
if (childList == null) {
70+
Optional<List<Range>> childList = child.visit(this);
71+
if (!childList.isPresent()) {
7172
continue;
7273
}
7374

74-
if (rowIds == null) {
75-
rowIds = childList;
76-
} else {
77-
rowIds = Range.and(rowIds, childList);
78-
}
75+
rowIds =
76+
rowIds.map(ranges -> Optional.of(Range.and(ranges, childList.get())))
77+
.orElse(childList);
7978

8079
// shortcut for intersection
81-
if (rowIds.isEmpty()) {
80+
if (rowIds.get().isEmpty()) {
8281
return rowIds;
8382
}
8483
}
8584
} else if (function instanceof Or) {
8685
// `Or` means we should get the union of all children
87-
rowIds = new ArrayList<>();
86+
rowIds = Optional.of(new ArrayList<>());
8887
for (Predicate child : predicate.children()) {
89-
List<Range> childList = child.visit(this);
90-
if (childList == null) {
91-
return null;
88+
Optional<List<Range>> childList = child.visit(this);
89+
if (!childList.isPresent()) {
90+
return Optional.empty();
9291
}
9392

94-
rowIds.addAll(childList);
95-
rowIds = Range.sortAndMergeOverlap(rowIds, true);
93+
rowIds.get().addAll(childList.get());
94+
rowIds = Optional.of(Range.sortAndMergeOverlap(rowIds.get(), true));
9695
}
9796
} else {
98-
// unexpected function type, just return null
99-
return null;
97+
// unexpected function type, just return empty
98+
return Optional.empty();
10099
}
101100
return rowIds;
102101
}
103102

104103
@Override
105-
public List<Range> visit(TransformPredicate predicate) {
104+
public Optional<List<Range>> visit(TransformPredicate predicate) {
106105
// do not support transform predicate now.
107-
return null;
106+
return Optional.empty();
108107
}
109108
}

paimon-common/src/main/java/org/apache/paimon/utils/Range.java

Lines changed: 19 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@
2222
import java.util.ArrayList;
2323
import java.util.Collections;
2424
import java.util.Comparator;
25+
import java.util.Iterator;
2526
import java.util.List;
2627
import java.util.Objects;
27-
import java.util.stream.Collectors;
2828

2929
/** Range represents from (inclusive) and to (inclusive). */
3030
public class Range implements Serializable {
@@ -178,31 +178,29 @@ public static List<Range> mergeSortedAsPossible(List<Range> ranges) {
178178
return result;
179179
}
180180

181-
public static List<Range> getRangesFromList(List<Long> origLongs) {
182-
if (origLongs == null || origLongs.isEmpty()) {
183-
return Collections.emptyList();
181+
public static List<Range> toRanges(Iterable<Long> ids) {
182+
List<Range> ranges = new ArrayList<>();
183+
Iterator<Long> iterator = ids.iterator();
184+
185+
if (!iterator.hasNext()) {
186+
return ranges;
184187
}
185188

186-
List<Long> longs = origLongs.stream().distinct().sorted().collect(Collectors.toList());
187-
188-
ArrayList<Range> ranges = new ArrayList<>();
189-
Long rangeStart = null;
190-
Long rangeEnd = null;
191-
for (Long cur : longs) {
192-
if (rangeStart == null) {
193-
rangeStart = cur;
194-
rangeEnd = cur;
195-
} else if (rangeEnd == cur - 1) {
196-
rangeEnd = cur;
197-
} else {
189+
long rangeStart = iterator.next();
190+
long rangeEnd = rangeStart;
191+
192+
while (iterator.hasNext()) {
193+
long current = iterator.next();
194+
if (current != rangeEnd + 1) {
195+
// Save the current range and start a new one
198196
ranges.add(new Range(rangeStart, rangeEnd));
199-
rangeStart = cur;
200-
rangeEnd = cur;
197+
rangeStart = current;
201198
}
199+
rangeEnd = current;
202200
}
203-
if (rangeStart != null) {
204-
ranges.add(new Range(rangeStart, rangeEnd));
205-
}
201+
// Add the last range
202+
ranges.add(new Range(rangeStart, rangeEnd));
203+
206204
return ranges;
207205
}
208206

paimon-common/src/main/java/org/apache/paimon/utils/RoaringNavigableMap64.java

Lines changed: 2 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,13 @@
1818

1919
package org.apache.paimon.utils;
2020

21-
import org.apache.paimon.annotation.VisibleForTesting;
22-
2321
import org.roaringbitmap.longlong.Roaring64NavigableMap;
2422

2523
import java.io.ByteArrayInputStream;
2624
import java.io.ByteArrayOutputStream;
2725
import java.io.DataInputStream;
2826
import java.io.DataOutputStream;
2927
import java.io.IOException;
30-
import java.util.ArrayList;
3128
import java.util.Iterator;
3229
import java.util.List;
3330
import java.util.Objects;
@@ -135,42 +132,16 @@ public void deserialize(byte[] rbmBytes) throws IOException {
135132
}
136133
}
137134

138-
@VisibleForTesting
139135
/**
140136
* Converts this bitmap to a list of contiguous ranges.
141137
*
142138
* <p>This is useful for interoperability with APIs that expect List&lt;Range&gt;.
143139
*/
144140
public List<Range> toRangeList() {
145-
List<Range> ranges = new ArrayList<>();
146-
Iterator<Long> iterator = roaring64NavigableMap.iterator();
147-
148-
if (!iterator.hasNext()) {
149-
return ranges;
150-
}
151-
152-
long rangeStart = iterator.next();
153-
long rangeEnd = rangeStart;
154-
155-
while (iterator.hasNext()) {
156-
long current = iterator.next();
157-
if (current == rangeEnd + 1) {
158-
// Extend the current range
159-
rangeEnd = current;
160-
} else {
161-
// Save the current range and start a new one
162-
ranges.add(new Range(rangeStart, rangeEnd));
163-
rangeStart = current;
164-
rangeEnd = current;
165-
}
166-
}
167-
// Add the last range
168-
ranges.add(new Range(rangeStart, rangeEnd));
169-
170-
return ranges;
141+
// TODO Optimize this to avoid iterator all ids
142+
return Range.toRanges(roaring64NavigableMap::iterator);
171143
}
172144

173-
@VisibleForTesting
174145
public static RoaringNavigableMap64 bitmapOf(long... dat) {
175146
RoaringNavigableMap64 roaringBitmap64 = new RoaringNavigableMap64();
176147
for (long ele : dat) {

paimon-core/src/main/java/org/apache/paimon/globalindex/DataEvolutionBatchScan.java

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,10 @@
2424
import org.apache.paimon.manifest.PartitionEntry;
2525
import org.apache.paimon.metrics.MetricRegistry;
2626
import org.apache.paimon.partition.PartitionPredicate;
27+
import org.apache.paimon.predicate.CompoundPredicate;
28+
import org.apache.paimon.predicate.LeafPredicate;
2729
import org.apache.paimon.predicate.Predicate;
30+
import org.apache.paimon.predicate.RowIdPredicateVisitor;
2831
import org.apache.paimon.predicate.TopN;
2932
import org.apache.paimon.predicate.VectorSearch;
3033
import org.apache.paimon.table.FileStoreTable;
@@ -47,6 +50,7 @@
4750
import java.util.Optional;
4851

4952
import static org.apache.paimon.globalindex.GlobalIndexScanBuilder.parallelScan;
53+
import static org.apache.paimon.table.SpecialFields.ROW_ID;
5054

5155
/** Scan for data evolution table. */
5256
public class DataEvolutionBatchScan implements DataTableScan {
@@ -71,11 +75,43 @@ public DataTableScan withShard(int indexOfThisSubtask, int numberOfParallelSubta
7175

7276
@Override
7377
public InnerTableScan withFilter(Predicate predicate) {
78+
if (predicate == null) {
79+
return this;
80+
}
81+
82+
predicate.visit(new RowIdPredicateVisitor()).ifPresent(this::withRowRanges);
83+
predicate = removeRowIdFilter(predicate);
7484
this.filter = predicate;
7585
batchScan.withFilter(predicate);
7686
return this;
7787
}
7888

89+
private Predicate removeRowIdFilter(Predicate filter) {
90+
if (filter instanceof LeafPredicate
91+
&& ROW_ID.name().equals(((LeafPredicate) filter).fieldName())) {
92+
return null;
93+
} else if (filter instanceof CompoundPredicate) {
94+
CompoundPredicate compoundPredicate = (CompoundPredicate) filter;
95+
96+
List<Predicate> newChildren = new ArrayList<>();
97+
for (Predicate child : compoundPredicate.children()) {
98+
Predicate newChild = removeRowIdFilter(child);
99+
if (newChild != null) {
100+
newChildren.add(newChild);
101+
}
102+
}
103+
104+
if (newChildren.isEmpty()) {
105+
return null;
106+
} else if (newChildren.size() == 1) {
107+
return newChildren.get(0);
108+
} else {
109+
return new CompoundPredicate(compoundPredicate.function(), newChildren);
110+
}
111+
}
112+
return filter;
113+
}
114+
79115
@Override
80116
public InnerTableScan withVectorSearch(VectorSearch vectorSearch) {
81117
this.vectorSearch = vectorSearch;
@@ -157,9 +193,13 @@ public InnerTableScan withLevelFilter(Filter<Integer> levelFilter) {
157193

158194
@Override
159195
public InnerTableScan withRowRanges(List<Range> rowRanges) {
196+
if (rowRanges == null) {
197+
return this;
198+
}
199+
160200
this.pushedRowRanges = rowRanges;
161201
if (globalIndexResult != null) {
162-
throw new IllegalStateException("");
202+
throw new IllegalStateException("Cannot push row ranges after global index eval.");
163203
}
164204
return this;
165205
}

paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,7 @@ public FileStoreScan keepStats() {
243243
@Override
244244
public FileStoreScan withRowRanges(List<Range> rowRanges) {
245245
this.rowRanges = rowRanges;
246+
manifestsReader.withRowRanges(rowRanges);
246247
return this;
247248
}
248249

@@ -274,7 +275,6 @@ public Plan plan() {
274275
ManifestsReader.Result manifestsResult = readManifests();
275276
Snapshot snapshot = manifestsResult.snapshot;
276277
List<ManifestFileMeta> manifests = manifestsResult.filteredManifests;
277-
manifests = postFilterManifests(manifests);
278278

279279
Iterator<ManifestEntry> iterator = readManifestEntries(manifests, false);
280280
if (supportsLimitPushManifestEntries()) {
@@ -434,10 +434,6 @@ protected TableSchema scanTableSchema(long id) {
434434
/** Note: Keep this thread-safe. */
435435
protected abstract boolean filterByStats(ManifestEntry entry);
436436

437-
protected List<ManifestFileMeta> postFilterManifests(List<ManifestFileMeta> manifests) {
438-
return manifests;
439-
}
440-
441437
protected boolean postFilterManifestEntriesEnabled() {
442438
return false;
443439
}

0 commit comments

Comments
 (0)