@@ -104,12 +104,13 @@ public static ZooKeeperClient startZookeeperClient(
104104 new SessionConnectionStateErrorPolicy ());
105105 }
106106
107+ // Configure ZKClientConfig with jute.maxbuffer
108+ ZKClientConfig zkClientConfig ;
107109 Optional <String > configPath =
108110 configuration .getOptional (ConfigOptions .ZOOKEEPER_CONFIG_PATH );
109111 if (configPath .isPresent ()) {
110112 try {
111- ZKClientConfig zkClientConfig = new ZKClientConfig (configPath .get ());
112- curatorFrameworkBuilder .zkClientConfig (zkClientConfig );
113+ zkClientConfig = new ZKClientConfig (configPath .get ());
113114 } catch (QuorumPeerConfig .ConfigException e ) {
114115 LOG .warn ("Fail to load zookeeper client config from path {}" , configPath .get (), e );
115116 throw new RuntimeException (
@@ -118,7 +119,15 @@ public static ZooKeeperClient startZookeeperClient(
118119 configPath .get ()),
119120 e );
120121 }
122+ } else {
123+ zkClientConfig = new ZKClientConfig ();
121124 }
125+
126+ // Set jute.maxbuffer to match RPC frame length limit
127+ int maxBufferSize = configuration .getInt (ConfigOptions .ZOOKEEPER_MAX_BUFFER_SIZE );
128+ zkClientConfig .setProperty ("jute.maxbuffer" , String .valueOf (maxBufferSize ));
129+ LOG .debug ("Set ZooKeeper jute.maxbuffer to {} bytes" , maxBufferSize );
130+ curatorFrameworkBuilder .zkClientConfig (zkClientConfig );
122131 return new ZooKeeperClient (
123132 startZookeeperClient (curatorFrameworkBuilder , fatalErrorHandler ), configuration );
124133 }
0 commit comments