Skip to content

Commit d1c103f

Browse files
committed
[flink]fix and add log
1 parent 82ebb7c commit d1c103f

File tree

1 file changed

+12
-2
lines changed

1 file changed

+12
-2
lines changed

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -541,11 +541,21 @@ && hasPrimaryKey()
541541
}
542542
partitionFilters = converted.isEmpty() ? null : PredicateBuilder.and(converted);
543543
// lake source is not null
544-
if (lakeSource != null && partitionFilters != null) {
544+
if (lakeSource != null) {
545+
PredicateVisitor<Boolean> lakePredicateVisitor =
546+
new PartitionPredicateVisitor(tableOutputType.getFieldNames());
545547

546548
List<Predicate> lakePredicates = new ArrayList<>();
549+
for (ResolvedExpression filter : filters) {
547550

548-
lakePredicates.addAll(converted);
551+
Optional<Predicate> predicateOptional =
552+
PredicateConverter.convert(tableOutputType, filter);
553+
554+
if (predicateOptional.isPresent()) {
555+
Predicate p = predicateOptional.get();
556+
lakePredicates.add(p);
557+
}
558+
}
549559

550560
if (!lakePredicates.isEmpty()) {
551561
final LakeSource.FilterPushDownResult filterPushDownResult =

0 commit comments

Comments
 (0)