Skip to content

Commit b7d28f4

Browse files
committed
[client] Introduce client.metadata.retry.times/client.metadata.retry.interval configurations
1 parent b9b92d4 commit b7d28f4

File tree

4 files changed

+59
-34
lines changed

4 files changed

+59
-34
lines changed

fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -65,13 +65,12 @@
6565
public class MetadataUpdater {
6666
private static final Logger LOG = LoggerFactory.getLogger(MetadataUpdater.class);
6767

68-
private static final int MAX_RETRY_TIMES = 3;
69-
private static final int RETRY_INTERVAL_MS = 100;
70-
7168
private final Configuration conf;
7269
private final RpcClient rpcClient;
7370
private final Set<Integer> unavailableTabletServerIds = new CopyOnWriteArraySet<>();
7471
protected volatile Cluster cluster;
72+
private final int retryTimes;
73+
private final int retryInterval;
7574

7675
public MetadataUpdater(Configuration conf, RpcClient rpcClient) {
7776
this(rpcClient, conf, initializeCluster(conf, rpcClient));
@@ -82,6 +81,8 @@ public MetadataUpdater(RpcClient rpcClient, Configuration conf, Cluster cluster)
8281
this.rpcClient = rpcClient;
8382
this.conf = conf;
8483
this.cluster = cluster;
84+
this.retryTimes = conf.get(ConfigOptions.CLIENT_METADATA_RETRY_TIMES);
85+
this.retryInterval = conf.get(ConfigOptions.CLIENT_METADATA_RETRY_INTERVAL);
8586
}
8687

8788
public Cluster getCluster() {
@@ -107,7 +108,7 @@ public Optional<BucketLocation> getBucketLocation(TableBucket tableBucket) {
107108
public int leaderFor(TablePath tablePath, TableBucket tableBucket) {
108109
Integer serverNode = cluster.leaderFor(tableBucket);
109110
if (serverNode == null) {
110-
for (int i = 0; i < MAX_RETRY_TIMES; i++) {
111+
for (int i = 0; i < this.retryTimes; i++) {
111112
// check if bucket is for a partition
112113
if (tableBucket.getPartitionId() != null) {
113114
updateMetadata(
@@ -126,7 +127,7 @@ public int leaderFor(TablePath tablePath, TableBucket tableBucket) {
126127
if (serverNode == null) {
127128
throw new FlussRuntimeException(
128129
"Leader not found after retry "
129-
+ MAX_RETRY_TIMES
130+
+ this.retryTimes
130131
+ " times for table bucket: "
131132
+ tableBucket);
132133
}
@@ -300,6 +301,9 @@ public void updateMetadata(
300301
private static Cluster initializeCluster(Configuration conf, RpcClient rpcClient) {
301302
List<InetSocketAddress> inetSocketAddresses =
302303
ClientUtils.parseAndValidateAddresses(conf.get(ConfigOptions.BOOTSTRAP_SERVERS));
304+
Integer retryTimes = conf.get(ConfigOptions.CLIENT_METADATA_RETRY_TIMES);
305+
Integer retryInterval = conf.get(ConfigOptions.CLIENT_METADATA_RETRY_INTERVAL);
306+
303307
Cluster cluster = null;
304308
Exception lastException = null;
305309
for (InetSocketAddress address : inetSocketAddresses) {
@@ -319,7 +323,11 @@ private static Cluster initializeCluster(Configuration conf, RpcClient rpcClient
319323
// if there is only one bootstrap server, we can retry to connect to it.
320324
cluster =
321325
tryToInitializeClusterWithRetries(
322-
rpcClient, serverNode, adminReadOnlyGateway, MAX_RETRY_TIMES);
326+
rpcClient,
327+
serverNode,
328+
adminReadOnlyGateway,
329+
retryTimes,
330+
retryInterval);
323331
} else {
324332
cluster = tryToInitializeCluster(adminReadOnlyGateway);
325333
break;
@@ -356,7 +364,8 @@ private static Cluster initializeCluster(Configuration conf, RpcClient rpcClient
356364
RpcClient rpcClient,
357365
ServerNode serverNode,
358366
AdminReadOnlyGateway gateway,
359-
int maxRetryTimes)
367+
int maxRetryTimes,
368+
int retryInterval)
360369
throws Exception {
361370
int retryCount = 0;
362371
while (retryCount <= maxRetryTimes) {
@@ -376,7 +385,7 @@ private static Cluster initializeCluster(Configuration conf, RpcClient rpcClient
376385
// retry can rebuild the connection.
377386
rpcClient.disconnect(serverNode.uid());
378387

379-
long delayMs = (long) (RETRY_INTERVAL_MS * Math.pow(2, retryCount));
388+
long delayMs = (long) (retryInterval * Math.pow(2, retryCount));
380389
LOG.warn(
381390
"Failed to connect to bootstrap server: {} (retry {}/{}). Retrying in {} ms.",
382391
serverNode,

fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@ void testInitializeClusterWithRetries() throws Exception {
5555
// retry lower than max retry count.
5656
AdminReadOnlyGateway gateway = new TestingAdminReadOnlyGateway(2);
5757
Cluster cluster =
58-
MetadataUpdater.tryToInitializeClusterWithRetries(rpcClient, CS_NODE, gateway, 3);
58+
MetadataUpdater.tryToInitializeClusterWithRetries(
59+
rpcClient, CS_NODE, gateway, 3, 100);
5960
assertThat(cluster).isNotNull();
6061
assertThat(cluster.getCoordinatorServer()).isEqualTo(CS_NODE);
6162
assertThat(cluster.getAliveTabletServerList()).containsExactly(TS_NODE);
@@ -65,7 +66,7 @@ void testInitializeClusterWithRetries() throws Exception {
6566
assertThatThrownBy(
6667
() ->
6768
MetadataUpdater.tryToInitializeClusterWithRetries(
68-
rpcClient, CS_NODE, gateway2, 3))
69+
rpcClient, CS_NODE, gateway2, 3, 100))
6970
.isInstanceOf(StaleMetadataException.class)
7071
.hasMessageContaining("The metadata is stale.");
7172
}

fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -873,6 +873,20 @@ public class ConfigOptions {
873873
+ "Since these servers are just used for the initial connection to discover the full cluster membership (which may change dynamically), "
874874
+ "this list need not contain the full set of servers (you may want more than one, though, in case a server is down) ");
875875

876+
public static final ConfigOption<Integer> CLIENT_METADATA_RETRY_TIMES =
877+
key("client.metadata.retry.times")
878+
.intType()
879+
.defaultValue(3)
880+
.withDescription(
881+
"The number of times the client reconnects to retrieve metadata, default is 3.");
882+
883+
public static final ConfigOption<Integer> CLIENT_METADATA_RETRY_INTERVAL =
884+
key("client.metadata.retry.interval")
885+
.intType()
886+
.defaultValue(100)
887+
.withDescription(
888+
"The interval of the client retrieves metadata when reconnecting, default is 100.");
889+
876890
public static final ConfigOption<MemorySize> CLIENT_WRITER_BUFFER_MEMORY_SIZE =
877891
key("client.writer.buffer.memory-size")
878892
.memoryType()

0 commit comments

Comments
 (0)