Skip to content

Commit f0e7af6

Browse files
committed
Jark's improvement
1 parent 2a3892b commit f0e7af6

File tree

7 files changed

+77
-111
lines changed

7 files changed

+77
-111
lines changed

LICENSE

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -359,6 +359,7 @@ Apache Kafka
359359
./fluss-server/src/main/java/org/apache/fluss/server/utils/timer/TimingWheel.java
360360

361361
Apache Paimon
362+
./fluss-common/src/main/java/org/apache/fluss/flink/utils/PredicateConverter.java
362363
./fluss-common/src/main/java/org/apache/fluss/predicate/And.java
363364
./fluss-common/src/main/java/org/apache/fluss/predicate/CompareUtils.java
364365
./fluss-common/src/main/java/org/apache/fluss/predicate/CompoundPredicate.java

fluss-common/src/main/java/org/apache/fluss/utils/TypeUtils.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,12 @@
2121
import org.apache.fluss.row.Decimal;
2222
import org.apache.fluss.types.DataType;
2323
import org.apache.fluss.types.DecimalType;
24+
import org.apache.fluss.types.LocalZonedTimestampType;
2425
import org.apache.fluss.types.TimestampType;
2526

2627
import java.math.BigDecimal;
2728
import java.nio.charset.StandardCharsets;
29+
import java.util.TimeZone;
2830

