Skip to content

Commit 7d39de6

Browse files
committed
[rpc] Remove describeLakeStorage to get LakeStorageInfo from table properties
1 parent 83aef68 commit 7d39de6

File tree

16 files changed

+100
-96
lines changed

16 files changed

+100
-96
lines changed

fluss-client/src/main/java/com/alibaba/fluss/client/admin/Admin.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
import com.alibaba.fluss.exception.TableAlreadyExistException;
3535
import com.alibaba.fluss.exception.TableNotExistException;
3636
import com.alibaba.fluss.exception.TableNotPartitionedException;
37-
import com.alibaba.fluss.lakehouse.LakeStorageInfo;
3837
import com.alibaba.fluss.metadata.DatabaseDescriptor;
3938
import com.alibaba.fluss.metadata.DatabaseInfo;
4039
import com.alibaba.fluss.metadata.PartitionInfo;
@@ -325,7 +324,4 @@ ListOffsetsResult listOffsets(
325324
PhysicalTablePath physicalTablePath,
326325
Collection<Integer> buckets,
327326
OffsetSpec offsetSpec);
328-
329-
/** Describe the lake used for lakehouse storage. */
330-
CompletableFuture<LakeStorageInfo> describeLakeStorage();
331327
}

fluss-client/src/main/java/com/alibaba/fluss/client/admin/FlussAdmin.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import com.alibaba.fluss.client.utils.ClientRpcMessageUtils;
2424
import com.alibaba.fluss.cluster.Cluster;
2525
import com.alibaba.fluss.cluster.ServerNode;
26-
import com.alibaba.fluss.lakehouse.LakeStorageInfo;
2726
import com.alibaba.fluss.metadata.DatabaseDescriptor;
2827
import com.alibaba.fluss.metadata.DatabaseInfo;
2928
import com.alibaba.fluss.metadata.PartitionInfo;
@@ -42,7 +41,6 @@
4241
import com.alibaba.fluss.rpc.messages.CreateTableRequest;
4342
import com.alibaba.fluss.rpc.messages.DatabaseExistsRequest;
4443
import com.alibaba.fluss.rpc.messages.DatabaseExistsResponse;
45-
import com.alibaba.fluss.rpc.messages.DescribeLakeStorageRequest;
4644
import com.alibaba.fluss.rpc.messages.DropDatabaseRequest;
4745
import com.alibaba.fluss.rpc.messages.DropTableRequest;
4846
import com.alibaba.fluss.rpc.messages.GetDatabaseInfoRequest;
@@ -341,12 +339,6 @@ public ListOffsetsResult listOffsets(
341339
return new ListOffsetsResult(bucketToOffsetMap);
342340
}
343341

344-
@Override
345-
public CompletableFuture<LakeStorageInfo> describeLakeStorage() {
346-
return gateway.describeLakeStorage(new DescribeLakeStorageRequest())
347-
.thenApply(ClientRpcMessageUtils::toLakeStorageInfo);
348-
}
349-
350342
@Override
351343
public void close() {
352344
// nothing to do yet

fluss-client/src/main/java/com/alibaba/fluss/client/utils/ClientRpcMessageUtils.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
import com.alibaba.fluss.remote.RemoteLogFetchInfo;
3838
import com.alibaba.fluss.remote.RemoteLogSegment;
3939
import com.alibaba.fluss.rpc.entity.FetchLogResultForBucket;
40-
import com.alibaba.fluss.rpc.messages.DescribeLakeStorageResponse;
4140
import com.alibaba.fluss.rpc.messages.GetFileSystemSecurityTokenResponse;
4241
import com.alibaba.fluss.rpc.messages.GetKvSnapshotMetadataResponse;
4342
import com.alibaba.fluss.rpc.messages.GetLatestKvSnapshotsResponse;
@@ -387,10 +386,6 @@ public static List<PartitionInfo> toPartitionInfos(ListPartitionInfosResponse re
387386
.collect(Collectors.toList());
388387
}
389388

390-
public static LakeStorageInfo toLakeStorageInfo(DescribeLakeStorageResponse response) {
391-
return toLakeStorageInfo(response.getLakehouseStorageInfo());
392-
}
393-
394389
private static LakeStorageInfo toLakeStorageInfo(PbLakeStorageInfo pbLakeStorageInfo) {
395390
Map<String, String> dataLakeCatalogConfig =
396391
toKeyValueMap(pbLakeStorageInfo.getCatalogPropertiesList());

fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkCatalog.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.alibaba.fluss.connector.flink.lakehouse.LakeCatalog;
2525
import com.alibaba.fluss.connector.flink.utils.CatalogExceptionUtils;
2626
import com.alibaba.fluss.connector.flink.utils.FlinkConversions;
27+
import com.alibaba.fluss.connector.flink.utils.LakeStorageInfoUtils;
2728
import com.alibaba.fluss.exception.FlussRuntimeException;
2829
import com.alibaba.fluss.metadata.DatabaseDescriptor;
2930
import com.alibaba.fluss.metadata.TableDescriptor;
@@ -262,7 +263,7 @@ public CatalogBaseTable getTable(ObjectPath objectPath)
262263
objectPath.getDatabaseName(),
263264
tableName.split("\\" + LAKE_TABLE_SPLITTER)[0])));
264265
}
265-
return getLakeTable(objectPath.getDatabaseName(), tableName);
266+
return getLakeTable(objectPath.getDatabaseName(), tableName, tableInfo);
266267
} else {
267268
tableInfo = admin.getTableInfo(tablePath).get();
268269
}
@@ -284,9 +285,10 @@ public CatalogBaseTable getTable(ObjectPath objectPath)
284285
}
285286
}
286287

