Skip to content

Commit 9c7b8da

Browse files
authored
[server] Add ConfigOption to pass config path to zookeeper client (#909)
1 parent 2232363 commit 9c7b8da

File tree

5 files changed

+85
-0
lines changed

5 files changed

+85
-0
lines changed

fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -486,6 +486,15 @@ public class ConfigOptions {
486486
+ "Under certain configurations EnsembleTracking can lead to setting of ZooKeeper connection string "
487487
+ "with unresolvable hostnames.");
488488

489+
public static final ConfigOption<String> ZOOKEEPER_CONFIG_PATH =
490+
key("zookeeper.client.config-path")
491+
.stringType()
492+
.noDefaultValue()
493+
.withDescription(
494+
"The file path from which the ZooKeeper client reads its configuration. "
495+
+ "This allows each ZooKeeper client instance to load its own configuration file, "
496+
+ "instead of relying on shared JVM-level environment settings. "
497+
+ "This enables fine-grained control over ZooKeeper client behavior.");
489498
// ------------------------------------------------------------------------
490499
// ConfigOptions for Log
491500
// ------------------------------------------------------------------------

fluss-server/src/main/java/com/alibaba/fluss/server/zk/ZooKeeperUtils.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,15 @@
2525
import com.alibaba.fluss.shaded.curator5.org.apache.curator.framework.api.UnhandledErrorListener;
2626
import com.alibaba.fluss.shaded.curator5.org.apache.curator.framework.state.SessionConnectionStateErrorPolicy;
2727
import com.alibaba.fluss.shaded.curator5.org.apache.curator.retry.ExponentialBackoffRetry;
28+
import com.alibaba.fluss.shaded.zookeeper3.org.apache.zookeeper.client.ZKClientConfig;
29+
import com.alibaba.fluss.shaded.zookeeper3.org.apache.zookeeper.server.quorum.QuorumPeerConfig;
2830

2931
import org.apache.commons.lang3.StringUtils;
3032
import org.slf4j.Logger;
3133
import org.slf4j.LoggerFactory;
3234

3335
import java.util.Arrays;
36+
import java.util.Optional;
3437
import java.util.stream.Collectors;
3538

3639
import static com.alibaba.fluss.utils.Preconditions.checkNotNull;
@@ -96,6 +99,22 @@ public static ZooKeeperClient startZookeeperClient(
9699
curatorFrameworkBuilder.connectionStateErrorPolicy(
97100
new SessionConnectionStateErrorPolicy());
98101
}
102+
103+
Optional<String> configPath =
104+
configuration.getOptional(ConfigOptions.ZOOKEEPER_CONFIG_PATH);
105+
if (configPath.isPresent()) {
106+
try {
107+
ZKClientConfig zkClientConfig = new ZKClientConfig(configPath.get());
108+
curatorFrameworkBuilder.zkClientConfig(zkClientConfig);
109+
} catch (QuorumPeerConfig.ConfigException e) {
110+
LOG.warn("Fail to load zookeeper client config from path {}", configPath.get(), e);
111+
throw new RuntimeException(
112+
String.format(
113+
"Fail to load zookeeper client config from path %s",
114+
configPath.get()),
115+
e);
116+
}
117+
}
99118
return new ZooKeeperClient(
100119
startZookeeperClient(curatorFrameworkBuilder, fatalErrorHandler));
101120
}

fluss-server/src/test/java/com/alibaba/fluss/server/zk/ZooKeeperClientTest.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
package com.alibaba.fluss.server.zk;
1818

1919
import com.alibaba.fluss.cluster.Endpoint;
20+
import com.alibaba.fluss.config.ConfigOptions;
21+
import com.alibaba.fluss.config.Configuration;
2022
import com.alibaba.fluss.metadata.Schema;
2123
import com.alibaba.fluss.metadata.SchemaInfo;
2224
import com.alibaba.fluss.metadata.TableBucket;
@@ -30,6 +32,10 @@
3032
import com.alibaba.fluss.server.zk.data.TableAssignment;
3133
import com.alibaba.fluss.server.zk.data.TableRegistration;
3234
import com.alibaba.fluss.server.zk.data.TabletServerRegistration;
35+
import com.alibaba.fluss.shaded.curator5.org.apache.curator.CuratorZookeeperClient;
36+
import com.alibaba.fluss.shaded.curator5.org.apache.curator.framework.CuratorFramework;
37+
import com.alibaba.fluss.shaded.zookeeper3.org.apache.zookeeper.ZooKeeper;
38+
import com.alibaba.fluss.shaded.zookeeper3.org.apache.zookeeper.client.ZKClientConfig;
3339
import com.alibaba.fluss.testutils.common.AllCallbackWrapper;
3440
import com.alibaba.fluss.types.DataTypes;
3541
import com.alibaba.fluss.utils.types.Tuple2;
@@ -49,6 +55,7 @@
4955
import java.util.Set;
5056

