Skip to content

Commit 23042f3

Browse files
authored
[zk] Allow to set jute.maxbuffer for zk client (#2048)
1 parent 168a092 commit 23042f3

File tree

2 files changed

+20
-1
lines changed

2 files changed

+20
-1
lines changed

fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -572,6 +572,16 @@ public class ConfigOptions {
572572
+ "This allows each ZooKeeper client instance to load its own configuration file, "
573573
+ "instead of relying on shared JVM-level environment settings. "
574574
+ "This enables fine-grained control over ZooKeeper client behavior.");
575+
576+
public static final ConfigOption<Integer> ZOOKEEPER_MAX_BUFFER_SIZE =
577+
key("zookeeper.client.max-buffer-size")
578+
.intType()
579+
.defaultValue(100 * 1024 * 1024) // 100MB
580+
.withDescription(
581+
"The maximum buffer size (in bytes) for ZooKeeper client. "
582+
+ "This corresponds to the jute.maxbuffer property. "
583+
+ "Default is 100MB to match the RPC frame length limit.");
584+
575585
// ------------------------------------------------------------------------
576586
// ConfigOptions for Log
577587
// ------------------------------------------------------------------------

fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperUtils.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import java.util.Optional;
4141
import java.util.stream.Collectors;
4242

43+
import static org.apache.fluss.shaded.zookeeper3.org.apache.zookeeper.common.ZKConfig.JUTE_MAXBUFFER;
4344
import static org.apache.fluss.utils.Preconditions.checkNotNull;
4445

4546
/** Class containing helper functions to interact with ZooKeeper. */
@@ -104,11 +105,12 @@ public static ZooKeeperClient startZookeeperClient(
104105
new SessionConnectionStateErrorPolicy());
105106
}
106107

108+
ZKClientConfig zkClientConfig;
107109
Optional<String> configPath =
108110
configuration.getOptional(ConfigOptions.ZOOKEEPER_CONFIG_PATH);
109111
if (configPath.isPresent()) {
110112
try {
111-
ZKClientConfig zkClientConfig = new ZKClientConfig(configPath.get());
113+
zkClientConfig = new ZKClientConfig(configPath.get());
112114
curatorFrameworkBuilder.zkClientConfig(zkClientConfig);
113115
} catch (QuorumPeerConfig.ConfigException e) {
114116
LOG.warn("Fail to load zookeeper client config from path {}", configPath.get(), e);
@@ -118,7 +120,14 @@ public static ZooKeeperClient startZookeeperClient(
118120
configPath.get()),
119121
e);
120122
}
123+
} else {
124+
zkClientConfig = new ZKClientConfig();
121125
}
126+
// set jute.max buffer
127+
zkClientConfig.setProperty(
128+
JUTE_MAXBUFFER,
129+
String.valueOf(configuration.getInt(ConfigOptions.ZOOKEEPER_MAX_BUFFER_SIZE)));
130+
122131
return new ZooKeeperClient(
123132
startZookeeperClient(curatorFrameworkBuilder, fatalErrorHandler), configuration);
124133
}

0 commit comments

Comments
 (0)