Skip to content

Commit 82ebb7c

Browse files
committed
[flink]fix and add log
1 parent 4823a8a commit 82ebb7c

File tree

1 file changed

+19
-10
lines changed

1 file changed

+19
-10
lines changed

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -368,7 +368,7 @@ private List<PartitionInfo> applyPartitionFilter(List<PartitionInfo> partitionIn
368368
convertPartitionInfoToInternalRow(
369369
partitionInfo)))
370370
.collect(Collectors.toList());
371-
371+
372372
int filteredSize = filteredPartitionInfos.size();
373373
// Only log when there's actual filtering happening or when it's the first time
374374
if (originalSize != filteredSize) {
@@ -411,12 +411,14 @@ private void checkPartitionChanges(Set<PartitionInfo> partitionInfos, Throwable
411411
LOG.error("Failed to list partitions for {}", tablePath, t);
412412
return;
413413
}
414-
414+
415415
if (LOG.isDebugEnabled()) {
416-
LOG.debug("Checking partition changes for table {}, found {} partitions",
417-
tablePath, partitionInfos.size());
416+
LOG.debug(
417+
"Checking partition changes for table {}, found {} partitions",
418+
tablePath,
419+
partitionInfos.size());
418420
}
419-
421+
420422
final PartitionChange partitionChange = getPartitionChange(partitionInfos);
421423
if (partitionChange.isEmpty()) {
422424
if (LOG.isDebugEnabled()) {
@@ -427,17 +429,24 @@ private void checkPartitionChanges(Set<PartitionInfo> partitionInfos, Throwable
427429

428430
// handle removed partitions
429431
if (!partitionChange.removedPartitions.isEmpty()) {
430-
LOG.info("Handling {} removed partitions for table {}: {}",
431-
partitionChange.removedPartitions.size(), tablePath, partitionChange.removedPartitions);
432+
LOG.info(
433+
"Handling {} removed partitions for table {}: {}",
434+
partitionChange.removedPartitions.size(),
435+
tablePath,
436+
partitionChange.removedPartitions);
432437
handlePartitionsRemoved(partitionChange.removedPartitions);
433438
}
434439

435440
// handle new partitions
436441
if (!partitionChange.newPartitions.isEmpty()) {
437-
LOG.info("Handling {} new partitions for table {}: {}",
438-
partitionChange.newPartitions.size(), tablePath, partitionChange.newPartitions);
442+
LOG.info(
443+
"Handling {} new partitions for table {}: {}",
444+
partitionChange.newPartitions.size(),
445+
tablePath,
446+
partitionChange.newPartitions);
439447
context.callAsync(
440-
() -> initPartitionedSplits(partitionChange.newPartitions), this::handleSplitsAdd);
448+
() -> initPartitionedSplits(partitionChange.newPartitions),
449+
this::handleSplitsAdd);
441450
}
442451
}
443452

0 commit comments

Comments
 (0)