Skip to content

Commit 4eefb80

Browse files
committed
improve code
1 parent f14578e commit 4eefb80

File tree

14 files changed

+26
-93
lines changed

14 files changed

+26
-93
lines changed

fluss-client/src/main/java/com/alibaba/fluss/client/utils/ClientRpcMessageUtils.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import com.alibaba.fluss.fs.FsPath;
2828
import com.alibaba.fluss.fs.FsPathAndFileName;
2929
import com.alibaba.fluss.fs.token.ObtainedSecurityToken;
30-
import com.alibaba.fluss.lakehouse.LakeStorageInfo;
3130
import com.alibaba.fluss.metadata.PartitionInfo;
3231
import com.alibaba.fluss.metadata.PartitionSpec;
3332
import com.alibaba.fluss.metadata.PhysicalTablePath;
@@ -52,7 +51,6 @@
5251
import com.alibaba.fluss.rpc.messages.PbKeyValue;
5352
import com.alibaba.fluss.rpc.messages.PbKvSnapshot;
5453
import com.alibaba.fluss.rpc.messages.PbLakeSnapshotForBucket;
55-
import com.alibaba.fluss.rpc.messages.PbLakeStorageInfo;
5654
import com.alibaba.fluss.rpc.messages.PbLookupReqForBucket;
5755
import com.alibaba.fluss.rpc.messages.PbPhysicalTablePath;
5856
import com.alibaba.fluss.rpc.messages.PbPrefixLookupReqForBucket;
@@ -283,7 +281,6 @@ public static KvSnapshotMetadata toKvSnapshotMetadata(GetKvSnapshotMetadataRespo
283281
}
284282

