Skip to content

Commit 7f762c4

Browse files
committed
add log for enumerator
1 parent de9f394 commit 7f762c4

File tree

3 files changed

+24
-0
lines changed

3 files changed

+24
-0
lines changed

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitGenerator.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@
3232
import org.apache.fluss.metadata.TableInfo;
3333
import org.apache.fluss.utils.ExceptionUtils;
3434

35+
import org.slf4j.Logger;
36+
import org.slf4j.LoggerFactory;
37+
3538
import javax.annotation.Nullable;
3639

3740
import java.util.ArrayList;
@@ -52,6 +55,8 @@
5255
/** A generator for lake splits. */
5356
public class LakeSplitGenerator {
5457

58+
private static final Logger LOG = LoggerFactory.getLogger(LakeSplitGenerator.class);
59+
5560
private final TableInfo tableInfo;
5661
private final Admin flussAdmin;
5762
private final OffsetsInitializer.BucketOffsetsRetriever bucketOffsetsRetriever;
@@ -88,6 +93,7 @@ public List<SourceSplitBase> generateHybridLakeFlussSplits() throws Exception {
8893
LakeSnapshot lakeSnapshotInfo;
8994
try {
9095
lakeSnapshotInfo = flussAdmin.getLatestLakeSnapshot(tableInfo.getTablePath()).get();
96+
LOG.info("Get lake snapshot info: {}", lakeSnapshotInfo);
9197
} catch (Exception exception) {
9298
if (ExceptionUtils.stripExecutionException(exception)
9399
instanceof LakeTableSnapshotNotExistException) {
@@ -105,6 +111,7 @@ public List<SourceSplitBase> generateHybridLakeFlussSplits() throws Exception {
105111
.createPlanner(
106112
(LakeSource.PlannerContext) lakeSnapshotInfo::getSnapshotId)
107113
.plan());
114+
LOG.info("Group lake splits: {}", lakeSplits);
108115

109116
Map<TableBucket, Long> tableBucketsOffset = lakeSnapshotInfo.getTableBucketsOffset();
110117
if (isPartitioned) {

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,7 @@ public void start() {
258258
// we'll need to consider lake splits
259259
List<SourceSplitBase> hybridLakeFlussSplits = generateHybridLakeFlussSplits();
260260
if (hybridLakeFlussSplits != null) {
261+
LOG.info("Generated hybrid lake splits: {}", hybridLakeFlussSplits);
261262
// handle hybrid lake fluss splits firstly
262263
handleSplitsAdd(hybridLakeFlussSplits, null);
263264
}
@@ -616,6 +617,7 @@ private List<SourceSplitBase> generateHybridLakeFlussSplits() {
616617
// should be restored from checkpoint, shouldn't
617618
// list splits again
618619
if (pendingHybridLakeFlussSplits != null) {
620+
LOG.info("Still have pending lake fluss splits, shouldn't list splits again.");
619621
return pendingHybridLakeFlussSplits;
620622
}
621623
try {
@@ -659,6 +661,7 @@ private void handlePartitionsRemoved(Collection<Partition> removedPartitionInfo)
659661
split ->
660662
removedPartitionsMap.containsKey(
661663
split.getTableBucket().getPartitionId())));
664+
LOG.info("Removed partitions {}", removedPartitionsMap);
662665

663666
// send partition removed event to all readers
664667
PartitionsRemovedEvent event = new PartitionsRemovedEvent(removedPartitionsMap);
@@ -709,6 +712,8 @@ private void addSplitToPendingAssignments(Collection<SourceSplitBase> newSplits)
709712
private void assignPendingSplits(Set<Integer> pendingReaders) {
710713
Map<Integer, List<SourceSplitBase>> incrementalAssignment = new HashMap<>();
711714

715+
LOG.info("Assigning pending splits to readers {}", pendingReaders);
716+
LOG.info("Pending split assignment {}", pendingSplitAssignment);
712717
// Check if there's any pending splits for given readers
713718
for (int pendingReader : pendingReaders) {
714719
checkReaderRegistered(pendingReader);
@@ -758,6 +763,8 @@ private void assignPendingSplits(Set<Integer> pendingReaders) {
758763
if (!incrementalAssignment.isEmpty()) {
759764
LOG.info("Assigning splits to readers {}", incrementalAssignment);
760765
context.assignSplits(new SplitsAssignment<>(incrementalAssignment));
766+
} else {
767+
LOG.info("No pending splits to assign to readers {}", pendingReaders);
761768
}
762769

763770
if (noMoreNewSplits) {

fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSplit.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,4 +72,14 @@ public DataSplit dataSplit() {
7272
public boolean isBucketUnAware() {
7373
return isBucketUnAware;
7474
}
75+
76+
@Override
77+
public String toString() {
78+
return "PaimonSplit{"
79+
+ "dataSplit="
80+
+ dataSplit
81+
+ ", isBucketUnAware="
82+
+ isBucketUnAware
83+
+ '}';
84+
}
7585
}

0 commit comments

Comments
 (0)