diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/row/FlinkAsFlussRow.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/row/FlinkAsFlussRow.java index 4b48a4ef45..3593d1f106 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/row/FlinkAsFlussRow.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/row/FlinkAsFlussRow.java @@ -17,6 +17,7 @@ package org.apache.fluss.flink.row; +import org.apache.fluss.flink.utils.FlinkConversions; import org.apache.fluss.row.BinaryString; import org.apache.fluss.row.Decimal; import org.apache.fluss.row.InternalRow; @@ -24,8 +25,10 @@ import org.apache.fluss.row.TimestampNtz; import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.DataType; /** Wraps a Flink {@link RowData} as a Fluss {@link InternalRow}. */ public class FlinkAsFlussRow implements InternalRow { @@ -132,4 +135,12 @@ public byte[] getBinary(int pos, int length) { public byte[] getBytes(int pos) { return flinkRow.getBinary(pos); } + + public static Object fromFlinkObject(Object o, DataType type) { + if (o == null) { + return null; + } + return InternalRow.createFieldGetter(FlinkConversions.toFlussType(type), 0) + .getFieldOrNull((new FlinkAsFlussRow()).replace(GenericRowData.of(o))); + } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java index 9687b5efc8..aee2dde054 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java @@ -30,10 +30,10 @@ import org.apache.fluss.flink.source.split.SourceSplitSerializer; import org.apache.fluss.flink.source.state.FlussSourceEnumeratorStateSerializer; import org.apache.fluss.flink.source.state.SourceEnumeratorState; -import org.apache.fluss.flink.utils.PushdownUtils.FieldEqual; import org.apache.fluss.lake.source.LakeSource; import org.apache.fluss.lake.source.LakeSplit; import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.predicate.Predicate; import org.apache.fluss.types.RowType; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -50,10 +50,6 @@ import javax.annotation.Nullable; -import java.util.List; - -import static org.apache.fluss.utils.Preconditions.checkNotNull; - /** Flink source for Fluss. */ public class FlinkSource implements Source, ResultTypeQueryable { @@ -70,7 +66,7 @@ public class FlinkSource private final boolean streaming; private final FlussDeserializationSchema deserializationSchema; - private final List partitionFilters; + private Predicate partitionFilters; private final @Nullable LakeSource lakeSource; @@ -85,7 +81,7 @@ public FlinkSource( long scanPartitionDiscoveryIntervalMs, FlussDeserializationSchema deserializationSchema, boolean streaming, - List partitionFilters) { + Predicate partitionFilters) { this( flussConf, tablePath, @@ -112,7 +108,7 @@ public FlinkSource( long scanPartitionDiscoveryIntervalMs, FlussDeserializationSchema deserializationSchema, boolean streaming, - List partitionFilters, + Predicate partitionFilters, LakeSource lakeSource) { this.flussConf = flussConf; this.tablePath = tablePath; @@ -124,7 +120,7 @@ public FlinkSource( this.scanPartitionDiscoveryIntervalMs = scanPartitionDiscoveryIntervalMs; this.deserializationSchema = deserializationSchema; this.streaming = streaming; - this.partitionFilters = checkNotNull(partitionFilters); + this.partitionFilters = partitionFilters; this.lakeSource = lakeSource; } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java index 5739381d36..23e4d7b414 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java @@ -34,8 +34,10 @@ import org.apache.fluss.metadata.TablePath; import org.apache.fluss.predicate.GreaterOrEqual; import org.apache.fluss.predicate.LeafPredicate; +import org.apache.fluss.predicate.PartitionPredicateVisitor; import org.apache.fluss.predicate.Predicate; import org.apache.fluss.predicate.PredicateBuilder; +import org.apache.fluss.predicate.PredicateVisitor; import org.apache.fluss.row.TimestampLtz; import org.apache.fluss.types.DataTypes; import org.apache.fluss.types.RowType; @@ -72,6 +74,7 @@ import org.apache.flink.table.functions.LookupFunction; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.VarCharType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -85,11 +88,10 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; import static org.apache.fluss.flink.utils.LakeSourceUtils.createLakeSource; -import static org.apache.fluss.flink.utils.PushdownUtils.ValueConversion.FLINK_INTERNAL_VALUE; -import static org.apache.fluss.flink.utils.PushdownUtils.ValueConversion.FLUSS_INTERNAL_VALUE; -import static org.apache.fluss.flink.utils.PushdownUtils.extractFieldEquals; import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME; import static org.apache.fluss.utils.Preconditions.checkNotNull; import static org.apache.fluss.utils.Preconditions.checkState; @@ -144,7 +146,7 @@ public class FlinkTableSource private long limit = -1; - private List partitionFilters = Collections.emptyList(); + @Nullable protected Predicate partitionFilters; private final Map tableOptions; @@ -475,18 +477,19 @@ public Result applyFilters(List filters) { && startupOptions.startupMode == FlinkConnectorOptions.ScanStartupMode.FULL && hasPrimaryKey() && filters.size() == primaryKeyIndexes.length) { + Map primaryKeyTypes = getPrimaryKeyTypes(); - List fieldEquals = - extractFieldEquals( + List fieldEquals = + PushdownUtils.extractFieldEquals( filters, primaryKeyTypes, acceptedFilters, remainingFilters, - FLINK_INTERNAL_VALUE); + PushdownUtils.ValueConversion.FLINK_INTERNAL_VALUE); int[] keyRowProjection = getKeyRowProjection(); HashSet visitedPkFields = new HashSet<>(); GenericRowData lookupRow = new GenericRowData(primaryKeyIndexes.length); - for (FieldEqual fieldEqual : fieldEquals) { + for (PushdownUtils.FieldEqual fieldEqual : fieldEquals) { lookupRow.setField(keyRowProjection[fieldEqual.fieldIndex], fieldEqual.equalValue); visitedPkFields.add(fieldEqual.fieldIndex); } @@ -496,52 +499,79 @@ && hasPrimaryKey() } singleRowFilter = lookupRow; return Result.of(acceptedFilters, remainingFilters); - } else if (isPartitioned()) { - // dynamic partition pushdown - List fieldEquals = - extractFieldEquals( - filters, - getPartitionKeyTypes(), - acceptedFilters, - remainingFilters, - FLUSS_INTERNAL_VALUE); - - // partitions are filtered by string representations, convert the equals to string first - partitionFilters = stringifyFieldEquals(fieldEquals); - + } else if (isPartitioned() + && !RowLevelModificationType.UPDATE.equals(modificationScanType)) { + // apply partition filter pushdown + List converted = new ArrayList<>(); + + List fieldNames = tableOutputType.getFieldNames(); + List partitionKeys = + Arrays.stream(partitionKeyIndexes) + .mapToObj(fieldNames::get) + .collect(Collectors.toList()); + + PredicateVisitor partitionPredicateVisitor = + new PartitionPredicateVisitor(partitionKeys); + + // TODO after https://github.com/alibaba/fluss/pull/979 + // replace string type with the real type + LogicalType[] partitionKeyTypes = + partitionKeys.stream() + .map(key -> VarCharType.STRING_TYPE) + .toArray(LogicalType[]::new); + for (ResolvedExpression filter : filters) { + + Optional predicateOptional = + PredicateConverter.convert( + org.apache.flink.table.types.logical.RowType.of( + partitionKeyTypes, partitionKeys.toArray(new String[0])), + filter); + + if (!predicateOptional.isPresent()) { + remainingFilters.add(filter); + } else { + Predicate p = predicateOptional.get(); + if (!p.visit(partitionPredicateVisitor)) { + remainingFilters.add(filter); + } else { + acceptedFilters.add(filter); + } + converted.add(p); + } + } + partitionFilters = converted.isEmpty() ? null : PredicateBuilder.and(converted); // lake source is not null if (lakeSource != null) { - // and exist field equals, push down to lake source - if (!fieldEquals.isEmpty()) { - // convert flink row type to fluss row type - RowType flussRowType = FlinkConversions.toFlussRowType(tableOutputType); - - List lakePredicates = new ArrayList<>(); - PredicateBuilder predicateBuilder = new PredicateBuilder(flussRowType); - - for (FieldEqual fieldEqual : fieldEquals) { - lakePredicates.add( - predicateBuilder.equal( - fieldEqual.fieldIndex, fieldEqual.equalValue)); + PredicateVisitor lakePredicateVisitor = + new PartitionPredicateVisitor(tableOutputType.getFieldNames()); + + List lakePredicates = new ArrayList<>(); + for (ResolvedExpression filter : filters) { + + Optional predicateOptional = + PredicateConverter.convert(tableOutputType, filter); + + if (predicateOptional.isPresent()) { + Predicate p = predicateOptional.get(); + lakePredicates.add(p); } + } - if (!lakePredicates.isEmpty()) { - final LakeSource.FilterPushDownResult filterPushDownResult = - lakeSource.withFilters(lakePredicates); - if (filterPushDownResult.acceptedPredicates().size() - != lakePredicates.size()) { - LOG.info( - "LakeSource rejected some partition filters. Falling back to Flink-side filtering."); - // Flink will apply all filters to preserve correctness - return Result.of(Collections.emptyList(), filters); - } + if (!lakePredicates.isEmpty()) { + final LakeSource.FilterPushDownResult filterPushDownResult = + lakeSource.withFilters(lakePredicates); + if (filterPushDownResult.acceptedPredicates().size() != lakePredicates.size()) { + LOG.info( + "LakeSource rejected some partition filters. Falling back to Flink-side filtering."); + // Flink will apply all filters to preserve correctness + return Result.of(Collections.emptyList(), filters); } } } return Result.of(acceptedFilters, remainingFilters); - } else { - return Result.of(Collections.emptyList(), filters); } + + return Result.of(Collections.emptyList(), filters); } @Override diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSource.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSource.java index 428204d83f..4277418349 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSource.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSource.java @@ -26,8 +26,6 @@ import javax.annotation.Nullable; -import java.util.Collections; - /** * A Flink DataStream source implementation for reading data from Fluss tables. * @@ -82,7 +80,7 @@ public class FlussSource extends FlinkSource { scanPartitionDiscoveryIntervalMs, deserializationSchema, streaming, - Collections.emptyList()); + null); } /** diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/PredicateConverter.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/PredicateConverter.java new file mode 100644 index 0000000000..73b36c0d15 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/PredicateConverter.java @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.source; + +import org.apache.fluss.flink.row.FlinkAsFlussRow; +import org.apache.fluss.flink.utils.FlinkConversions; +import org.apache.fluss.predicate.Predicate; +import org.apache.fluss.predicate.PredicateBuilder; +import org.apache.fluss.predicate.UnsupportedExpression; +import org.apache.fluss.utils.TypeUtils; + +import org.apache.flink.table.expressions.CallExpression; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.expressions.ExpressionVisitor; +import org.apache.flink.table.expressions.FieldReferenceExpression; +import org.apache.flink.table.expressions.ResolvedExpression; +import org.apache.flink.table.expressions.TypeLiteralExpression; +import org.apache.flink.table.expressions.ValueLiteralExpression; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.FunctionDefinition; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeFamily; +import org.apache.flink.table.types.logical.RowType; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.function.BiFunction; +import java.util.regex.Pattern; + +import static org.apache.flink.table.types.logical.utils.LogicalTypeCasts.supportsImplicitCast; + +/* This file is based on source code of Apache Paimon Project (https://paimon.apache.org/), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ +/** + * Convert {@link Expression} to {@link Predicate}. + * + *

For {@link FieldReferenceExpression}, please use name instead of index, if the project + * pushdown is before and the filter pushdown is after, the index of the filter will be projected. + */ +public class PredicateConverter implements ExpressionVisitor { + + private final PredicateBuilder builder; + + public PredicateConverter(RowType type) { + this(new PredicateBuilder(FlinkConversions.toFlussRowType(type))); + } + + public PredicateConverter(PredicateBuilder builder) { + this.builder = builder; + } + + /** Accepts simple LIKE patterns like "abc%". */ + private static final Pattern BEGIN_PATTERN = Pattern.compile("^[^%_]+%$"); + + private static final Pattern END_PATTERN = Pattern.compile("^%[^%_]+$"); + private static final Pattern CONTAINS_PATTERN = Pattern.compile("^%[^%_]+%$"); + + @Override + public Predicate visit(CallExpression call) { + FunctionDefinition func = call.getFunctionDefinition(); + List children = call.getChildren(); + + if (func == BuiltInFunctionDefinitions.AND) { + return PredicateBuilder.and(children.get(0).accept(this), children.get(1).accept(this)); + } else if (func == BuiltInFunctionDefinitions.OR) { + return PredicateBuilder.or(children.get(0).accept(this), children.get(1).accept(this)); + } else if (func == BuiltInFunctionDefinitions.EQUALS) { + return visitBiFunction(children, builder::equal, builder::equal); + } else if (func == BuiltInFunctionDefinitions.NOT_EQUALS) { + return visitBiFunction(children, builder::notEqual, builder::notEqual); + } else if (func == BuiltInFunctionDefinitions.GREATER_THAN) { + return visitBiFunction(children, builder::greaterThan, builder::lessThan); + } else if (func == BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL) { + return visitBiFunction(children, builder::greaterOrEqual, builder::lessOrEqual); + } else if (func == BuiltInFunctionDefinitions.LESS_THAN) { + return visitBiFunction(children, builder::lessThan, builder::greaterThan); + } else if (func == BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL) { + return visitBiFunction(children, builder::lessOrEqual, builder::greaterOrEqual); + } else if (func == BuiltInFunctionDefinitions.IN) { + FieldReferenceExpression fieldRefExpr = + extractFieldReference(children.get(0)).orElseThrow(UnsupportedExpression::new); + List literals = new ArrayList<>(); + for (int i = 1; i < children.size(); i++) { + literals.add(extractLiteral(fieldRefExpr.getOutputDataType(), children.get(i))); + } + return builder.in(builder.indexOf(fieldRefExpr.getName()), literals); + } else if (func == BuiltInFunctionDefinitions.IS_NULL) { + return extractFieldReference(children.get(0)) + .map(FieldReferenceExpression::getName) + .map(builder::indexOf) + .map(builder::isNull) + .orElseThrow(UnsupportedExpression::new); + } else if (func == BuiltInFunctionDefinitions.IS_NOT_NULL) { + return extractFieldReference(children.get(0)) + .map(FieldReferenceExpression::getName) + .map(builder::indexOf) + .map(builder::isNotNull) + .orElseThrow(UnsupportedExpression::new); + } else if (func == BuiltInFunctionDefinitions.LIKE) { + FieldReferenceExpression fieldRefExpr = + extractFieldReference(children.get(0)).orElseThrow(UnsupportedExpression::new); + if (fieldRefExpr + .getOutputDataType() + .getLogicalType() + .getTypeRoot() + .getFamilies() + .contains(LogicalTypeFamily.CHARACTER_STRING) + && builder.indexOf(fieldRefExpr.getName()) != -1) { + String sqlPattern = + Objects.requireNonNull( + extractLiteral( + fieldRefExpr.getOutputDataType(), children.get(1))) + .toString(); + String escape = + children.size() <= 2 + ? null + : Objects.requireNonNull( + extractLiteral( + fieldRefExpr.getOutputDataType(), + children.get(2))) + .toString(); + + if (escape == null) { + if (BEGIN_PATTERN.matcher(sqlPattern).matches()) { + String prefix = sqlPattern.substring(0, sqlPattern.length() - 1); + return builder.startsWith(builder.indexOf(fieldRefExpr.getName()), prefix); + } + if (END_PATTERN.matcher(sqlPattern).matches()) { + String suffix = sqlPattern.substring(1); + return builder.endsWith(builder.indexOf(fieldRefExpr.getName()), suffix); + } + if (CONTAINS_PATTERN.matcher(sqlPattern).matches() + && sqlPattern.indexOf('%', 1) == sqlPattern.length() - 1) { + String mid = sqlPattern.substring(1, sqlPattern.length() - 1); + return builder.contains(builder.indexOf(fieldRefExpr.getName()), mid); + } + } + } + } + + // TODO is_xxx, between_xxx, similar, in, not_in, not? + + throw new UnsupportedExpression(); + } + + private Predicate visitBiFunction( + List children, + BiFunction visit1, + BiFunction visit2) { + Optional fieldRefExpr = extractFieldReference(children.get(0)); + if (fieldRefExpr.isPresent() && builder.indexOf(fieldRefExpr.get().getName()) != -1) { + Object literal = + extractLiteral(fieldRefExpr.get().getOutputDataType(), children.get(1)); + return visit1.apply(builder.indexOf(fieldRefExpr.get().getName()), literal); + } else { + fieldRefExpr = extractFieldReference(children.get(1)); + if (fieldRefExpr.isPresent()) { + Object literal = + extractLiteral(fieldRefExpr.get().getOutputDataType(), children.get(0)); + return visit2.apply(builder.indexOf(fieldRefExpr.get().getName()), literal); + } + } + + throw new UnsupportedExpression(); + } + + private Optional extractFieldReference(Expression expression) { + if (expression instanceof FieldReferenceExpression) { + return Optional.of((FieldReferenceExpression) expression); + } + return Optional.empty(); + } + + private Object extractLiteral(DataType expectedType, Expression expression) { + LogicalType expectedLogicalType = expectedType.getLogicalType(); + if (!supportsPredicate(expectedLogicalType)) { + throw new UnsupportedExpression(); + } + + if (expression instanceof ValueLiteralExpression) { + ValueLiteralExpression valueExpression = (ValueLiteralExpression) expression; + if (valueExpression.isNull()) { + return null; + } + + DataType actualType = valueExpression.getOutputDataType(); + LogicalType actualLogicalType = actualType.getLogicalType(); + Optional valueOpt = valueExpression.getValueAs(actualType.getConversionClass()); + if (valueOpt.isPresent()) { + Object value = valueOpt.get(); + if (actualLogicalType.getTypeRoot().equals(expectedLogicalType.getTypeRoot()) + && !isStringType(expectedLogicalType)) { + return FlinkAsFlussRow.fromFlinkObject(value, expectedType); + } else if (supportsImplicitCast(actualLogicalType, expectedLogicalType)) { + try { + return TypeUtils.castFromString( + value.toString(), FlinkConversions.toFlussType(expectedType)); + } catch (Exception ignored) { + } + } + } + } + + throw new UnsupportedExpression(); + } + + private boolean isStringType(LogicalType type) { + switch (type.getTypeRoot()) { + case CHAR: + case VARCHAR: + return true; + default: + return false; + } + } + + private boolean supportsPredicate(LogicalType type) { + switch (type.getTypeRoot()) { + case CHAR: + case VARCHAR: + case BOOLEAN: + case BINARY: + case VARBINARY: + case DECIMAL: + case TINYINT: + case SMALLINT: + case INTEGER: + case BIGINT: + case FLOAT: + case DOUBLE: + case DATE: + case TIME_WITHOUT_TIME_ZONE: + case TIMESTAMP_WITHOUT_TIME_ZONE: + case TIMESTAMP_WITH_TIME_ZONE: + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + case INTERVAL_YEAR_MONTH: + case INTERVAL_DAY_TIME: + return true; + default: + return false; + } + } + + @Override + public Predicate visit(ValueLiteralExpression valueLiteralExpression) { + throw new UnsupportedExpression(); + } + + @Override + public Predicate visit(FieldReferenceExpression fieldReferenceExpression) { + throw new UnsupportedExpression(); + } + + @Override + public Predicate visit(TypeLiteralExpression typeLiteralExpression) { + throw new UnsupportedExpression(); + } + + @Override + public Predicate visit(Expression expression) { + throw new UnsupportedExpression(); + } + + /** + * Try best to convert a {@link ResolvedExpression} to {@link Predicate}. + * + * @param filter a resolved expression + * @return {@link Predicate} if no {@link UnsupportedExpression} thrown. + */ + public static Optional convert(RowType rowType, ResolvedExpression filter) { + try { + return Optional.ofNullable(filter.accept(new PredicateConverter(rowType))); + } catch (UnsupportedExpression e) { + return Optional.empty(); + } + } +} diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java index 867ff442f8..f97187c14f 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java @@ -35,14 +35,16 @@ import org.apache.fluss.flink.source.split.LogSplit; import org.apache.fluss.flink.source.split.SourceSplitBase; import org.apache.fluss.flink.source.state.SourceEnumeratorState; -import org.apache.fluss.flink.utils.PushdownUtils.FieldEqual; import org.apache.fluss.lake.source.LakeSource; import org.apache.fluss.lake.source.LakeSplit; import org.apache.fluss.metadata.PartitionInfo; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePath; -import org.apache.fluss.types.DataField; +import org.apache.fluss.predicate.Predicate; +import org.apache.fluss.row.BinaryString; +import org.apache.fluss.row.GenericRow; +import org.apache.fluss.row.InternalRow; import org.apache.fluss.utils.ExceptionUtils; import org.apache.flink.annotation.VisibleForTesting; @@ -133,7 +135,7 @@ public class FlinkSourceEnumerator private volatile boolean closed = false; - private final List partitionFilters; + private Predicate partitionFilters; @Nullable private final LakeSource lakeSource; @@ -146,7 +148,7 @@ public FlinkSourceEnumerator( OffsetsInitializer startingOffsetsInitializer, long scanPartitionDiscoveryIntervalMs, boolean streaming, - List partitionFilters) { + Predicate partitionFilters) { this( tablePath, flussConf, @@ -169,7 +171,7 @@ public FlinkSourceEnumerator( OffsetsInitializer startingOffsetsInitializer, long scanPartitionDiscoveryIntervalMs, boolean streaming, - List partitionFilters, + Predicate partitionFilters, @Nullable LakeSource lakeSource) { this( tablePath, @@ -199,7 +201,7 @@ public FlinkSourceEnumerator( OffsetsInitializer startingOffsetsInitializer, long scanPartitionDiscoveryIntervalMs, boolean streaming, - List partitionFilters, + Predicate partitionFilters, @Nullable LakeSource lakeSource) { this.tablePath = checkNotNull(tablePath); this.flussConf = checkNotNull(flussConf); @@ -216,7 +218,7 @@ public FlinkSourceEnumerator( : new LinkedList<>(pendingHybridLakeFlussSplits); this.scanPartitionDiscoveryIntervalMs = scanPartitionDiscoveryIntervalMs; this.streaming = streaming; - this.partitionFilters = checkNotNull(partitionFilters); + this.partitionFilters = partitionFilters; this.stoppingOffsetsInitializer = streaming ? new NoStoppingOffsetsInitializer() : OffsetsInitializer.latest(); this.lakeSource = lakeSource; @@ -354,30 +356,49 @@ private Set listPartitions() { /** Apply partition filter. */ private List applyPartitionFilter(List partitionInfos) { - if (!partitionFilters.isEmpty()) { - return partitionInfos.stream() - .filter( - partitionInfo -> { - Map specMap = - partitionInfo.getPartitionSpec().getSpecMap(); - // use getFields() instead of getFieldNames() to - // avoid collection construction - List fields = tableInfo.getRowType().getFields(); - for (FieldEqual filter : partitionFilters) { - String fieldName = fields.get(filter.fieldIndex).getName(); - String partitionValue = specMap.get(fieldName); - if (partitionValue == null - || !filter.equalValue - .toString() - .equals(partitionValue)) { - return false; - } - } - return true; - }) - .collect(Collectors.toList()); - } - return partitionInfos; + if (partitionFilters == null) { + return partitionInfos; + } else { + int originalSize = partitionInfos.size(); + List filteredPartitionInfos = + partitionInfos.stream() + .filter( + partitionInfo -> + partitionFilters.test( + convertPartitionInfoToInternalRow( + partitionInfo))) + .collect(Collectors.toList()); + + int filteredSize = filteredPartitionInfos.size(); + // Only log when there's actual filtering happening or when it's the first time + if (originalSize != filteredSize) { + LOG.info( + "Applied partition filter for table {}: {} partitions filtered to {} partitions with predicate: {}", + tablePath, + originalSize, + filteredSize, + partitionFilters); + if (LOG.isDebugEnabled()) { + LOG.debug("Filtered partitions: {}", filteredPartitionInfos); + } + } else if (LOG.isDebugEnabled()) { + LOG.debug( + "Partition filter applied for table {} but no partitions were filtered out (total: {})", + tablePath, + originalSize); + } + return filteredPartitionInfos; + } + } + + private InternalRow convertPartitionInfoToInternalRow(PartitionInfo partitionInfo) { + List partitionValues = + partitionInfo.getResolvedPartitionSpec().getPartitionValues(); + GenericRow genericRow = new GenericRow(partitionValues.size()); + for (int i = 0; i < partitionValues.size(); i++) { + genericRow.setField(i, BinaryString.fromString(partitionValues.get(i))); + } + return genericRow; } /** Init the splits for Fluss. */ @@ -390,17 +411,43 @@ private void checkPartitionChanges(Set partitionInfos, Throwable LOG.error("Failed to list partitions for {}", tablePath, t); return; } + + if (LOG.isDebugEnabled()) { + LOG.debug( + "Checking partition changes for table {}, found {} partitions", + tablePath, + partitionInfos.size()); + } + final PartitionChange partitionChange = getPartitionChange(partitionInfos); if (partitionChange.isEmpty()) { + if (LOG.isDebugEnabled()) { + LOG.debug("No partition changes detected for table {}", tablePath); + } return; } // handle removed partitions - handlePartitionsRemoved(partitionChange.removedPartitions); + if (!partitionChange.removedPartitions.isEmpty()) { + LOG.info( + "Handling {} removed partitions for table {}: {}", + partitionChange.removedPartitions.size(), + tablePath, + partitionChange.removedPartitions); + handlePartitionsRemoved(partitionChange.removedPartitions); + } // handle new partitions - context.callAsync( - () -> initPartitionedSplits(partitionChange.newPartitions), this::handleSplitsAdd); + if (!partitionChange.newPartitions.isEmpty()) { + LOG.info( + "Handling {} new partitions for table {}: {}", + partitionChange.newPartitions.size(), + tablePath, + partitionChange.newPartitions); + context.callAsync( + () -> initPartitionedSplits(partitionChange.newPartitions), + this::handleSplitsAdd); + } } private PartitionChange getPartitionChange(Set fetchedPartitionInfos) { diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java index 24757cb05f..04b43693f1 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java @@ -1025,7 +1025,7 @@ void testStreamingReadMultiPartitionPushDown() throws Exception { .filter(s -> s.contains("2025")) .collect(Collectors.toList()); waitUntilAllBucketFinishSnapshot( - admin, tablePath, Arrays.asList("2025$1", "2025$2", "2025$2")); + admin, tablePath, Arrays.asList("2025$1", "2025$2", "2026$1")); String plan = tEnv.explainSql("select * from multi_partitioned_table where c ='2025'"); assertThat(plan) @@ -1067,7 +1067,7 @@ void testStreamingReadMultiPartitionPushDown() throws Exception { } @Test - void testStreamingReadWithCombinedFilters() throws Exception { + void testStreamingReadWithCombinedFilters1() throws Exception { tEnv.executeSql( "create table combined_filters_table" + " (a int not null, b varchar, c string, d int, primary key (a, c) NOT ENFORCED) partitioned by (c) "); @@ -1076,12 +1076,16 @@ void testStreamingReadWithCombinedFilters() throws Exception { tEnv.executeSql("alter table combined_filters_table add partition (c=2026)"); List rows = new ArrayList<>(); - List expectedRowValues = new ArrayList<>(); + List expectedRowValues1 = new ArrayList<>(); + List expectedRowValues2 = new ArrayList<>(); for (int i = 0; i < 10; i++) { rows.add(row(i, "v" + i, "2025", i * 100)); if (i % 2 == 0) { - expectedRowValues.add(String.format("+I[%d, 2025, %d]", i, i * 100)); + expectedRowValues1.add(String.format("+I[%d, 2025, %d]", i, i * 100)); + } + if (i == 2) { + expectedRowValues2.add(String.format("+I[%d, 2025, %d]", i, i * 100)); } } writeRows(conn, tablePath, rows, false); @@ -1108,7 +1112,24 @@ void testStreamingReadWithCombinedFilters() throws Exception { "select a,c,d from combined_filters_table where c ='2025' and d % 200 = 0") .collect(); - assertResultsIgnoreOrder(rowIter, expectedRowValues, true); + assertResultsIgnoreOrder(rowIter, expectedRowValues1, true); + + plan = + tEnv.explainSql( + "select a,c,d from combined_filters_table where c ='2025' and d = 200"); + assertThat(plan) + .contains( + "TableSourceScan(table=[[testcatalog, defaultdb, combined_filters_table, " + + "filter=[=(c, _UTF-16LE'2025':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\")], " + + "project=[a, d]]], fields=[a, d])"); + + // test column filter、partition filter and flink runtime filter + rowIter = + tEnv.executeSql( + "select a,c,d from combined_filters_table where c ='2025' and d = 200") + .collect(); + + assertResultsIgnoreOrder(rowIter, expectedRowValues2, true); } @Test @@ -1141,8 +1162,9 @@ private List writeRowsToTwoPartition(TablePath tablePath, Collection writeRowsToTwoPartition(TablePath tablePath, Collection expectedRowValues = + writeRowsToPartition(conn, tablePath, Arrays.asList("2025", "2026", "2027")) + .stream() + .filter(s -> s.contains("2025") || s.contains("2026")) + .collect(Collectors.toList()); + waitUntilAllBucketFinishSnapshot(admin, tablePath, Arrays.asList("2025", "2026", "2027")); + + String plan = + tEnv.explainSql("select * from partitioned_table_in where c in ('2025','2026')"); + assertThat(plan) + .contains( + "TableSourceScan(table=[[testcatalog, defaultdb, partitioned_table_in, filter=[OR(=(c, _UTF-16LE'2025'), =(c, _UTF-16LE'2026'))]]], fields=[a, b, c])"); + + org.apache.flink.util.CloseableIterator rowIter = + tEnv.executeSql("select * from partitioned_table_in where c in ('2025','2026')") + .collect(); + + assertResultsIgnoreOrder(rowIter, expectedRowValues, true); + + plan = tEnv.explainSql("select * from partitioned_table_in where c ='2025' or c ='2026'"); + assertThat(plan) + .contains( + "TableSourceScan(table=[[testcatalog, defaultdb, partitioned_table_in, filter=[OR(=(c, _UTF-16LE'2025':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\"), =(c, _UTF-16LE'2026':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\"))]]], fields=[a, b, c])"); + + rowIter = + tEnv.executeSql("select * from partitioned_table_in where c ='2025' or c ='2026'") + .collect(); + + assertResultsIgnoreOrder(rowIter, expectedRowValues, true); + } + + @Test + void testStreamingReadWithCombinedFiltersAndInExpr() throws Exception { + tEnv.executeSql( + "create table combined_filters_table_in" + + " (a int not null, b varchar, c string, d int, primary key (a, c) NOT ENFORCED) partitioned by (c) "); + TablePath tablePath = TablePath.of(DEFAULT_DB, "combined_filters_table_in"); + tEnv.executeSql("alter table combined_filters_table_in add partition (c=2025)"); + tEnv.executeSql("alter table combined_filters_table_in add partition (c=2026)"); + tEnv.executeSql("alter table combined_filters_table_in add partition (c=2027)"); + + List rows = new ArrayList<>(); + List expectedRowValues = new ArrayList<>(); + + for (int i = 0; i < 10; i++) { + rows.add(row(i, "v" + i, "2025", i * 100)); + if (i % 2 == 0) { + expectedRowValues.add(String.format("+I[%d, 2025, %d]", i, i * 100)); + } + } + for (int i = 0; i < 10; i++) { + rows.add(row(i, "v" + i, "2026", i * 100)); + if (i % 2 == 0) { + expectedRowValues.add(String.format("+I[%d, 2026, %d]", i, i * 100)); + } + } + writeRows(conn, tablePath, rows, false); + + for (int i = 0; i < 10; i++) { + rows.add(row(i, "v" + i, "2027", i * 100)); + } + + writeRows(conn, tablePath, rows, false); + waitUntilAllBucketFinishSnapshot(admin, tablePath, Arrays.asList("2025", "2026", "2027")); + + String plan = + tEnv.explainSql( + "select a,c,d from combined_filters_table_in where c in ('2025','2026') and d % 200 = 0"); + assertThat(plan) + .contains( + "TableSourceScan(table=[[testcatalog, defaultdb, combined_filters_table_in, filter=[OR(=(c, _UTF-16LE'2025'), =(c, _UTF-16LE'2026'))], project=[a, c, d]]], fields=[a, c, d])"); + + // test column filter、partition filter and flink runtime filter + org.apache.flink.util.CloseableIterator rowIter = + tEnv.executeSql( + "select a,c,d from combined_filters_table_in where c in ('2025','2026') " + + "and d % 200 = 0") + .collect(); + + assertResultsIgnoreOrder(rowIter, expectedRowValues, true); + + rowIter = + tEnv.executeSql( + "select a,c,d from combined_filters_table_in where (c ='2025' or c ='2026') " + + "and d % 200 = 0") + .collect(); + + assertResultsIgnoreOrder(rowIter, expectedRowValues, true); + } + + @Test + void testStreamingReadPartitionPushDownWithLikeExpr() throws Exception { + + tEnv.executeSql( + "create table partitioned_table_like" + + " (a int not null, b varchar, c string, primary key (a, c) NOT ENFORCED) partitioned by (c) "); + TablePath tablePath = TablePath.of(DEFAULT_DB, "partitioned_table_like"); + tEnv.executeSql("alter table partitioned_table_like add partition (c=2025)"); + tEnv.executeSql("alter table partitioned_table_like add partition (c=2026)"); + tEnv.executeSql("alter table partitioned_table_like add partition (c=3026)"); + + List allData = + writeRowsToPartition(conn, tablePath, Arrays.asList("2025", "2026", "3026")); + List expectedRowValues = + allData.stream() + .filter(s -> s.contains("2025") || s.contains("2026")) + .collect(Collectors.toList()); + waitUntilAllBucketFinishSnapshot(admin, tablePath, Arrays.asList("2025", "2026", "3026")); + + String plan = tEnv.explainSql("select * from partitioned_table_like where c like '202%'"); + assertThat(plan) + .contains( + "TableSourceScan(table=[[testcatalog, defaultdb, partitioned_table_like, filter=[LIKE(c, _UTF-16LE'202%')]]], fields=[a, b, c])"); + + org.apache.flink.util.CloseableIterator rowIter = + tEnv.executeSql("select * from partitioned_table_like where c like '202%'") + .collect(); + + assertResultsIgnoreOrder(rowIter, expectedRowValues, true); + expectedRowValues = + allData.stream() + .filter(s -> s.contains("2026") || s.contains("3026")) + .collect(Collectors.toList()); + plan = tEnv.explainSql("select * from partitioned_table_like where c like '%026'"); + assertThat(plan) + .contains( + "TableSourceScan(table=[[testcatalog, defaultdb, partitioned_table_like, filter=[LIKE(c, _UTF-16LE'%026')]]], fields=[a, b, c])"); + + rowIter = + tEnv.executeSql("select * from partitioned_table_like where c like '%026'") + .collect(); + + assertResultsIgnoreOrder(rowIter, expectedRowValues, true); + + expectedRowValues = + allData.stream().filter(s -> s.contains("3026")).collect(Collectors.toList()); + plan = tEnv.explainSql("select * from partitioned_table_like where c like '%3026%'"); + assertThat(plan) + .contains( + "TableSourceScan(table=[[testcatalog, defaultdb, partitioned_table_like, filter=[LIKE(c, _UTF-16LE'%3026%')]]], fields=[a, b, c])"); + + rowIter = + tEnv.executeSql("select * from partitioned_table_like where c like '%3026%'") + .collect(); + + assertResultsIgnoreOrder(rowIter, expectedRowValues, true); + } + + @Test + void testStreamingReadPartitionComplexPushDown() throws Exception { + + tEnv.executeSql( + "create table partitioned_table_complex" + + " (a int not null, b varchar, c string,d string, primary key (a, c, d) NOT ENFORCED) partitioned by (c,d) "); + TablePath tablePath = TablePath.of(DEFAULT_DB, "partitioned_table_complex"); + tEnv.executeSql("alter table partitioned_table_complex add partition (c=2025,d=1)"); + tEnv.executeSql("alter table partitioned_table_complex add partition (c=2025,d=2)"); + tEnv.executeSql("alter table partitioned_table_complex add partition (c=2026,d=1)"); + + List allData = + writeRowsToTwoPartition( + tablePath, Arrays.asList("c=2025,d=1", "c=2025,d=2", "c=2026,d=1")); + List expectedRowValues = + allData.stream() + .filter(s -> s.contains("v3") && !s.contains("2025, 2")) + .collect(Collectors.toList()); + waitUntilAllBucketFinishSnapshot( + admin, tablePath, Arrays.asList("2025$1", "2025$2", "2026$1")); + + String plan = + tEnv.explainSql( + "select * from partitioned_table_complex where a = 3\n" + + " and (c in ('2026') or d like '%1%') " + + " and b like '%v3%'"); + assertThat(plan) + .contains( + "Calc(select=[3 AS a, b, c, d], where=[((a = 3) AND LIKE(b, '%v3%'))])\n" + + "+- TableSourceScan(table=[[testcatalog, defaultdb, partitioned_table_complex, filter=[OR(=(c, _UTF-16LE'2026'), LIKE(d, _UTF-16LE'%1%'))]]], fields=[a, b, c, d])"); + + org.apache.flink.util.CloseableIterator rowIter = + tEnv.executeSql( + "select * from partitioned_table_complex where a = 3\n" + + " and (c in ('2026') or d like '%1%') " + + " and b like '%v3%'") + .collect(); + + assertResultsIgnoreOrder(rowIter, expectedRowValues, true); + } + private enum Caching { ENABLE_CACHE, DISABLE_CACHE diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java index 7218f38913..5dc4af4708 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java @@ -97,7 +97,7 @@ void testPkTableNoSnapshotSplits() throws Throwable { OffsetsInitializer.full(), DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS, streaming, - Collections.emptyList()); + null); enumerator.start(); @@ -144,7 +144,7 @@ void testPkTableWithSnapshotSplits() throws Throwable { OffsetsInitializer.full(), DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS, streaming, - Collections.emptyList()); + null); enumerator.start(); // register all read for (int i = 0; i < numSubtasks; i++) { @@ -215,7 +215,7 @@ void testNonPkTable() throws Throwable { OffsetsInitializer.full(), DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS, streaming, - Collections.emptyList()); + null); enumerator.start(); @@ -261,7 +261,7 @@ void testReaderRegistrationTriggerAssignments() throws Throwable { OffsetsInitializer.full(), DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS, streaming, - Collections.emptyList()); + null); enumerator.start(); @@ -297,7 +297,7 @@ void testAddSplitBack() throws Throwable { OffsetsInitializer.full(), DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS, streaming, - Collections.emptyList()); + null); enumerator.start(); @@ -357,7 +357,7 @@ void testRestore() throws Throwable { OffsetsInitializer.earliest(), DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS, streaming, - Collections.emptyList(), + null, null); enumerator.start(); @@ -401,7 +401,7 @@ void testDiscoverPartitionsPeriodically(boolean isPrimaryKeyTable) throws Throwa OffsetsInitializer.full(), DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS, streaming, - Collections.emptyList())) { + null)) { Map partitionNameByIds = waitUntilPartitions(zooKeeperClient, DEFAULT_TABLE_PATH); enumerator.start(); @@ -516,7 +516,7 @@ void testGetSplitOwner() throws Exception { OffsetsInitializer.full(), DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS, streaming, - Collections.emptyList())) { + null)) { // test splits for same non-partitioned bucket, should assign to same task TableBucket t1 = new TableBucket(tableId, 0);