Skip to content

Commit cdb7cdc

Browse files
committed
add log for enumerator
1 parent de9f394 commit cdb7cdc

File tree

3 files changed

+19
-0
lines changed

3 files changed

+19
-0
lines changed

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitGenerator.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@
3232
import org.apache.fluss.metadata.TableInfo;
3333
import org.apache.fluss.utils.ExceptionUtils;
3434

35+
import org.slf4j.Logger;
36+
import org.slf4j.LoggerFactory;
37+
3538
import javax.annotation.Nullable;
3639

3740
import java.util.ArrayList;
@@ -52,6 +55,8 @@
5255
/** A generator for lake splits. */
5356
public class LakeSplitGenerator {
5457

58+
private static final Logger LOG = LoggerFactory.getLogger(LakeSplitGenerator.class);
59+
5560
private final TableInfo tableInfo;
5661
private final Admin flussAdmin;
5762
private final OffsetsInitializer.BucketOffsetsRetriever bucketOffsetsRetriever;
@@ -88,6 +93,7 @@ public List<SourceSplitBase> generateHybridLakeFlussSplits() throws Exception {
8893
LakeSnapshot lakeSnapshotInfo;
8994
try {
9095
lakeSnapshotInfo = flussAdmin.getLatestLakeSnapshot(tableInfo.getTablePath()).get();
96+
LOG.info("Get lake snapshot info: {}", lakeSnapshotInfo);
9197
} catch (Exception exception) {
9298
if (ExceptionUtils.stripExecutionException(exception)
9399
instanceof LakeTableSnapshotNotExistException) {
@@ -105,6 +111,7 @@ public List<SourceSplitBase> generateHybridLakeFlussSplits() throws Exception {
105111
.createPlanner(
106112
(LakeSource.PlannerContext) lakeSnapshotInfo::getSnapshotId)
107113
.plan());
114+
LOG.info("Group lake splits: {}", lakeSplits);
108115

109116
Map<TableBucket, Long> tableBucketsOffset = lakeSnapshotInfo.getTableBucketsOffset();
110117
if (isPartitioned) {

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,7 @@ public void start() {
258258
// we'll need to consider lake splits
259259
List<SourceSplitBase> hybridLakeFlussSplits = generateHybridLakeFlussSplits();
260260
if (hybridLakeFlussSplits != null) {
261+
LOG.info("Generated hybrid lake splits: {}", hybridLakeFlussSplits);
261262
// handle hybrid lake fluss splits firstly
262263
handleSplitsAdd(hybridLakeFlussSplits, null);
263264
}
@@ -616,6 +617,7 @@ private List<SourceSplitBase> generateHybridLakeFlussSplits() {
616617
// should be restored from checkpoint, shouldn't
617618
// list splits again
618619
if (pendingHybridLakeFlussSplits != null) {
620+
LOG.info("Still have pending lake fluss splits, shouldn't list splits again.");
619621
return pendingHybridLakeFlussSplits;
620622
}
621623
try {

fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSplit.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,4 +72,14 @@ public DataSplit dataSplit() {
7272
public boolean isBucketUnAware() {
7373
return isBucketUnAware;
7474
}
75+
76+
@Override
77+
public String toString() {
78+
return "PaimonSplit{"
79+
+ "dataSplit="
80+
+ dataSplit
81+
+ ", isBucketUnAware="
82+
+ isBucketUnAware
83+
+ '}';
84+
}
7585
}

0 commit comments

Comments
 (0)