Skip to content

Commit 6c38e4e

Browse files
authored
[rpc] Remove describeLakeStorage RPC to get lake catalog from table properties (#503)
1 parent f29018e commit 6c38e4e

File tree

33 files changed

+178
-325
lines changed

33 files changed

+178
-325
lines changed

fluss-client/src/main/java/com/alibaba/fluss/client/admin/Admin.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
import com.alibaba.fluss.exception.TableAlreadyExistException;
3737
import com.alibaba.fluss.exception.TableNotExistException;
3838
import com.alibaba.fluss.exception.TableNotPartitionedException;
39-
import com.alibaba.fluss.lakehouse.LakeStorageInfo;
4039
import com.alibaba.fluss.metadata.DatabaseDescriptor;
4140
import com.alibaba.fluss.metadata.DatabaseInfo;
4241
import com.alibaba.fluss.metadata.PartitionInfo;
@@ -372,7 +371,4 @@ ListOffsetsResult listOffsets(
372371
PhysicalTablePath physicalTablePath,
373372
Collection<Integer> buckets,
374373
OffsetSpec offsetSpec);
375-
376-
/** Describe the lake used for lakehouse storage. */
377-
CompletableFuture<LakeStorageInfo> describeLakeStorage();
378374
}

fluss-client/src/main/java/com/alibaba/fluss/client/admin/FlussAdmin.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import com.alibaba.fluss.client.utils.ClientRpcMessageUtils;
2424
import com.alibaba.fluss.cluster.Cluster;
2525
import com.alibaba.fluss.cluster.ServerNode;
26-
import com.alibaba.fluss.lakehouse.LakeStorageInfo;
2726
import com.alibaba.fluss.metadata.DatabaseDescriptor;
2827
import com.alibaba.fluss.metadata.DatabaseInfo;
2928
import com.alibaba.fluss.metadata.PartitionInfo;
@@ -43,7 +42,6 @@
4342
import com.alibaba.fluss.rpc.messages.CreateTableRequest;
4443
import com.alibaba.fluss.rpc.messages.DatabaseExistsRequest;
4544
import com.alibaba.fluss.rpc.messages.DatabaseExistsResponse;
46-
import com.alibaba.fluss.rpc.messages.DescribeLakeStorageRequest;
4745
import com.alibaba.fluss.rpc.messages.DropDatabaseRequest;
4846
import com.alibaba.fluss.rpc.messages.DropTableRequest;
4947
import com.alibaba.fluss.rpc.messages.GetDatabaseInfoRequest;
@@ -360,12 +358,6 @@ public ListOffsetsResult listOffsets(
360358
return new ListOffsetsResult(bucketToOffsetMap);
361359
}
362360

