Skip to content

Commit 6d7cbbb

Browse files
committed
reactor registry
1 parent 859ad8d commit 6d7cbbb

File tree

10 files changed

+177
-126
lines changed

10 files changed

+177
-126
lines changed

Diff for: eventmesh-registry/eventmesh-registry-api/src/main/java/org/apache/eventmesh/registry/AbstractRegistryListener.java

-14
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package org.apache.eventmesh.registry;
2+
3+
import lombok.Getter;
4+
5+
import java.util.List;
6+
7+
public class NotifyEvent {
8+
9+
public NotifyEvent(){
10+
11+
}
12+
13+
public NotifyEvent(List<RegisterServerInfo> instances) {
14+
this(instances, false);
15+
}
16+
17+
public NotifyEvent(List<RegisterServerInfo> instances, boolean isIncrement) {
18+
this.isIncrement = isIncrement;
19+
this.instances = instances;
20+
}
21+
22+
23+
24+
// means whether it is an increment data
25+
@Getter
26+
private boolean isIncrement;
27+
28+
@Getter
29+
private List<RegisterServerInfo> instances;
30+
}

Diff for: eventmesh-registry/eventmesh-registry-api/src/main/java/org/apache/eventmesh/registry/RegisterServerInfo.java

+13
Original file line numberDiff line numberDiff line change
@@ -38,4 +38,17 @@ public void setMetadata(Map<String, String> metadata) {
3838
public void addMetadata(String key, String value) {
3939
this.metadata.put(key, value);
4040
}
41+
42+
public void setExtFields(Map<String, Object> extFields) {
43+
if (extFields == null) {
44+
this.extFields.clear();
45+
return;
46+
}
47+
48+
this.extFields = extFields;
49+
}
50+
51+
public void addExtFields(String key, Object value) {
52+
this.extFields.put(key, value);
53+
}
4154
}

Diff for: eventmesh-registry/eventmesh-registry-api/src/main/java/org/apache/eventmesh/registry/Registry.java

-76
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package org.apache.eventmesh.registry;
2+
3+
import lombok.extern.slf4j.Slf4j;
4+
import org.apache.eventmesh.spi.EventMeshExtensionFactory;
5+
6+
import java.util.HashMap;
7+
import java.util.Map;
8+
9+
@Slf4j
10+
public class RegistryFactory {
11+
private static final Map<String, RegistryService> META_CACHE = new HashMap<>(16);
12+
13+
public static RegistryService getInstance(String registryPluginType) {
14+
return META_CACHE.computeIfAbsent(registryPluginType, RegistryFactory::registryBuilder);
15+
}
16+
17+
private static RegistryService registryBuilder(String registryPluginType) {
18+
RegistryService registryServiceExt = EventMeshExtensionFactory.getExtension(RegistryService.class, registryPluginType);
19+
if (registryServiceExt == null) {
20+
String errorMsg = "can't load the registry plugin, please check.";
21+
log.error(errorMsg);
22+
throw new RuntimeException(errorMsg);
23+
}
24+
log.info("build registry plugin [{}] by type [{}] success", registryServiceExt.getClass().getSimpleName(),
25+
registryPluginType);
26+
return registryServiceExt;
27+
}
28+
}
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
package org.apache.eventmesh.registry;
22

33
public interface RegistryListener {
4-
void onChange(Object data);
4+
void onChange(NotifyEvent event) throws Exception;
55
}

Diff for: eventmesh-registry/eventmesh-registry-api/src/main/java/org/apache/eventmesh/registry/RegistryService.java

-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99

