6565public class MetadataUpdater {
6666 private static final Logger LOG = LoggerFactory .getLogger (MetadataUpdater .class );
6767
68- private static final int MAX_RETRY_TIMES = 3 ;
69- private static final int RETRY_INTERVAL_MS = 100 ;
70-
7168 private final Configuration conf ;
7269 private final RpcClient rpcClient ;
7370 private final Set <Integer > unavailableTabletServerIds = new CopyOnWriteArraySet <>();
7471 protected volatile Cluster cluster ;
72+ private final int retryTimes ;
73+ private final int retryInterval ;
7574
7675 public MetadataUpdater (Configuration conf , RpcClient rpcClient ) {
7776 this (rpcClient , conf , initializeCluster (conf , rpcClient ));
@@ -82,6 +81,8 @@ public MetadataUpdater(RpcClient rpcClient, Configuration conf, Cluster cluster)
8281 this .rpcClient = rpcClient ;
8382 this .conf = conf ;
8483 this .cluster = cluster ;
84+ this .retryTimes = conf .get (ConfigOptions .CLIENT_METADATA_RETRY_TIMES );
85+ this .retryInterval = conf .get (ConfigOptions .CLIENT_METADATA_RETRY_INTERVAL );
8586 }
8687
8788 public Cluster getCluster () {
@@ -107,7 +108,7 @@ public Optional<BucketLocation> getBucketLocation(TableBucket tableBucket) {
107108 public int leaderFor (TablePath tablePath , TableBucket tableBucket ) {
108109 Integer serverNode = cluster .leaderFor (tableBucket );
109110 if (serverNode == null ) {
110- for (int i = 0 ; i < MAX_RETRY_TIMES ; i ++) {
111+ for (int i = 0 ; i < this . retryTimes ; i ++) {
111112 // check if bucket is for a partition
112113 if (tableBucket .getPartitionId () != null ) {
113114 updateMetadata (
@@ -126,7 +127,7 @@ public int leaderFor(TablePath tablePath, TableBucket tableBucket) {
126127 if (serverNode == null ) {
127128 throw new FlussRuntimeException (
128129 "Leader not found after retry "
129- + MAX_RETRY_TIMES
130+ + this . retryTimes
130131 + " times for table bucket: "
131132 + tableBucket );
132133 }
@@ -300,6 +301,8 @@ public void updateMetadata(
300301 private static Cluster initializeCluster (Configuration conf , RpcClient rpcClient ) {
301302 List <InetSocketAddress > inetSocketAddresses =
302303 ClientUtils .parseAndValidateAddresses (conf .get (ConfigOptions .BOOTSTRAP_SERVERS ));
304+ Integer retryTimes = conf .get (ConfigOptions .CLIENT_METADATA_RETRY_TIMES );
305+ Integer retryInterval = conf .get (ConfigOptions .CLIENT_METADATA_RETRY_INTERVAL );
303306 Cluster cluster = null ;
304307 Exception lastException = null ;
305308 for (InetSocketAddress address : inetSocketAddresses ) {
@@ -319,7 +322,11 @@ private static Cluster initializeCluster(Configuration conf, RpcClient rpcClient
319322 // if there is only one bootstrap server, we can retry to connect to it.
320323 cluster =
321324 tryToInitializeClusterWithRetries (
322- rpcClient , serverNode , adminReadOnlyGateway , MAX_RETRY_TIMES );
325+ rpcClient ,
326+ serverNode ,
327+ adminReadOnlyGateway ,
328+ retryTimes ,
329+ retryInterval );
323330 } else {
324331 cluster = tryToInitializeCluster (adminReadOnlyGateway );
325332 break ;
@@ -356,7 +363,8 @@ private static Cluster initializeCluster(Configuration conf, RpcClient rpcClient
356363 RpcClient rpcClient ,
357364 ServerNode serverNode ,
358365 AdminReadOnlyGateway gateway ,
359- int maxRetryTimes )
366+ int maxRetryTimes ,
367+ int retryInterval )
360368 throws Exception {
361369 int retryCount = 0 ;
362370 while (retryCount <= maxRetryTimes ) {
@@ -376,7 +384,7 @@ private static Cluster initializeCluster(Configuration conf, RpcClient rpcClient
376384 // retry can rebuild the connection.
377385 rpcClient .disconnect (serverNode .uid ());
378386
379- long delayMs = (long ) (RETRY_INTERVAL_MS * Math .pow (2 , retryCount ));
387+ long delayMs = (long ) (retryInterval * Math .pow (2 , retryCount ));
380388 LOG .warn (
381389 "Failed to connect to bootstrap server: {} (retry {}/{}). Retrying in {} ms." ,
382390 serverNode ,
0 commit comments