5157
import static org.assertj.core.api.Assertions.assertThat;
58+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
5259

5360
/** Test for {@link ZooKeeperClient}. */
5461
class ZooKeeperClientTest {
@@ -371,4 +378,34 @@ void testPartition() throws Exception {
371378
partitions = zookeeperClient.getPartitions(tablePath);
372379
assertThat(partitions).containsExactly("p2");
373380
}
381+
382+
@Test
383+
void testZookeeperConfigPath() throws Exception {
384+
final Configuration config = new Configuration();
385+
config.setString(
386+
ConfigOptions.ZOOKEEPER_ADDRESS,
387+
ZOO_KEEPER_EXTENSION_WRAPPER.getCustomExtension().getConnectString());
388+
config.setString(ConfigOptions.ZOOKEEPER_CONFIG_PATH, "./no-file.properties");
389+
assertThatThrownBy(
390+
() -> ZooKeeperUtils.startZookeeperClient(config, NOPErrorHandler.INSTANCE))
391+
.isExactlyInstanceOf(RuntimeException.class)
392+
.hasMessageContaining("Fail to load zookeeper client config from path");
393+
394+
config.setString(
395+
ConfigOptions.ZOOKEEPER_CONFIG_PATH,
396+
getClass().getClassLoader().getResource("zk.properties").getPath());
397+
try (ZooKeeperClient zookeeperClient =
398+
ZooKeeperUtils.startZookeeperClient(config, NOPErrorHandler.INSTANCE);
399+
CuratorFramework curatorClient = zookeeperClient.getCuratorClient();
400+
CuratorZookeeperClient curatorZookeeperClient = curatorClient.getZookeeperClient();
401+
ZooKeeper zooKeeper = curatorZookeeperClient.getZooKeeper()) {
402+
ZKClientConfig clientConfig = zooKeeper.getClientConfig();
403+
assertThat(clientConfig.getProperty(ZKClientConfig.ENABLE_CLIENT_SASL_KEY))
404+
.isEqualTo("true");
405+
assertThat(clientConfig.getProperty(ZKClientConfig.LOGIN_CONTEXT_NAME_KEY))
406+
.isEqualTo("ZookeeperClient");
407+
assertThat(clientConfig.getProperty(ZKClientConfig.ZK_SASL_CLIENT_USERNAME))
408+
.isEqualTo("zookeeper2");
409+
}
410+
}
374411
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
#
2+
# Copyright (c) 2025 Alibaba Group Holding Ltd.
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
#
16+
17+
zookeeper.sasl.client=true
18+
zookeeper.sasl.clientconfig=ZookeeperClient
19+
zookeeper.sasl.client.username=zookeeper2

website/docs/maintenance/configuration.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ during the Fluss cluster working.
9797
| zookeeper.client.max-retry-attempts | Integer | 3 | Defines the number of connection retries before the client gives up. |
9898
| zookeeper.client.tolerate-suspended-connections | Boolean | false | Defines whether a suspended ZooKeeper connection will be treated as an error that causes the leader information to be invalidated or not. In case you set this option to %s, Fluss will wait until a ZooKeeper connection is marked as lost before it revokes the leadership of components. This has the effect that Fluss is more resilient against temporary connection instabilities at the cost of running more likely into timing issues with ZooKeeper. |
9999
| zookeeper.client.ensemble-tracker | Boolean | true | Defines whether Curator should enable ensemble tracker. This can be useful in certain scenarios in which CuratorFramework is accessing to ZK clusters via load balancer or Virtual IPs. Default Curator EnsembleTracking logic watches CuratorEventType.GET_CONFIG events and changes ZooKeeper connection string. It is not desired behaviour when ZooKeeper is running under the Virtual IPs. Under certain configurations EnsembleTracking can lead to setting of ZooKeeper connection string with unresolvable hostnames. |
100+
| zookeeper.client.config-path | String | (None) | The file path from which the ZooKeeper client reads its configuration. This allows each ZooKeeper client instance to load its own configuration file, instead of relying on shared JVM-level environment settings. This enables fine-grained control over ZooKeeper client behavior. |
100101

101102
## Netty
102103

0 commit comments

Comments
 (0)