Skip to content

Commit 81859b1

Browse files
authored
Merge pull request #943 from li-xiao-shuang/feature_registry_metadata
[ISSUE #942] Support eventmesh server cross-cluster service discovery
2 parents 7b60b22 + 24b43f5 commit 81859b1

File tree

18 files changed

+517
-156
lines changed

18 files changed

+517
-156
lines changed

eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/ConfigurationContextUtil.java

+7-3
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.eventmesh.common.config.CommonConfiguration;
2121

2222
import java.util.List;
23+
import java.util.Objects;
2324
import java.util.concurrent.ConcurrentHashMap;
2425

2526
import com.google.common.collect.Lists;
@@ -31,10 +32,10 @@ public class ConfigurationContextUtil {
3132

3233
private static final ConcurrentHashMap<String, CommonConfiguration> CONFIGURATION_MAP = new ConcurrentHashMap<>();
3334

34-
public static final String HTTP = "http";
35+
public static final String HTTP = "HTTP";
3536

36-
public static final String TCP = "tcp";
37-
public static final String GRPC = "grpc";
37+
public static final String TCP = "TCP";
38+
public static final String GRPC = "GRPC";
3839

3940
public static final List<String> KEYS = Lists.newArrayList(HTTP, TCP, GRPC);
4041

@@ -46,6 +47,9 @@ public class ConfigurationContextUtil {
4647
* @param configuration
4748
*/
4849
public static void putIfAbsent(String key, CommonConfiguration configuration) {
50+
if (Objects.isNull(configuration)) {
51+
return;
52+
}
4953
CONFIGURATION_MAP.putIfAbsent(key, configuration);
5054
}
5155

eventmesh-registry-plugin/eventmesh-registry-api/src/main/java/org/apache/eventmesh/api/registry/RegistryService.java

+4
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,13 @@ public interface RegistryService {
4141

4242
List<EventMeshDataInfo> findEventMeshInfoByCluster(String clusterName) throws RegistryException;
4343

44+
List<EventMeshDataInfo> findAllEventMeshInfo() throws RegistryException;
45+
4446
Map<String/*eventMeshName*/, Map<String/*purpose*/, Integer/*num*/>> findEventMeshClientDistributionData(
4547
String clusterName, String group, String purpose) throws RegistryException;
4648

49+
void registerMetadata(Map<String, String> metadataMap);
50+
4751
boolean register(EventMeshRegisterInfo eventMeshRegisterInfo) throws RegistryException;
4852

4953
boolean unRegister(EventMeshUnRegisterInfo eventMeshUnRegisterInfo) throws RegistryException;

eventmesh-registry-plugin/eventmesh-registry-api/src/main/java/org/apache/eventmesh/api/registry/dto/EventMeshDataInfo.java

+15-1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.eventmesh.api.registry.dto;
1919

20+
import java.util.Map;
21+
2022
/**
2123
* EventMeshDataInfo
2224
*/
@@ -26,11 +28,15 @@ public class EventMeshDataInfo {
2628
private String endpoint;
2729
private long lastUpdateTimestamp;
2830

29-
public EventMeshDataInfo(String eventMeshClusterName, String eventMeshName, String endpoint, long lastUpdateTimestamp) {
31+
private Map<String, String> metadata;
32+
33+
public EventMeshDataInfo(String eventMeshClusterName, String eventMeshName, String endpoint, long lastUpdateTimestamp,
34+
Map<String, String> metadata) {
3035
this.eventMeshClusterName = eventMeshClusterName;
3136
this.eventMeshName = eventMeshName;
3237
this.endpoint = endpoint;
3338
this.lastUpdateTimestamp = lastUpdateTimestamp;
39+
this.metadata = metadata;
3440
}
3541

3642
public String getEventMeshClusterName() {
@@ -64,4 +70,12 @@ public long getLastUpdateTimestamp() {
6470
public void setLastUpdateTimestamp(long lastUpdateTimestamp) {
6571
this.lastUpdateTimestamp = lastUpdateTimestamp;
6672
}
73+
74+
public Map<String, String> getMetadata() {
75+
return metadata;
76+
}
77+
78+
public void setMetadata(Map<String, String> metadata) {
79+
this.metadata = metadata;
80+
}
6781
}

eventmesh-registry-plugin/eventmesh-registry-api/src/main/java/org/apache/eventmesh/api/registry/dto/EventMeshRegisterInfo.java

+20
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ public class EventMeshRegisterInfo {
2828
private String endPoint;
2929
private Map<String, Map<String, Integer>> eventMeshInstanceNumMap;
3030

31+
private Map<String, String> metadata;
32+
33+
private String protocolType;
34+
3135
public String getEventMeshClusterName() {
3236
return eventMeshClusterName;
3337
}
@@ -59,4 +63,20 @@ public Map<String, Map<String, Integer>> getEventMeshInstanceNumMap() {
5963
public void setEventMeshInstanceNumMap(Map<String, Map<String, Integer>> eventMeshInstanceNumMap) {
6064
this.eventMeshInstanceNumMap = eventMeshInstanceNumMap;
6165
}
66+
67+
public Map<String, String> getMetadata() {
68+
return metadata;
69+
}
70+
71+
public void setMetadata(Map<String, String> metadata) {
72+
this.metadata = metadata;
73+
}
74+
75+
public String getProtocolType() {
76+
return protocolType;
77+
}
78+
79+
public void setProtocolType(String protocolType) {
80+
this.protocolType = protocolType;
81+
}
6282
}

eventmesh-registry-plugin/eventmesh-registry-api/src/main/java/org/apache/eventmesh/api/registry/dto/EventMeshUnRegisterInfo.java

+10
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ public class EventMeshUnRegisterInfo {
2626

2727
private String endPoint;
2828

29+
private String protocolType;
30+
2931
public String getEventMeshClusterName() {
3032
return eventMeshClusterName;
3133
}
@@ -49,4 +51,12 @@ public String getEndPoint() {
4951
public void setEndPoint(String endPoint) {
5052
this.endPoint = endPoint;
5153
}
54+
55+
public String getProtocolType() {
56+
return protocolType;
57+
}
58+
59+
public void setProtocolType(String protocolType) {
60+
this.protocolType = protocolType;
61+
}
5262
}

eventmesh-registry-plugin/eventmesh-registry-nacos/src/main/java/org/apache/eventmesh/registry/nacos/constant/NacosConstant.java

+3
Original file line numberDiff line numberDiff line change
@@ -33,4 +33,7 @@ public class NacosConstant {
3333
public static final String IP_PORT_SEPARATOR = ":";
3434

3535
public static final String DEFAULT_GROUP = "DEFAULT_GROUP";
36+
37+
public static final String GROUP = "GROUP";
38+
3639
}

eventmesh-registry-plugin/eventmesh-registry-nacos/src/main/java/org/apache/eventmesh/registry/nacos/service/NacosRegistryService.java

+55-7
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030

3131
import java.util.ArrayList;
3232
import java.util.Collections;
33+
import java.util.HashMap;
3334
import java.util.List;
3435
import java.util.Map;
3536
import java.util.Objects;
@@ -61,12 +62,15 @@ public class NacosRegistryService implements RegistryService {
6162

6263
private NamingService namingService;
6364

65+
private Map<String, EventMeshRegisterInfo> eventMeshRegisterInfoMap;
66+
6467
@Override
6568
public void init() throws RegistryException {
6669
boolean update = INIT_STATUS.compareAndSet(false, true);
6770
if (!update) {
6871
return;
6972
}
73+
eventMeshRegisterInfoMap = new HashMap<>(ConfigurationContextUtil.KEYS.size());
7074
for (String key : ConfigurationContextUtil.KEYS) {
7175
CommonConfiguration commonConfiguration = ConfigurationContextUtil.get(key);
7276
if (null == commonConfiguration) {
@@ -124,15 +128,46 @@ public List<EventMeshDataInfo> findEventMeshInfoByCluster(String clusterName) th
124128
String eventMeshName = configuration.eventMeshName;
125129
try {
126130
List<Instance> instances =
127-
namingService.selectInstances(eventMeshName + "-" + key, NacosConstant.DEFAULT_GROUP, Collections.singletonList(clusterName),
131+
namingService.selectInstances(eventMeshName + "-" + key, configuration.eventMeshCluster, Collections.singletonList(clusterName),
128132
true);
129133
if (CollectionUtils.isEmpty(instances)) {
130134
continue;
131135
}
132136
for (Instance instance : instances) {
133137
EventMeshDataInfo eventMeshDataInfo =
134138
new EventMeshDataInfo(instance.getClusterName(), instance.getServiceName(),
135-
instance.getIp() + ":" + instance.getPort(), 0L);
139+
instance.getIp() + ":" + instance.getPort(), 0L, instance.getMetadata());
140+
eventMeshDataInfoList.add(eventMeshDataInfo);
141+
}
142+
} catch (NacosException e) {
143+
logger.error("[NacosRegistryService][findEventMeshInfoByCluster] error", e);
144+
throw new RegistryException(e.getMessage());
145+
}
146+
147+
}
148+
return eventMeshDataInfoList;
149+
}
150+
151+
@Override
152+
public List<EventMeshDataInfo> findAllEventMeshInfo() throws RegistryException {
153+
List<EventMeshDataInfo> eventMeshDataInfoList = new ArrayList<>();
154+
for (String key : ConfigurationContextUtil.KEYS) {
155+
CommonConfiguration configuration = ConfigurationContextUtil.get(key);
156+
if (Objects.isNull(configuration)) {
157+
continue;
158+
}
159+
String eventMeshName = configuration.eventMeshName;
160+
try {
161+
List<Instance> instances =
162+
namingService.selectInstances(eventMeshName + "-" + key, key + "-" + NacosConstant.GROUP, null,
163+
true);
164+
if (CollectionUtils.isEmpty(instances)) {
165+
continue;
166+
}
167+
for (Instance instance : instances) {
168+
EventMeshDataInfo eventMeshDataInfo =
169+
new EventMeshDataInfo(instance.getClusterName(), instance.getServiceName(),
170+
instance.getIp() + ":" + instance.getPort(), 0L, instance.getMetadata());
136171
eventMeshDataInfoList.add(eventMeshDataInfo);
137172
}
138173
} catch (NacosException e) {
@@ -151,19 +186,32 @@ public Map<String, Map<String, Integer>> findEventMeshClientDistributionData(Str
151186
return null;
152187
}
153188

189+
@Override
190+
public void registerMetadata(Map<String, String> metadataMap) {
191+
for (Map.Entry<String, EventMeshRegisterInfo> eventMeshRegisterInfo : eventMeshRegisterInfoMap.entrySet()) {
192+
EventMeshRegisterInfo registerInfo = eventMeshRegisterInfo.getValue();
193+
registerInfo.setMetadata(metadataMap);
194+
this.register(registerInfo);
195+
}
196+
}
197+
154198
@Override
155199
public boolean register(EventMeshRegisterInfo eventMeshRegisterInfo) throws RegistryException {
156200
try {
157201
String[] ipPort = eventMeshRegisterInfo.getEndPoint().split(NacosConstant.IP_PORT_SEPARATOR);
202+
String eventMeshClusterName = eventMeshRegisterInfo.getEventMeshClusterName();
203+
Map<String, String> metadata = eventMeshRegisterInfo.getMetadata();
204+
158205
Instance instance = new Instance();
159206
instance.setIp(ipPort[0]);
160207
instance.setPort(Integer.parseInt(ipPort[1]));
161208
instance.setWeight(1.0);
162-
String eventMeshName = eventMeshRegisterInfo.getEventMeshName();
163-
String eventMeshClusterName = eventMeshRegisterInfo.getEventMeshClusterName();
164209
instance.setClusterName(eventMeshClusterName);
165-
// todo save metadata
166-
namingService.registerInstance(eventMeshName, NacosConstant.DEFAULT_GROUP, instance);
210+
instance.setMetadata(metadata);
211+
212+
String eventMeshName = eventMeshRegisterInfo.getEventMeshName();
213+
namingService.registerInstance(eventMeshName, eventMeshRegisterInfo.getProtocolType() + "-" + NacosConstant.GROUP, instance);
214+
eventMeshRegisterInfoMap.put(eventMeshName, eventMeshRegisterInfo);
167215
} catch (NacosException e) {
168216
logger.error("[NacosRegistryService][register] error", e);
169217
throw new RegistryException(e.getMessage());
@@ -182,7 +230,7 @@ public boolean unRegister(EventMeshUnRegisterInfo eventMeshUnRegisterInfo) throw
182230
String eventMeshName = eventMeshUnRegisterInfo.getEventMeshName();
183231
String eventMeshClusterName = eventMeshUnRegisterInfo.getEventMeshClusterName();
184232
instance.setClusterName(eventMeshClusterName);
185-
namingService.deregisterInstance(eventMeshName, NacosConstant.DEFAULT_GROUP, instance);
233+
namingService.deregisterInstance(eventMeshName, eventMeshUnRegisterInfo.getProtocolType() + "-" + NacosConstant.GROUP, instance);
186234
} catch (NacosException e) {
187235
logger.error("[NacosRegistryService][unRegister] error", e);
188236
throw new RegistryException(e.getMessage());

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshGrpcServer.java

+7-5
Original file line numberDiff line numberDiff line change
@@ -155,11 +155,12 @@ public boolean register() {
155155
try {
156156
String endPoints = IPUtils.getLocalAddress()
157157
+ EventMeshConstants.IP_PORT_SEPARATOR + eventMeshGrpcConfiguration.grpcServerPort;
158-
EventMeshRegisterInfo self = new EventMeshRegisterInfo();
159-
self.setEventMeshClusterName(eventMeshGrpcConfiguration.eventMeshCluster);
160-
self.setEventMeshName(eventMeshGrpcConfiguration.eventMeshName + "-" + ConfigurationContextUtil.GRPC);
161-
self.setEndPoint(endPoints);
162-
registerResult = registry.register(self);
158+
EventMeshRegisterInfo eventMeshRegisterInfo = new EventMeshRegisterInfo();
159+
eventMeshRegisterInfo.setEventMeshClusterName(eventMeshGrpcConfiguration.eventMeshCluster);
160+
eventMeshRegisterInfo.setEventMeshName(eventMeshGrpcConfiguration.eventMeshName + "-" + ConfigurationContextUtil.GRPC);
161+
eventMeshRegisterInfo.setEndPoint(endPoints);
162+
eventMeshRegisterInfo.setProtocolType(ConfigurationContextUtil.GRPC);
163+
registerResult = registry.register(eventMeshRegisterInfo);
163164
} catch (Exception e) {
164165
logger.warn("eventMesh register to registry failed", e);
165166
}
@@ -174,6 +175,7 @@ private void unRegister() throws Exception {
174175
eventMeshUnRegisterInfo.setEventMeshClusterName(eventMeshGrpcConfiguration.eventMeshCluster);
175176
eventMeshUnRegisterInfo.setEventMeshName(eventMeshGrpcConfiguration.eventMeshName);
176177
eventMeshUnRegisterInfo.setEndPoint(endPoints);
178+
eventMeshUnRegisterInfo.setProtocolType(ConfigurationContextUtil.GRPC);
177179
boolean registerResult = registry.unRegister(eventMeshUnRegisterInfo);
178180
if (!registerResult) {
179181
throw new EventMeshException("eventMesh fail to unRegister");

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java

+11-5
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,10 @@ public RateLimiter getBatchRateLimiter() {
216216
return batchRateLimiter;
217217
}
218218

219+
public Registry getRegistry() {
220+
return registry;
221+
}
222+
219223
public void init() throws Exception {
220224
logger.info("==================EventMeshHTTPServer Initialing==================");
221225
super.init("eventMesh-http");
@@ -297,11 +301,12 @@ public boolean register() {
297301
try {
298302
String endPoints = IPUtils.getLocalAddress()
299303
+ EventMeshConstants.IP_PORT_SEPARATOR + eventMeshHttpConfiguration.httpServerPort;
300-
EventMeshRegisterInfo self = new EventMeshRegisterInfo();
301-
self.setEventMeshClusterName(eventMeshHttpConfiguration.eventMeshCluster);
302-
self.setEventMeshName(eventMeshHttpConfiguration.eventMeshName + "-" + ConfigurationContextUtil.HTTP);
303-
self.setEndPoint(endPoints);
304-
registerResult = registry.register(self);
304+
EventMeshRegisterInfo eventMeshRegisterInfo = new EventMeshRegisterInfo();
305+
eventMeshRegisterInfo.setEventMeshClusterName(eventMeshHttpConfiguration.eventMeshCluster);
306+
eventMeshRegisterInfo.setEventMeshName(eventMeshHttpConfiguration.eventMeshName + "-" + ConfigurationContextUtil.HTTP);
307+
eventMeshRegisterInfo.setEndPoint(endPoints);
308+
eventMeshRegisterInfo.setProtocolType(ConfigurationContextUtil.HTTP);
309+
registerResult = registry.register(eventMeshRegisterInfo);
305310
} catch (Exception e) {
306311
logger.warn("eventMesh register to registry failed", e);
307312
}
@@ -316,6 +321,7 @@ private void unRegister() throws Exception {
316321
eventMeshUnRegisterInfo.setEventMeshClusterName(eventMeshHttpConfiguration.eventMeshCluster);
317322
eventMeshUnRegisterInfo.setEventMeshName(eventMeshHttpConfiguration.eventMeshName);
318323
eventMeshUnRegisterInfo.setEndPoint(endPoints);
324+
eventMeshUnRegisterInfo.setProtocolType(ConfigurationContextUtil.HTTP);
319325
boolean registerResult = registry.unRegister(eventMeshUnRegisterInfo);
320326
if (!registerResult) {
321327
throw new EventMeshException("eventMesh fail to unRegister");

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java

+8-6
Original file line numberDiff line numberDiff line change
@@ -298,12 +298,13 @@ public boolean register() {
298298
try {
299299
String endPoints = IPUtils.getLocalAddress()
300300
+ EventMeshConstants.IP_PORT_SEPARATOR + eventMeshTCPConfiguration.eventMeshTcpServerPort;
301-
EventMeshRegisterInfo self = new EventMeshRegisterInfo();
302-
self.setEventMeshClusterName(eventMeshTCPConfiguration.eventMeshCluster);
303-
self.setEventMeshName(eventMeshTCPConfiguration.eventMeshName + "-" + ConfigurationContextUtil.TCP);
304-
self.setEndPoint(endPoints);
305-
self.setEventMeshInstanceNumMap(clientSessionGroupMapping.prepareProxyClientDistributionData());
306-
registerResult = registry.register(self);
301+
EventMeshRegisterInfo eventMeshRegisterInfo = new EventMeshRegisterInfo();
302+
eventMeshRegisterInfo.setEventMeshClusterName(eventMeshTCPConfiguration.eventMeshCluster);
303+
eventMeshRegisterInfo.setEventMeshName(eventMeshTCPConfiguration.eventMeshName + "-" + ConfigurationContextUtil.TCP);
304+
eventMeshRegisterInfo.setEndPoint(endPoints);
305+
eventMeshRegisterInfo.setEventMeshInstanceNumMap(clientSessionGroupMapping.prepareProxyClientDistributionData());
306+
eventMeshRegisterInfo.setProtocolType(ConfigurationContextUtil.TCP);
307+
registerResult = registry.register(eventMeshRegisterInfo);
307308
} catch (Exception e) {
308309
logger.warn("eventMesh register to registry failed", e);
309310
}
@@ -318,6 +319,7 @@ private void unRegister() throws Exception {
318319
eventMeshUnRegisterInfo.setEventMeshClusterName(eventMeshTCPConfiguration.eventMeshCluster);
319320
eventMeshUnRegisterInfo.setEventMeshName(eventMeshTCPConfiguration.eventMeshName);
320321
eventMeshUnRegisterInfo.setEndPoint(endPoints);
322+
eventMeshUnRegisterInfo.setProtocolType(ConfigurationContextUtil.TCP);
321323
boolean registerResult = registry.unRegister(eventMeshUnRegisterInfo);
322324
if (!registerResult) {
323325
throw new EventMeshException("eventMesh fail to unRegister");

0 commit comments

Comments
 (0)