363-
@Override
364-
public CompletableFuture<LakeStorageInfo> describeLakeStorage() {
365-
return gateway.describeLakeStorage(new DescribeLakeStorageRequest())
366-
.thenApply(ClientRpcMessageUtils::toLakeStorageInfo);
367-
}
368-
369361
@Override
370362
public void close() {
371363
// nothing to do yet

fluss-client/src/main/java/com/alibaba/fluss/client/metadata/LakeSnapshot.java

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,33 +17,25 @@
1717
package com.alibaba.fluss.client.metadata;
1818

1919
import com.alibaba.fluss.annotation.PublicEvolving;
20-
import com.alibaba.fluss.lakehouse.LakeStorageInfo;
2120
import com.alibaba.fluss.metadata.TableBucket;
2221

2322
import java.util.Map;
2423

2524
/**
2625
* A class representing the lake snapshot information of a table. It contains:
27-
* <li>The lake storage info
2826
* <li>The snapshot id and the log offset for each bucket.
2927
*
3028
* @since 0.3
3129
*/
3230
@PublicEvolving
3331
public class LakeSnapshot {
3432

35-
private final LakeStorageInfo lakeStorageInfo;
36-
3733
private final long snapshotId;
3834

3935
// the specific log offset of the snapshot
4036
private final Map<TableBucket, Long> tableBucketsOffset;
4137

42-
public LakeSnapshot(
43-
LakeStorageInfo lakeStorageInfo,
44-
long snapshotId,
45-
Map<TableBucket, Long> tableBucketsOffset) {
46-
this.lakeStorageInfo = lakeStorageInfo;
38+
public LakeSnapshot(long snapshotId, Map<TableBucket, Long> tableBucketsOffset) {
4739
this.snapshotId = snapshotId;
4840
this.tableBucketsOffset = tableBucketsOffset;
4941
}
@@ -52,10 +44,6 @@ public long getSnapshotId() {
5244
return snapshotId;
5345
}
5446

55-
public LakeStorageInfo getLakeStorageInfo() {
56-
return lakeStorageInfo;
57-
}
58-
5947
public Map<TableBucket, Long> getTableBucketsOffset() {
6048
return tableBucketsOffset;
6149
}

fluss-client/src/main/java/com/alibaba/fluss/client/utils/ClientRpcMessageUtils.java

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import com.alibaba.fluss.fs.FsPath;
2828
import com.alibaba.fluss.fs.FsPathAndFileName;
2929
import com.alibaba.fluss.fs.token.ObtainedSecurityToken;
30-
import com.alibaba.fluss.lakehouse.LakeStorageInfo;
3130
import com.alibaba.fluss.metadata.PartitionInfo;
3231
import com.alibaba.fluss.metadata.PartitionSpec;
3332
import com.alibaba.fluss.metadata.PhysicalTablePath;
@@ -39,7 +38,6 @@
3938
import com.alibaba.fluss.remote.RemoteLogSegment;
4039
import com.alibaba.fluss.rpc.entity.FetchLogResultForBucket;
4140
import com.alibaba.fluss.rpc.messages.CreatePartitionRequest;
42-
import com.alibaba.fluss.rpc.messages.DescribeLakeStorageResponse;
4341
import com.alibaba.fluss.rpc.messages.DropPartitionRequest;
4442
import com.alibaba.fluss.rpc.messages.GetFileSystemSecurityTokenResponse;
4543
import com.alibaba.fluss.rpc.messages.GetKvSnapshotMetadataResponse;
@@ -53,7 +51,6 @@
5351
import com.alibaba.fluss.rpc.messages.PbKeyValue;
5452
import com.alibaba.fluss.rpc.messages.PbKvSnapshot;
5553
import com.alibaba.fluss.rpc.messages.PbLakeSnapshotForBucket;
56-
import com.alibaba.fluss.rpc.messages.PbLakeStorageInfo;
5754
import com.alibaba.fluss.rpc.messages.PbLookupReqForBucket;
5855
import com.alibaba.fluss.rpc.messages.PbPhysicalTablePath;
5956
import com.alibaba.fluss.rpc.messages.PbPrefixLookupReqForBucket;
@@ -284,7 +281,6 @@ public static KvSnapshotMetadata toKvSnapshotMetadata(GetKvSnapshotMetadataRespo
284281
}
285282

286283
public static LakeSnapshot toLakeTableSnapshotInfo(GetLatestLakeSnapshotResponse response) {
287-
LakeStorageInfo lakeStorageInfo = toLakeStorageInfo(response.getLakehouseStorageInfo());
288284
long tableId = response.getTableId();
289285
long snapshotId = response.getSnapshotId();
290286
Map<TableBucket, Long> tableBucketsOffset =
@@ -298,7 +294,7 @@ public static LakeSnapshot toLakeTableSnapshotInfo(GetLatestLakeSnapshotResponse
298294
new TableBucket(tableId, partitionId, pbLakeSnapshotForBucket.getBucketId());
299295
tableBucketsOffset.put(tableBucket, pbLakeSnapshotForBucket.getLogOffset());
300296
}
301-
return new LakeSnapshot(lakeStorageInfo, snapshotId, tableBucketsOffset);
297+
return new LakeSnapshot(snapshotId, tableBucketsOffset);
302298
}
303299

304300
public static List<FsPathAndFileName> toFsPathAndFileName(
@@ -428,16 +424,6 @@ public static List<PartitionInfo> toPartitionInfos(ListPartitionInfosResponse re
428424
.collect(Collectors.toList());
429425
}
430426

431-
public static LakeStorageInfo toLakeStorageInfo(DescribeLakeStorageResponse response) {
432-
return toLakeStorageInfo(response.getLakehouseStorageInfo());
433-
}
434-
435-
private static LakeStorageInfo toLakeStorageInfo(PbLakeStorageInfo pbLakeStorageInfo) {
436-
Map<String, String> dataLakeCatalogConfig =
437-
toKeyValueMap(pbLakeStorageInfo.getCatalogPropertiesList());
438-
return new LakeStorageInfo(pbLakeStorageInfo.getLakeStorageType(), dataLakeCatalogConfig);
439-
}
440-
441427
public static Map<String, String> toKeyValueMap(List<PbKeyValue> pbKeyValues) {
442428
return pbKeyValues.stream()
443429
.collect(

fluss-common/src/main/java/com/alibaba/fluss/lakehouse/LakeStorageInfo.java

Lines changed: 0 additions & 40 deletions
This file was deleted.

fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkCatalog.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.alibaba.fluss.config.Configuration;
2424
import com.alibaba.fluss.connector.flink.lakehouse.LakeCatalog;
2525
import com.alibaba.fluss.connector.flink.utils.CatalogExceptionUtils;
26+
import com.alibaba.fluss.connector.flink.utils.DataLakeUtils;
2627
import com.alibaba.fluss.connector.flink.utils.FlinkConversions;
2728
import com.alibaba.fluss.exception.FlussRuntimeException;
2829
import com.alibaba.fluss.metadata.DatabaseDescriptor;
@@ -270,7 +271,8 @@ public CatalogBaseTable getTable(ObjectPath objectPath)
270271
objectPath.getDatabaseName(),
271272
tableName.split("\\" + LAKE_TABLE_SPLITTER)[0])));
272273
}
273-
return getLakeTable(objectPath.getDatabaseName(), tableName);
274+
return getLakeTable(
275+
objectPath.getDatabaseName(), tableName, tableInfo.getProperties());
274276
} else {
275277
tableInfo = admin.getTableInfo(tablePath).get();
276278
}
@@ -292,9 +294,10 @@ public CatalogBaseTable getTable(ObjectPath objectPath)
292294
}
293295
}
294296