2931
/** Type related helper functions. */
3032
public class TypeUtils {
@@ -62,6 +64,10 @@ public static Object castFromString(String s, DataType type) {
6264
case TIMESTAMP_WITHOUT_TIME_ZONE:
6365
TimestampType timestampType = (TimestampType) type;
6466
return BinaryStringUtils.toTimestampNtz(str, timestampType.getPrecision());
67+
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
68+
LocalZonedTimestampType localZonedTimestampType = (LocalZonedTimestampType) type;
69+
return BinaryStringUtils.toTimestampLtz(
70+
str, localZonedTimestampType.getPrecision(), TimeZone.getDefault());
6571
default:
6672
throw new UnsupportedOperationException("Unsupported type " + type);
6773
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -65,10 +65,8 @@ public class FlinkSource<OUT>
6565
protected final long scanPartitionDiscoveryIntervalMs;
6666
private final boolean streaming;
6767
private final FlussDeserializationSchema<OUT> deserializationSchema;
68-
69-
private Predicate partitionFilters;
70-
71-
private final @Nullable LakeSource<LakeSplit> lakeSource;
68+
@Nullable private final Predicate partitionFilters;
69+
@Nullable private final LakeSource<LakeSplit> lakeSource;
7270

7371
public FlinkSource(
7472
Configuration flussConf,
@@ -81,7 +79,7 @@ public FlinkSource(
8179
long scanPartitionDiscoveryIntervalMs,
8280
FlussDeserializationSchema<OUT> deserializationSchema,
8381
boolean streaming,
84-
Predicate partitionFilters) {
82+
@Nullable Predicate partitionFilters) {
8583
this(
8684
flussConf,
8785
tablePath,
@@ -108,8 +106,8 @@ public FlinkSource(
108106
long scanPartitionDiscoveryIntervalMs,
109107
FlussDeserializationSchema<OUT> deserializationSchema,
110108
boolean streaming,
111-
Predicate partitionFilters,
112-
LakeSource<LakeSplit> lakeSource) {
109+
@Nullable Predicate partitionFilters,
110+
@Nullable LakeSource<LakeSplit> lakeSource) {
113111
this.flussConf = flussConf;
114112
this.tablePath = tablePath;
115113
this.hasPrimaryKey = hasPrimaryKey;

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java

Lines changed: 13 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.fluss.flink.source.lookup.LookupNormalizer;
2727
import org.apache.fluss.flink.utils.FlinkConnectorOptionsUtils;
2828
import org.apache.fluss.flink.utils.FlinkConversions;
29+
import org.apache.fluss.flink.utils.PredicateConverter;
2930
import org.apache.fluss.flink.utils.PushdownUtils;
3031
import org.apache.fluss.flink.utils.PushdownUtils.FieldEqual;
3132
import org.apache.fluss.lake.source.LakeSource;
@@ -92,6 +93,8 @@
9293
import java.util.stream.Collectors;
9394

9495
import static org.apache.fluss.flink.utils.LakeSourceUtils.createLakeSource;
96+
import static org.apache.fluss.flink.utils.PushdownUtils.ValueConversion.FLINK_INTERNAL_VALUE;
97+
import static org.apache.fluss.flink.utils.PushdownUtils.extractFieldEquals;
9598
import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
9699
import static org.apache.fluss.utils.Preconditions.checkNotNull;
97100
import static org.apache.fluss.utils.Preconditions.checkState;
@@ -142,11 +145,11 @@ public class FlinkTableSource
142145
@Nullable private RowLevelModificationType modificationScanType;
143146

144147
// count(*) push down
145-
protected boolean selectRowCount = false;
148+
private boolean selectRowCount = false;
146149

147150
private long limit = -1;
148151

149-
@Nullable protected Predicate partitionFilters;
152+
@Nullable private Predicate partitionFilters;
150153

151154
private final Map<String, String> tableOptions;
152155

@@ -479,17 +482,17 @@ && hasPrimaryKey()
479482
&& filters.size() == primaryKeyIndexes.length) {
480483

481484
Map<Integer, LogicalType> primaryKeyTypes = getPrimaryKeyTypes();
482-
List<PushdownUtils.FieldEqual> fieldEquals =
483-
PushdownUtils.extractFieldEquals(
485+
List<FieldEqual> fieldEquals =
486+
extractFieldEquals(
484487
filters,
485488
primaryKeyTypes,
486489
acceptedFilters,
487490
remainingFilters,
488-
PushdownUtils.ValueConversion.FLINK_INTERNAL_VALUE);
491+
FLINK_INTERNAL_VALUE);
489492
int[] keyRowProjection = getKeyRowProjection();
490493
HashSet<Integer> visitedPkFields = new HashSet<>();
491494
GenericRowData lookupRow = new GenericRowData(primaryKeyIndexes.length);
492-
for (PushdownUtils.FieldEqual fieldEqual : fieldEquals) {
495+
for (FieldEqual fieldEqual : fieldEquals) {
493496
lookupRow.setField(keyRowProjection[fieldEqual.fieldIndex], fieldEqual.equalValue);
494497
visitedPkFields.add(fieldEqual.fieldIndex);
495498
}
@@ -527,34 +530,26 @@ && hasPrimaryKey()
527530
partitionKeyTypes, partitionKeys.toArray(new String[0])),
528531
filter);
529532

530-
if (!predicateOptional.isPresent()) {
531-
remainingFilters.add(filter);
532-
} else {
533+
if (predicateOptional.isPresent()) {
533534
Predicate p = predicateOptional.get();
534535
if (!p.visit(partitionPredicateVisitor)) {
535536
remainingFilters.add(filter);
536537
} else {
537538
acceptedFilters.add(filter);
538539
}
539540
converted.add(p);
541+
} else {
542+
remainingFilters.add(filter);
540543
}
541544
}
542545
partitionFilters = converted.isEmpty() ? null : PredicateBuilder.and(converted);
543546
// lake source is not null
544547
if (lakeSource != null) {
545-
PredicateVisitor<Boolean> lakePredicateVisitor =
546-
new PartitionPredicateVisitor(tableOutputType.getFieldNames());
547-
548548
List<Predicate> lakePredicates = new ArrayList<>();
549549
for (ResolvedExpression filter : filters) {
550-
551550
Optional<Predicate> predicateOptional =
552551
PredicateConverter.convert(tableOutputType, filter);
553-
554-
if (predicateOptional.isPresent()) {
555-
Predicate p = predicateOptional.get();
556-
lakePredicates.add(p);
557-
}
552+
predicateOptional.ifPresent(lakePredicates::add);
558553
}
559554

560555
if (!lakePredicates.isEmpty()) {

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java

Lines changed: 18 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ public class FlinkSourceEnumerator
135135

136136
private volatile boolean closed = false;
137137

138-
private Predicate partitionFilters;
138+
@Nullable private final Predicate partitionFilters;
139139

140140
@Nullable private final LakeSource<LakeSplit> lakeSource;
141141

@@ -148,7 +148,7 @@ public FlinkSourceEnumerator(
148148
OffsetsInitializer startingOffsetsInitializer,
149149
long scanPartitionDiscoveryIntervalMs,
150150
boolean streaming,
151-
Predicate partitionFilters) {
151+
@Nullable Predicate partitionFilters) {
152152
this(
153153
tablePath,
154154
flussConf,
@@ -171,7 +171,7 @@ public FlinkSourceEnumerator(
171171
OffsetsInitializer startingOffsetsInitializer,
172172
long scanPartitionDiscoveryIntervalMs,
173173
boolean streaming,
174-
Predicate partitionFilters,
174+
@Nullable Predicate partitionFilters,
175175
@Nullable LakeSource<LakeSplit> lakeSource) {
176176
this(
177177
tablePath,
@@ -201,7 +201,7 @@ public FlinkSourceEnumerator(
201201
OffsetsInitializer startingOffsetsInitializer,
202202
long scanPartitionDiscoveryIntervalMs,
203203
boolean streaming,
204-
Predicate partitionFilters,
204+
@Nullable Predicate partitionFilters,
205205
@Nullable LakeSource<LakeSplit> lakeSource) {
206206
this.tablePath = checkNotNull(tablePath);
207207
this.flussConf = checkNotNull(flussConf);
@@ -362,36 +362,30 @@ private List<PartitionInfo> applyPartitionFilter(List<PartitionInfo> partitionIn
362362
int originalSize = partitionInfos.size();
363363
List<PartitionInfo> filteredPartitionInfos =
364364
partitionInfos.stream()
365-
.filter(
366-
partitionInfo ->
367-
partitionFilters.test(
368-
convertPartitionInfoToInternalRow(
369-
partitionInfo)))
365+
.filter(partition -> partitionFilters.test(toInternalRow(partition)))
370366
.collect(Collectors.toList());
371367

372368
int filteredSize = filteredPartitionInfos.size();
373-
// Only log when there's actual filtering happening or when it's the first time
374369
if (originalSize != filteredSize) {
375-
LOG.info(
376-
"Applied partition filter for table {}: {} partitions filtered to {} partitions with predicate: {}",
370+
LOG.debug(
371+
"Applied partition filter for table {}: {} partitions filtered down to {} "
372+
+ "matching partitions with predicate: {}. Matching partitions after filtering: {}",
377373
tablePath,
378374
originalSize,
379375
filteredSize,
380-
partitionFilters);
381-
if (LOG.isDebugEnabled()) {
382-
LOG.debug("Filtered partitions: {}", filteredPartitionInfos);
383-
}
384-
} else if (LOG.isDebugEnabled()) {
376+
partitionFilters,
377+
filteredPartitionInfos);
378+
} else {
385379
LOG.debug(
386-
"Partition filter applied for table {} but no partitions were filtered out (total: {})",
380+
"Partition filter applied for table {}, but all {} partitions matched the predicate",
387381
tablePath,
388382
originalSize);
389383
}
390384
return filteredPartitionInfos;
391385
}
392386
}
393387

394-
private InternalRow convertPartitionInfoToInternalRow(PartitionInfo partitionInfo) {
388+
private static InternalRow toInternalRow(PartitionInfo partitionInfo) {
395389
List<String> partitionValues =
396390
partitionInfo.getResolvedPartitionSpec().getPartitionValues();
397391
GenericRow genericRow = new GenericRow(partitionValues.size());
@@ -412,18 +406,14 @@ private void checkPartitionChanges(Set<PartitionInfo> partitionInfos, Throwable
412406
return;
413407
}
414408

415-
if (LOG.isDebugEnabled()) {
416-
LOG.debug(
417-
"Checking partition changes for table {}, found {} partitions",
418-
tablePath,
419-
partitionInfos.size());
420-
}
409+
LOG.debug(
410+
"Checking partition changes for table {}, found {} partitions",
411+
tablePath,
412+
partitionInfos.size());
421413

422414
final PartitionChange partitionChange = getPartitionChange(partitionInfos);
423415
if (partitionChange.isEmpty()) {
424-
if (LOG.isDebugEnabled()) {
425-
LOG.debug("No partition changes detected for table {}", tablePath);
426-
}
416+
LOG.debug("No partition changes detected for table {}", tablePath);
427417
return;
428418
}
429419

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/PredicateConverter.java renamed to fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PredicateConverter.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
/*
2-
* Licensed to the Apache Software Foundation (ASF) under one
3-
* or more contributor license agreements. See the NOTICE file
4-
* distributed with this work for additional information
5-
* regarding copyright ownership. The ASF licenses this file
6-
* to you under the Apache License, Version 2.0 (the
7-
* "License"); you may not use this file except in compliance
8-
* with the License. You may obtain a copy of the License at
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
98
*
109
* http://www.apache.org/licenses/LICENSE-2.0
1110
*
@@ -16,10 +15,9 @@
1615
* limitations under the License.
1716
*/
1817

19-
package org.apache.fluss.flink.source;
18+
package org.apache.fluss.flink.utils;
2019

2120
import org.apache.fluss.flink.row.FlinkAsFlussRow;
22-
import org.apache.fluss.flink.utils.FlinkConversions;
2321
import org.apache.fluss.predicate.Predicate;
2422
import org.apache.fluss.predicate.PredicateBuilder;
2523
import org.apache.fluss.predicate.UnsupportedExpression;
@@ -51,8 +49,9 @@
5149
/* This file is based on source code of Apache Paimon Project (https://paimon.apache.org/), licensed by the Apache
5250
* Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
5351
* additional information regarding copyright ownership. */
52+
5453
/**
55-
* Convert {@link Expression} to {@link Predicate}.
54+
* Convert Flink {@link Expression} to Fluss {@link Predicate}.
5655
*
5756
* <p>For {@link FieldReferenceExpression}, please use name instead of index, if the project
5857
* pushdown is before and the filter pushdown is after, the index of the filter will be projected.

0 commit comments

Comments
 (0)