Skip to content

Commit 3bbf724

Browse files
committed
Merge remote-tracking branch 'apache/master' into trace_improve_3
2 parents 1e03168 + 6250d93 commit 3bbf724

30 files changed

+1368
-373
lines changed

eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/ConfigurationContextUtil.java

+7-3
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.eventmesh.common.config.CommonConfiguration;
2121

2222
import java.util.List;
23+
import java.util.Objects;
2324
import java.util.concurrent.ConcurrentHashMap;
2425

2526
import com.google.common.collect.Lists;
@@ -31,10 +32,10 @@ public class ConfigurationContextUtil {
3132

3233
private static final ConcurrentHashMap<String, CommonConfiguration> CONFIGURATION_MAP = new ConcurrentHashMap<>();
3334

34-
public static final String HTTP = "http";
35+
public static final String HTTP = "HTTP";
3536

36-
public static final String TCP = "tcp";
37-
public static final String GRPC = "grpc";
37+
public static final String TCP = "TCP";
38+
public static final String GRPC = "GRPC";
3839

3940
public static final List<String> KEYS = Lists.newArrayList(HTTP, TCP, GRPC);
4041

@@ -46,6 +47,9 @@ public class ConfigurationContextUtil {
4647
* @param configuration
4748
*/
4849
public static void putIfAbsent(String key, CommonConfiguration configuration) {
50+
if (Objects.isNull(configuration)) {
51+
return;
52+
}
4953
CONFIGURATION_MAP.putIfAbsent(key, configuration);
5054
}
5155

eventmesh-registry-plugin/eventmesh-registry-api/src/main/java/org/apache/eventmesh/api/registry/RegistryService.java

+4
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,13 @@ public interface RegistryService {
4141

4242
List<EventMeshDataInfo> findEventMeshInfoByCluster(String clusterName) throws RegistryException;
4343

44+
List<EventMeshDataInfo> findAllEventMeshInfo() throws RegistryException;
45+
4446
Map<String/*eventMeshName*/, Map<String/*purpose*/, Integer/*num*/>> findEventMeshClientDistributionData(
4547
String clusterName, String group, String purpose) throws RegistryException;
4648

49+
void registerMetadata(Map<String, String> metadataMap);
50+
4751
boolean register(EventMeshRegisterInfo eventMeshRegisterInfo) throws RegistryException;
4852

4953
boolean unRegister(EventMeshUnRegisterInfo eventMeshUnRegisterInfo) throws RegistryException;

eventmesh-registry-plugin/eventmesh-registry-api/src/main/java/org/apache/eventmesh/api/registry/dto/EventMeshDataInfo.java

+15-1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.eventmesh.api.registry.dto;
1919

20+
import java.util.Map;
21+
2022
/**
2123
* EventMeshDataInfo
2224
*/
@@ -26,11 +28,15 @@ public class EventMeshDataInfo {
2628
private String endpoint;
2729
private long lastUpdateTimestamp;
2830

29-
public EventMeshDataInfo(String eventMeshClusterName, String eventMeshName, String endpoint, long lastUpdateTimestamp) {
31+
private Map<String, String> metadata;
32+
33+
public EventMeshDataInfo(String eventMeshClusterName, String eventMeshName, String endpoint, long lastUpdateTimestamp,
34+
Map<String, String> metadata) {
3035
this.eventMeshClusterName = eventMeshClusterName;
3136
this.eventMeshName = eventMeshName;
3237
this.endpoint = endpoint;
3338
this.lastUpdateTimestamp = lastUpdateTimestamp;
39+
this.metadata = metadata;
3440
}
3541

3642
public String getEventMeshClusterName() {
@@ -64,4 +70,12 @@ public long getLastUpdateTimestamp() {
6470
public void setLastUpdateTimestamp(long lastUpdateTimestamp) {
6571
this.lastUpdateTimestamp = lastUpdateTimestamp;
6672
}
73+
74+
public Map<String, String> getMetadata() {
75+
return metadata;
76+
}
77+
78+
public void setMetadata(Map<String, String> metadata) {
79+
this.metadata = metadata;
80+
}
6781
}

eventmesh-registry-plugin/eventmesh-registry-api/src/main/java/org/apache/eventmesh/api/registry/dto/EventMeshRegisterInfo.java

+20
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ public class EventMeshRegisterInfo {
2828
private String endPoint;
2929
private Map<String, Map<String, Integer>> eventMeshInstanceNumMap;
3030

31+
private Map<String, String> metadata;
32+
33+
private String protocolType;
34+
3135
public String getEventMeshClusterName() {
3236
return eventMeshClusterName;
3337
}
@@ -59,4 +63,20 @@ public Map<String, Map<String, Integer>> getEventMeshInstanceNumMap() {
5963
public void setEventMeshInstanceNumMap(Map<String, Map<String, Integer>> eventMeshInstanceNumMap) {
6064
this.eventMeshInstanceNumMap = eventMeshInstanceNumMap;
6165
}
66+
67+
public Map<String, String> getMetadata() {
68+
return metadata;
69+
}
70+
71+
public void setMetadata(Map<String, String> metadata) {
72+
this.metadata = metadata;
73+
}
74+
75+
public String getProtocolType() {
76+
return protocolType;
77+
}
78+
79+
public void setProtocolType(String protocolType) {
80+
this.protocolType = protocolType;
81+
}
6282
}

eventmesh-registry-plugin/eventmesh-registry-api/src/main/java/org/apache/eventmesh/api/registry/dto/EventMeshUnRegisterInfo.java

+10
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ public class EventMeshUnRegisterInfo {
2626

2727
private String endPoint;
2828

29+
private String protocolType;
30+
2931
public String getEventMeshClusterName() {
3032
return eventMeshClusterName;
3133
}
@@ -49,4 +51,12 @@ public String getEndPoint() {
4951
public void setEndPoint(String endPoint) {
5052
this.endPoint = endPoint;
5153
}
54+
55+
public String getProtocolType() {
56+
return protocolType;
57+
}
58+
59+
public void setProtocolType(String protocolType) {
60+
this.protocolType = protocolType;
61+
}
5262
}

eventmesh-registry-plugin/eventmesh-registry-nacos/src/main/java/org/apache/eventmesh/registry/nacos/constant/NacosConstant.java

+3
Original file line numberDiff line numberDiff line change
@@ -33,4 +33,7 @@ public class NacosConstant {
3333
public static final String IP_PORT_SEPARATOR = ":";
3434

3535
public static final String DEFAULT_GROUP = "DEFAULT_GROUP";
36+
37+
public static final String GROUP = "GROUP";
38+
3639
}

eventmesh-registry-plugin/eventmesh-registry-nacos/src/main/java/org/apache/eventmesh/registry/nacos/service/NacosRegistryService.java

+55-7
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030

3131
import java.util.ArrayList;
3232
import java.util.Collections;
33+
import java.util.HashMap;
3334
import java.util.List;
3435
import java.util.Map;
3536
import java.util.Objects;
@@ -61,12 +62,15 @@ public class NacosRegistryService implements RegistryService {
6162

6263
private NamingService namingService;
6364

65+
private Map<String, EventMeshRegisterInfo> eventMeshRegisterInfoMap;
66+
6467
@Override
6568
public void init() throws RegistryException {
6669
boolean update = INIT_STATUS.compareAndSet(false, true);
6770
if (!update) {
6871
return;
6972
}
73+
eventMeshRegisterInfoMap = new HashMap<>(ConfigurationContextUtil.KEYS.size());
7074
for (String key : ConfigurationContextUtil.KEYS) {
7175
CommonConfiguration commonConfiguration = ConfigurationContextUtil.get(key);
7276
if (null == commonConfiguration) {
@@ -124,15 +128,46 @@ public List<EventMeshDataInfo> findEventMeshInfoByCluster(String clusterName) th
124128
String eventMeshName = configuration.eventMeshName;
125129
try {
126130
List<Instance> instances =
127-
namingService.selectInstances(eventMeshName + "-" + key, NacosConstant.DEFAULT_GROUP, Collections.singletonList(clusterName),
131+
namingService.selectInstances(eventMeshName + "-" + key, configuration.eventMeshCluster, Collections.singletonList(clusterName),
128132
true);
129133
if (CollectionUtils.isEmpty(instances)) {
130134
continue;
131135
}
132136
for (Instance instance : instances) {
133137
EventMeshDataInfo eventMeshDataInfo =
134138
new EventMeshDataInfo(instance.getClusterName(), instance.getServiceName(),
135-
instance.getIp() + ":" + instance.getPort(), 0L);
139+
instance.getIp() + ":" + instance.getPort(), 0L, instance.getMetadata());
140+
eventMeshDataInfoList.add(eventMeshDataInfo);
141+
}
142+
} catch (NacosException e) {
143+
logger.error("[NacosRegistryService][findEventMeshInfoByCluster] error", e);
144+
throw new RegistryException(e.getMessage());
145+
}
146+
147+
}
148+
return eventMeshDataInfoList;
149+
}
150+
151+
@Override
152+
public List<EventMeshDataInfo> findAllEventMeshInfo() throws RegistryException {
153+
List<EventMeshDataInfo> eventMeshDataInfoList = new ArrayList<>();
154+
for (String key : ConfigurationContextUtil.KEYS) {
155+
CommonConfiguration configuration = ConfigurationContextUtil.get(key);
156+
if (Objects.isNull(configuration)) {
157+
continue;
158+
}
159+
String eventMeshName = configuration.eventMeshName;
160+
try {
161+
List<Instance> instances =
162+
namingService.selectInstances(eventMeshName + "-" + key, key + "-" + NacosConstant.GROUP, null,
163+
true);
164+
if (CollectionUtils.isEmpty(instances)) {
165+
continue;
166+
}
167+
for (Instance instance : instances) {
168+
EventMeshDataInfo eventMeshDataInfo =
169+
new EventMeshDataInfo(instance.getClusterName(), instance.getServiceName(),
170+
instance.getIp() + ":" + instance.getPort(), 0L, instance.getMetadata());
136171
eventMeshDataInfoList.add(eventMeshDataInfo);
137172
}
138173
} catch (NacosException e) {
@@ -151,19 +186,32 @@ public Map<String, Map<String, Integer>> findEventMeshClientDistributionData(Str
151186
return null;
152187
}
153188

189+
@Override
190+
public void registerMetadata(Map<String, String> metadataMap) {
191+
for (Map.Entry<String, EventMeshRegisterInfo> eventMeshRegisterInfo : eventMeshRegisterInfoMap.entrySet()) {
192+
EventMeshRegisterInfo registerInfo = eventMeshRegisterInfo.getValue();
193+
registerInfo.setMetadata(metadataMap);
194+
this.register(registerInfo);
195+
}
196+
}
197+
154198
@Override
155199
public boolean register(EventMeshRegisterInfo eventMeshRegisterInfo) throws RegistryException {
156200
try {
157201
String[] ipPort = eventMeshRegisterInfo.getEndPoint().split(NacosConstant.IP_PORT_SEPARATOR);
202+
String eventMeshClusterName = eventMeshRegisterInfo.getEventMeshClusterName();
203+
Map<String, String> metadata = eventMeshRegisterInfo.getMetadata();
204+
158205
Instance instance = new Instance();
159206
instance.setIp(ipPort[0]);
160207
instance.setPort(Integer.parseInt(ipPort[1]));
161208
instance.setWeight(1.0);
162-
String eventMeshName = eventMeshRegisterInfo.getEventMeshName();
163-
String eventMeshClusterName = eventMeshRegisterInfo.getEventMeshClusterName();
164209
instance.setClusterName(eventMeshClusterName);
165-
// todo save metadata
166-
namingService.registerInstance(eventMeshName, NacosConstant.DEFAULT_GROUP, instance);
210+
instance.setMetadata(metadata);
211+
212+
String eventMeshName = eventMeshRegisterInfo.getEventMeshName();
213+
namingService.registerInstance(eventMeshName, eventMeshRegisterInfo.getProtocolType() + "-" + NacosConstant.GROUP, instance);
214+
eventMeshRegisterInfoMap.put(eventMeshName, eventMeshRegisterInfo);
167215
} catch (NacosException e) {
168216
logger.error("[NacosRegistryService][register] error", e);
169217
throw new RegistryException(e.getMessage());
@@ -182,7 +230,7 @@ public boolean unRegister(EventMeshUnRegisterInfo eventMeshUnRegisterInfo) throw
182230
String eventMeshName = eventMeshUnRegisterInfo.getEventMeshName();
183231
String eventMeshClusterName = eventMeshUnRegisterInfo.getEventMeshClusterName();
184232
instance.setClusterName(eventMeshClusterName);
185-
namingService.deregisterInstance(eventMeshName, NacosConstant.DEFAULT_GROUP, instance);
233+
namingService.deregisterInstance(eventMeshName, eventMeshUnRegisterInfo.getProtocolType() + "-" + NacosConstant.GROUP, instance);
186234
} catch (NacosException e) {
187235
logger.error("[NacosRegistryService][unRegister] error", e);
188236
throw new RegistryException(e.getMessage());

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshGrpcServer.java

+7-5
Original file line numberDiff line numberDiff line change
@@ -155,11 +155,12 @@ public boolean register() {
155155
try {
156156
String endPoints = IPUtils.getLocalAddress()
157157
+ EventMeshConstants.IP_PORT_SEPARATOR + eventMeshGrpcConfiguration.grpcServerPort;
158-
EventMeshRegisterInfo self = new EventMeshRegisterInfo();
159-
self.setEventMeshClusterName(eventMeshGrpcConfiguration.eventMeshCluster);
160-
self.setEventMeshName(eventMeshGrpcConfiguration.eventMeshName + "-" + ConfigurationContextUtil.GRPC);
161-
self.setEndPoint(endPoints);
162-
registerResult = registry.register(self);
158+
EventMeshRegisterInfo eventMeshRegisterInfo = new EventMeshRegisterInfo();
159+
eventMeshRegisterInfo.setEventMeshClusterName(eventMeshGrpcConfiguration.eventMeshCluster);
160+
eventMeshRegisterInfo.setEventMeshName(eventMeshGrpcConfiguration.eventMeshName + "-" + ConfigurationContextUtil.GRPC);
161+
eventMeshRegisterInfo.setEndPoint(endPoints);
162+
eventMeshRegisterInfo.setProtocolType(ConfigurationContextUtil.GRPC);
163+
registerResult = registry.register(eventMeshRegisterInfo);
163164
} catch (Exception e) {
164165
logger.warn("eventMesh register to registry failed", e);
165166
}
@@ -174,6 +175,7 @@ private void unRegister() throws Exception {
174175
eventMeshUnRegisterInfo.setEventMeshClusterName(eventMeshGrpcConfiguration.eventMeshCluster);
175176
eventMeshUnRegisterInfo.setEventMeshName(eventMeshGrpcConfiguration.eventMeshName);
176177
eventMeshUnRegisterInfo.setEndPoint(endPoints);
178+
eventMeshUnRegisterInfo.setProtocolType(ConfigurationContextUtil.GRPC);
177179
boolean registerResult = registry.unRegister(eventMeshUnRegisterInfo);
178180
if (!registerResult) {
179181
throw new EventMeshException("eventMesh fail to unRegister");

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java

+11-5
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,10 @@ public RateLimiter getBatchRateLimiter() {
213213
return batchRateLimiter;
214214
}
215215

216+
public Registry getRegistry() {
217+
return registry;
218+
}
219+
216220
public void init() throws Exception {
217221
logger.info("==================EventMeshHTTPServer Initialing==================");
218222
super.init("eventMesh-http");
@@ -294,11 +298,12 @@ public boolean register() {
294298
try {
295299
String endPoints = IPUtils.getLocalAddress()
296300
+ EventMeshConstants.IP_PORT_SEPARATOR + eventMeshHttpConfiguration.httpServerPort;
297-
EventMeshRegisterInfo self = new EventMeshRegisterInfo();
298-
self.setEventMeshClusterName(eventMeshHttpConfiguration.eventMeshCluster);
299-
self.setEventMeshName(eventMeshHttpConfiguration.eventMeshName + "-" + ConfigurationContextUtil.HTTP);
300-
self.setEndPoint(endPoints);
301-
registerResult = registry.register(self);
301+
EventMeshRegisterInfo eventMeshRegisterInfo = new EventMeshRegisterInfo();
302+
eventMeshRegisterInfo.setEventMeshClusterName(eventMeshHttpConfiguration.eventMeshCluster);
303+
eventMeshRegisterInfo.setEventMeshName(eventMeshHttpConfiguration.eventMeshName + "-" + ConfigurationContextUtil.HTTP);
304+
eventMeshRegisterInfo.setEndPoint(endPoints);
305+
eventMeshRegisterInfo.setProtocolType(ConfigurationContextUtil.HTTP);
306+
registerResult = registry.register(eventMeshRegisterInfo);
302307
} catch (Exception e) {
303308
logger.warn("eventMesh register to registry failed", e);
304309
}
@@ -313,6 +318,7 @@ private void unRegister() throws Exception {
313318
eventMeshUnRegisterInfo.setEventMeshClusterName(eventMeshHttpConfiguration.eventMeshCluster);
314319
eventMeshUnRegisterInfo.setEventMeshName(eventMeshHttpConfiguration.eventMeshName);
315320
eventMeshUnRegisterInfo.setEndPoint(endPoints);
321+
eventMeshUnRegisterInfo.setProtocolType(ConfigurationContextUtil.HTTP);
316322
boolean registerResult = registry.unRegister(eventMeshUnRegisterInfo);
317323
if (!registerResult) {
318324
throw new EventMeshException("eventMesh fail to unRegister");

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshServer.java

+17-1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.eventmesh.runtime.connector.ConnectorResource;
2727
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
2828
import org.apache.eventmesh.runtime.registry.Registry;
29+
import org.apache.eventmesh.runtime.trace.Trace;
2930

3031
import org.slf4j.Logger;
3132
import org.slf4j.LoggerFactory;
@@ -50,6 +51,8 @@ public class EventMeshServer {
5051

5152
private Registry registry;
5253

54+
private static Trace trace;
55+
5356
private ConnectorResource connectorResource;
5457

5558
private ServiceState serviceState;
@@ -62,6 +65,7 @@ public EventMeshServer(EventMeshHTTPConfiguration eventMeshHttpConfiguration,
6265
this.eventMeshGrpcConfiguration = eventMeshGrpcConfiguration;
6366
this.acl = new Acl();
6467
this.registry = new Registry();
68+
this.trace = new Trace(eventMeshHttpConfiguration.eventMeshServerTraceEnable);
6569
this.connectorResource = new ConnectorResource();
6670

6771
ConfigurationContextUtil.putIfAbsent(ConfigurationContextUtil.TCP, eventMeshTcpConfiguration);
@@ -70,7 +74,6 @@ public EventMeshServer(EventMeshHTTPConfiguration eventMeshHttpConfiguration,
7074
}
7175

7276
public void init() throws Exception {
73-
7477
if (eventMeshHttpConfiguration != null && eventMeshHttpConfiguration.eventMeshServerSecurityEnable) {
7578
acl.init(eventMeshHttpConfiguration.eventMeshSecurityPluginType);
7679
}
@@ -90,6 +93,10 @@ public void init() throws Exception {
9093
registry.init(eventMeshHttpConfiguration.eventMeshRegistryPluginType);
9194
}
9295

96+
if (eventMeshHttpConfiguration != null && eventMeshHttpConfiguration.eventMeshServerTraceEnable) {
97+
trace.init(eventMeshHttpConfiguration.eventMeshTracePluginType);
98+
}
99+
93100
connectorResource.init(eventMeshHttpConfiguration.eventMeshConnectorPluginType);
94101

95102
// server init
@@ -167,6 +174,11 @@ public void shutdown() throws Exception {
167174
acl.shutdown();
168175
}
169176

177+
if (eventMeshHttpConfiguration != null && eventMeshHttpConfiguration.eventMeshServerTraceEnable) {
178+
trace.shutdown();
179+
}
180+
181+
170182
ConfigurationContextUtil.clear();
171183
serviceState = ServiceState.STOPED;
172184
logger.info("server state:{}", serviceState);
@@ -184,6 +196,10 @@ public EventMeshTCPServer getEventMeshTCPServer() {
184196
return eventMeshTCPServer;
185197
}
186198

199+
public static Trace getTrace() {
200+
return trace;
201+
}
202+
187203
public ServiceState getServiceState() {
188204
return serviceState;
189205
}

0 commit comments

Comments
 (0)