287-
protected CatalogBaseTable getLakeTable(String databaseName, String tableName)
288+
protected CatalogBaseTable getLakeTable(
289+
String databaseName, String tableName, TableInfo tableInfo)
288290
throws TableNotExistException, CatalogException {
289-
mayInitLakeCatalogCatalog();
291+
mayInitLakeCatalogCatalog(tableInfo);
290292
String[] tableComponents = tableName.split("\\" + LAKE_TABLE_SPLITTER);
291293
if (tableComponents.length == 1) {
292294
// should be pattern like table_name$lake
@@ -530,13 +532,14 @@ protected TablePath toTablePath(ObjectPath objectPath) {
530532
return TablePath.of(objectPath.getDatabaseName(), objectPath.getObjectName());
531533
}
532534

533-
private void mayInitLakeCatalogCatalog() {
535+
private void mayInitLakeCatalogCatalog(TableInfo tableInfo) {
534536
if (lakeCatalog == null) {
535537
synchronized (this) {
536538
if (lakeCatalog == null) {
537539
try {
538540
Map<String, String> catalogProperties =
539-
admin.describeLakeStorage().get().getCatalogProperties();
541+
LakeStorageInfoUtils.getLakeStorageInfo(tableInfo)
542+
.getCatalogProperties();
540543
lakeCatalog = new LakeCatalog(catalogName, catalogProperties, classLoader);
541544
} catch (Exception e) {
542545
throw new FlussRuntimeException("Failed to init paimon catalog.", e);

fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/lakehouse/LakeSplitReaderGenerator.java

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,14 @@
1717
package com.alibaba.fluss.connector.flink.lakehouse;
1818

1919
import com.alibaba.fluss.client.Connection;
20-
import com.alibaba.fluss.client.admin.Admin;
2120
import com.alibaba.fluss.client.table.Table;
2221
import com.alibaba.fluss.connector.flink.lakehouse.paimon.reader.PaimonSnapshotAndLogSplitScanner;
2322
import com.alibaba.fluss.connector.flink.lakehouse.paimon.reader.PaimonSnapshotScanner;
2423
import com.alibaba.fluss.connector.flink.lakehouse.paimon.split.PaimonSnapshotAndFlussLogSplit;
2524
import com.alibaba.fluss.connector.flink.lakehouse.paimon.split.PaimonSnapshotSplit;
2625
import com.alibaba.fluss.connector.flink.source.reader.BoundedSplitReader;
2726
import com.alibaba.fluss.connector.flink.source.split.SourceSplitBase;
28-
import com.alibaba.fluss.lakehouse.LakeStorageInfo;
27+
import com.alibaba.fluss.connector.flink.utils.LakeStorageInfoUtils;
2928
import com.alibaba.fluss.metadata.TablePath;
3029

3130
import org.apache.flink.util.ExceptionUtils;
@@ -120,18 +119,16 @@ private FileStoreTable getFileStoreTable() {
120119
return fileStoreTable;
121120
}
122121

123-
try (Admin admin = connection.getAdmin()) {
124-
LakeStorageInfo dataLakeInfo = admin.describeLakeStorage().get();
125-
try (Catalog paimonCatalog =
126-
FlinkCatalogFactory.createPaimonCatalog(
127-
Options.fromMap(dataLakeInfo.getCatalogProperties()))) {
128-
fileStoreTable =
129-
(FileStoreTable)
130-
paimonCatalog.getTable(
131-
Identifier.create(
132-
tablePath.getDatabaseName(),
133-
tablePath.getTableName()));
134-
}
122+
try (Catalog paimonCatalog =
123+
FlinkCatalogFactory.createPaimonCatalog(
124+
Options.fromMap(
125+
LakeStorageInfoUtils.getLakeStorageInfo(table.getTableInfo())
126+
.getCatalogProperties()))) {
127+
fileStoreTable =
128+
(FileStoreTable)
129+
paimonCatalog.getTable(
130+
Identifier.create(
131+
tablePath.getDatabaseName(), tablePath.getTableName()));
135132
return fileStoreTable;
136133
} catch (Exception e) {
137134
throw new FlinkRuntimeException(
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Copyright (c) 2024 Alibaba Group Holding Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.alibaba.fluss.connector.flink.utils;
18+
19+
import com.alibaba.fluss.config.ConfigOptions;
20+
import com.alibaba.fluss.lakehouse.LakeStorageInfo;
21+
import com.alibaba.fluss.metadata.DataLakeFormat;
22+
import com.alibaba.fluss.metadata.TableInfo;
23+
24+
import java.util.HashMap;
25+
import java.util.Map;
26+
27+
/** Utility class for {@link LakeStorageInfo}. */
28+
public class LakeStorageInfoUtils {
29+
30+
private static final String TABLE_DATALAKE_PAIMON_PREFIX = "table.datalake.paimon.";
31+
32+
public static LakeStorageInfo getLakeStorageInfo(TableInfo tableInfo) {
33+
DataLakeFormat datalakeFormat =
34+
tableInfo.getProperties().get(ConfigOptions.TABLE_DATALAKE_FORMAT);
35+
if (datalakeFormat == null) {
36+
throw new IllegalArgumentException(
37+
String.format(
38+
"The lakehouse storage is not set, please set it by %s",
39+
ConfigOptions.TABLE_DATALAKE_FORMAT.key()));
40+
}
41+
42+
if (datalakeFormat != DataLakeFormat.PAIMON) {
43+
throw new UnsupportedOperationException(
44+
String.format(
45+
"The lakehouse storage %s "
46+
+ " is not supported. Only %s is supported.",
47+
datalakeFormat, DataLakeFormat.PAIMON));
48+
}
49+
50+
// currently, extract catalog config
51+
Map<String, String> datalakeConfig = new HashMap<>();
52+
Map<String, String> flussConfig = tableInfo.getCustomProperties().toMap();
53+
for (Map.Entry<String, String> configEntry : flussConfig.entrySet()) {
54+
String configKey = configEntry.getKey();
55+
String configValue = configEntry.getValue();
56+
if (configKey.startsWith(TABLE_DATALAKE_PAIMON_PREFIX)) {
57+
datalakeConfig.put(
58+
configKey.substring(TABLE_DATALAKE_PAIMON_PREFIX.length()), configValue);
59+
}
60+
}
61+
return new LakeStorageInfo(datalakeFormat.toString(), datalakeConfig);
62+
}
63+
}

fluss-lakehouse/fluss-lakehouse-paimon/src/test/java/com/alibaba/fluss/lakehouse/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.alibaba.fluss.config.AutoPartitionTimeUnit;
2020
import com.alibaba.fluss.config.ConfigOptions;
2121
import com.alibaba.fluss.lakehouse.paimon.sink.PaimonDataBaseSyncSinkBuilder;
22+
import com.alibaba.fluss.metadata.DataLakeFormat;
2223
import com.alibaba.fluss.metadata.Schema;
2324
import com.alibaba.fluss.metadata.TableBucket;
2425
import com.alibaba.fluss.metadata.TableDescriptor;
@@ -148,7 +149,10 @@ void testPrimaryKeyTable(boolean isPartitioned) throws Exception {
148149
batchTEnv.executeSql(
149150
String.format("select * from %s$lake$options", tableName)));
150151
assertThat(paimonOptionsRows.toString())
151-
.isEqualTo("[+I[bucket, 3], +I[bucket-key, a], +I[changelog-producer, input]]");
152+
.isEqualTo(
153+
String.format(
154+
"[+I[bucket, 3], +I[bucket-key, a], +I[changelog-producer, input], +I[table.datalake.paimon.metastore, filesystem], +I[table.datalake.paimon.warehouse, %s]]",
155+
warehousePath));
152156

153157
// stop sync database job
154158
jobClient.cancel().get();
@@ -306,7 +310,10 @@ protected long createPkTable(TablePath tablePath, int bucketNum, boolean isParti
306310
TableDescriptor.Builder tableBuilder =
307311
TableDescriptor.builder()
308312
.distributedBy(bucketNum)
309-
.property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true");
313+
.property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true")
314+
.property(ConfigOptions.TABLE_DATALAKE_FORMAT, DataLakeFormat.PAIMON)
315+
.customProperty("table.datalake.paimon.metastore", "filesystem")
316+
.customProperty("table.datalake.paimon.warehouse", warehousePath);
310317

311318
if (isPartitioned) {
312319
schemaBuilder.column("c", DataTypes.STRING());

fluss-lakehouse/fluss-lakehouse-paimon/src/test/java/com/alibaba/fluss/lakehouse/paimon/testutils/FlinkPaimonTestBase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public class FlinkPaimonTestBase {
6666
protected static Connection conn;
6767
protected static Admin admin;
6868
protected static Configuration clientConf;
69-
private static String warehousePath;
69+
protected static String warehousePath;
7070

7171
private static Configuration initConfig() {
7272
Configuration conf = new Configuration();

fluss-lakehouse/fluss-lakehouse-paimon/src/test/java/com/alibaba/fluss/lakehouse/paimon/testutils/PaimonSyncTestBase.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.alibaba.fluss.lakehouse.paimon.sink.NewTablesAddedPaimonListener;
2424
import com.alibaba.fluss.lakehouse.paimon.sink.PaimonDataBaseSyncSinkBuilder;
2525
import com.alibaba.fluss.lakehouse.paimon.source.FlussDatabaseSyncSource;
26+
import com.alibaba.fluss.metadata.DataLakeFormat;
2627
import com.alibaba.fluss.metadata.Schema;
2728
import com.alibaba.fluss.metadata.TableBucket;
2829
import com.alibaba.fluss.metadata.TableDescriptor;
@@ -122,7 +123,10 @@ protected long createLogTable(TablePath tablePath, int bucketNum, boolean isPart
122123
TableDescriptor.Builder tableBuilder =
123124
TableDescriptor.builder()
124125
.distributedBy(bucketNum, "a")
125-
.property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true");
126+
.property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true")
127+
.property(ConfigOptions.TABLE_DATALAKE_FORMAT, DataLakeFormat.PAIMON)
128+
.customProperty("table.datalake.paimon.metastore", "filesystem")
129+
.customProperty("table.datalake.paimon.warehouse", warehousePath);
126130

127131
if (isPartitioned) {
128132
schemaBuilder.column("c", DataTypes.STRING());
@@ -150,6 +154,9 @@ protected long createPkTable(TablePath tablePath, int bucketNum) throws Exceptio
150154
.build())
151155
.distributedBy(bucketNum)
152156
.property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true")
157+
.property(ConfigOptions.TABLE_DATALAKE_FORMAT, DataLakeFormat.PAIMON)
158+
.customProperty("table.datalake.paimon.metastore", "filesystem")
159+
.customProperty("table.datalake.paimon.warehouse", warehousePath)
153160
.build();
154161
return createTable(tablePath, table1Descriptor);
155162
}

fluss-rpc/src/main/java/com/alibaba/fluss/rpc/gateway/AdminReadOnlyGateway.java

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@
1919
import com.alibaba.fluss.rpc.RpcGateway;
2020
import com.alibaba.fluss.rpc.messages.DatabaseExistsRequest;
2121
import com.alibaba.fluss.rpc.messages.DatabaseExistsResponse;
22-
import com.alibaba.fluss.rpc.messages.DescribeLakeStorageRequest;
23-
import com.alibaba.fluss.rpc.messages.DescribeLakeStorageResponse;
2422
import com.alibaba.fluss.rpc.messages.GetDatabaseInfoRequest;
2523
import com.alibaba.fluss.rpc.messages.GetDatabaseInfoResponse;
2624
import com.alibaba.fluss.rpc.messages.GetFileSystemSecurityTokenRequest;
@@ -179,15 +177,6 @@ CompletableFuture<GetFileSystemSecurityTokenResponse> getFileSystemSecurityToken
179177
CompletableFuture<ListPartitionInfosResponse> listPartitionInfos(
180178
ListPartitionInfosRequest request);
181179

182-
/**
183-
* Describe the lake storage used for Fluss.
184-
*
185-
* @return a future returns lake storage info
186-
*/
187-
@RPC(api = ApiKeys.DESCRIBE_LAKE_STORAGE)
188-
CompletableFuture<DescribeLakeStorageResponse> describeLakeStorage(
189-
DescribeLakeStorageRequest request);
190-
191180
/**
192181
* Get the latest lake snapshot for the given table.
193182
*

0 commit comments

Comments
 (0)