Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,12 @@
public class MetadataUpdater {
private static final Logger LOG = LoggerFactory.getLogger(MetadataUpdater.class);

private static final int MAX_RETRY_TIMES = 3;
private static final int RETRY_INTERVAL_MS = 100;

private final Configuration conf;
private final RpcClient rpcClient;
private final Set<Integer> unavailableTabletServerIds = new CopyOnWriteArraySet<>();
protected volatile Cluster cluster;
private final int retryTimes;
private final int retryInterval;

public MetadataUpdater(Configuration conf, RpcClient rpcClient) {
this(rpcClient, conf, initializeCluster(conf, rpcClient));
Expand All @@ -82,6 +81,8 @@ public MetadataUpdater(RpcClient rpcClient, Configuration conf, Cluster cluster)
this.rpcClient = rpcClient;
this.conf = conf;
this.cluster = cluster;
this.retryTimes = conf.get(ConfigOptions.CLIENT_METADATA_RETRY_TIMES);
this.retryInterval = conf.get(ConfigOptions.CLIENT_METADATA_RETRY_INTERVAL);
}

public Cluster getCluster() {
Expand All @@ -107,7 +108,7 @@ public Optional<BucketLocation> getBucketLocation(TableBucket tableBucket) {
public int leaderFor(TablePath tablePath, TableBucket tableBucket) {
Integer serverNode = cluster.leaderFor(tableBucket);
if (serverNode == null) {
for (int i = 0; i < MAX_RETRY_TIMES; i++) {
for (int i = 0; i < this.retryTimes; i++) {
// check if bucket is for a partition
if (tableBucket.getPartitionId() != null) {
updateMetadata(
Expand All @@ -126,7 +127,7 @@ public int leaderFor(TablePath tablePath, TableBucket tableBucket) {
if (serverNode == null) {
throw new FlussRuntimeException(
"Leader not found after retry "
+ MAX_RETRY_TIMES
+ this.retryTimes
+ " times for table bucket: "
+ tableBucket);
}
Expand Down Expand Up @@ -300,6 +301,9 @@ public void updateMetadata(
private static Cluster initializeCluster(Configuration conf, RpcClient rpcClient) {
List<InetSocketAddress> inetSocketAddresses =
ClientUtils.parseAndValidateAddresses(conf.get(ConfigOptions.BOOTSTRAP_SERVERS));
Integer retryTimes = conf.get(ConfigOptions.CLIENT_METADATA_RETRY_TIMES);
Integer retryInterval = conf.get(ConfigOptions.CLIENT_METADATA_RETRY_INTERVAL);

Cluster cluster = null;
Exception lastException = null;
for (InetSocketAddress address : inetSocketAddresses) {
Expand All @@ -319,7 +323,11 @@ private static Cluster initializeCluster(Configuration conf, RpcClient rpcClient
// if there is only one bootstrap server, we can retry to connect to it.
cluster =
tryToInitializeClusterWithRetries(
rpcClient, serverNode, adminReadOnlyGateway, MAX_RETRY_TIMES);
rpcClient,
serverNode,
adminReadOnlyGateway,
retryTimes,
retryInterval);
} else {
cluster = tryToInitializeCluster(adminReadOnlyGateway);
break;
Expand Down Expand Up @@ -356,7 +364,8 @@ private static Cluster initializeCluster(Configuration conf, RpcClient rpcClient
RpcClient rpcClient,
ServerNode serverNode,
AdminReadOnlyGateway gateway,
int maxRetryTimes)
int maxRetryTimes,
int retryInterval)
throws Exception {
int retryCount = 0;
while (retryCount <= maxRetryTimes) {
Expand All @@ -376,7 +385,7 @@ private static Cluster initializeCluster(Configuration conf, RpcClient rpcClient
// retry can rebuild the connection.
rpcClient.disconnect(serverNode.uid());

long delayMs = (long) (RETRY_INTERVAL_MS * Math.pow(2, retryCount));
long delayMs = (long) (retryInterval * Math.pow(2, retryCount));
LOG.warn(
"Failed to connect to bootstrap server: {} (retry {}/{}). Retrying in {} ms.",
serverNode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.fluss.cluster.Cluster;
import org.apache.fluss.cluster.ServerNode;
import org.apache.fluss.cluster.ServerType;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.exception.StaleMetadataException;
import org.apache.fluss.rpc.RpcClient;
Expand Down Expand Up @@ -51,11 +52,14 @@ void testInitializeClusterWithRetries() throws Exception {
Configuration configuration = new Configuration();
RpcClient rpcClient =
RpcClient.create(configuration, TestingClientMetricGroup.newInstance(), false);
int times = configuration.getInt(ConfigOptions.CLIENT_METADATA_RETRY_TIMES);
int interval = configuration.getInt(ConfigOptions.CLIENT_METADATA_RETRY_INTERVAL);

// retry lower than max retry count.
AdminReadOnlyGateway gateway = new TestingAdminReadOnlyGateway(2);
Cluster cluster =
MetadataUpdater.tryToInitializeClusterWithRetries(rpcClient, CS_NODE, gateway, 3);
MetadataUpdater.tryToInitializeClusterWithRetries(
rpcClient, CS_NODE, gateway, times, interval);
assertThat(cluster).isNotNull();
assertThat(cluster.getCoordinatorServer()).isEqualTo(CS_NODE);
assertThat(cluster.getAliveTabletServerList()).containsExactly(TS_NODE);
Expand All @@ -65,7 +69,7 @@ void testInitializeClusterWithRetries() throws Exception {
assertThatThrownBy(
() ->
MetadataUpdater.tryToInitializeClusterWithRetries(
rpcClient, CS_NODE, gateway2, 3))
rpcClient, CS_NODE, gateway2, times, interval))
.isInstanceOf(StaleMetadataException.class)
.hasMessageContaining("The metadata is stale.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -873,6 +873,20 @@ public class ConfigOptions {
+ "Since these servers are just used for the initial connection to discover the full cluster membership (which may change dynamically), "
+ "this list need not contain the full set of servers (you may want more than one, though, in case a server is down) ");

public static final ConfigOption<Integer> CLIENT_METADATA_RETRY_TIMES =
key("client.metadata.retry.times")
.intType()
.defaultValue(3)
.withDescription(
"The number of times the client reconnects to retrieve metadata, default is 3.");

public static final ConfigOption<Integer> CLIENT_METADATA_RETRY_INTERVAL =
key("client.metadata.retry.interval")
.intType()
.defaultValue(100)
.withDescription(
"The interval of the client retrieves metadata when reconnecting, default is 100.");

public static final ConfigOption<MemorySize> CLIENT_WRITER_BUFFER_MEMORY_SIZE =
key("client.writer.buffer.memory-size")
.memoryType()
Expand Down
Loading