Skip to content

Commit 74e4d2b

Browse files
authored
[hotfix] Improve some codes in FlinkTableSource (#1083)
1 parent 6d038d6 commit 74e4d2b

File tree

3 files changed

+13
-11
lines changed

3 files changed

+13
-11
lines changed

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/sink/FlinkTableSink.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@
5656
import java.util.Set;
5757
import java.util.stream.Collectors;
5858

59+
import static com.alibaba.fluss.flink.utils.PushdownUtils.extractFieldEquals;
60+
5961
/** A Flink {@link DynamicTableSink}. */
6062
public class FlinkTableSink
6163
implements DynamicTableSink,
@@ -263,7 +265,7 @@ public boolean applyDeleteFilters(List<ResolvedExpression> filters) {
263265
List<ResolvedExpression> remainingFilters = new ArrayList<>();
264266
Map<Integer, LogicalType> primaryKeyTypes = getPrimaryKeyTypes();
265267
List<FieldEqual> fieldEquals =
266-
PushdownUtils.extractFieldEquals(
268+
extractFieldEquals(
267269
filters,
268270
primaryKeyTypes,
269271
acceptedFilters,

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/FlinkTableSource.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@
7676
import java.util.Map;
7777

7878
import static com.alibaba.fluss.flink.utils.PushdownUtils.ValueConversion.FLINK_INTERNAL_VALUE;
79+
import static com.alibaba.fluss.flink.utils.PushdownUtils.extractFieldEquals;
7980
import static com.alibaba.fluss.utils.Preconditions.checkNotNull;
8081

8182
/** Flink table source to scan Fluss data. */
@@ -398,7 +399,7 @@ && hasPrimaryKey()
398399
&& filters.size() == primaryKeyIndexes.length) {
399400
Map<Integer, LogicalType> primaryKeyTypes = getPrimaryKeyTypes();
400401
List<FieldEqual> fieldEquals =
401-
PushdownUtils.extractFieldEquals(
402+
extractFieldEquals(
402403
filters,
403404
primaryKeyTypes,
404405
acceptedFilters,
@@ -419,11 +420,10 @@ && hasPrimaryKey()
419420
return Result.of(acceptedFilters, remainingFilters);
420421
} else if (isPartitioned()) {
421422
// dynamic partition pushdown
422-
Map<Integer, LogicalType> partitionKeyTypes = getPartitionKeyTypes();
423423
List<FieldEqual> fieldEquals =
424-
PushdownUtils.extractFieldEquals(
424+
extractFieldEquals(
425425
filters,
426-
partitionKeyTypes,
426+
getPartitionKeyTypes(),
427427
acceptedFilters,
428428
remainingFilters,
429429
FLINK_INTERNAL_VALUE);

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/utils/PushdownUtils.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ public class PushdownUtils {
8383
/** Extract field equality information from expressions. */
8484
public static List<FieldEqual> extractFieldEquals(
8585
List<ResolvedExpression> expressions,
86-
Map<Integer, LogicalType> primaryKeyColumn,
86+
Map<Integer, LogicalType> fieldIndexToType,
8787
List<ResolvedExpression> acceptedFiltersResult,
8888
List<ResolvedExpression> remainingFiltersResult,
8989
ValueConversion valueConversion) {
@@ -107,7 +107,7 @@ public static List<FieldEqual> extractFieldEquals(
107107
extractFieldEqual(
108108
leftFieldRef,
109109
rightValue,
110-
primaryKeyColumn,
110+
fieldIndexToType,
111111
valueConversion);
112112
} else if (left instanceof ValueLiteralExpression
113113
&& right instanceof FieldReferenceExpression) {
@@ -117,7 +117,7 @@ public static List<FieldEqual> extractFieldEquals(
117117
extractFieldEqual(
118118
rightFieldRef,
119119
leftValue,
120-
primaryKeyColumn,
120+
fieldIndexToType,
121121
valueConversion);
122122
}
123123

@@ -141,11 +141,11 @@ public static List<FieldEqual> extractFieldEquals(
141141
private static FieldEqual extractFieldEqual(
142142
FieldReferenceExpression fieldsRef,
143143
ValueLiteralExpression valueLiteral,
144-
Map<Integer, LogicalType> primaryKeyColumn,
144+
Map<Integer, LogicalType> fieldIndexToType,
145145
ValueConversion valueConversion) {
146146
int columnIndex = fieldsRef.getFieldIndex();
147-
if (primaryKeyColumn.containsKey(columnIndex)) {
148-
LogicalType expectedType = primaryKeyColumn.get(columnIndex);
147+
if (fieldIndexToType.containsKey(columnIndex)) {
148+
LogicalType expectedType = fieldIndexToType.get(columnIndex);
149149
if (expectedType.getTypeRoot()
150150
!= valueLiteral.getOutputDataType().getLogicalType().getTypeRoot()) {
151151
return null;

0 commit comments

Comments
 (0)