1010
@EventMeshSPI(eventMeshExtensionType = EventMeshExtensionType.REGISTRY)
1111
public interface RegistryService {
12-
String ConfigurationKey = "registry";
1312
void init() throws RegistryException;
1413

1514
void shutdown() throws RegistryException;

Diff for: eventmesh-registry/eventmesh-registry-nacos/src/main/java/org/apache/eventmesh/registry/nacos/NacosDiscoveryService.java

+77-23
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,18 @@
44
import com.alibaba.nacos.api.PropertyKeyConst;
55
import com.alibaba.nacos.api.exception.NacosException;
66
import com.alibaba.nacos.api.naming.NamingService;
7+
import com.alibaba.nacos.api.naming.listener.AbstractEventListener;
8+
import com.alibaba.nacos.api.naming.listener.Event;
79
import com.alibaba.nacos.api.naming.listener.EventListener;
10+
import com.alibaba.nacos.api.naming.listener.NamingEvent;
811
import com.alibaba.nacos.api.naming.pojo.Instance;
912
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
1013
import com.alibaba.nacos.api.naming.utils.NamingUtils;
1114
import com.alibaba.nacos.client.naming.utils.UtilAndComs;
1215
import lombok.extern.slf4j.Slf4j;
1316
import org.apache.commons.lang3.StringUtils;
14-
import org.apache.eventmesh.common.config.CommonConfiguration;
1517
import org.apache.eventmesh.common.config.ConfigService;
16-
import org.apache.eventmesh.common.utils.ConfigurationContextUtil;
18+
import org.apache.eventmesh.registry.NotifyEvent;
1719
import org.apache.eventmesh.registry.QueryInstances;
1820
import org.apache.eventmesh.registry.RegisterServerInfo;
1921
import org.apache.eventmesh.registry.RegistryListener;
@@ -28,35 +30,43 @@
2830
import java.util.Objects;
2931
import java.util.Optional;
3032
import java.util.Properties;
33+
import java.util.concurrent.Executor;
34+
import java.util.concurrent.LinkedBlockingQueue;
35+
import java.util.concurrent.ThreadPoolExecutor;
36+
import java.util.concurrent.TimeUnit;
3137
import java.util.concurrent.atomic.AtomicBoolean;
3238
import java.util.concurrent.locks.Lock;
3339
import java.util.concurrent.locks.ReentrantLock;
3440
import java.util.stream.Collectors;
3541

3642
@Slf4j
3743
public class NacosDiscoveryService implements RegistryService {
38-
private final AtomicBoolean initFlag = new AtomicBoolean(false);
3944

40-
private CommonConfiguration configuration;
45+
private final AtomicBoolean initFlag = new AtomicBoolean(false);
4146

4247
private NacosRegistryConfiguration nacosConf;
4348

4449
private NamingService namingService;
4550

4651
private final Map<String, Map<RegistryListener, EventListener>> listeners = new HashMap<>();
4752

53+
private static final Executor notifyExecutor = new ThreadPoolExecutor(1, 1, 60L, TimeUnit.SECONDS,
54+
new LinkedBlockingQueue<>(20), r -> {
55+
Thread t = new Thread(r);
56+
t.setName("org.apache.eventmesh.registry.nacos.executor");
57+
t.setDaemon(true);
58+
return t;
59+
}, new ThreadPoolExecutor.DiscardOldestPolicy()
60+
);
61+
4862
private final Lock lock = new ReentrantLock();
49-
private static final String GROUP_NAME = "admin";
63+
5064

5165
@Override
5266
public void init() throws RegistryException {
5367
if (!initFlag.compareAndSet(false, true)) {
5468
return;
5569
}
56-
configuration = ConfigurationContextUtil.get(RegistryService.ConfigurationKey);
57-
if (configuration == null ) {
58-
throw new RegistryException("registry config instance is null");
59-
}
6070
nacosConf = ConfigService.getInstance().buildConfigInstance(NacosRegistryConfiguration.class);
6171
if (nacosConf == null) {
6272
log.info("nacos registry configuration is null");
@@ -73,12 +83,13 @@ public void init() throws RegistryException {
7383

7484
private Properties buildProperties() {
7585
Properties properties = new Properties();
76-
properties.setProperty(PropertyKeyConst.SERVER_ADDR, configuration.getRegistryAddr());
77-
properties.setProperty(PropertyKeyConst.USERNAME, configuration.getEventMeshRegistryPluginUsername());
78-
properties.setProperty(PropertyKeyConst.PASSWORD, configuration.getEventMeshRegistryPluginPassword());
7986
if (nacosConf == null) {
8087
return properties;
8188
}
89+
properties.setProperty(PropertyKeyConst.SERVER_ADDR, nacosConf.getRegistryAddr());
90+
properties.setProperty(PropertyKeyConst.USERNAME, nacosConf.getEventMeshRegistryPluginUsername());
91+
properties.setProperty(PropertyKeyConst.PASSWORD, nacosConf.getEventMeshRegistryPluginPassword());
92+
8293
String endpoint = nacosConf.getEndpoint();
8394
if (Objects.nonNull(endpoint) && endpoint.contains(":")) {
8495
int index = endpoint.indexOf(":");
@@ -87,7 +98,8 @@ private Properties buildProperties() {
8798
} else {
8899
Optional.ofNullable(endpoint).ifPresent(value -> properties.put(PropertyKeyConst.ENDPOINT, endpoint));
89100
String endpointPort = nacosConf.getEndpointPort();
90-
Optional.ofNullable(endpointPort).ifPresent(value -> properties.put(PropertyKeyConst.ENDPOINT_PORT, endpointPort));
101+
Optional.ofNullable(endpointPort).ifPresent(value -> properties.put(PropertyKeyConst.ENDPOINT_PORT,
102+
endpointPort));
91103
}
92104
String accessKey = nacosConf.getAccessKey();
93105
Optional.ofNullable(accessKey).ifPresent(value -> properties.put(PropertyKeyConst.ACCESS_KEY, accessKey));
@@ -96,7 +108,8 @@ private Properties buildProperties() {
96108
String clusterName = nacosConf.getClusterName();
97109
Optional.ofNullable(clusterName).ifPresent(value -> properties.put(PropertyKeyConst.CLUSTER_NAME, clusterName));
98110
String logFileName = nacosConf.getLogFileName();
99-
Optional.ofNullable(logFileName).ifPresent(value -> properties.put(UtilAndComs.NACOS_NAMING_LOG_NAME, logFileName));
111+
Optional.ofNullable(logFileName).ifPresent(value -> properties.put(UtilAndComs.NACOS_NAMING_LOG_NAME,
112+
logFileName));
100113
String logLevel = nacosConf.getLogLevel();
101114
Optional.ofNullable(logLevel).ifPresent(value -> properties.put(UtilAndComs.NACOS_NAMING_LOG_LEVEL, logLevel));
102115
Integer pollingThreadCount = nacosConf.getPollingThreadCount();
@@ -122,19 +135,54 @@ public void subscribe(RegistryListener listener, String serviceName) {
122135
lock.lock();
123136
try {
124137
ServiceInfo serviceInfo = ServiceInfo.fromKey(serviceName);
125-
Map<RegistryListener, EventListener> eventListenerMap = listeners.computeIfAbsent(serviceName, k -> new HashMap<>());
138+
Map<RegistryListener, EventListener> eventListenerMap = listeners.computeIfAbsent(serviceName,
139+
k -> new HashMap<>());
126140
if (eventListenerMap.containsKey(listener)) {
127-
log.warn("already use same listener subscribe service name {}" ,serviceName);
141+
log.warn("already use same listener subscribe service name {}", serviceName);
128142
return;
129143
}
130-
EventListener eventListener = listener::onChange;
131-
List<String> clusters ;
144+
EventListener eventListener = new AbstractEventListener() {
145+
@Override
146+
public Executor getExecutor() {
147+
return notifyExecutor;
148+
}
149+
150+
@Override
151+
public void onEvent(Event event) {
152+
if (!(event instanceof NamingEvent)) {
153+
log.warn("received notify event type isn't not as expected");
154+
return;
155+
}
156+
try {
157+
NamingEvent namingEvent = (NamingEvent) event;
158+
List<Instance> instances = namingEvent.getInstances();
159+
List<RegisterServerInfo> list = new ArrayList<>();
160+
if (instances != null) {
161+
for (Instance instance : instances) {
162+
RegisterServerInfo info = new RegisterServerInfo();
163+
info.setAddress(instance.getIp() + ":" + instance.getPort());
164+
info.setMetadata(instance.getMetadata());
165+
info.setHealth(instance.isHealthy());
166+
info.setServiceName(
167+
ServiceInfo.getKey(NamingUtils.getGroupedName(namingEvent.getServiceName(),
168+
namingEvent.getGroupName()),
169+
namingEvent.getClusters()));
170+
list.add(info);
171+
}
172+
}
173+
listener.onChange(new NotifyEvent(list));
174+
} catch (Exception e) {
175+
log.warn("");
176+
}
177+
}
178+
};
179+
List<String> clusters;
132180
if (serviceInfo.getClusters() == null || serviceInfo.getClusters().isEmpty()) {
133181
clusters = new ArrayList<>();
134182
} else {
135183
clusters = Arrays.stream(serviceInfo.getClusters().split(",")).collect(Collectors.toList());
136184
}
137-
namingService.subscribe(serviceInfo.getName(),serviceInfo.getGroupName(), clusters, eventListener);
185+
namingService.subscribe(serviceInfo.getName(), serviceInfo.getGroupName(), clusters, eventListener);
138186
eventListenerMap.put(listener, eventListener);
139187
} catch (Exception e) {
140188
log.error("subscribe service name {} fail", serviceName, e);
@@ -152,7 +200,7 @@ public void unsubscribe(RegistryListener registryListener, String serviceName) {
152200
if (map == null) {
153201
return;
154202
}
155-
List<String> clusters ;
203+
List<String> clusters;
156204
if (serviceInfo.getClusters() == null || serviceInfo.getClusters().isEmpty()) {
157205
clusters = new ArrayList<>();
158206
} else {
@@ -177,14 +225,18 @@ public List<RegisterServerInfo> selectInstances(QueryInstances queryInstances) {
177225
if (StringUtils.isNotBlank(serviceInfo.getClusters())) {
178226
clusters.addAll(Arrays.asList(serviceInfo.getClusters().split(",")));
179227
}
180-
List<Instance> instances = namingService.selectInstances(serviceInfo.getName(), serviceInfo.getGroupName(), clusters, queryInstances.isHealth());
228+
List<Instance> instances = namingService.selectInstances(serviceInfo.getName(),
229+
serviceInfo.getGroupName(), clusters,
230+
queryInstances.isHealth());
181231
if (instances != null) {
182232
instances.forEach(x -> {
183233
RegisterServerInfo instanceInfo = new RegisterServerInfo();
184234
instanceInfo.setMetadata(x.getMetadata());
185235
instanceInfo.setHealth(x.isHealthy());
186236
instanceInfo.setAddress(x.getIp() + ":" + x.getPort());
187-
instanceInfo.setServiceName(ServiceInfo.getKey(NamingUtils.getGroupedName(x.getServiceName(), serviceInfo.getGroupName()), x.getClusterName()));
237+
instanceInfo.setServiceName(
238+
ServiceInfo.getKey(NamingUtils.getGroupedName(x.getServiceName(),
239+
serviceInfo.getGroupName()), x.getClusterName()));
188240
list.add(instanceInfo);
189241
});
190242
}
@@ -228,7 +280,9 @@ public boolean unRegister(RegisterServerInfo eventMeshRegisterInfo) throws Regis
228280
return false;
229281
}
230282
ServiceInfo serviceInfo = ServiceInfo.fromKey(eventMeshRegisterInfo.getServiceName());
231-
namingService.deregisterInstance(serviceInfo.getName(), serviceInfo.getGroupName(), ipPort[0], Integer.parseInt(ipPort[1]), serviceInfo.getClusters());
283+
namingService.deregisterInstance(serviceInfo.getName(), serviceInfo.getGroupName(), ipPort[0],
284+
Integer.parseInt(ipPort[1]),
285+
serviceInfo.getClusters());
232286
return true;
233287
} catch (Exception e) {
234288
log.error("unregister instance service {} fail", eventMeshRegisterInfo, e);

0 commit comments

Comments
 (0)