Skip to content

Commit 85bb178

Browse files
committed
[example] Add rpc example
1 parent 74d0d49 commit 85bb178

File tree

2 files changed

+353
-0
lines changed

2 files changed

+353
-0
lines changed
Lines changed: 295 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,295 @@
1+
/*
2+
* Copyright (c) 2025 Alibaba Group Holding Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.alibaba.fluss.client.example.rpc;
18+
19+
import com.alibaba.fluss.cluster.Endpoint;
20+
import com.alibaba.fluss.cluster.ServerType;
21+
import com.alibaba.fluss.config.Configuration;
22+
import com.alibaba.fluss.metrics.CharacterFilter;
23+
import com.alibaba.fluss.metrics.groups.AbstractMetricGroup;
24+
import com.alibaba.fluss.metrics.registry.MetricRegistry;
25+
import com.alibaba.fluss.rpc.RpcGatewayService;
26+
import com.alibaba.fluss.rpc.RpcServer;
27+
import com.alibaba.fluss.rpc.gateway.AdminGateway;
28+
import com.alibaba.fluss.rpc.messages.ApiVersionsRequest;
29+
import com.alibaba.fluss.rpc.messages.ApiVersionsResponse;
30+
import com.alibaba.fluss.rpc.messages.CreateAclsRequest;
31+
import com.alibaba.fluss.rpc.messages.CreateAclsResponse;
32+
import com.alibaba.fluss.rpc.messages.CreateDatabaseRequest;
33+
import com.alibaba.fluss.rpc.messages.CreateDatabaseResponse;
34+
import com.alibaba.fluss.rpc.messages.CreatePartitionRequest;
35+
import com.alibaba.fluss.rpc.messages.CreatePartitionResponse;
36+
import com.alibaba.fluss.rpc.messages.CreateTableRequest;
37+
import com.alibaba.fluss.rpc.messages.CreateTableResponse;
38+
import com.alibaba.fluss.rpc.messages.DatabaseExistsRequest;
39+
import com.alibaba.fluss.rpc.messages.DatabaseExistsResponse;
40+
import com.alibaba.fluss.rpc.messages.DropAclsRequest;
41+
import com.alibaba.fluss.rpc.messages.DropAclsResponse;
42+
import com.alibaba.fluss.rpc.messages.DropDatabaseRequest;
43+
import com.alibaba.fluss.rpc.messages.DropDatabaseResponse;
44+
import com.alibaba.fluss.rpc.messages.DropPartitionRequest;
45+
import com.alibaba.fluss.rpc.messages.DropPartitionResponse;
46+
import com.alibaba.fluss.rpc.messages.DropTableRequest;
47+
import com.alibaba.fluss.rpc.messages.DropTableResponse;
48+
import com.alibaba.fluss.rpc.messages.GetDatabaseInfoRequest;
49+
import com.alibaba.fluss.rpc.messages.GetDatabaseInfoResponse;
50+
import com.alibaba.fluss.rpc.messages.GetFileSystemSecurityTokenRequest;
51+
import com.alibaba.fluss.rpc.messages.GetFileSystemSecurityTokenResponse;
52+
import com.alibaba.fluss.rpc.messages.GetKvSnapshotMetadataRequest;
53+
import com.alibaba.fluss.rpc.messages.GetKvSnapshotMetadataResponse;
54+
import com.alibaba.fluss.rpc.messages.GetLatestKvSnapshotsRequest;
55+
import com.alibaba.fluss.rpc.messages.GetLatestKvSnapshotsResponse;
56+
import com.alibaba.fluss.rpc.messages.GetLatestLakeSnapshotRequest;
57+
import com.alibaba.fluss.rpc.messages.GetLatestLakeSnapshotResponse;
58+
import com.alibaba.fluss.rpc.messages.GetTableInfoRequest;
59+
import com.alibaba.fluss.rpc.messages.GetTableInfoResponse;
60+
import com.alibaba.fluss.rpc.messages.GetTableSchemaRequest;
61+
import com.alibaba.fluss.rpc.messages.GetTableSchemaResponse;
62+
import com.alibaba.fluss.rpc.messages.ListAclsRequest;
63+
import com.alibaba.fluss.rpc.messages.ListAclsResponse;
64+
import com.alibaba.fluss.rpc.messages.ListDatabasesRequest;
65+
import com.alibaba.fluss.rpc.messages.ListDatabasesResponse;
66+
import com.alibaba.fluss.rpc.messages.ListPartitionInfosRequest;
67+
import com.alibaba.fluss.rpc.messages.ListPartitionInfosResponse;
68+
import com.alibaba.fluss.rpc.messages.ListTablesRequest;
69+
import com.alibaba.fluss.rpc.messages.ListTablesResponse;
70+
import com.alibaba.fluss.rpc.messages.MetadataRequest;
71+
import com.alibaba.fluss.rpc.messages.MetadataResponse;
72+
import com.alibaba.fluss.rpc.messages.PbApiVersion;
73+
import com.alibaba.fluss.rpc.messages.TableExistsRequest;
74+
import com.alibaba.fluss.rpc.messages.TableExistsResponse;
75+
import com.alibaba.fluss.rpc.netty.server.RequestsMetrics;
76+
import com.alibaba.fluss.rpc.protocol.ApiKeys;
77+
78+
import org.slf4j.Logger;
79+
import org.slf4j.LoggerFactory;
80+
81+
import java.util.ArrayList;
82+
import java.util.Collections;
83+
import java.util.List;
84+
import java.util.concurrent.CompletableFuture;
85+
import java.util.concurrent.ExecutorService;
86+
import java.util.concurrent.Executors;
87+
88+
/** RpcGateway example. */
89+
public class CoordinatorServer extends RpcGatewayService implements AdminGateway {
90+
91+
private static final Logger LOG = LoggerFactory.getLogger(CoordinatorServer.class);
92+
93+
private final List<String> databases = new ArrayList<>();
94+
95+
private RpcServer rpcServer;
96+
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
97+
98+
public CoordinatorServer() {}
99+
100+
@Override
101+
public CompletableFuture<ApiVersionsResponse> apiVersions(ApiVersionsRequest request) {
102+
ApiVersionsResponse response = new ApiVersionsResponse();
103+
104+
PbApiVersion apiVersion = new PbApiVersion();
105+
apiVersion.setApiKey(ApiKeys.CREATE_DATABASE.id);
106+
apiVersion.setMinVersion(ApiKeys.CREATE_DATABASE.lowestSupportedVersion);
107+
apiVersion.setMaxVersion(ApiKeys.CREATE_DATABASE.highestSupportedVersion);
108+
response.addAllApiVersions(Collections.singletonList(apiVersion));
109+
110+
return CompletableFuture.completedFuture(response);
111+
}
112+
113+
@Override
114+
public CompletableFuture<CreateDatabaseResponse> createDatabase(CreateDatabaseRequest request) {
115+
String databaseName = request.getDatabaseName();
116+
LOG.info("Create database: {}", databaseName);
117+
databases.add(databaseName + ", " + Thread.currentThread().getName());
118+
119+
CreateDatabaseResponse response = new CreateDatabaseResponse();
120+
return CompletableFuture.completedFuture(response);
121+
}
122+
123+
@Override
124+
public ServerType providerType() {
125+
return ServerType.COORDINATOR;
126+
}
127+
128+
@Override
129+
public String name() {
130+
return "Coordinator";
131+
}
132+
133+
@Override
134+
public void shutdown() {
135+
try {
136+
rpcServer.close();
137+
} catch (Exception e) {
138+
throw new RuntimeException(e);
139+
}
140+
}
141+
142+
public void start() {
143+
List<Endpoint> endpoints = new ArrayList<>();
144+
endpoints.add(new Endpoint("localhost", 8088, "FLUSS"));
145+
146+
try {
147+
148+
MetricRegistry registry = MetricRegistry.create(new Configuration(), null);
149+
AbstractMetricGroup group =
150+
new AbstractMetricGroup(registry, new String[0], null) {
151+
152+
@Override
153+
protected String getGroupName(CharacterFilter filter) {
154+
return "test";
155+
}
156+
};
157+
158+
rpcServer =
159+
RpcServer.create(
160+
new Configuration(),
161+
endpoints,
162+
this,
163+
group,
164+
RequestsMetrics.createCoordinatorServerRequestMetrics(group));
165+
rpcServer.start();
166+
167+
executorService.submit(
168+
() -> {
169+
while (true) {
170+
LOG.info("Database size: {}, {}", databases.size(), databases);
171+
Thread.sleep(1000);
172+
}
173+
});
174+
} catch (Exception e) {
175+
throw new RuntimeException(e);
176+
}
177+
}
178+
179+
public static void main(String[] args) throws InterruptedException {
180+
CoordinatorServer server = new CoordinatorServer();
181+
server.start();
182+
}
183+
184+
@Override
185+
public CompletableFuture<DropDatabaseResponse> dropDatabase(DropDatabaseRequest request) {
186+
return null;
187+
}
188+
189+
@Override
190+
public CompletableFuture<CreateTableResponse> createTable(CreateTableRequest request) {
191+
return null;
192+
}
193+
194+
@Override
195+
public CompletableFuture<DropTableResponse> dropTable(DropTableRequest request) {
196+
return null;
197+
}
198+
199+
@Override
200+
public CompletableFuture<CreatePartitionResponse> createPartition(
201+
CreatePartitionRequest request) {
202+
return null;
203+
}
204+
205+
@Override
206+
public CompletableFuture<DropPartitionResponse> dropPartition(DropPartitionRequest request) {
207+
return null;
208+
}
209+
210+
@Override
211+
public CompletableFuture<CreateAclsResponse> createAcls(CreateAclsRequest request) {
212+
return null;
213+
}
214+
215+
@Override
216+
public CompletableFuture<DropAclsResponse> dropAcls(DropAclsRequest request) {
217+
return null;
218+
}
219+
220+
@Override
221+
public CompletableFuture<ListDatabasesResponse> listDatabases(ListDatabasesRequest request) {
222+
return null;
223+
}
224+
225+
@Override
226+
public CompletableFuture<GetDatabaseInfoResponse> getDatabaseInfo(
227+
GetDatabaseInfoRequest request) {
228+
return null;
229+
}
230+
231+
@Override
232+
public CompletableFuture<DatabaseExistsResponse> databaseExists(DatabaseExistsRequest request) {
233+
return null;
234+
}
235+
236+
@Override
237+
public CompletableFuture<ListTablesResponse> listTables(ListTablesRequest request) {
238+
return null;
239+
}
240+
241+
@Override
242+
public CompletableFuture<GetTableInfoResponse> getTableInfo(GetTableInfoRequest request) {
243+
return null;
244+
}
245+
246+
@Override
247+
public CompletableFuture<GetTableSchemaResponse> getTableSchema(GetTableSchemaRequest request) {
248+
return null;
249+
}
250+
251+
@Override
252+
public CompletableFuture<TableExistsResponse> tableExists(TableExistsRequest request) {
253+
return null;
254+
}
255+
256+
@Override
257+
public CompletableFuture<MetadataResponse> metadata(MetadataRequest request) {
258+
return null;
259+
}
260+
261+
@Override
262+
public CompletableFuture<GetLatestKvSnapshotsResponse> getLatestKvSnapshots(
263+
GetLatestKvSnapshotsRequest request) {
264+
return null;
265+
}
266+
267+
@Override
268+
public CompletableFuture<GetKvSnapshotMetadataResponse> getKvSnapshotMetadata(
269+
GetKvSnapshotMetadataRequest request) {
270+
return null;
271+
}
272+
273+
@Override
274+
public CompletableFuture<GetFileSystemSecurityTokenResponse> getFileSystemSecurityToken(
275+
GetFileSystemSecurityTokenRequest request) {
276+
return null;
277+
}
278+
279+
@Override
280+
public CompletableFuture<ListPartitionInfosResponse> listPartitionInfos(
281+
ListPartitionInfosRequest request) {
282+
return null;
283+
}
284+
285+
@Override
286+
public CompletableFuture<GetLatestLakeSnapshotResponse> getLatestLakeSnapshot(
287+
GetLatestLakeSnapshotRequest request) {
288+
return null;
289+
}
290+
291+
@Override
292+
public CompletableFuture<ListAclsResponse> listAcls(ListAclsRequest request) {
293+
return null;
294+
}
295+
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package com.alibaba.fluss.client.example.rpc;
2+
3+
import com.alibaba.fluss.cluster.ServerNode;
4+
import com.alibaba.fluss.cluster.ServerType;
5+
import com.alibaba.fluss.config.Configuration;
6+
import com.alibaba.fluss.metrics.registry.MetricRegistry;
7+
import com.alibaba.fluss.rpc.GatewayClientProxy;
8+
import com.alibaba.fluss.rpc.RpcClient;
9+
import com.alibaba.fluss.rpc.gateway.AdminGateway;
10+
import com.alibaba.fluss.rpc.messages.CreateDatabaseRequest;
11+
import com.alibaba.fluss.rpc.metrics.ClientMetricGroup;
12+
13+
import java.util.concurrent.ExecutionException;
14+
import java.util.concurrent.ExecutorService;
15+
import java.util.concurrent.Executors;
16+
17+
/** RpcClient example. */
18+
public class RpcClientExample {
19+
20+
public static void main(String[] args) throws ExecutionException, InterruptedException {
21+
ExecutorService executorService = Executors.newFixedThreadPool(8);
22+
for (int i = 0; i < 8; i++) {
23+
executorService.submit(new CreateDatabaseTask(i * 100));
24+
}
25+
}
26+
27+
/** Create database task. */
28+
public static class CreateDatabaseTask implements Runnable {
29+
30+
private final AdminGateway gateway;
31+
private final int startId;
32+
33+
public CreateDatabaseTask(int startId) {
34+
this.startId = startId;
35+
36+
MetricRegistry registry = MetricRegistry.create(new Configuration(), null);
37+
RpcClient client =
38+
RpcClient.create(
39+
new Configuration(), new ClientMetricGroup(registry, "Client"), false);
40+
gateway =
41+
GatewayClientProxy.createGatewayProxy(
42+
() -> new ServerNode(1, "127.0.0.1", 8088, ServerType.COORDINATOR),
43+
client,
44+
AdminGateway.class);
45+
}
46+
47+
@Override
48+
public void run() {
49+
for (int i = startId; i < startId + 100; i++) {
50+
CreateDatabaseRequest request = new CreateDatabaseRequest();
51+
request.setDatabaseName("mydb" + i);
52+
request.setIgnoreIfExists(false);
53+
54+
gateway.createDatabase(request);
55+
}
56+
}
57+
}
58+
}

0 commit comments

Comments
 (0)