Skip to content

Commit cb11779

Browse files
authored
[flink] Disable count push down for lake enabled table and catch exception when create lake source (#1679)
1 parent cf5d24e commit cb11779

File tree

2 files changed

+18
-10
lines changed

2 files changed

+18
-10
lines changed

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,10 @@ public FlinkTableSource(
185185
this.mergeEngineType = mergeEngineType;
186186
this.tableOptions = tableOptions;
187187
if (isDataLakeEnabled) {
188-
this.lakeSource = createLakeSource(tablePath, tableOptions);
188+
this.lakeSource =
189+
checkNotNull(
190+
createLakeSource(tablePath, tableOptions),
191+
"LakeSource must not be null if enable datalake");
189192
}
190193
}
191194

@@ -567,7 +570,11 @@ public boolean applyAggregates(
567570
|| aggregateExpressions.size() != 1
568571
|| hasPrimaryKey()
569572
|| groupingSets.size() > 1
570-
|| (groupingSets.size() == 1 && groupingSets.get(0).length > 0)) {
573+
|| (groupingSets.size() == 1 && groupingSets.get(0).length > 0)
574+
// The count pushdown feature is not supported when the data lake is enabled.
575+
// Otherwise, it'll cause miss count data in lake. But In the future, we can push
576+
// down count into lake.
577+
|| isDataLakeEnabled) {
571578
return false;
572579
}
573580

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/LakeSourceUtils.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -60,19 +60,20 @@ public static LakeSource<LakeSplit> createLakeSource(
6060
try {
6161
lakeStoragePlugin = LakeStoragePluginSetUp.fromDataLakeFormat(dataLake, null);
6262
} catch (UnsupportedOperationException e) {
63-
LOG.info(
64-
"No LakeStoragePlugin can be found for datalake format: {}, return null to disable reading from lake source.",
65-
dataLake);
66-
return null;
63+
throw new UnsupportedOperationException(
64+
String.format(
65+
"No LakeStoragePlugin available for data lake format: %s. "
66+
+ "To resolve this, ensure fluss-lake-%s.jar is in the classpath.",
67+
dataLake, dataLake.toLowerCase()));
6768
}
6869
LakeStorage lakeStorage = checkNotNull(lakeStoragePlugin).createLakeStorage(lakeConfig);
6970
try {
7071
return (LakeSource<LakeSplit>) lakeStorage.createLakeSource(tablePath);
7172
} catch (UnsupportedOperationException e) {
72-
LOG.info(
73-
"method createLakeSource throw UnsupportedOperationException for datalake format {}, return null as lakeSource to disable reading from lake source.",
74-
dataLake);
75-
return null;
73+
throw new UnsupportedOperationException(
74+
String.format(
75+
"Table using '%s' data lake format cannot be used as historical data in Fluss.",
76+
dataLake));
7677
}
7778
}
7879
}

0 commit comments

Comments
 (0)