Skip to content

Commit ed42b36

Browse files
authored
[server] Re-register server-node when zookeeper connection is reconnected (#1004)
1 parent 13115e4 commit ed42b36

File tree

5 files changed

+131
-1
lines changed

5 files changed

+131
-1
lines changed

fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorServer.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import com.alibaba.fluss.server.zk.ZooKeeperClient;
4343
import com.alibaba.fluss.server.zk.ZooKeeperUtils;
4444
import com.alibaba.fluss.server.zk.data.CoordinatorAddress;
45+
import com.alibaba.fluss.shaded.zookeeper3.org.apache.zookeeper.KeeperException;
4546
import com.alibaba.fluss.utils.ExceptionUtils;
4647
import com.alibaba.fluss.utils.ExecutorUtils;
4748
import com.alibaba.fluss.utils.concurrent.ExecutorThreadFactory;
@@ -53,6 +54,7 @@
5354
import javax.annotation.Nullable;
5455
import javax.annotation.concurrent.GuardedBy;
5556

57+
import java.time.Duration;
5658
import java.util.ArrayList;
5759
import java.util.Collection;
5860
import java.util.List;
@@ -204,6 +206,7 @@ protected void startServices() throws Exception {
204206
rpcServer.start();
205207

206208
registerCoordinatorLeader();
209+
registerZookeeperClientReconnectedListener();
207210

208211
this.clientMetricGroup = new ClientMetricGroup(metricRegistry, SERVER_NAME);
209212
this.rpcClient = RpcClient.create(conf, clientMetricGroup);
@@ -282,6 +285,47 @@ private void registerCoordinatorLeader() throws Exception {
282285
zkClient.registerCoordinatorLeader(coordinatorAddress);
283286
}
284287

288+
private void registerZookeeperClientReconnectedListener() {
289+
ZooKeeperUtils.registerZookeeperClientReInitSessionListener(
290+
zkClient,
291+
() -> {
292+
// we need to retry to register since although
293+
// zkClient reconnect, the ephemeral node may still exist
294+
// for a while time, retry to wait the ephemeral node removed
295+
// see ZOOKEEPER-2985
296+
long startTime = System.currentTimeMillis();
297+
long retryWaitIntervalMs = Duration.ofSeconds(3).toMillis();
298+
long retryTotalWaitTimeMs = Duration.ofMinutes(1).toMillis();
299+
while (true) {
300+
try {
301+
this.registerCoordinatorLeader();
302+
break;
303+
} catch (KeeperException.NodeExistsException nodeExistsException) {
304+
long elapsedTime = System.currentTimeMillis() - startTime;
305+
if (elapsedTime >= retryTotalWaitTimeMs) {
306+
LOG.error(
307+
"Coordinator Server register to Zookeeper exceeded total retry time of {} ms. "
308+
+ "Aborting registration attempts.",
309+
retryTotalWaitTimeMs);
310+
throw nodeExistsException;
311+
}
312+
313+
LOG.warn(
314+
"Coordinator server already registered in Zookeeper. "
315+
+ "retrying register after {} ms....",
316+
retryWaitIntervalMs);
317+
try {
318+
Thread.sleep(retryWaitIntervalMs);
319+
} catch (InterruptedException interruptedException) {
320+
Thread.currentThread().interrupt();
321+
break;
322+
}
323+
}
324+
}
325+
},
326+
this);
327+
}
328+
285329
private void createDefaultDatabase() {
286330
MetadataManager metadataManager = new MetadataManager(zkClient, conf);
287331
List<String> databases = metadataManager.listDatabases();

fluss-server/src/main/java/com/alibaba/fluss/server/tablet/TabletServer.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,9 @@ protected void startServices() throws Exception {
246246
rpcServer.start();
247247

248248
registerTabletServer();
249+
// when init session, register tablet server again
250+
ZooKeeperUtils.registerZookeeperClientReInitSessionListener(
251+
zkClient, this::registerTabletServer, this);
249252
}
250253
}
251254

fluss-server/src/main/java/com/alibaba/fluss/server/zk/ZooKeeperClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -783,7 +783,7 @@ public List<String> getChildren(String path) throws Exception {
783783
public Optional<Stat> getStat(String path) throws Exception {
784784
try {
785785
Stat stat = zkClient.checkExists().forPath(path);
786-
return Optional.of(stat);
786+
return Optional.ofNullable(stat);
787787
} catch (KeeperException.NoNodeException e) {
788788
return Optional.empty();
789789
}

fluss-server/src/main/java/com/alibaba/fluss/server/zk/ZooKeeperUtils.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,13 @@
2323
import com.alibaba.fluss.shaded.curator5.org.apache.curator.framework.CuratorFramework;
2424
import com.alibaba.fluss.shaded.curator5.org.apache.curator.framework.CuratorFrameworkFactory;
2525
import com.alibaba.fluss.shaded.curator5.org.apache.curator.framework.api.UnhandledErrorListener;
26+
import com.alibaba.fluss.shaded.curator5.org.apache.curator.framework.state.ConnectionState;
27+
import com.alibaba.fluss.shaded.curator5.org.apache.curator.framework.state.ConnectionStateListener;
2628
import com.alibaba.fluss.shaded.curator5.org.apache.curator.framework.state.SessionConnectionStateErrorPolicy;
2729
import com.alibaba.fluss.shaded.curator5.org.apache.curator.retry.ExponentialBackoffRetry;
2830
import com.alibaba.fluss.shaded.zookeeper3.org.apache.zookeeper.client.ZKClientConfig;
2931
import com.alibaba.fluss.shaded.zookeeper3.org.apache.zookeeper.server.quorum.QuorumPeerConfig;
32+
import com.alibaba.fluss.utils.function.ThrowingRunnable;
3033

3134
import org.apache.commons.lang3.StringUtils;
3235
import org.slf4j.Logger;
@@ -148,6 +151,53 @@ public static CuratorFrameworkWithUnhandledErrorListener startZookeeperClient(
148151
return new CuratorFrameworkWithUnhandledErrorListener(cf, unhandledErrorListener);
149152
}
150153

154+
public static void registerZookeeperClientReInitSessionListener(
155+
ZooKeeperClient zooKeeperClient,
156+
ThrowingRunnable<Exception> reInitSessionCallback,
157+
FatalErrorHandler fatalErrorHandler) {
158+
zooKeeperClient
159+
.getCuratorClient()
160+
.getConnectionStateListenable()
161+
.addListener(
162+
new ZookeeperClientSessionReInitListener(
163+
reInitSessionCallback, fatalErrorHandler));
164+
}
165+
166+
private static class ZookeeperClientSessionReInitListener implements ConnectionStateListener {
167+
private final ThrowingRunnable<Exception> reInitSessionCallback;
168+
private final FatalErrorHandler fatalErrorHandler;
169+
private volatile boolean sessionExpired = false;
170+
171+
public ZookeeperClientSessionReInitListener(
172+
ThrowingRunnable<Exception> reInitSessionCallback,
173+
FatalErrorHandler fatalErrorHandler) {
174+
this.reInitSessionCallback = reInitSessionCallback;
175+
this.fatalErrorHandler = fatalErrorHandler;
176+
}
177+
178+
public void stateChanged(
179+
CuratorFramework curatorFramework, ConnectionState connectionState) {
180+
switch (connectionState) {
181+
case LOST:
182+
sessionExpired = true;
183+
break;
184+
case RECONNECTED:
185+
if (sessionExpired) {
186+
LOG.info("Zookeeper session re-initialized.");
187+
try {
188+
reInitSessionCallback.run();
189+
} catch (Exception e) {
190+
fatalErrorHandler.onFatalError(e);
191+
}
192+
sessionExpired = false;
193+
}
194+
break;
195+
default:
196+
break;
197+
}
198+
}
199+
}
200+
151201
/** Creates a ZooKeeper path of the form "/a/b/.../z". */
152202
public static String generateZookeeperPath(String... paths) {
153203
return Arrays.stream(paths)

fluss-server/src/test/java/com/alibaba/fluss/server/ServerTestBase.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,20 @@
2424
import com.alibaba.fluss.server.zk.NOPErrorHandler;
2525
import com.alibaba.fluss.server.zk.ZooKeeperClient;
2626
import com.alibaba.fluss.server.zk.ZooKeeperExtension;
27+
import com.alibaba.fluss.server.zk.data.ZkData.CoordinatorZNode;
28+
import com.alibaba.fluss.server.zk.data.ZkData.ServerIdZNode;
29+
import com.alibaba.fluss.shaded.zookeeper3.org.apache.zookeeper.data.Stat;
2730
import com.alibaba.fluss.testutils.common.AllCallbackWrapper;
2831

2932
import org.junit.jupiter.api.BeforeAll;
3033
import org.junit.jupiter.api.Test;
3134
import org.junit.jupiter.api.extension.RegisterExtension;
3235

36+
import java.time.Duration;
3337
import java.util.List;
38+
import java.util.Optional;
3439

40+
import static com.alibaba.fluss.testutils.common.CommonTestUtils.retry;
3541
import static org.assertj.core.api.Assertions.assertThat;
3642
import static org.assertj.core.api.Assertions.assertThatThrownBy;
3743

@@ -82,6 +88,28 @@ void testExceptionWhenRunServer() throws Exception {
8288
server.close();
8389
}
8490

91+
@Test
92+
void registerServerNodeWhenZkClientReInitSession() throws Exception {
93+
ServerBase server = getServer();
94+
// get the EPHEMERAL node of server
95+
String path =
96+
server instanceof CoordinatorServer
97+
? CoordinatorZNode.path()
98+
: ServerIdZNode.path(server.conf.getInt(ConfigOptions.TABLET_SERVER_ID));
99+
100+
long oldNodeCtime = zookeeperClient.getStat(path).get().getCtime();
101+
// let's restart zk to mock zk client re-init session
102+
ZOO_KEEPER_EXTENSION_WRAPPER.getCustomExtension().restart();
103+
retry(
104+
Duration.ofMinutes(2),
105+
() -> {
106+
Optional<Stat> optionalStat = zookeeperClient.getStat(path);
107+
assertThat(optionalStat).isPresent();
108+
Stat stat = optionalStat.get();
109+
assertThat(stat.getCtime()).isGreaterThan(oldNodeCtime);
110+
});
111+
}
112+
85113
/** Create a configuration with Zookeeper address setting. */
86114
protected static Configuration createConfiguration() {
87115
Configuration configuration = new Configuration();
@@ -92,6 +120,11 @@ protected static Configuration createConfiguration() {
92120
ConfigOptions.BIND_LISTENERS, "CLIENT://localhost:0,FLUSS://localhost:0");
93121
configuration.setString(ConfigOptions.ADVERTISED_LISTENERS, "CLIENT://198.168.0.1:100");
94122
configuration.set(ConfigOptions.REMOTE_DATA_DIR, "/tmp/fluss/remote-data");
123+
124+
// set to small timout to verify the case that zk session is timeout
125+
configuration.set(ConfigOptions.ZOOKEEPER_SESSION_TIMEOUT, Duration.ofMillis(500));
126+
configuration.set(ConfigOptions.ZOOKEEPER_CONNECTION_TIMEOUT, Duration.ofMillis(500));
127+
configuration.set(ConfigOptions.ZOOKEEPER_RETRY_WAIT, Duration.ofMillis(500));
95128
return configuration;
96129
}
97130

0 commit comments

Comments
 (0)