Skip to content

Commit 4eb3d90

Browse files
Fluss admin use tablet server for read only operation to reduce the cost of coordinator. (#1303)
* Fluss admin use tablet server for read only operation to reduce the cost of coordinator. * fix test * modified based on cr
1 parent 780a32f commit 4eb3d90

File tree

7 files changed

+250
-192
lines changed

7 files changed

+250
-192
lines changed

fluss-client/src/main/java/com/alibaba/fluss/client/admin/FlussAdmin.java

Lines changed: 29 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import com.alibaba.fluss.rpc.GatewayClientProxy;
3939
import com.alibaba.fluss.rpc.RpcClient;
4040
import com.alibaba.fluss.rpc.gateway.AdminGateway;
41+
import com.alibaba.fluss.rpc.gateway.AdminReadOnlyGateway;
4142
import com.alibaba.fluss.rpc.gateway.TabletServerGateway;
4243
import com.alibaba.fluss.rpc.messages.CreateAclsRequest;
4344
import com.alibaba.fluss.rpc.messages.CreateDatabaseRequest;
@@ -99,13 +100,17 @@
99100
public class FlussAdmin implements Admin {
100101

101102
private final AdminGateway gateway;
103+
private final AdminReadOnlyGateway readOnlyGateway;
102104
private final MetadataUpdater metadataUpdater;
103105
private final RpcClient client;
104106

105107
public FlussAdmin(RpcClient client, MetadataUpdater metadataUpdater) {
106108
this.gateway =
107109
GatewayClientProxy.createGatewayProxy(
108110
metadataUpdater::getCoordinatorServer, client, AdminGateway.class);
111+
this.readOnlyGateway =
112+
GatewayClientProxy.createGatewayProxy(
113+
metadataUpdater::getRandomTabletServer, client, AdminGateway.class);
109114
this.metadataUpdater = metadataUpdater;
110115
this.client = client;
111116
}
@@ -119,7 +124,7 @@ public CompletableFuture<List<ServerNode>> getServerNodes() {
119124
List<ServerNode> serverNodeList = new ArrayList<>();
120125
Cluster cluster =
121126
sendMetadataRequestAndRebuildCluster(
122-
gateway,
127+
readOnlyGateway,
123128
false,
124129
metadataUpdater.getCluster(),
125130
null,
@@ -142,7 +147,8 @@ public CompletableFuture<SchemaInfo> getTableSchema(TablePath tablePath) {
142147
request.setTablePath()
143148
.setDatabaseName(tablePath.getDatabaseName())
144149
.setTableName(tablePath.getTableName());
145-
return gateway.getTableSchema(request)
150+
return readOnlyGateway
151+
.getTableSchema(request)
146152
.thenApply(
147153
r ->
148154
new SchemaInfo(
@@ -157,7 +163,8 @@ public CompletableFuture<SchemaInfo> getTableSchema(TablePath tablePath, int sch
157163
.setTablePath()
158164
.setDatabaseName(tablePath.getDatabaseName())
159165
.setTableName(tablePath.getTableName());
160-
return gateway.getTableSchema(request)
166+
return readOnlyGateway
167+
.getTableSchema(request)
161168
.thenApply(
162169
r ->
163170
new SchemaInfo(
@@ -179,7 +186,8 @@ public CompletableFuture<Void> createDatabase(
179186
public CompletableFuture<DatabaseInfo> getDatabaseInfo(String databaseName) {
180187
GetDatabaseInfoRequest request = new GetDatabaseInfoRequest();
181188
request.setDatabaseName(databaseName);
182-
return gateway.getDatabaseInfo(request)
189+
return readOnlyGateway
190+
.getDatabaseInfo(request)
183191
.thenApply(
184192
r ->
185193
new DatabaseInfo(
@@ -204,13 +212,14 @@ public CompletableFuture<Void> dropDatabase(
204212
public CompletableFuture<Boolean> databaseExists(String databaseName) {
205213
DatabaseExistsRequest request = new DatabaseExistsRequest();
206214
request.setDatabaseName(databaseName);
207-
return gateway.databaseExists(request).thenApply(DatabaseExistsResponse::isExists);
215+
return readOnlyGateway.databaseExists(request).thenApply(DatabaseExistsResponse::isExists);
208216
}
209217

210218
@Override
211219
public CompletableFuture<List<String>> listDatabases() {
212220
ListDatabasesRequest request = new ListDatabasesRequest();
213-
return gateway.listDatabases(request)
221+
return readOnlyGateway
222+
.listDatabases(request)
214223
.thenApply(ListDatabasesResponse::getDatabaseNamesList);
215224
}
216225

@@ -233,7 +242,8 @@ public CompletableFuture<TableInfo> getTableInfo(TablePath tablePath) {
233242
request.setTablePath()
234243
.setDatabaseName(tablePath.getDatabaseName())
235244
.setTableName(tablePath.getTableName());
236-
return gateway.getTableInfo(request)
245+
return readOnlyGateway
246+
.getTableInfo(request)
237247
.thenApply(
238248
r ->
239249
TableInfo.of(
@@ -261,14 +271,14 @@ public CompletableFuture<Boolean> tableExists(TablePath tablePath) {
261271
request.setTablePath()
262272
.setDatabaseName(tablePath.getDatabaseName())
263273
.setTableName(tablePath.getTableName());
264-
return gateway.tableExists(request).thenApply(TableExistsResponse::isExists);
274+
return readOnlyGateway.tableExists(request).thenApply(TableExistsResponse::isExists);
265275
}
266276

267277
@Override
268278
public CompletableFuture<List<String>> listTables(String databaseName) {
269279
ListTablesRequest request = new ListTablesRequest();
270280
request.setDatabaseName(databaseName);
271-
return gateway.listTables(request).thenApply(ListTablesResponse::getTableNamesList);
281+
return readOnlyGateway.listTables(request).thenApply(ListTablesResponse::getTableNamesList);
272282
}
273283

274284
@Override
@@ -289,7 +299,8 @@ public CompletableFuture<List<PartitionInfo>> listPartitionInfos(
289299
PbPartitionSpec pbPartitionSpec = makePbPartitionSpec(partitionSpec);
290300
request.setPartialPartitionSpec(pbPartitionSpec);
291301
}
292-
return gateway.listPartitionInfos(request)
302+
return readOnlyGateway
303+
.listPartitionInfos(request)
293304
.thenApply(ClientRpcMessageUtils::toPartitionInfos);
294305
}
295306

@@ -315,7 +326,8 @@ public CompletableFuture<KvSnapshots> getLatestKvSnapshots(TablePath tablePath)
315326
request.setTablePath()
316327
.setDatabaseName(tablePath.getDatabaseName())
317328
.setTableName(tablePath.getTableName());
318-
return gateway.getLatestKvSnapshots(request)
329+
return readOnlyGateway
330+
.getLatestKvSnapshots(request)
319331
.thenApply(ClientRpcMessageUtils::toKvSnapshots);
320332
}
321333

@@ -328,7 +340,8 @@ public CompletableFuture<KvSnapshots> getLatestKvSnapshots(
328340
.setDatabaseName(tablePath.getDatabaseName())
329341
.setTableName(tablePath.getTableName());
330342
request.setPartitionName(partitionName);
331-
return gateway.getLatestKvSnapshots(request)
343+
return readOnlyGateway
344+
.getLatestKvSnapshots(request)
332345
.thenApply(ClientRpcMessageUtils::toKvSnapshots);
333346
}
334347

@@ -342,7 +355,8 @@ public CompletableFuture<KvSnapshotMetadata> getKvSnapshotMetadata(
342355
request.setTableId(bucket.getTableId())
343356
.setBucketId(bucket.getBucket())
344357
.setSnapshotId(snapshotId);
345-
return gateway.getKvSnapshotMetadata(request)
358+
return readOnlyGateway
359+
.getKvSnapshotMetadata(request)
346360
.thenApply(ClientRpcMessageUtils::toKvSnapshotMetadata);
347361
}
348362

@@ -353,7 +367,8 @@ public CompletableFuture<LakeSnapshot> getLatestLakeSnapshot(TablePath tablePath
353367
.setDatabaseName(tablePath.getDatabaseName())
354368
.setTableName(tablePath.getTableName());
355369

356-
return gateway.getLatestLakeSnapshot(request)
370+
return readOnlyGateway
371+
.getLatestLakeSnapshot(request)
357372
.thenApply(ClientRpcMessageUtils::toLakeTableSnapshotInfo);
358373
}
359374

fluss-client/src/test/java/com/alibaba/fluss/client/admin/FlussAdminITCase.java

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -130,12 +130,13 @@ void testMultiClient() throws Exception {
130130
void testGetDatabaseInfo() throws Exception {
131131
long timestampBeforeCreate = System.currentTimeMillis();
132132
admin.createDatabase(
133-
"test_db_2",
134-
DatabaseDescriptor.builder()
135-
.comment("test comment")
136-
.customProperty("key1", "value1")
137-
.build(),
138-
false);
133+
"test_db_2",
134+
DatabaseDescriptor.builder()
135+
.comment("test comment")
136+
.customProperty("key1", "value1")
137+
.build(),
138+
false)
139+
.get();
139140
DatabaseInfo databaseInfo = admin.getDatabaseInfo("test_db_2").get();
140141
long timestampAfterCreate = System.currentTimeMillis();
141142
assertThat(databaseInfo.getCreatedTime()).isEqualTo(databaseInfo.getModifiedTime());
@@ -181,7 +182,7 @@ void testGetTableInfoAndSchema() throws Exception {
181182
// create and get a new table
182183
long timestampBeforeCreate = System.currentTimeMillis();
183184
TablePath tablePath = TablePath.of("test_db", "table_2");
184-
admin.createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, false);
185+
admin.createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, false).get();
185186
tableInfo = admin.getTableInfo(tablePath).get();
186187
timestampAfterCreate = System.currentTimeMillis();
187188
assertThat(tableInfo.getSchemaId()).isEqualTo(schemaInfo.getSchemaId());
@@ -378,16 +379,19 @@ void testCreateTableWithInvalidReplicationFactor() throws Exception {
378379

379380
// assert the cluster should have tablet server number to be 3
380381
FLUSS_CLUSTER_EXTENSION.assertHasTabletServerNumber(3);
381-
FLUSS_CLUSTER_EXTENSION.waitUtilAllGatewayHasSameMetadata();
382382

383383
// we can create the table now
384384
admin.createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, false).get();
385-
TableInfo tableInfo = admin.getTableInfo(DEFAULT_TABLE_PATH).get();
386-
assertThat(tableInfo.toTableDescriptor())
387-
.isEqualTo(
388-
DEFAULT_TABLE_DESCRIPTOR
389-
.withReplicationFactor(3)
390-
.withDataLakeFormat(DataLakeFormat.PAIMON));
385+
// recreate the connection because the metadata of tablet server has changed
386+
try (Connection conn = ConnectionFactory.createConnection(clientConf);
387+
Admin admin = conn.getAdmin()) {
388+
TableInfo tableInfo = admin.getTableInfo(DEFAULT_TABLE_PATH).get();
389+
assertThat(tableInfo.toTableDescriptor())
390+
.isEqualTo(
391+
DEFAULT_TABLE_DESCRIPTOR
392+
.withReplicationFactor(3)
393+
.withDataLakeFormat(DataLakeFormat.PAIMON));
394+
}
391395
}
392396

393397
@Test

0 commit comments

Comments
 (0)