Skip to content

Commit 4823a8a

Browse files
committed
[flink]fix and add log
1 parent cecf4d0 commit 4823a8a

File tree

5 files changed

+70
-53
lines changed

5 files changed

+70
-53
lines changed

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,9 @@
3030
import org.apache.fluss.flink.source.split.SourceSplitSerializer;
3131
import org.apache.fluss.flink.source.state.FlussSourceEnumeratorStateSerializer;
3232
import org.apache.fluss.flink.source.state.SourceEnumeratorState;
33-
import org.apache.fluss.metadata.TablePath;
3433
import org.apache.fluss.lake.source.LakeSource;
3534
import org.apache.fluss.lake.source.LakeSplit;
35+
import org.apache.fluss.metadata.TablePath;
3636
import org.apache.fluss.predicate.Predicate;
3737
import org.apache.fluss.types.RowType;
3838

@@ -50,10 +50,6 @@
5050

5151
import javax.annotation.Nullable;
5252

53-
import java.util.List;
54-
55-
import static org.apache.fluss.utils.Preconditions.checkNotNull;
56-
5753
/** Flink source for Fluss. */
5854
public class FlinkSource<OUT>
5955
implements Source<OUT, SourceSplitBase, SourceEnumeratorState>, ResultTypeQueryable {
@@ -112,7 +108,7 @@ public FlinkSource(
112108
long scanPartitionDiscoveryIntervalMs,
113109
FlussDeserializationSchema<OUT> deserializationSchema,
114110
boolean streaming,
115-
List<FieldEqual> partitionFilters,
111+
Predicate partitionFilters,
116112
LakeSource<LakeSplit> lakeSource) {
117113
this.flussConf = flussConf;
118114
this.tablePath = tablePath;

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java

Lines changed: 16 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,10 @@
3434
import org.apache.fluss.metadata.TablePath;
3535
import org.apache.fluss.predicate.GreaterOrEqual;
3636
import org.apache.fluss.predicate.LeafPredicate;
37+
import org.apache.fluss.predicate.PartitionPredicateVisitor;
3738
import org.apache.fluss.predicate.Predicate;
3839
import org.apache.fluss.predicate.PredicateBuilder;
40+
import org.apache.fluss.predicate.PredicateVisitor;
3941
import org.apache.fluss.row.TimestampLtz;
4042
import org.apache.fluss.types.DataTypes;
4143
import org.apache.fluss.types.RowType;
@@ -72,6 +74,7 @@
7274
import org.apache.flink.table.functions.LookupFunction;
7375
import org.apache.flink.table.types.DataType;
7476
import org.apache.flink.table.types.logical.LogicalType;
77+
import org.apache.flink.table.types.logical.VarCharType;
7578
import org.slf4j.Logger;
7679
import org.slf4j.LoggerFactory;
7780

@@ -89,9 +92,6 @@
8992
import java.util.stream.Collectors;
9093

9194
import static org.apache.fluss.flink.utils.LakeSourceUtils.createLakeSource;
92-
import static org.apache.fluss.flink.utils.PushdownUtils.ValueConversion.FLINK_INTERNAL_VALUE;
93-
import static org.apache.fluss.flink.utils.PushdownUtils.ValueConversion.FLUSS_INTERNAL_VALUE;
94-
import static org.apache.fluss.flink.utils.PushdownUtils.extractFieldEquals;
9595
import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
9696
import static org.apache.fluss.utils.Preconditions.checkNotNull;
9797
import static org.apache.fluss.utils.Preconditions.checkState;
@@ -541,31 +541,20 @@ && hasPrimaryKey()
541541
}
542542
partitionFilters = converted.isEmpty() ? null : PredicateBuilder.and(converted);
543543
// lake source is not null
544-
if (lakeSource != null) {
545-
// and exist field equals, push down to lake source
546-
if (!fieldEquals.isEmpty()) {
547-
// convert flink row type to fluss row type
548-
RowType flussRowType = FlinkConversions.toFlussRowType(tableOutputType);
549-
550-
List<Predicate> lakePredicates = new ArrayList<>();
551-
PredicateBuilder predicateBuilder = new PredicateBuilder(flussRowType);
552-
553-
for (FieldEqual fieldEqual : fieldEquals) {
554-
lakePredicates.add(
555-
predicateBuilder.equal(
556-
fieldEqual.fieldIndex, fieldEqual.equalValue));
557-
}
544+
if (lakeSource != null && partitionFilters != null) {
545+
546+
List<Predicate> lakePredicates = new ArrayList<>();
547+
548+
lakePredicates.addAll(converted);
558549

559-
if (!lakePredicates.isEmpty()) {
560-
final LakeSource.FilterPushDownResult filterPushDownResult =
561-
lakeSource.withFilters(lakePredicates);
562-
if (filterPushDownResult.acceptedPredicates().size()
563-
!= lakePredicates.size()) {
564-
LOG.info(
565-
"LakeSource rejected some partition filters. Falling back to Flink-side filtering.");
566-
// Flink will apply all filters to preserve correctness
567-
return Result.of(Collections.emptyList(), filters);
568-
}
550+
if (!lakePredicates.isEmpty()) {
551+
final LakeSource.FilterPushDownResult filterPushDownResult =
552+
lakeSource.withFilters(lakePredicates);
553+
if (filterPushDownResult.acceptedPredicates().size() != lakePredicates.size()) {
554+
LOG.info(
555+
"LakeSource rejected some partition filters. Falling back to Flink-side filtering.");
556+
// Flink will apply all filters to preserve correctness
557+
return Result.of(Collections.emptyList(), filters);
569558
}
570559
}
571560
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/PredicateConverter.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -211,8 +211,6 @@ private Object extractLiteral(DataType expectedType, Expression expression) {
211211
if (actualLogicalType.getTypeRoot().equals(expectedLogicalType.getTypeRoot())
212212
&& !isStringType(expectedLogicalType)) {
213213
return FlinkAsFlussRow.fromFlinkObject(value, expectedType);
214-
} else if (isStringType(actualLogicalType) || isStringType(expectedLogicalType)) {
215-
return value.toString();
216214
} else if (supportsImplicitCast(actualLogicalType, expectedLogicalType)) {
217215
try {
218216
return TypeUtils.castFromString(

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java

Lines changed: 51 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,16 @@
3535
import org.apache.fluss.flink.source.split.LogSplit;
3636
import org.apache.fluss.flink.source.split.SourceSplitBase;
3737
import org.apache.fluss.flink.source.state.SourceEnumeratorState;
38-
import org.apache.fluss.flink.utils.PushdownUtils.FieldEqual;
3938
import org.apache.fluss.lake.source.LakeSource;
4039
import org.apache.fluss.lake.source.LakeSplit;
4140
import org.apache.fluss.metadata.PartitionInfo;
4241
import org.apache.fluss.metadata.TableBucket;
4342
import org.apache.fluss.metadata.TableInfo;
4443
import org.apache.fluss.metadata.TablePath;
45-
import org.apache.fluss.types.DataField;
44+
import org.apache.fluss.predicate.Predicate;
45+
import org.apache.fluss.row.BinaryString;
46+
import org.apache.fluss.row.GenericRow;
47+
import org.apache.fluss.row.InternalRow;
4648
import org.apache.fluss.utils.ExceptionUtils;
4749

4850
import org.apache.flink.annotation.VisibleForTesting;
@@ -133,7 +135,7 @@ public class FlinkSourceEnumerator
133135

134136
private volatile boolean closed = false;
135137

136-
private Predicate predicate;
138+
private Predicate partitionFilters;
137139

138140
@Nullable private final LakeSource<LakeSplit> lakeSource;
139141

@@ -146,7 +148,7 @@ public FlinkSourceEnumerator(
146148
OffsetsInitializer startingOffsetsInitializer,
147149
long scanPartitionDiscoveryIntervalMs,
148150
boolean streaming,
149-
Predicate predicate) {
151+
Predicate partitionFilters) {
150152
this(
151153
tablePath,
152154
flussConf,
@@ -169,7 +171,7 @@ public FlinkSourceEnumerator(
169171
OffsetsInitializer startingOffsetsInitializer,
170172
long scanPartitionDiscoveryIntervalMs,
171173
boolean streaming,
172-
List<FieldEqual> partitionFilters,
174+
Predicate partitionFilters,
173175
@Nullable LakeSource<LakeSplit> lakeSource) {
174176
this(
175177
tablePath,
@@ -199,7 +201,7 @@ public FlinkSourceEnumerator(
199201
OffsetsInitializer startingOffsetsInitializer,
200202
long scanPartitionDiscoveryIntervalMs,
201203
boolean streaming,
202-
List<FieldEqual> partitionFilters,
204+
Predicate partitionFilters,
203205
@Nullable LakeSource<LakeSplit> lakeSource) {
204206
this.tablePath = checkNotNull(tablePath);
205207
this.flussConf = checkNotNull(flussConf);
@@ -216,7 +218,7 @@ public FlinkSourceEnumerator(
216218
: new LinkedList<>(pendingHybridLakeFlussSplits);
217219
this.scanPartitionDiscoveryIntervalMs = scanPartitionDiscoveryIntervalMs;
218220
this.streaming = streaming;
219-
this.partitionFilters = checkNotNull(partitionFilters);
221+
this.partitionFilters = partitionFilters;
220222
this.stoppingOffsetsInitializer =
221223
streaming ? new NoStoppingOffsetsInitializer() : OffsetsInitializer.latest();
222224
this.lakeSource = lakeSource;
@@ -354,22 +356,37 @@ private Set<PartitionInfo> listPartitions() {
354356

355357
/** Apply partition filter. */
356358
private List<PartitionInfo> applyPartitionFilter(List<PartitionInfo> partitionInfos) {
357-
if (predicate == null) {
359+
if (partitionFilters == null) {
358360
return partitionInfos;
359361
} else {
362+
int originalSize = partitionInfos.size();
360363
List<PartitionInfo> filteredPartitionInfos =
361364
partitionInfos.stream()
362365
.filter(
363366
partitionInfo ->
364-
predicate.test(
367+
partitionFilters.test(
365368
convertPartitionInfoToInternalRow(
366369
partitionInfo)))
367370
.collect(Collectors.toList());
368-
LOG.info(
369-
"Filtered partitions {} for table {} with predicate: {}",
370-
filteredPartitionInfos,
371-
tablePath,
372-
predicate);
371+
372+
int filteredSize = filteredPartitionInfos.size();
373+
// Only log when there's actual filtering happening or when it's the first time
374+
if (originalSize != filteredSize) {
375+
LOG.info(
376+
"Applied partition filter for table {}: {} partitions filtered to {} partitions with predicate: {}",
377+
tablePath,
378+
originalSize,
379+
filteredSize,
380+
partitionFilters);
381+
if (LOG.isDebugEnabled()) {
382+
LOG.debug("Filtered partitions: {}", filteredPartitionInfos);
383+
}
384+
} else if (LOG.isDebugEnabled()) {
385+
LOG.debug(
386+
"Partition filter applied for table {} but no partitions were filtered out (total: {})",
387+
tablePath,
388+
originalSize);
389+
}
373390
return filteredPartitionInfos;
374391
}
375392
}
@@ -394,17 +411,34 @@ private void checkPartitionChanges(Set<PartitionInfo> partitionInfos, Throwable
394411
LOG.error("Failed to list partitions for {}", tablePath, t);
395412
return;
396413
}
414+
415+
if (LOG.isDebugEnabled()) {
416+
LOG.debug("Checking partition changes for table {}, found {} partitions",
417+
tablePath, partitionInfos.size());
418+
}
419+
397420
final PartitionChange partitionChange = getPartitionChange(partitionInfos);
398421
if (partitionChange.isEmpty()) {
422+
if (LOG.isDebugEnabled()) {
423+
LOG.debug("No partition changes detected for table {}", tablePath);
424+
}
399425
return;
400426
}
401427

402428
// handle removed partitions
403-
handlePartitionsRemoved(partitionChange.removedPartitions);
429+
if (!partitionChange.removedPartitions.isEmpty()) {
430+
LOG.info("Handling {} removed partitions for table {}: {}",
431+
partitionChange.removedPartitions.size(), tablePath, partitionChange.removedPartitions);
432+
handlePartitionsRemoved(partitionChange.removedPartitions);
433+
}
404434

405435
// handle new partitions
406-
context.callAsync(
407-
() -> initPartitionedSplits(partitionChange.newPartitions), this::handleSplitsAdd);
436+
if (!partitionChange.newPartitions.isEmpty()) {
437+
LOG.info("Handling {} new partitions for table {}: {}",
438+
partitionChange.newPartitions.size(), tablePath, partitionChange.newPartitions);
439+
context.callAsync(
440+
() -> initPartitionedSplits(partitionChange.newPartitions), this::handleSplitsAdd);
441+
}
408442
}
409443

410444
private PartitionChange getPartitionChange(Set<PartitionInfo> fetchedPartitionInfos) {

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -357,7 +357,7 @@ void testRestore() throws Throwable {
357357
OffsetsInitializer.earliest(),
358358
DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS,
359359
streaming,
360-
Collections.emptyList(),
360+
null,
361361
null);
362362

363363
enumerator.start();

0 commit comments

Comments
 (0)