diff --git a/fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java b/fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java index 5cb7140445..0431efeeb0 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java @@ -65,13 +65,12 @@ public class MetadataUpdater { private static final Logger LOG = LoggerFactory.getLogger(MetadataUpdater.class); - private static final int MAX_RETRY_TIMES = 3; - private static final int RETRY_INTERVAL_MS = 100; - private final Configuration conf; private final RpcClient rpcClient; private final Set unavailableTabletServerIds = new CopyOnWriteArraySet<>(); protected volatile Cluster cluster; + private final int retryTimes; + private final int retryInterval; public MetadataUpdater(Configuration conf, RpcClient rpcClient) { this(rpcClient, conf, initializeCluster(conf, rpcClient)); @@ -82,6 +81,8 @@ public MetadataUpdater(RpcClient rpcClient, Configuration conf, Cluster cluster) this.rpcClient = rpcClient; this.conf = conf; this.cluster = cluster; + this.retryTimes = conf.get(ConfigOptions.CLIENT_METADATA_RETRY_TIMES); + this.retryInterval = conf.get(ConfigOptions.CLIENT_METADATA_RETRY_INTERVAL); } public Cluster getCluster() { @@ -107,7 +108,7 @@ public Optional getBucketLocation(TableBucket tableBucket) { public int leaderFor(TablePath tablePath, TableBucket tableBucket) { Integer serverNode = cluster.leaderFor(tableBucket); if (serverNode == null) { - for (int i = 0; i < MAX_RETRY_TIMES; i++) { + for (int i = 0; i < this.retryTimes; i++) { // check if bucket is for a partition if (tableBucket.getPartitionId() != null) { updateMetadata( @@ -126,7 +127,7 @@ public int leaderFor(TablePath tablePath, TableBucket tableBucket) { if (serverNode == null) { throw new FlussRuntimeException( "Leader not found after retry " - + MAX_RETRY_TIMES + + this.retryTimes + " times for table bucket: " + tableBucket); } @@ -300,6 +301,9 @@ public void updateMetadata( private static Cluster initializeCluster(Configuration conf, RpcClient rpcClient) { List inetSocketAddresses = ClientUtils.parseAndValidateAddresses(conf.get(ConfigOptions.BOOTSTRAP_SERVERS)); + Integer retryTimes = conf.get(ConfigOptions.CLIENT_METADATA_RETRY_TIMES); + Integer retryInterval = conf.get(ConfigOptions.CLIENT_METADATA_RETRY_INTERVAL); + Cluster cluster = null; Exception lastException = null; for (InetSocketAddress address : inetSocketAddresses) { @@ -319,7 +323,11 @@ private static Cluster initializeCluster(Configuration conf, RpcClient rpcClient // if there is only one bootstrap server, we can retry to connect to it. cluster = tryToInitializeClusterWithRetries( - rpcClient, serverNode, adminReadOnlyGateway, MAX_RETRY_TIMES); + rpcClient, + serverNode, + adminReadOnlyGateway, + retryTimes, + retryInterval); } else { cluster = tryToInitializeCluster(adminReadOnlyGateway); break; @@ -356,7 +364,8 @@ private static Cluster initializeCluster(Configuration conf, RpcClient rpcClient RpcClient rpcClient, ServerNode serverNode, AdminReadOnlyGateway gateway, - int maxRetryTimes) + int maxRetryTimes, + int retryInterval) throws Exception { int retryCount = 0; while (retryCount <= maxRetryTimes) { @@ -376,7 +385,7 @@ private static Cluster initializeCluster(Configuration conf, RpcClient rpcClient // retry can rebuild the connection. rpcClient.disconnect(serverNode.uid()); - long delayMs = (long) (RETRY_INTERVAL_MS * Math.pow(2, retryCount)); + long delayMs = (long) (retryInterval * Math.pow(2, retryCount)); LOG.warn( "Failed to connect to bootstrap server: {} (retry {}/{}). Retrying in {} ms.", serverNode, diff --git a/fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterTest.java b/fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterTest.java index 6717cfe5ae..4b1fffd03c 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterTest.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterTest.java @@ -20,6 +20,7 @@ import org.apache.fluss.cluster.Cluster; import org.apache.fluss.cluster.ServerNode; import org.apache.fluss.cluster.ServerType; +import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; import org.apache.fluss.exception.StaleMetadataException; import org.apache.fluss.rpc.RpcClient; @@ -51,11 +52,14 @@ void testInitializeClusterWithRetries() throws Exception { Configuration configuration = new Configuration(); RpcClient rpcClient = RpcClient.create(configuration, TestingClientMetricGroup.newInstance(), false); + int times = configuration.getInt(ConfigOptions.CLIENT_METADATA_RETRY_TIMES); + int interval = configuration.getInt(ConfigOptions.CLIENT_METADATA_RETRY_INTERVAL); // retry lower than max retry count. AdminReadOnlyGateway gateway = new TestingAdminReadOnlyGateway(2); Cluster cluster = - MetadataUpdater.tryToInitializeClusterWithRetries(rpcClient, CS_NODE, gateway, 3); + MetadataUpdater.tryToInitializeClusterWithRetries( + rpcClient, CS_NODE, gateway, times, interval); assertThat(cluster).isNotNull(); assertThat(cluster.getCoordinatorServer()).isEqualTo(CS_NODE); assertThat(cluster.getAliveTabletServerList()).containsExactly(TS_NODE); @@ -65,7 +69,7 @@ void testInitializeClusterWithRetries() throws Exception { assertThatThrownBy( () -> MetadataUpdater.tryToInitializeClusterWithRetries( - rpcClient, CS_NODE, gateway2, 3)) + rpcClient, CS_NODE, gateway2, times, interval)) .isInstanceOf(StaleMetadataException.class) .hasMessageContaining("The metadata is stale."); } diff --git a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java index 43c2445544..f99a03857e 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java @@ -873,6 +873,20 @@ public class ConfigOptions { + "Since these servers are just used for the initial connection to discover the full cluster membership (which may change dynamically), " + "this list need not contain the full set of servers (you may want more than one, though, in case a server is down) "); + public static final ConfigOption CLIENT_METADATA_RETRY_TIMES = + key("client.metadata.retry.times") + .intType() + .defaultValue(3) + .withDescription( + "The number of times the client reconnects to retrieve metadata, default is 3."); + + public static final ConfigOption CLIENT_METADATA_RETRY_INTERVAL = + key("client.metadata.retry.interval") + .intType() + .defaultValue(100) + .withDescription( + "The interval of the client retrieves metadata when reconnecting, default is 100."); + public static final ConfigOption CLIENT_WRITER_BUFFER_MEMORY_SIZE = key("client.writer.buffer.memory-size") .memoryType() diff --git a/website/docs/maintenance/configuration.md b/website/docs/maintenance/configuration.md index f6988aacef..1e4d385afa 100644 --- a/website/docs/maintenance/configuration.md +++ b/website/docs/maintenance/configuration.md @@ -26,30 +26,31 @@ during the Fluss cluster working. ## Common -| Option | Type | Default | Description | -|-----------------------------------------------------|--------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| bind.listeners | String | (None) | The network address and port to which the server binds for accepting connections. This defines the interface and port where the server will listen for incoming requests. The format is `{listener_name}://{host}:{port}`, and multiple addresses can be specified, separated by commas. Use `0.0.0.0` for the `host` to bind to all available interfaces which is dangerous on production and not suggested for production usage. The `listener_name` serves as an identifier for the address in the configuration. For example, `internal.listener.name` specifies the address used for internal server communication. If multiple addresses are configured, ensure that the `listener_name` values are unique. | -| advertised.listeners | String | (None) | The externally advertised address and port for client connections. Required in distributed environments when the bind address is not publicly reachable. Format matches `bind.listeners` (`{listener_name}://{host}:{port}`). Defaults to the value of `bind.listeners` if not explicitly configured. | -| internal.listener.name | String | FLUSS | The listener for server internal communication. | -| security.protocol.map | Map | (none) | A map defining the authentication protocol for each listener. The format is `listenerName1:protocol1,listenerName2:protocol2`, e.g., `INTERNAL:PLAINTEXT,CLIENT:GSSAPI`. Each listener can be associated with a specific authentication protocol. Listeners not included in the map will use PLAINTEXT by default, which does not require authentication. | -| `security.${protocol}.*` | String | (none) | Protocol-specific configuration properties. For example, security.sasl.jaas.config for SASL authentication settings. | -| default.bucket.number | Integer | 1 | The default number of buckets for a table in Fluss cluster. It's a cluster-level parameter and all the tables without specifying bucket number in the cluster will use the value as the bucket number. | -| default.replication.factor | Integer | 1 | The default replication factor for the log of a table in Fluss cluster. It's a cluster-level parameter, and all the tables without specifying replication factor in the cluster will use the value as replication factor. | -| remote.data.dir | String | (None) | The directory used for storing the kv snapshot data files and remote log for log tiered storage in a Fluss supported filesystem. | -| remote.fs.write-buffer-size | MemorySize | 4kb | The default size of the write buffer for writing the local files to remote file systems. | -| plugin.classloader.parent-first-patterns.additional | List<String> | (None) | A (semicolon-separated) list of patterns that specifies which classes should always be resolved through the plugin parent ClassLoader first. A pattern is a simple prefix that is checked against the fully qualified class name. These patterns are appended to `classloader.parent-first-patterns.default`. | -| plugin.classloader.parent-first-patterns.default | String | java.,
org.apache.fluss.,
javax.annotation.,
org.slf4j,
org.apache.log4j,
org.apache.logging,
org.apache.commons.logging,
ch.qos.logback | A (semicolon-separated) list of patterns that specifies which classes should always be resolved through the plugin parent ClassLoader first. A pattern is a simple prefix that is checked against the fully qualified class name. This setting should generally not be modified. | -| auto-partition.check.interval | Duration | 10min | The interval of auto partition check. The default value is 10 minutes. | -| allow.create.log.tables | Boolean | true | Whether to allow creation of log tables. When set to false, attempts to create log tables (tables without primary key) will be rejected. The default value is true. | -| allow.create.kv.tables | Boolean | true | Whether to allow creation of kv tables (primary key tables). When set to false, attempts to create kv tables (tables with primary key) will be rejected. The default value is true. | -| max.partition.num | Integer | 1000 | Limits the maximum number of partitions that can be created for a partitioned table to avoid creating too many partitions. | -| max.bucket.num | Integer | 128000 | The maximum number of buckets that can be created for a table. The default value is 128000. | -| acl.notification.expiration-time | Duration | 15min | The duration for which ACL notifications are valid before they expire. This configuration determines the time window during which an ACL notification is considered active. After this duration, the notification will no longer be valid and will be discarded. The default value is 15 minutes. This setting is important to ensure that ACL changes are propagated in a timely manner and do not remain active longer than necessary. | -| authorizer.enabled | Boolean | false | Specifies whether to enable the authorization feature. If enabled, access control is enforced based on the authorization rules defined in the configuration. If disabled, all operations and resources are accessible to all users. | -| authorizer.type | String | default | Specifies the type of authorizer to be used for access control. This value corresponds to the identifier of the authorization plugin. The default value is `default`, which indicates the built-in authorizer implementation. Custom authorizers can be implemented by providing a matching plugin identifier. | -| super.users | String | (None) | A semicolon-separated list of superusers who have unrestricted access to all operations and resources. Note that the delimiter is semicolon since SSL user names may contain comma, and each super user should be specified in the format `principal_type:principal_name`, e.g., `User:admin;User:bob`. This configuration is critical for defining administrative privileges in the system. | -| server.io-pool.size | Integer | 10 | The size of the IO thread pool to run blocking operations for both coordinator and tablet servers. This includes discard unnecessary snapshot files, transfer kv snapshot files, and transfer remote log files. Increase this value if you experience slow IO operations. The default value is 10. | - +| Option | Type | Default | Description | +|-----------------------------------------------------|--------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| bind.listeners | String | (None) | The network address and port to which the server binds for accepting connections. This defines the interface and port where the server will listen for incoming requests. The format is `{listener_name}://{host}:{port}`, and multiple addresses can be specified, separated by commas. Use `0.0.0.0` for the `host` to bind to all available interfaces which is dangerous on production and not suggested for production usage. The `listener_name` serves as an identifier for the address in the configuration. For example, `internal.listener.name` specifies the address used for internal server communication. If multiple addresses are configured, ensure that the `listener_name` values are unique. | +| advertised.listeners | String | (None) | The externally advertised address and port for client connections. Required in distributed environments when the bind address is not publicly reachable. Format matches `bind.listeners` (`{listener_name}://{host}:{port}`). Defaults to the value of `bind.listeners` if not explicitly configured. | +| internal.listener.name | String | FLUSS | The listener for server internal communication. | +| security.protocol.map | Map | (none) | A map defining the authentication protocol for each listener. The format is `listenerName1:protocol1,listenerName2:protocol2`, e.g., `INTERNAL:PLAINTEXT,CLIENT:GSSAPI`. Each listener can be associated with a specific authentication protocol. Listeners not included in the map will use PLAINTEXT by default, which does not require authentication. | +| `security.${protocol}.*` | String | (none) | Protocol-specific configuration properties. For example, security.sasl.jaas.config for SASL authentication settings. | +| default.bucket.number | Integer | 1 | The default number of buckets for a table in Fluss cluster. It's a cluster-level parameter and all the tables without specifying bucket number in the cluster will use the value as the bucket number. | +| default.replication.factor | Integer | 1 | The default replication factor for the log of a table in Fluss cluster. It's a cluster-level parameter, and all the tables without specifying replication factor in the cluster will use the value as replication factor. | +| remote.data.dir | String | (None) | The directory used for storing the kv snapshot data files and remote log for log tiered storage in a Fluss supported filesystem. | +| remote.fs.write-buffer-size | MemorySize | 4kb | The default size of the write buffer for writing the local files to remote file systems. | +| plugin.classloader.parent-first-patterns.additional | List<String> | (None) | A (semicolon-separated) list of patterns that specifies which classes should always be resolved through the plugin parent ClassLoader first. A pattern is a simple prefix that is checked against the fully qualified class name. These patterns are appended to `classloader.parent-first-patterns.default`. | +| plugin.classloader.parent-first-patterns.default | String | java.,
org.apache.fluss.,
javax.annotation.,
org.slf4j,
org.apache.log4j,
org.apache.logging,
org.apache.commons.logging,
ch.qos.logback | A (semicolon-separated) list of patterns that specifies which classes should always be resolved through the plugin parent ClassLoader first. A pattern is a simple prefix that is checked against the fully qualified class name. This setting should generally not be modified. | +| auto-partition.check.interval | Duration | 10min | The interval of auto partition check. The default value is 10 minutes. | +| allow.create.log.tables | Boolean | true | Whether to allow creation of log tables. When set to false, attempts to create log tables (tables without primary key) will be rejected. The default value is true. | +| allow.create.kv.tables | Boolean | true | Whether to allow creation of kv tables (primary key tables). When set to false, attempts to create kv tables (tables with primary key) will be rejected. The default value is true. | +| max.partition.num | Integer | 1000 | Limits the maximum number of partitions that can be created for a partitioned table to avoid creating too many partitions. | +| max.bucket.num | Integer | 128000 | The maximum number of buckets that can be created for a table. The default value is 128000. | +| acl.notification.expiration-time | Duration | 15min | The duration for which ACL notifications are valid before they expire. This configuration determines the time window during which an ACL notification is considered active. After this duration, the notification will no longer be valid and will be discarded. The default value is 15 minutes. This setting is important to ensure that ACL changes are propagated in a timely manner and do not remain active longer than necessary. | +| authorizer.enabled | Boolean | false | Specifies whether to enable the authorization feature. If enabled, access control is enforced based on the authorization rules defined in the configuration. If disabled, all operations and resources are accessible to all users. | +| authorizer.type | String | default | Specifies the type of authorizer to be used for access control. This value corresponds to the identifier of the authorization plugin. The default value is `default`, which indicates the built-in authorizer implementation. Custom authorizers can be implemented by providing a matching plugin identifier. | +| super.users | String | (None) | A semicolon-separated list of superusers who have unrestricted access to all operations and resources. Note that the delimiter is semicolon since SSL user names may contain comma, and each super user should be specified in the format `principal_type:principal_name`, e.g., `User:admin;User:bob`. This configuration is critical for defining administrative privileges in the system. | +| server.io-pool.size | Integer | 10 | The size of the IO thread pool to run blocking operations for both coordinator and tablet servers. This includes discard unnecessary snapshot files, transfer kv snapshot files, and transfer remote log files. Increase this value if you experience slow IO operations. The default value is 10. | +| client.metadata.retry.times | Integer | 3 | The number of times the client reconnects to retrieve metadata, default is 3. | +| client.metadata.retry.interval | Integer | 100 | The interval of the client retrieves metadata when reconnecting, default is 100. | ## CoordinatorServer