Skip to content

Commit 9e750c1

Browse files
committed
WIP
1 parent 7c7f39d commit 9e750c1

File tree

2 files changed

+15
-25
lines changed

2 files changed

+15
-25
lines changed

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,8 @@
9292
import java.util.stream.Collectors;
9393

9494
import static org.apache.fluss.flink.utils.LakeSourceUtils.createLakeSource;
95+
import static org.apache.fluss.flink.utils.PushdownUtils.ValueConversion.FLINK_INTERNAL_VALUE;
96+
import static org.apache.fluss.flink.utils.PushdownUtils.extractFieldEquals;
9597
import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
9698
import static org.apache.fluss.utils.Preconditions.checkNotNull;
9799
import static org.apache.fluss.utils.Preconditions.checkState;
@@ -142,11 +144,11 @@ public class FlinkTableSource
142144
@Nullable private RowLevelModificationType modificationScanType;
143145

144146
// count(*) push down
145-
protected boolean selectRowCount = false;
147+
private boolean selectRowCount = false;
146148

147149
private long limit = -1;
148150

149-
@Nullable protected Predicate partitionFilters;
151+
@Nullable private Predicate partitionFilters;
150152

151153
private final Map<String, String> tableOptions;
152154

@@ -479,17 +481,17 @@ && hasPrimaryKey()
479481
&& filters.size() == primaryKeyIndexes.length) {
480482

481483
Map<Integer, LogicalType> primaryKeyTypes = getPrimaryKeyTypes();
482-
List<PushdownUtils.FieldEqual> fieldEquals =
483-
PushdownUtils.extractFieldEquals(
484+
List<FieldEqual> fieldEquals =
485+
extractFieldEquals(
484486
filters,
485487
primaryKeyTypes,
486488
acceptedFilters,
487489
remainingFilters,
488-
PushdownUtils.ValueConversion.FLINK_INTERNAL_VALUE);
490+
FLINK_INTERNAL_VALUE);
489491
int[] keyRowProjection = getKeyRowProjection();
490492
HashSet<Integer> visitedPkFields = new HashSet<>();
491493
GenericRowData lookupRow = new GenericRowData(primaryKeyIndexes.length);
492-
for (PushdownUtils.FieldEqual fieldEqual : fieldEquals) {
494+
for (FieldEqual fieldEqual : fieldEquals) {
493495
lookupRow.setField(keyRowProjection[fieldEqual.fieldIndex], fieldEqual.equalValue);
494496
visitedPkFields.add(fieldEqual.fieldIndex);
495497
}
@@ -527,7 +529,7 @@ && hasPrimaryKey()
527529
partitionKeyTypes, partitionKeys.toArray(new String[0])),
528530
filter);
529531

530-
if (!predicateOptional.isPresent()) {
532+
if (predicateOptional.isEmpty()) {
531533
remainingFilters.add(filter);
532534
} else {
533535
Predicate p = predicateOptional.get();
@@ -542,19 +544,11 @@ && hasPrimaryKey()
542544
partitionFilters = converted.isEmpty() ? null : PredicateBuilder.and(converted);
543545
// lake source is not null
544546
if (lakeSource != null) {
545-
PredicateVisitor<Boolean> lakePredicateVisitor =
546-
new PartitionPredicateVisitor(tableOutputType.getFieldNames());
547-
548547
List<Predicate> lakePredicates = new ArrayList<>();
549548
for (ResolvedExpression filter : filters) {
550-
551549
Optional<Predicate> predicateOptional =
552550
PredicateConverter.convert(tableOutputType, filter);
553-
554-
if (predicateOptional.isPresent()) {
555-
Predicate p = predicateOptional.get();
556-
lakePredicates.add(p);
557-
}
551+
predicateOptional.ifPresent(lakePredicates::add);
558552
}
559553

560554
if (!lakePredicates.isEmpty()) {

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -406,18 +406,14 @@ private void checkPartitionChanges(Set<PartitionInfo> partitionInfos, Throwable
406406
return;
407407
}
408408

409-
if (LOG.isDebugEnabled()) {
410-
LOG.debug(
411-
"Checking partition changes for table {}, found {} partitions",
412-
tablePath,
413-
partitionInfos.size());
414-
}
409+
LOG.debug(
410+
"Checking partition changes for table {}, found {} partitions",
411+
tablePath,
412+
partitionInfos.size());
415413

416414
final PartitionChange partitionChange = getPartitionChange(partitionInfos);
417415
if (partitionChange.isEmpty()) {
418-
if (LOG.isDebugEnabled()) {
419-
LOG.debug("No partition changes detected for table {}", tablePath);
420-
}
416+
LOG.debug("No partition changes detected for table {}", tablePath);
421417
return;
422418
}
423419

0 commit comments

Comments
 (0)