295-
protected CatalogBaseTable getLakeTable(String databaseName, String tableName)
297+
protected CatalogBaseTable getLakeTable(
298+
String databaseName, String tableName, Configuration properties)
296299
throws TableNotExistException, CatalogException {
297-
mayInitLakeCatalogCatalog();
300+
mayInitLakeCatalogCatalog(properties);
298301
String[] tableComponents = tableName.split("\\" + LAKE_TABLE_SPLITTER);
299302
if (tableComponents.length == 1) {
300303
// should be pattern like table_name$lake
@@ -629,13 +632,17 @@ protected TablePath toTablePath(ObjectPath objectPath) {
629632
return TablePath.of(objectPath.getDatabaseName(), objectPath.getObjectName());
630633
}
631634

632-
private void mayInitLakeCatalogCatalog() {
635+
private void mayInitLakeCatalogCatalog(Configuration tableOptions) {
636+
// TODO: Currently, a Fluss cluster only supports a single DataLake storage. However, in the
637+
// future, it may support multiple DataLakes. The following code assumes that a single
638+
// lakeCatalog is shared across multiple tables, which will no longer be valid in such
639+
// cases and should be updated accordingly.
633640
if (lakeCatalog == null) {
634641
synchronized (this) {
635642
if (lakeCatalog == null) {
636643
try {
637644
Map<String, String> catalogProperties =
638-
admin.describeLakeStorage().get().getCatalogProperties();
645+
DataLakeUtils.extractLakeCatalogProperties(tableOptions);
639646
lakeCatalog = new LakeCatalog(catalogName, catalogProperties, classLoader);
640647
} catch (Exception e) {
641648
throw new FlussRuntimeException("Failed to init paimon catalog.", e);

fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/lakehouse/LakeSplitGenerator.java

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import com.alibaba.fluss.connector.flink.source.split.SourceSplitBase;
2626
import com.alibaba.fluss.metadata.PartitionInfo;
2727
import com.alibaba.fluss.metadata.TableBucket;
28-
import com.alibaba.fluss.metadata.TablePath;
28+
import com.alibaba.fluss.metadata.TableInfo;
2929

3030
import org.apache.paimon.CoreOptions;
3131
import org.apache.paimon.catalog.Catalog;
@@ -48,6 +48,7 @@
4848
import java.util.stream.IntStream;
4949

5050
import static com.alibaba.fluss.client.table.scanner.log.LogScanner.EARLIEST_OFFSET;
51+
import static com.alibaba.fluss.connector.flink.utils.DataLakeUtils.extractLakeCatalogProperties;
5152
import static com.alibaba.fluss.utils.Preconditions.checkState;
5253

5354
/**
@@ -57,22 +58,19 @@
5758
*/
5859
public class LakeSplitGenerator {
5960

60-
private final long tableId;
61-
private final TablePath tablePath;
61+
private final TableInfo tableInfo;
6262
private final Admin flussAdmin;
6363
private final OffsetsInitializer.BucketOffsetsRetriever bucketOffsetsRetriever;
6464
private final OffsetsInitializer stoppingOffsetInitializer;
6565
private final int bucketCount;
6666

6767
public LakeSplitGenerator(
68-
long tableId,
69-
TablePath tablePath,
68+
TableInfo tableInfo,
7069
Admin flussAdmin,
7170
OffsetsInitializer.BucketOffsetsRetriever bucketOffsetsRetriever,
7271
OffsetsInitializer stoppingOffsetInitializer,
7372
int bucketCount) {
74-
this.tableId = tableId;
75-
this.tablePath = tablePath;
73+
this.tableInfo = tableInfo;
7674
this.flussAdmin = flussAdmin;
7775
this.bucketOffsetsRetriever = bucketOffsetsRetriever;
7876
this.stoppingOffsetInitializer = stoppingOffsetInitializer;
@@ -81,16 +79,18 @@ public LakeSplitGenerator(
8179

8280
public List<SourceSplitBase> generateLakeSplits() throws Exception {
8381
// get the file store
84-
LakeSnapshot lakeSnapshotInfo = flussAdmin.getLatestLakeSnapshot(tablePath).get();
82+
LakeSnapshot lakeSnapshotInfo =
83+
flussAdmin.getLatestLakeSnapshot(tableInfo.getTablePath()).get();
8584
FileStoreTable fileStoreTable =
8685
getTable(
8786
lakeSnapshotInfo.getSnapshotId(),
88-
lakeSnapshotInfo.getLakeStorageInfo().getCatalogProperties());
87+
extractLakeCatalogProperties(tableInfo.getProperties()));
8988
boolean isLogTable = fileStoreTable.schema().primaryKeys().isEmpty();
9089
boolean isPartitioned = !fileStoreTable.schema().partitionKeys().isEmpty();
9190

9291
if (isPartitioned) {
93-
List<PartitionInfo> partitionInfos = flussAdmin.listPartitionInfos(tablePath).get();
92+
List<PartitionInfo> partitionInfos =
93+
flussAdmin.listPartitionInfos(tableInfo.getTablePath()).get();
9494
Map<Long, String> partitionNameById =
9595
partitionInfos.stream()
9696
.collect(
@@ -152,7 +152,8 @@ private List<SourceSplitBase> generateSplit(
152152
generateSplitForLogSnapshot(
153153
fileStoreTable, splitGenerator, partitionId, partitionName));
154154
for (int bucket = 0; bucket < bucketCount; bucket++) {
155-
TableBucket tableBucket = new TableBucket(tableId, partitionId, bucket);
155+
TableBucket tableBucket =
156+
new TableBucket(tableInfo.getTableId(), partitionId, bucket);
156157
Long snapshotLogOffset = tableBucketSnapshotLogOffset.get(tableBucket);
157158
long stoppingOffset = bucketEndOffset.get(bucket);
158159
if (snapshotLogOffset == null) {
@@ -175,7 +176,8 @@ private List<SourceSplitBase> generateSplit(
175176
} else {
176177
// it's primary key table
177178
for (int bucket = 0; bucket < bucketCount; bucket++) {
178-
TableBucket tableBucket = new TableBucket(tableId, partitionId, bucket);
179+
TableBucket tableBucket =
180+
new TableBucket(tableInfo.getTableId(), partitionId, bucket);
179181
Long snapshotLogOffset = tableBucketSnapshotLogOffset.get(tableBucket);
180182
long stoppingOffset = bucketEndOffset.get(bucket);
181183
splits.add(
@@ -205,7 +207,7 @@ private List<SourceSplitBase> generateSplitForLogSnapshot(
205207
}
206208
// for snapshot splits, we always use bucket = -1 ad the bucket since we can't get bucket in
207209
// paimon's log table
208-
TableBucket tableBucket = new TableBucket(tableId, partitionId, -1);
210+
TableBucket tableBucket = new TableBucket(tableInfo.getTableId(), partitionId, -1);
209211
// snapshot splits + one log split
210212
for (FileStoreSourceSplit fileStoreSourceSplit : splitGenerator.createSplits(scan.plan())) {
211213
splits.add(new PaimonSnapshotSplit(tableBucket, partitionName, fileStoreSourceSplit));
@@ -219,7 +221,7 @@ private Map<String, String> getPartitionSpec(
219221
checkState(
220222
partitionKeys.size() == 1,
221223
"Must only one partition key for paimon table %, but got %s, the partition keys are: ",
222-
tablePath,
224+
tableInfo.getTablePath(),
223225
partitionKeys.size(),
224226
partitionKeys.size());
225227
return Collections.singletonMap(partitionKeys.get(0), partitionName);
@@ -297,7 +299,8 @@ private FileStoreTable getTable(long snapshotId, Map<String, String> catalogProp
297299
return (FileStoreTable)
298300
catalog.getTable(
299301
Identifier.create(
300-
tablePath.getDatabaseName(), tablePath.getTableName()))
302+
tableInfo.getTablePath().getDatabaseName(),
303+
tableInfo.getTablePath().getTableName()))
301304
.copy(
302305
Collections.singletonMap(
303306
CoreOptions.SCAN_SNAPSHOT_ID.key(),

fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/lakehouse/LakeSplitReaderGenerator.java

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,14 @@
1717
package com.alibaba.fluss.connector.flink.lakehouse;
1818

1919
import com.alibaba.fluss.client.Connection;
20-
import com.alibaba.fluss.client.admin.Admin;
2120
import com.alibaba.fluss.client.table.Table;
2221
import com.alibaba.fluss.connector.flink.lakehouse.paimon.reader.PaimonSnapshotAndLogSplitScanner;
2322
import com.alibaba.fluss.connector.flink.lakehouse.paimon.reader.PaimonSnapshotScanner;
2423
import com.alibaba.fluss.connector.flink.lakehouse.paimon.split.PaimonSnapshotAndFlussLogSplit;
2524
import com.alibaba.fluss.connector.flink.lakehouse.paimon.split.PaimonSnapshotSplit;
2625
import com.alibaba.fluss.connector.flink.source.reader.BoundedSplitReader;
2726
import com.alibaba.fluss.connector.flink.source.split.SourceSplitBase;
28-
import com.alibaba.fluss.lakehouse.LakeStorageInfo;
27+
import com.alibaba.fluss.connector.flink.utils.DataLakeUtils;
2928
import com.alibaba.fluss.metadata.TablePath;
3029

3130
import org.apache.flink.util.ExceptionUtils;
@@ -120,18 +119,16 @@ private FileStoreTable getFileStoreTable() {
120119
return fileStoreTable;
121120
}
122121

123-
try (Admin admin = connection.getAdmin()) {
124-
LakeStorageInfo dataLakeInfo = admin.describeLakeStorage().get();
125-
try (Catalog paimonCatalog =
126-
FlinkCatalogFactory.createPaimonCatalog(
127-
Options.fromMap(dataLakeInfo.getCatalogProperties()))) {
128-
fileStoreTable =
129-
(FileStoreTable)
130-
paimonCatalog.getTable(
131-
Identifier.create(
132-
tablePath.getDatabaseName(),
133-
tablePath.getTableName()));
134-
}
122+
try (Catalog paimonCatalog =
123+
FlinkCatalogFactory.createPaimonCatalog(
124+
Options.fromMap(
125+
DataLakeUtils.extractLakeCatalogProperties(
126+
table.getTableInfo().getProperties())))) {
127+
fileStoreTable =
128+
(FileStoreTable)
129+
paimonCatalog.getTable(
130+
Identifier.create(
131+
tablePath.getDatabaseName(), tablePath.getTableName()));
135132
return fileStoreTable;
136133
} catch (Exception e) {
137134
throw new FlinkRuntimeException(

0 commit comments

Comments
 (0)