Skip to content

Commit 45aacb1

Browse files
committed
feat: Enhancing the robustness of camellia-discovery code (#478)
1 parent b31b37b commit 45aacb1

6 files changed

Lines changed: 75 additions & 26 deletions

File tree

camellia-core/src/main/java/com/netease/nim/camellia/core/discovery/AbstractCamelliaDiscovery.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@
33
import org.slf4j.Logger;
44
import org.slf4j.LoggerFactory;
55

6+
import java.util.ArrayList;
67
import java.util.HashSet;
8+
import java.util.List;
79
import java.util.Set;
810

911
/**
@@ -15,7 +17,8 @@ public abstract class AbstractCamelliaDiscovery implements CamelliaDiscovery {
1517
private final Set<Callback> callbackSet = new HashSet<>();
1618

1719
public void invokeAddCallback(ServerNode server) {
18-
for (Callback callback : callbackSet) {
20+
List<Callback> callbacks = getCallbackSnapshot();
21+
for (Callback callback : callbacks) {
1922
try {
2023
callback.add(server);
2124
} catch (Exception e) {
@@ -25,7 +28,8 @@ public void invokeAddCallback(ServerNode server) {
2528
}
2629

2730
public void invokeRemoveCallback(ServerNode server) {
28-
for (Callback callback : callbackSet) {
31+
List<Callback> callbacks = getCallbackSnapshot();
32+
for (Callback callback : callbacks) {
2933
try {
3034
callback.remove(server);
3135
} catch (Exception e) {
@@ -47,4 +51,10 @@ public final void clearCallback(Callback callback) {
4751
callbackSet.remove(callback);
4852
}
4953
}
54+
55+
private List<Callback> getCallbackSnapshot() {
56+
synchronized (callbackSet) {
57+
return new ArrayList<>(callbackSet);
58+
}
59+
}
5060
}

camellia-core/src/main/java/com/netease/nim/camellia/core/discovery/DetectedLocalConfCamelliaDiscovery.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public DetectedLocalConfCamelliaDiscovery(List<ServerNode> list, int detectInter
3737
}
3838

3939
private void init(List<ServerNode> list, int detectIntervalSeconds) {
40-
this.originalList = list;
40+
this.originalList = new ArrayList<>(list);
4141
checkAndUpdate();
4242
if (aliveList.isEmpty()) {
4343
throw new IllegalArgumentException("all node is not alive");

camellia-core/src/main/java/com/netease/nim/camellia/core/discovery/LocalConfCamelliaDiscovery.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.netease.nim.camellia.core.discovery;
22

3+
import java.util.ArrayList;
34
import java.util.List;
45

56
/**
@@ -9,12 +10,12 @@ public class LocalConfCamelliaDiscovery implements CamelliaDiscovery {
910
private final List<ServerNode> list;
1011

1112
public LocalConfCamelliaDiscovery(List<ServerNode> list) {
12-
this.list = list;
13+
this.list = new ArrayList<>(list);
1314
}
1415

1516
@Override
1617
public List<ServerNode> findAll() {
17-
return list;
18+
return new ArrayList<>(list);
1819
}
1920

2021
@Override

camellia-toolkits/camellia-naming/camellia-naming-nacos/src/main/java/com/netease/nim/camellia/naming/nacos/CamelliaNacosNamingService.java

Lines changed: 34 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -90,12 +90,12 @@ public String subscribe(String serviceName, ICamelliaNamingCallback callback) {
9090
EventListener eventListener = event -> {
9191
try {
9292
boolean change = false;
93-
synchronized (instanceMap) {
94-
if (event instanceof NamingChangeEvent) {
95-
if (((NamingChangeEvent) event).isAdded()) {
96-
Set<InstanceInfo> set = CamelliaMapUtils.computeIfAbsent(instanceMap, serviceName, k -> new HashSet<>());
97-
Set<InstanceInfo> added = toSet(((NamingChangeEvent) event).getAddedInstances());
98-
set.addAll(added);
93+
if (event instanceof NamingChangeEvent) {
94+
NamingChangeEvent namingChangeEvent = (NamingChangeEvent) event;
95+
if (namingChangeEvent.isAdded()) {
96+
Set<InstanceInfo> added = toSet(namingChangeEvent.getAddedInstances());
97+
if (!added.isEmpty()) {
98+
updateCachedInstances(serviceName, added, true);
9999
if (logger.isDebugEnabled()) {
100100
logger.debug("instance info added, list = {}", JSONObject.toJSON(added));
101101
}
@@ -108,10 +108,11 @@ public String subscribe(String serviceName, ICamelliaNamingCallback callback) {
108108
}
109109
change = true;
110110
}
111-
if (((NamingChangeEvent) event).isRemoved()) {
112-
Set<InstanceInfo> set = CamelliaMapUtils.computeIfAbsent(instanceMap, serviceName, k -> new HashSet<>());
113-
Set<InstanceInfo> removed = toSet(((NamingChangeEvent) event).getRemovedInstances());
114-
set.removeAll(removed);
111+
}
112+
if (namingChangeEvent.isRemoved()) {
113+
Set<InstanceInfo> removed = toSet(namingChangeEvent.getRemovedInstances());
114+
if (!removed.isEmpty()) {
115+
updateCachedInstances(serviceName, removed, false);
115116
if (logger.isDebugEnabled()) {
116117
logger.debug("instance info removed, list = {}", JSONObject.toJSON(removed));
117118
}
@@ -222,7 +223,7 @@ private void reloadAll() {
222223
}
223224

224225
private void reloadAndNotify(String serviceName) throws NacosException {
225-
Set<InstanceInfo> oldSet = new HashSet<>(instanceMap.get(serviceName));
226+
Set<InstanceInfo> oldSet = getInstanceSetSnapshot(serviceName);
226227
Set<InstanceInfo> newSet = reload(serviceName);
227228
Set<InstanceInfo> added = new HashSet<>(newSet);
228229
added.removeAll(oldSet);
@@ -267,6 +268,28 @@ private Set<InstanceInfo> reload(String serviceName) throws NacosException {
267268
return set;
268269
}
269270

271+
private Set<InstanceInfo> getInstanceSetSnapshot(String serviceName) {
272+
Set<InstanceInfo> set = instanceMap.get(serviceName);
273+
if (set == null) {
274+
return new HashSet<>();
275+
}
276+
return new HashSet<>(set);
277+
}
278+
279+
private void updateCachedInstances(String serviceName, Collection<InstanceInfo> instances, boolean add) {
280+
synchronized (instanceMap) {
281+
Set<InstanceInfo> current = instanceMap.get(serviceName);
282+
Set<InstanceInfo> newSet = current == null ? new HashSet<>() : new HashSet<>(current);
283+
if (add) {
284+
newSet.addAll(instances);
285+
} else {
286+
newSet.removeAll(instances);
287+
}
288+
instanceMap.put(serviceName, newSet);
289+
}
290+
timeMap.put(serviceName, System.currentTimeMillis());
291+
}
292+
270293
private static class CallbackItem {
271294
private final ICamelliaNamingCallback callback;
272295
private final EventListener eventListener;

camellia-toolkits/camellia-naming/camellia-naming-zk/src/main/java/com/netease/nim/camellia/naming/zk/CamelliaZkNamingService.java

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -234,10 +234,7 @@ private void initZkWatch(String serviceName) {
234234
logger.warn("child_added, but data is null, serviceName = {}, path = {}", serviceName, path);
235235
} else {
236236
InstanceInfo instanceInfo = JSONObject.parseObject(new String(data, StandardCharsets.UTF_8), InstanceInfo.class);
237-
synchronized (instanceMap) {
238-
Set<InstanceInfo> set = CamelliaMapUtils.computeIfAbsent(instanceMap, serviceName, k -> new HashSet<>());
239-
set.add(instanceInfo);
240-
}
237+
updateCachedInstances(serviceName, Collections.singleton(instanceInfo), true);
241238
if (logger.isDebugEnabled()) {
242239
logger.debug("instance info added, instance = {}", JSONObject.toJSON(instanceInfo));
243240
}
@@ -263,10 +260,7 @@ private void initZkWatch(String serviceName) {
263260
logger.warn("child_removed, but data is null, serviceName = {}, path = {}", serviceName, path);
264261
} else {
265262
InstanceInfo instanceInfo = JSONObject.parseObject(new String(data, StandardCharsets.UTF_8), InstanceInfo.class);
266-
synchronized (instanceMap) {
267-
Set<InstanceInfo> set = CamelliaMapUtils.computeIfAbsent(instanceMap, serviceName, k -> new HashSet<>());
268-
set.remove(instanceInfo);
269-
}
263+
updateCachedInstances(serviceName, Collections.singleton(instanceInfo), false);
270264
if (logger.isDebugEnabled()) {
271265
logger.debug("instance info removed, instance = {}", JSONObject.toJSON(instanceInfo));
272266
}
@@ -311,7 +305,7 @@ private void reloadAll() {
311305
}
312306

313307
private void reloadAndNotify(String serviceName) {
314-
Set<InstanceInfo> oldSet = new HashSet<>(instanceMap.get(serviceName));
308+
Set<InstanceInfo> oldSet = getInstanceSetSnapshot(serviceName);
315309
Set<InstanceInfo> newSet = reload(serviceName);
316310
Set<InstanceInfo> added = new HashSet<>(newSet);
317311
added.removeAll(oldSet);
@@ -375,4 +369,26 @@ private Set<InstanceInfo> reload(String serviceName) {
375369
}
376370
}
377371

372+
private Set<InstanceInfo> getInstanceSetSnapshot(String serviceName) {
373+
Set<InstanceInfo> set = instanceMap.get(serviceName);
374+
if (set == null) {
375+
return new HashSet<>();
376+
}
377+
return new HashSet<>(set);
378+
}
379+
380+
private void updateCachedInstances(String serviceName, Collection<InstanceInfo> instances, boolean add) {
381+
synchronized (instanceMap) {
382+
Set<InstanceInfo> current = instanceMap.get(serviceName);
383+
Set<InstanceInfo> newSet = current == null ? new HashSet<>() : new HashSet<>(current);
384+
if (add) {
385+
newSet.addAll(instances);
386+
} else {
387+
newSet.removeAll(instances);
388+
}
389+
instanceMap.put(serviceName, newSet);
390+
}
391+
timeMap.put(serviceName, System.currentTimeMillis());
392+
}
393+
378394
}

camellia-toolkits/camellia-zk/src/main/java/com/netease/nim/camellia/zk/ZkDiscovery.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import com.alibaba.fastjson.JSONObject;
44
import com.netease.nim.camellia.core.discovery.AbstractCamelliaDiscovery;
5-
import com.netease.nim.camellia.core.discovery.CamelliaDiscovery;
65
import com.netease.nim.camellia.core.discovery.ServerNode;
76
import com.netease.nim.camellia.tools.executor.CamelliaThreadFactory;
87
import org.apache.curator.framework.CuratorFramework;

0 commit comments

Comments
 (0)