285283
public static LakeSnapshot toLakeTableSnapshotInfo(GetLatestLakeSnapshotResponse response) {
286-
LakeStorageInfo lakeStorageInfo = toLakeStorageInfo(response.getLakehouseStorageInfo());
287284
long tableId = response.getTableId();
288285
long snapshotId = response.getSnapshotId();
289286
Map<TableBucket, Long> tableBucketsOffset =
@@ -427,12 +424,6 @@ public static List<PartitionInfo> toPartitionInfos(ListPartitionInfosResponse re
427424
.collect(Collectors.toList());
428425
}
429426

430-
private static LakeStorageInfo toLakeStorageInfo(PbLakeStorageInfo pbLakeStorageInfo) {
431-
Map<String, String> dataLakeCatalogConfig =
432-
toKeyValueMap(pbLakeStorageInfo.getCatalogPropertiesList());
433-
return new LakeStorageInfo(pbLakeStorageInfo.getLakeStorageType(), dataLakeCatalogConfig);
434-
}
435-
436427
public static Map<String, String> toKeyValueMap(List<PbKeyValue> pbKeyValues) {
437428
return pbKeyValues.stream()
438429
.collect(

fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/source/testutils/FlinkTestBase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ public static Map<Long, String> waitUntilPartitions(
233233
public static Map<Long, String> createPartitions(
234234
ZooKeeperClient zkClient, TablePath tablePath, List<String> partitionsToCreate)
235235
throws Exception {
236-
MetadataManager metadataManager = new MetadataManager(zkClient);
236+
MetadataManager metadataManager = new MetadataManager(zkClient, new Configuration());
237237
TableInfo tableInfo = metadataManager.getTable(tablePath);
238238
Map<Long, String> newPartitionIds = new HashMap<>();
239239
for (String partition : partitionsToCreate) {

fluss-rpc/src/main/proto/FlussApi.proto

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -343,10 +343,9 @@ message GetLatestLakeSnapshotRequest {
343343
}
344344

345345
message GetLatestLakeSnapshotResponse {
346-
required PbLakeStorageInfo lakehouse_storage_info = 1;
347-
required int64 table_id = 2;
348-
required int64 snapshotId = 3;
349-
repeated PbLakeSnapshotForBucket bucket_snapshots = 4;
346+
required int64 table_id = 1;
347+
required int64 snapshotId = 2;
348+
repeated PbLakeSnapshotForBucket bucket_snapshots = 3;
350349
}
351350

352351
message GetFileSystemSecurityTokenRequest {
@@ -731,9 +730,4 @@ message PbPartitionInfo {
731730

732731
message PbPartitionSpec {
733732
repeated PbKeyValue partition_key_values = 1;
734-
}
735-
736-
message PbLakeStorageInfo {
737-
required string lake_storage_type = 1;
738-
repeated PbKeyValue catalog_properties = 2;
739733
}

fluss-server/src/main/java/com/alibaba/fluss/server/RpcServiceBase.java

Lines changed: 2 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -19,19 +19,15 @@
1919
import com.alibaba.fluss.cluster.BucketLocation;
2020
import com.alibaba.fluss.cluster.ServerNode;
2121
import com.alibaba.fluss.cluster.ServerType;
22-
import com.alibaba.fluss.config.ConfigOptions;
23-
import com.alibaba.fluss.config.Configuration;
2422
import com.alibaba.fluss.exception.FlussRuntimeException;
2523
import com.alibaba.fluss.exception.KvSnapshotNotExistException;
26-
import com.alibaba.fluss.exception.LakeStorageNotConfiguredException;
2724
import com.alibaba.fluss.exception.LakeTableSnapshotNotExistException;
2825
import com.alibaba.fluss.exception.NonPrimaryKeyTableException;
2926
import com.alibaba.fluss.exception.PartitionNotExistException;
3027
import com.alibaba.fluss.exception.SecurityTokenException;
3128
import com.alibaba.fluss.exception.TableNotPartitionedException;
3229
import com.alibaba.fluss.fs.FileSystem;
3330
import com.alibaba.fluss.fs.token.ObtainedSecurityToken;
34-
import com.alibaba.fluss.lakehouse.LakeStorageInfo;
3531
import com.alibaba.fluss.metadata.DatabaseInfo;
3632
import com.alibaba.fluss.metadata.PhysicalTablePath;
3733
import com.alibaba.fluss.metadata.SchemaInfo;
@@ -84,7 +80,6 @@
8480
import com.alibaba.fluss.server.metadata.ServerMetadataCache;
8581
import com.alibaba.fluss.server.metadata.TableMetadataInfo;
8682
import com.alibaba.fluss.server.tablet.TabletService;
87-
import com.alibaba.fluss.server.utils.LakeStorageUtils;
8883
import com.alibaba.fluss.server.utils.RpcMessageUtils;
8984
import com.alibaba.fluss.server.zk.ZooKeeperClient;
9085
import com.alibaba.fluss.server.zk.data.BucketAssignment;
@@ -133,11 +128,7 @@ public abstract class RpcServiceBase extends RpcGatewayService implements AdminR
133128
private long tokenLastUpdateTimeMs = 0;
134129
private ObtainedSecurityToken securityToken = null;
135130

136-
private @Nullable final LakeStorageInfo lakeStorageInfo;
137-
private @Nullable final Map<String, String> tableDataLakeProperties;
138-
139131
public RpcServiceBase(
140-
Configuration config,
141132
FileSystem remoteFileSystem,
142133
ServerType provider,
143134
ZooKeeperClient zkClient,
@@ -149,11 +140,6 @@ public RpcServiceBase(
149140
this.zkClient = zkClient;
150141
this.metadataCache = metadataCache;
151142
this.metadataManager = metadataManager;
152-
this.lakeStorageInfo =
153-
config.get(ConfigOptions.DATALAKE_FORMAT) != null
154-
? LakeStorageUtils.getLakeStorageInfo(config)
155-
: null;
156-
this.tableDataLakeProperties = LakeStorageUtils.getTableDataLakeProperties(config);
157143
}
158144

159145
@Override
@@ -216,7 +202,7 @@ public CompletableFuture<ListTablesResponse> listTables(ListTablesRequest reques
216202
public CompletableFuture<GetTableInfoResponse> getTableInfo(GetTableInfoRequest request) {
217203
GetTableInfoResponse response = new GetTableInfoResponse();
218204
TablePath tablePath = toTablePath(request.getTablePath());
219-
TableInfo tableInfo = metadataManager.getTable(tablePath, tableDataLakeProperties);
205+
TableInfo tableInfo = metadataManager.getTable(tablePath);
220206
response.setTableJson(tableInfo.toTableDescriptor().toJsonBytes())
221207
.setSchemaId(tableInfo.getSchemaId())
222208
.setTableId(tableInfo.getTableId())
@@ -413,10 +399,6 @@ public CompletableFuture<ListPartitionInfosResponse> listPartitionInfos(
413399
@Override
414400
public CompletableFuture<GetLatestLakeSnapshotResponse> getLatestLakeSnapshot(
415401
GetLatestLakeSnapshotRequest request) {
416-
if (lakeStorageInfo == null) {
417-
throw new LakeStorageNotConfiguredException("Lake storage is not configured.");
418-
}
419-
420402
// get table info
421403
TablePath tablePath = toTablePath(request.getTablePath());
422404
TableInfo tableInfo = metadataManager.getTable(tablePath);
@@ -443,8 +425,7 @@ public CompletableFuture<GetLatestLakeSnapshotResponse> getLatestLakeSnapshot(
443425

444426
LakeTableSnapshot lakeTableSnapshot = optLakeTableSnapshot.get();
445427
return CompletableFuture.completedFuture(
446-
RpcMessageUtils.makeGetLatestLakeSnapshotResponse(
447-
tableId, lakeStorageInfo, lakeTableSnapshot));
428+
RpcMessageUtils.makeGetLatestLakeSnapshotResponse(tableId, lakeTableSnapshot));
448429
}
449430

450431
private Set<ServerNode> getAllTabletServerNodes() {

fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -98,13 +98,7 @@ public CoordinatorService(
9898
Supplier<EventManager> eventManagerSupplier,
9999
ServerMetadataCache metadataCache,
100100
MetadataManager metadataManager) {
101-
super(
102-
conf,
103-
remoteFileSystem,
104-
ServerType.COORDINATOR,
105-
zkClient,
106-
metadataCache,
107-
metadataManager);
101+
super(remoteFileSystem, ServerType.COORDINATOR, zkClient, metadataCache, metadataManager);
108102
this.defaultBucketNumber = conf.getInt(ConfigOptions.DEFAULT_BUCKET_NUMBER);
109103
this.defaultReplicationFactor = conf.getInt(ConfigOptions.DEFAULT_REPLICATION_FACTOR);
110104
this.eventManagerSupplier = eventManagerSupplier;

fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/MetadataManager.java

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -66,10 +66,12 @@ public class MetadataManager {
6666
private final ZooKeeperClient zookeeperClient;
6767
private @Nullable final Map<String, String> tableDataLakeProperties;
6868

69-
public MetadataManager(ZooKeeperClient zookeeperClient) {
70-
this(zookeeperClient, new Configuration());
71-
}
72-
69+
/**
70+
* Creates a new metadata manager.
71+
*
72+
* @param zookeeperClient the zookeeper client
73+
* @param conf the cluster configuration
74+
*/
7375
public MetadataManager(ZooKeeperClient zookeeperClient, Configuration conf) {
7476
this.zookeeperClient = zookeeperClient;
7577
this.tableDataLakeProperties = LakeStorageUtils.getTableDataLakeProperties(conf);
@@ -258,12 +260,6 @@ public long createTable(
258260
}
259261

260262
public TableInfo getTable(TablePath tablePath) throws TableNotExistException {
261-
return getTable(tablePath, tableDataLakeProperties);
262-
}
263-
264-
public TableInfo getTable(
265-
TablePath tablePath, @Nullable Map<String, String> additionalProperties)
266-
throws TableNotExistException {
267263
Optional<TableRegistration> optionalTable;
268264
try {
269265
optionalTable = zookeeperClient.getTable(tablePath);
@@ -275,7 +271,7 @@ public TableInfo getTable(
275271
}
276272
TableRegistration tableReg = optionalTable.get();
277273
SchemaInfo schemaInfo = getLatestSchema(tablePath);
278-
return tableReg.toTableInfo(tablePath, schemaInfo, additionalProperties);
274+
return tableReg.toTableInfo(tablePath, schemaInfo, tableDataLakeProperties);
279275
}
280276

281277
public SchemaInfo getLatestSchema(TablePath tablePath) throws SchemaNotExistException {

fluss-server/src/main/java/com/alibaba/fluss/server/tablet/TabletServer.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,6 @@ protected void startServices() throws Exception {
196196
MetadataManager metadataManager = new MetadataManager(zkClient, conf);
197197
this.tabletService =
198198
new TabletService(
199-
conf,
200199
serverId,
201200
remoteFileSystem,
202201
zkClient,

fluss-server/src/main/java/com/alibaba/fluss/server/tablet/TabletService.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package com.alibaba.fluss.server.tablet;
1818

1919
import com.alibaba.fluss.cluster.ServerType;
20-
import com.alibaba.fluss.config.Configuration;
2120
import com.alibaba.fluss.fs.FileSystem;
2221
import com.alibaba.fluss.metadata.TableBucket;
2322
import com.alibaba.fluss.record.KvRecordBatch;
@@ -78,20 +77,13 @@ public final class TabletService extends RpcServiceBase implements TabletServerG
7877
private final ReplicaManager replicaManager;
7978

8079
public TabletService(
81-
Configuration config,
8280
int serverId,
8381
FileSystem remoteFileSystem,
8482
ZooKeeperClient zkClient,
8583
ReplicaManager replicaManager,
8684
ServerMetadataCache metadataCache,
8785
MetadataManager metadataManager) {
88-
super(
89-
config,
90-
remoteFileSystem,
91-
ServerType.TABLET_SERVER,
92-
zkClient,
93-
metadataCache,
94-
metadataManager);
86+
super(remoteFileSystem, ServerType.TABLET_SERVER, zkClient, metadataCache, metadataManager);
9587
this.serviceName = "server-" + serverId;
9688
this.replicaManager = replicaManager;
9789
}

fluss-server/src/main/java/com/alibaba/fluss/server/utils/RpcMessageUtils.java

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import com.alibaba.fluss.cluster.ServerType;
2121
import com.alibaba.fluss.fs.FsPath;
2222
import com.alibaba.fluss.fs.token.ObtainedSecurityToken;
23-
import com.alibaba.fluss.lakehouse.LakeStorageInfo;
2423
import com.alibaba.fluss.metadata.PartitionSpec;
2524
import com.alibaba.fluss.metadata.PhysicalTablePath;
2625
import com.alibaba.fluss.metadata.TableBucket;
@@ -76,7 +75,6 @@
7675
import com.alibaba.fluss.rpc.messages.PbKeyValue;
7776
import com.alibaba.fluss.rpc.messages.PbKvSnapshot;
7877
import com.alibaba.fluss.rpc.messages.PbLakeSnapshotForBucket;
79-
import com.alibaba.fluss.rpc.messages.PbLakeStorageInfo;
8078
import com.alibaba.fluss.rpc.messages.PbLakeTableOffsetForBucket;
8179
import com.alibaba.fluss.rpc.messages.PbLakeTableSnapshotInfo;
8280
import com.alibaba.fluss.rpc.messages.PbListOffsetsRespForBucket;
@@ -1229,10 +1227,9 @@ public static NotifyLakeTableOffsetData getNotifyLakeTableOffset(
12291227
}
12301228

12311229
public static GetLatestLakeSnapshotResponse makeGetLatestLakeSnapshotResponse(
1232-
long tableId, LakeStorageInfo lakeStorageInfo, LakeTableSnapshot lakeTableSnapshot) {
1230+
long tableId, LakeTableSnapshot lakeTableSnapshot) {
12331231
GetLatestLakeSnapshotResponse getLakeTableSnapshotResponse =
1234-
new GetLatestLakeSnapshotResponse()
1235-
.setLakehouseStorageInfo(toPbLakeStorageInfo(lakeStorageInfo));
1232+
new GetLatestLakeSnapshotResponse();
12361233

12371234
getLakeTableSnapshotResponse.setTableId(tableId);
12381235
getLakeTableSnapshotResponse.setSnapshotId(lakeTableSnapshot.getSnapshotId());
@@ -1261,16 +1258,4 @@ public static PartitionSpec getPartitionSpec(PbPartitionSpec pbPartitionSpec) {
12611258
}
12621259
return new PartitionSpec(partitionKeyAndValues);
12631260
}
1264-
1265-
private static PbLakeStorageInfo toPbLakeStorageInfo(LakeStorageInfo lakeStorageInfo) {
1266-
PbLakeStorageInfo pbLakeStorageInfo = new PbLakeStorageInfo();
1267-
pbLakeStorageInfo.setLakeStorageType(lakeStorageInfo.getLakeStorage());
1268-
for (Map.Entry<String, String> entry : lakeStorageInfo.getCatalogProperties().entrySet()) {
1269-
pbLakeStorageInfo
1270-
.addCatalogProperty()
1271-
.setKey(entry.getKey())
1272-
.setValue(entry.getValue());
1273-
}
1274-
return pbLakeStorageInfo;
1275-
}
12761261
}

fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/AutoPartitionManagerTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ static void beforeAll() {
7171
ZOO_KEEPER_EXTENSION_WRAPPER
7272
.getCustomExtension()
7373
.getZooKeeperClient(NOPErrorHandler.INSTANCE);
74-
metadataManager = new MetadataManager(zookeeperClient);
74+
metadataManager = new MetadataManager(zookeeperClient, new Configuration());
7575
}
7676

7777
@AfterEach
@@ -185,7 +185,7 @@ void testAddPartitionedTable(TestParams params) throws Exception {
185185
AutoPartitionManager autoPartitionManager =
186186
new AutoPartitionManager(
187187
new TestingMetadataCache(3),
188-
new MetadataManager(zookeeperClient),
188+
new MetadataManager(zookeeperClient, new Configuration()),
189189
new Configuration(),
190190
clock,
191191
periodicExecutor);

0 commit comments

Comments
 (0)