20
20
import com .alibaba .fluss .cluster .Cluster ;
21
21
import com .alibaba .fluss .cluster .ServerNode ;
22
22
import com .alibaba .fluss .cluster .ServerType ;
23
+ import com .alibaba .fluss .config .ConfigOptions ;
24
+ import com .alibaba .fluss .config .Configuration ;
23
25
import com .alibaba .fluss .exception .FlussRuntimeException ;
24
26
import com .alibaba .fluss .metadata .PhysicalTablePath ;
25
27
import com .alibaba .fluss .metadata .TableBucket ;
29
31
import com .alibaba .fluss .rpc .GatewayClientProxy ;
30
32
import com .alibaba .fluss .rpc .RpcClient ;
31
33
import com .alibaba .fluss .rpc .gateway .AdminReadOnlyGateway ;
34
+ import com .alibaba .fluss .rpc .gateway .CoordinatorGateway ;
32
35
import com .alibaba .fluss .rpc .messages .MetadataRequest ;
33
36
import com .alibaba .fluss .rpc .messages .MetadataResponse ;
34
37
import com .alibaba .fluss .rpc .messages .PbBucketMetadata ;
57
60
public class MetadataUtils {
58
61
private static final Logger LOG = LoggerFactory .getLogger (MetadataUtils .class );
59
62
60
- private static final int MAX_RETRY_TIMES = 5 ;
61
-
62
63
private static final Random randOffset = new Random ();
63
64
64
65
/**
@@ -81,13 +82,14 @@ public static Cluster sendMetadataRequestAndRebuildCluster(
81
82
public static Cluster sendMetadataRequestAndRebuildCluster (
82
83
Cluster cluster ,
83
84
RpcClient client ,
85
+ Configuration configuration ,
84
86
@ Nullable Set <TablePath > tablePaths ,
85
87
@ Nullable Collection <PhysicalTablePath > tablePartitionNames ,
86
88
@ Nullable Collection <Long > tablePartitionIds )
87
89
throws ExecutionException , InterruptedException , TimeoutException {
88
90
AdminReadOnlyGateway gateway =
89
91
GatewayClientProxy .createGatewayProxy (
90
- () -> getOneAvailableTabletServerNode (cluster ),
92
+ () -> getOneAvailableTabletServerNode (cluster , client , configuration ),
91
93
client ,
92
94
AdminReadOnlyGateway .class );
93
95
return sendMetadataRequestAndRebuildCluster (
@@ -260,24 +262,37 @@ public NewTableMetadata(
260
262
}
261
263
}
262
264
263
- public static ServerNode getOneAvailableTabletServerNode (Cluster cluster ) {
265
+ public static ServerNode getOneAvailableTabletServerNode (
266
+ Cluster cluster , RpcClient rpcClient , Configuration configuration ) {
264
267
List <ServerNode > aliveTabletServers = null ;
265
- for (int retryTimes = 0 ; retryTimes <= MAX_RETRY_TIMES ; retryTimes ++) {
268
+ int maxRetryTimes =
269
+ configuration .getInt (ConfigOptions .CLIENT_GET_TABLET_SERVER_NODE_MAX_RETRY_TIMES );
270
+ for (int retryTimes = 0 ; retryTimes <= maxRetryTimes ; retryTimes ++) {
266
271
aliveTabletServers = cluster .getAliveTabletServerList ();
267
272
if (aliveTabletServers .isEmpty ()) {
268
- LOG .error ("Fluss create gateway proxy error, retry times = {}." , retryTimes );
269
- if (retryTimes >= MAX_RETRY_TIMES ) {
273
+ LOG .error (
274
+ "Fluss get one available tablet server node failed retry times = {}." ,
275
+ retryTimes );
276
+ if (retryTimes >= maxRetryTimes ) {
270
277
String exceptionMsg =
271
278
String .format (
272
- "Execution of Fluss get one available tablet failed, no alive tablet server in cluster, retry times = %d." ,
273
- retryTimes );
279
+ "Execution of Fluss get one available tablet server node failed, no alive tablet server in cluster, retry times = %d." ,
280
+ maxRetryTimes );
274
281
throw new FlussRuntimeException (exceptionMsg );
275
282
} else {
276
283
try {
284
+ GatewayClientProxy .createGatewayProxy (
285
+ cluster ::getCoordinatorServer ,
286
+ rpcClient ,
287
+ CoordinatorGateway .class )
288
+ .metadata (new MetadataRequest ())
289
+ .get (1 , TimeUnit .MINUTES );
277
290
Thread .sleep (1000L * retryTimes );
278
- } catch (InterruptedException interruptedException ) {
291
+ } catch (ExecutionException | InterruptedException | TimeoutException e ) {
279
292
Thread .currentThread ().interrupt ();
280
- throw new RuntimeException (interruptedException );
293
+ throw new RuntimeException (
294
+ "Execution of Fluss get one available tablet server node failed" ,
295
+ e );
281
296
}
282
297
}
283
298
} else {
0 commit comments