Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,18 @@

package org.apache.fluss.flink.row;

import org.apache.fluss.flink.utils.FlinkConversions;
import org.apache.fluss.row.BinaryString;
import org.apache.fluss.row.Decimal;
import org.apache.fluss.row.InternalRow;
import org.apache.fluss.row.TimestampLtz;
import org.apache.fluss.row.TimestampNtz;

import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.DataType;

/** Wraps a Flink {@link RowData} as a Fluss {@link InternalRow}. */
public class FlinkAsFlussRow implements InternalRow {
Expand Down Expand Up @@ -132,4 +135,12 @@ public byte[] getBinary(int pos, int length) {
public byte[] getBytes(int pos) {
return flinkRow.getBinary(pos);
}

public static Object fromFlinkObject(Object o, DataType type) {
if (o == null) {
return null;
}
return InternalRow.createFieldGetter(FlinkConversions.toFlussType(type), 0)
.getFieldOrNull((new FlinkAsFlussRow()).replace(GenericRowData.of(o)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@
import org.apache.fluss.flink.source.split.SourceSplitSerializer;
import org.apache.fluss.flink.source.state.FlussSourceEnumeratorStateSerializer;
import org.apache.fluss.flink.source.state.SourceEnumeratorState;
import org.apache.fluss.flink.utils.PushdownUtils.FieldEqual;
import org.apache.fluss.lake.source.LakeSource;
import org.apache.fluss.lake.source.LakeSplit;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.predicate.Predicate;
import org.apache.fluss.types.RowType;

import org.apache.flink.api.common.typeinfo.TypeInformation;
Expand All @@ -50,10 +50,6 @@

import javax.annotation.Nullable;

import java.util.List;

import static org.apache.fluss.utils.Preconditions.checkNotNull;

/** Flink source for Fluss. */
public class FlinkSource<OUT>
implements Source<OUT, SourceSplitBase, SourceEnumeratorState>, ResultTypeQueryable {
Expand All @@ -70,7 +66,7 @@ public class FlinkSource<OUT>
private final boolean streaming;
private final FlussDeserializationSchema<OUT> deserializationSchema;

private final List<FieldEqual> partitionFilters;
private Predicate partitionFilters;

private final @Nullable LakeSource<LakeSplit> lakeSource;

Expand All @@ -85,7 +81,7 @@ public FlinkSource(
long scanPartitionDiscoveryIntervalMs,
FlussDeserializationSchema<OUT> deserializationSchema,
boolean streaming,
List<FieldEqual> partitionFilters) {
Predicate partitionFilters) {
this(
flussConf,
tablePath,
Expand All @@ -112,7 +108,7 @@ public FlinkSource(
long scanPartitionDiscoveryIntervalMs,
FlussDeserializationSchema<OUT> deserializationSchema,
boolean streaming,
List<FieldEqual> partitionFilters,
Predicate partitionFilters,
LakeSource<LakeSplit> lakeSource) {
this.flussConf = flussConf;
this.tablePath = tablePath;
Expand All @@ -124,7 +120,7 @@ public FlinkSource(
this.scanPartitionDiscoveryIntervalMs = scanPartitionDiscoveryIntervalMs;
this.deserializationSchema = deserializationSchema;
this.streaming = streaming;
this.partitionFilters = checkNotNull(partitionFilters);
this.partitionFilters = partitionFilters;
this.lakeSource = lakeSource;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.predicate.GreaterOrEqual;
import org.apache.fluss.predicate.LeafPredicate;
import org.apache.fluss.predicate.PartitionPredicateVisitor;
import org.apache.fluss.predicate.Predicate;
import org.apache.fluss.predicate.PredicateBuilder;
import org.apache.fluss.predicate.PredicateVisitor;
import org.apache.fluss.row.TimestampLtz;
import org.apache.fluss.types.DataTypes;
import org.apache.fluss.types.RowType;
Expand Down Expand Up @@ -72,6 +74,7 @@
import org.apache.flink.table.functions.LookupFunction;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.VarCharType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -85,11 +88,10 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

import static org.apache.fluss.flink.utils.LakeSourceUtils.createLakeSource;
import static org.apache.fluss.flink.utils.PushdownUtils.ValueConversion.FLINK_INTERNAL_VALUE;
import static org.apache.fluss.flink.utils.PushdownUtils.ValueConversion.FLUSS_INTERNAL_VALUE;
import static org.apache.fluss.flink.utils.PushdownUtils.extractFieldEquals;
import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
import static org.apache.fluss.utils.Preconditions.checkNotNull;
import static org.apache.fluss.utils.Preconditions.checkState;
Expand Down Expand Up @@ -144,7 +146,7 @@ public class FlinkTableSource

private long limit = -1;

private List<FieldEqual> partitionFilters = Collections.emptyList();
@Nullable protected Predicate partitionFilters;

private final Map<String, String> tableOptions;

Expand Down Expand Up @@ -475,18 +477,19 @@ public Result applyFilters(List<ResolvedExpression> filters) {
&& startupOptions.startupMode == FlinkConnectorOptions.ScanStartupMode.FULL
&& hasPrimaryKey()
&& filters.size() == primaryKeyIndexes.length) {

Map<Integer, LogicalType> primaryKeyTypes = getPrimaryKeyTypes();
List<FieldEqual> fieldEquals =
extractFieldEquals(
List<PushdownUtils.FieldEqual> fieldEquals =
PushdownUtils.extractFieldEquals(
filters,
primaryKeyTypes,
acceptedFilters,
remainingFilters,
FLINK_INTERNAL_VALUE);
PushdownUtils.ValueConversion.FLINK_INTERNAL_VALUE);
int[] keyRowProjection = getKeyRowProjection();
HashSet<Integer> visitedPkFields = new HashSet<>();
GenericRowData lookupRow = new GenericRowData(primaryKeyIndexes.length);
for (FieldEqual fieldEqual : fieldEquals) {
for (PushdownUtils.FieldEqual fieldEqual : fieldEquals) {
lookupRow.setField(keyRowProjection[fieldEqual.fieldIndex], fieldEqual.equalValue);
visitedPkFields.add(fieldEqual.fieldIndex);
}
Expand All @@ -496,52 +499,79 @@ && hasPrimaryKey()
}
singleRowFilter = lookupRow;
return Result.of(acceptedFilters, remainingFilters);
} else if (isPartitioned()) {
// dynamic partition pushdown
List<FieldEqual> fieldEquals =
extractFieldEquals(
filters,
getPartitionKeyTypes(),
acceptedFilters,
remainingFilters,
FLUSS_INTERNAL_VALUE);

// partitions are filtered by string representations, convert the equals to string first
partitionFilters = stringifyFieldEquals(fieldEquals);

} else if (isPartitioned()
&& !RowLevelModificationType.UPDATE.equals(modificationScanType)) {
// apply partition filter pushdown
List<Predicate> converted = new ArrayList<>();

List<String> fieldNames = tableOutputType.getFieldNames();
List<String> partitionKeys =
Arrays.stream(partitionKeyIndexes)
.mapToObj(fieldNames::get)
.collect(Collectors.toList());

PredicateVisitor<Boolean> partitionPredicateVisitor =
new PartitionPredicateVisitor(partitionKeys);

// TODO after https://github.com/alibaba/fluss/pull/979
// replace string type with the real type
LogicalType[] partitionKeyTypes =
partitionKeys.stream()
.map(key -> VarCharType.STRING_TYPE)
.toArray(LogicalType[]::new);
for (ResolvedExpression filter : filters) {

Optional<Predicate> predicateOptional =
PredicateConverter.convert(
org.apache.flink.table.types.logical.RowType.of(
partitionKeyTypes, partitionKeys.toArray(new String[0])),
filter);

if (!predicateOptional.isPresent()) {
remainingFilters.add(filter);
} else {
Predicate p = predicateOptional.get();
if (!p.visit(partitionPredicateVisitor)) {
remainingFilters.add(filter);
} else {
acceptedFilters.add(filter);
}
converted.add(p);
}
}
partitionFilters = converted.isEmpty() ? null : PredicateBuilder.and(converted);
// lake source is not null
if (lakeSource != null) {
// and exist field equals, push down to lake source
if (!fieldEquals.isEmpty()) {
// convert flink row type to fluss row type
RowType flussRowType = FlinkConversions.toFlussRowType(tableOutputType);

List<Predicate> lakePredicates = new ArrayList<>();
PredicateBuilder predicateBuilder = new PredicateBuilder(flussRowType);

for (FieldEqual fieldEqual : fieldEquals) {
lakePredicates.add(
predicateBuilder.equal(
fieldEqual.fieldIndex, fieldEqual.equalValue));
PredicateVisitor<Boolean> lakePredicateVisitor =
new PartitionPredicateVisitor(tableOutputType.getFieldNames());

List<Predicate> lakePredicates = new ArrayList<>();
for (ResolvedExpression filter : filters) {

Optional<Predicate> predicateOptional =
PredicateConverter.convert(tableOutputType, filter);

if (predicateOptional.isPresent()) {
Predicate p = predicateOptional.get();
lakePredicates.add(p);
}
}

if (!lakePredicates.isEmpty()) {
final LakeSource.FilterPushDownResult filterPushDownResult =
lakeSource.withFilters(lakePredicates);
if (filterPushDownResult.acceptedPredicates().size()
!= lakePredicates.size()) {
LOG.info(
"LakeSource rejected some partition filters. Falling back to Flink-side filtering.");
// Flink will apply all filters to preserve correctness
return Result.of(Collections.emptyList(), filters);
}
if (!lakePredicates.isEmpty()) {
final LakeSource.FilterPushDownResult filterPushDownResult =
lakeSource.withFilters(lakePredicates);
if (filterPushDownResult.acceptedPredicates().size() != lakePredicates.size()) {
LOG.info(
"LakeSource rejected some partition filters. Falling back to Flink-side filtering.");
// Flink will apply all filters to preserve correctness
return Result.of(Collections.emptyList(), filters);
}
}
}
return Result.of(acceptedFilters, remainingFilters);
} else {
return Result.of(Collections.emptyList(), filters);
}

return Result.of(Collections.emptyList(), filters);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@

import javax.annotation.Nullable;

import java.util.Collections;

/**
* A Flink DataStream source implementation for reading data from Fluss tables.
*
Expand Down Expand Up @@ -82,7 +80,7 @@ public class FlussSource<OUT> extends FlinkSource<OUT> {
scanPartitionDiscoveryIntervalMs,
deserializationSchema,
streaming,
Collections.emptyList());
null);
}

/**
Expand Down
Loading
Loading