Skip to content

Commit 5d97e49

Browse files
authored
[paimon] Paimon source supports filter push down (#1523)
1 parent fb6f6d5 commit 5d97e49

File tree

11 files changed

+539
-20
lines changed

11 files changed

+539
-20
lines changed

fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonLakeSource.java

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package com.alibaba.fluss.lake.paimon.source;
2020

2121
import com.alibaba.fluss.config.Configuration;
22+
import com.alibaba.fluss.lake.paimon.utils.FlussToPaimonPredicateConverter;
2223
import com.alibaba.fluss.lake.serializer.SimpleVersionedSerializer;
2324
import com.alibaba.fluss.lake.source.LakeSource;
2425
import com.alibaba.fluss.lake.source.Planner;
@@ -30,13 +31,16 @@
3031
import org.apache.paimon.catalog.CatalogContext;
3132
import org.apache.paimon.catalog.CatalogFactory;
3233
import org.apache.paimon.options.Options;
34+
import org.apache.paimon.predicate.PredicateBuilder;
3335
import org.apache.paimon.table.FileStoreTable;
36+
import org.apache.paimon.types.RowType;
3437

3538
import javax.annotation.Nullable;
3639

3740
import java.io.IOException;
38-
import java.util.Collections;
41+
import java.util.ArrayList;
3942
import java.util.List;
43+
import java.util.Optional;
4044

4145
import static com.alibaba.fluss.lake.paimon.utils.PaimonConversions.toPaimon;
4246

@@ -69,7 +73,23 @@ public void withLimit(int limit) {
6973

7074
@Override
7175
public FilterPushDownResult withFilters(List<Predicate> predicates) {
72-
return FilterPushDownResult.of(Collections.emptyList(), predicates);
76+
List<Predicate> unConsumedPredicates = new ArrayList<>();
77+
List<Predicate> consumedPredicates = new ArrayList<>();
78+
List<org.apache.paimon.predicate.Predicate> converted = new ArrayList<>();
79+
for (Predicate predicate : predicates) {
80+
Optional<org.apache.paimon.predicate.Predicate> optPredicate =
81+
FlussToPaimonPredicateConverter.convert(getRowType(tablePath), predicate);
82+
if (optPredicate.isPresent()) {
83+
consumedPredicates.add(predicate);
84+
converted.add(optPredicate.get());
85+
} else {
86+
unConsumedPredicates.add(predicate);
87+
}
88+
}
89+
if (!converted.isEmpty()) {
90+
predicate = PredicateBuilder.and(converted);
91+
}
92+
return FilterPushDownResult.of(consumedPredicates, unConsumedPredicates);
7393
}
7494

7595
@Override
@@ -107,4 +127,13 @@ private Catalog getCatalog() {
107127
private FileStoreTable getTable(Catalog catalog, TablePath tablePath) throws Exception {
108128
return (FileStoreTable) catalog.getTable(toPaimon(tablePath));
109129
}
130+
131+
private RowType getRowType(TablePath tablePath) {
132+
try (Catalog catalog = getCatalog()) {
133+
FileStoreTable fileStoreTable = getTable(catalog, tablePath);
134+
return fileStoreTable.rowType();
135+
} catch (Exception e) {
136+
throw new RuntimeException("Fail to get row type of " + tablePath, e);
137+
}
138+
}
110139
}

fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonRecordReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public PaimonRecordReader(
6767
readBuilder.withFilter(predicate);
6868
}
6969

70-
TableRead tableRead = readBuilder.newRead();
70+
TableRead tableRead = readBuilder.newRead().executeFilter();
7171
paimonRowType = readBuilder.readType();
7272

7373
org.apache.paimon.reader.RecordReader<InternalRow> recordReader =

fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitPlanner.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,10 @@ public List<PaimonSplit> plan() {
6666
List<PaimonSplit> splits = new ArrayList<>();
6767
try (Catalog catalog = getCatalog()) {
6868
FileStoreTable fileStoreTable = getTable(catalog, tablePath, snapshotId);
69-
// TODO: support filter .withFilter(predicate)
7069
InnerTableScan tableScan = fileStoreTable.newScan();
70+
if (predicate != null) {
71+
tableScan = tableScan.withFilter(predicate);
72+
}
7173
for (Split split : tableScan.plan().splits()) {
7274
DataSplit dataSplit = (DataSplit) split;
7375
splits.add(new PaimonSplit(dataSplit));
Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
/*
2+
* Copyright (c) 2025 Alibaba Group Holding Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.alibaba.fluss.lake.paimon.utils;
18+
19+
import com.alibaba.fluss.predicate.And;
20+
import com.alibaba.fluss.predicate.CompoundPredicate;
21+
import com.alibaba.fluss.predicate.FieldRef;
22+
import com.alibaba.fluss.predicate.FunctionVisitor;
23+
import com.alibaba.fluss.predicate.LeafPredicate;
24+
import com.alibaba.fluss.predicate.Or;
25+
import com.alibaba.fluss.predicate.PredicateVisitor;
26+
27+
import org.apache.paimon.predicate.Predicate;
28+
import org.apache.paimon.predicate.PredicateBuilder;
29+
import org.apache.paimon.types.RowType;
30+
31+
import java.util.List;
32+
import java.util.Optional;
33+
import java.util.stream.Collectors;
34+
35+
/**
36+
* Converts a Fluss {@link com.alibaba.fluss.predicate.Predicate} into a Paimon {@link Predicate}.
37+
*
38+
* <p>This class implements the {@link PredicateVisitor} pattern to traverse a tree of Fluss
39+
* predicates. It handles both leaf-level conditions (like equals, greater than) and compound
40+
* conditions (AND, OR).
41+
*/
42+
public class FlussToPaimonPredicateConverter implements PredicateVisitor<Predicate> {
43+
44+
private final PredicateBuilder builder;
45+
private final LeafFunctionConverter converter = new LeafFunctionConverter();
46+
47+
public FlussToPaimonPredicateConverter(RowType rowType) {
48+
this.builder = new PredicateBuilder(rowType);
49+
}
50+
51+
public static Optional<Predicate> convert(
52+
RowType rowType, com.alibaba.fluss.predicate.Predicate flussPredicate) {
53+
try {
54+
return Optional.of(flussPredicate.visit(new FlussToPaimonPredicateConverter(rowType)));
55+
} catch (UnsupportedOperationException e) {
56+
return Optional.empty();
57+
}
58+
}
59+
60+
@Override
61+
public Predicate visit(LeafPredicate predicate) {
62+
// Delegate the conversion of the specific function to a dedicated visitor.
63+
// This avoids a long chain of 'if-instanceof' checks.
64+
return predicate.visit(converter);
65+
}
66+
67+
@Override
68+
public Predicate visit(CompoundPredicate predicate) {
69+
List<Predicate> children =
70+
predicate.children().stream().map(p -> p.visit(this)).collect(Collectors.toList());
71+
CompoundPredicate.Function function = predicate.function();
72+
if (function instanceof And) {
73+
return PredicateBuilder.and(children);
74+
} else if (function instanceof Or) {
75+
return PredicateBuilder.or(children);
76+
} else {
77+
throw new UnsupportedOperationException(
78+
"Unsupported fluss compound predicate function: " + predicate.function());
79+
}
80+
}
81+
82+
/**
83+
* A visitor that implements the logic to convert each type of {@link
84+
* com.alibaba.fluss.predicate.LeafFunction} to a Paimon {@link Predicate}.
85+
*/
86+
private class LeafFunctionConverter implements FunctionVisitor<Predicate> {
87+
88+
@Override
89+
public Predicate visitIsNotNull(FieldRef fieldRef) {
90+
return builder.isNotNull(fieldRef.index());
91+
}
92+
93+
@Override
94+
public Predicate visitIsNull(FieldRef fieldRef) {
95+
return builder.isNull(fieldRef.index());
96+
}
97+
98+
@Override
99+
public Predicate visitStartsWith(FieldRef fieldRef, Object literal) {
100+
return builder.startsWith(fieldRef.index(), literal);
101+
}
102+
103+
@Override
104+
public Predicate visitEndsWith(FieldRef fieldRef, Object literal) {
105+
return builder.endsWith(fieldRef.index(), literal);
106+
}
107+
108+
@Override
109+
public Predicate visitContains(FieldRef fieldRef, Object literal) {
110+
return builder.contains(fieldRef.index(), literal);
111+
}
112+
113+
@Override
114+
public Predicate visitLessThan(FieldRef fieldRef, Object literal) {
115+
return builder.lessThan(fieldRef.index(), literal);
116+
}
117+
118+
@Override
119+
public Predicate visitGreaterOrEqual(FieldRef fieldRef, Object literal) {
120+
return builder.greaterOrEqual(fieldRef.index(), literal);
121+
}
122+
123+
@Override
124+
public Predicate visitNotEqual(FieldRef fieldRef, Object literal) {
125+
return builder.notEqual(fieldRef.index(), literal);
126+
}
127+
128+
@Override
129+
public Predicate visitLessOrEqual(FieldRef fieldRef, Object literal) {
130+
return builder.lessOrEqual(fieldRef.index(), literal);
131+
}
132+
133+
@Override
134+
public Predicate visitEqual(FieldRef fieldRef, Object literal) {
135+
return builder.equal(fieldRef.index(), literal);
136+
}
137+
138+
@Override
139+
public Predicate visitGreaterThan(FieldRef fieldRef, Object literal) {
140+
return builder.greaterThan(fieldRef.index(), literal);
141+
}
142+
143+
@Override
144+
public Predicate visitIn(FieldRef fieldRef, List<Object> literals) {
145+
return builder.in(fieldRef.index(), literals);
146+
}
147+
148+
@Override
149+
public Predicate visitNotIn(FieldRef fieldRef, List<Object> literals) {
150+
return builder.notIn(fieldRef.index(), literals);
151+
}
152+
153+
@Override
154+
public Predicate visitAnd(List<Predicate> children) {
155+
// shouldn't come to here
156+
throw new UnsupportedOperationException("Unsupported visitAnd method.");
157+
}
158+
159+
@Override
160+
public Predicate visitOr(List<Predicate> children) {
161+
// shouldn't come to here
162+
throw new UnsupportedOperationException("Unsupported visitOr method.");
163+
}
164+
}
165+
}

0 commit comments

Comments
 (0)