Skip to content

Commit 6250d93

Browse files
authored
Merge pull request #960 from lrhkobe/trace_improve_2
[ISSUE #956] add trace for tcp protocol in eventmesh-runtime close #956
2 parents 81859b1 + 81a5b71 commit 6250d93

File tree

12 files changed

+851
-217
lines changed

12 files changed

+851
-217
lines changed

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshServer.java

+17-1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.eventmesh.runtime.connector.ConnectorResource;
2727
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
2828
import org.apache.eventmesh.runtime.registry.Registry;
29+
import org.apache.eventmesh.runtime.trace.Trace;
2930

3031
import org.slf4j.Logger;
3132
import org.slf4j.LoggerFactory;
@@ -50,6 +51,8 @@ public class EventMeshServer {
5051

5152
private Registry registry;
5253

54+
private static Trace trace;
55+
5356
private ConnectorResource connectorResource;
5457

5558
private ServiceState serviceState;
@@ -62,6 +65,7 @@ public EventMeshServer(EventMeshHTTPConfiguration eventMeshHttpConfiguration,
6265
this.eventMeshGrpcConfiguration = eventMeshGrpcConfiguration;
6366
this.acl = new Acl();
6467
this.registry = new Registry();
68+
this.trace = new Trace(eventMeshHttpConfiguration.eventMeshServerTraceEnable);
6569
this.connectorResource = new ConnectorResource();
6670

6771
ConfigurationContextUtil.putIfAbsent(ConfigurationContextUtil.TCP, eventMeshTcpConfiguration);
@@ -70,7 +74,6 @@ public EventMeshServer(EventMeshHTTPConfiguration eventMeshHttpConfiguration,
7074
}
7175

7276
public void init() throws Exception {
73-
7477
if (eventMeshHttpConfiguration != null && eventMeshHttpConfiguration.eventMeshServerSecurityEnable) {
7578
acl.init(eventMeshHttpConfiguration.eventMeshSecurityPluginType);
7679
}
@@ -90,6 +93,10 @@ public void init() throws Exception {
9093
registry.init(eventMeshHttpConfiguration.eventMeshRegistryPluginType);
9194
}
9295

96+
if (eventMeshHttpConfiguration != null && eventMeshHttpConfiguration.eventMeshServerTraceEnable) {
97+
trace.init(eventMeshHttpConfiguration.eventMeshTracePluginType);
98+
}
99+
93100
connectorResource.init(eventMeshHttpConfiguration.eventMeshConnectorPluginType);
94101

95102
// server init
@@ -167,6 +174,11 @@ public void shutdown() throws Exception {
167174
acl.shutdown();
168175
}
169176

177+
if (eventMeshHttpConfiguration != null && eventMeshHttpConfiguration.eventMeshServerTraceEnable) {
178+
trace.shutdown();
179+
}
180+
181+
170182
ConfigurationContextUtil.clear();
171183
serviceState = ServiceState.STOPED;
172184
logger.info("server state:{}", serviceState);
@@ -184,6 +196,10 @@ public EventMeshTCPServer getEventMeshTCPServer() {
184196
return eventMeshTCPServer;
185197
}
186198

199+
public static Trace getTrace() {
200+
return trace;
201+
}
202+
187203
public ServiceState getServiceState() {
188204
return serviceState;
189205
}

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/constants/EventMeshConstants.java

+11
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,17 @@ public class EventMeshConstants {
5353
public static final String RSP_SEND_EVENTMESH_IP = "rspsendeventmeship";
5454
public static final String RSP_RECEIVE_EVENTMESH_IP = "rspreceiveeventmeship";
5555

56+
public static final String RSP_SYS = "rsp0sys";
57+
public static final String RSP_IP = "rsp0ip";
58+
public static final String RSP_IDC = "rsp0idc";
59+
public static final String RSP_GROUP = "rsp0group";
60+
public static final String RSP_URL = "rsp0url";
61+
62+
public static final String REQ_SYS = "req0sys";
63+
public static final String REQ_IP = "req0ip";
64+
public static final String REQ_IDC = "req0idc";
65+
public static final String REQ_GROUP = "req0group";
66+
5667
//default TTL 4 hours
5768
public static final Integer DEFAULT_MSG_TTL_MILLS = 14400000;
5869

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/EventMeshTcpMessageDispatcher.java

+57-12
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import org.apache.eventmesh.common.protocol.tcp.OPStatus;
2424
import org.apache.eventmesh.common.protocol.tcp.Package;
2525
import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
26+
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
27+
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
2628
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.SessionState;
2729
import org.apache.eventmesh.runtime.core.protocol.tcp.client.task.GoodbyeTask;
2830
import org.apache.eventmesh.runtime.core.protocol.tcp.client.task.HeartBeatTask;
@@ -33,13 +35,18 @@
3335
import org.apache.eventmesh.runtime.core.protocol.tcp.client.task.RecommendTask;
3436
import org.apache.eventmesh.runtime.core.protocol.tcp.client.task.SubscribeTask;
3537
import org.apache.eventmesh.runtime.core.protocol.tcp.client.task.UnSubscribeTask;
38+
import org.apache.eventmesh.runtime.trace.TraceUtils;
3639
import org.apache.eventmesh.runtime.util.EventMeshUtil;
40+
import org.apache.eventmesh.trace.api.common.EventMeshTraceConstants;
41+
42+
import java.util.concurrent.TimeUnit;
3743

3844
import org.slf4j.Logger;
3945
import org.slf4j.LoggerFactory;
4046

4147
import io.netty.channel.ChannelHandlerContext;
4248
import io.netty.channel.SimpleChannelInboundHandler;
49+
import io.opentelemetry.api.trace.Span;
4350

4451
public class EventMeshTcpMessageDispatcher extends SimpleChannelInboundHandler<Package> {
4552

@@ -55,11 +62,27 @@ public EventMeshTcpMessageDispatcher(EventMeshTCPServer eventMeshTCPServer) {
5562
protected void channelRead0(ChannelHandlerContext ctx, Package pkg) throws Exception {
5663
long startTime = System.currentTimeMillis();
5764
validateMsg(pkg);
58-
eventMeshTCPServer.getEventMeshTcpMonitor().getTcpSummaryMetrics().getClient2eventMeshMsgNum().incrementAndGet();
59-
Command cmd = null;
65+
66+
eventMeshTCPServer.getEventMeshTcpMonitor().getTcpSummaryMetrics()
67+
.getClient2eventMeshMsgNum().incrementAndGet();
68+
69+
Command cmd = pkg.getHeader().getCmd();
6070
try {
6171
Runnable task;
62-
cmd = pkg.getHeader().getCmd();
72+
73+
if (isNeedTrace(cmd)) {
74+
pkg.getHeader().getProperties()
75+
.put(EventMeshConstants.REQ_C2EVENTMESH_TIMESTAMP, startTime);
76+
pkg.getHeader().getProperties().put(EventMeshConstants.REQ_SEND_EVENTMESH_IP,
77+
eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshServerIp);
78+
Session session = eventMeshTCPServer.getClientSessionGroupMapping().getSession(ctx);
79+
80+
pkg.getHeader().getProperties().put(EventMeshConstants.REQ_SYS, session.getClient().getSubsystem());
81+
pkg.getHeader().getProperties().put(EventMeshConstants.REQ_IP, session.getClient().getHost());
82+
pkg.getHeader().getProperties().put(EventMeshConstants.REQ_IDC, session.getClient().getIdc());
83+
pkg.getHeader().getProperties().put(EventMeshConstants.REQ_GROUP, session.getClient().getGroup());
84+
}
85+
6386
if (cmd.equals(Command.RECOMMEND_REQUEST)) {
6487
messageLogger.info("pkg|c2eventMesh|cmd={}|pkg={}", cmd, pkg);
6588
task = new RecommendTask(pkg, ctx, startTime, eventMeshTCPServer);
@@ -80,22 +103,43 @@ protected void channelRead0(ChannelHandlerContext ctx, Package pkg) throws Excep
80103

81104
logMessageFlow(ctx, pkg, cmd);
82105

83-
if (eventMeshTCPServer.getClientSessionGroupMapping().getSession(ctx).getSessionState() == SessionState.CLOSED) {
84-
throw new Exception("this eventMesh tcp session will be closed, may be reboot or version change!");
106+
if (eventMeshTCPServer.getClientSessionGroupMapping().getSession(ctx)
107+
.getSessionState() == SessionState.CLOSED) {
108+
throw new Exception(
109+
"this eventMesh tcp session will be closed, may be reboot or version change!");
85110
}
86111

87112
dispatch(ctx, pkg, startTime, cmd);
88113
} catch (Exception e) {
89114
logger.error("exception occurred while pkg|cmd={}|pkg={}", cmd, pkg, e);
115+
116+
if (isNeedTrace(cmd)) {
117+
Span span = TraceUtils.prepareServerSpan(pkg.getHeader().getProperties(),
118+
EventMeshTraceConstants.TRACE_UPSTREAM_EVENTMESH_SERVER_SPAN, startTime,
119+
TimeUnit.MILLISECONDS, false);
120+
TraceUtils.finishSpanWithException(span, pkg.getHeader().getProperties(),
121+
"exception occurred while dispatch pkg", e);
122+
}
123+
90124
writeToClient(cmd, pkg, ctx, e);
91125
}
92126
}
93127

128+
private boolean isNeedTrace(Command cmd) {
129+
if (eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshServerTraceEnable
130+
&& cmd != null && (Command.REQUEST_TO_SERVER == cmd
131+
|| Command.ASYNC_MESSAGE_TO_SERVER == cmd
132+
|| Command.BROADCAST_MESSAGE_TO_SERVER == cmd)) {
133+
return true;
134+
}
135+
return false;
136+
}
137+
94138
private void writeToClient(Command cmd, Package pkg, ChannelHandlerContext ctx, Exception e) {
95139
try {
96140
Package res = new Package();
97-
res.setHeader(new Header(getReplyCommand(cmd), OPStatus.FAIL.getCode(), e.toString(), pkg.getHeader()
98-
.getSeq()));
141+
res.setHeader(new Header(getReplyCommand(cmd), OPStatus.FAIL.getCode(), e.toString(),
142+
pkg.getHeader().getSeq()));
99143
ctx.writeAndFlush(res);
100144
} catch (Exception ex) {
101145
logger.warn("writeToClient failed", ex);
@@ -131,11 +175,12 @@ private Command getReplyCommand(Command cmd) {
131175

132176
private void logMessageFlow(ChannelHandlerContext ctx, Package pkg, Command cmd) {
133177
if (pkg.getBody() instanceof EventMeshMessage) {
134-
messageLogger.info("pkg|c2eventMesh|cmd={}|Msg={}|user={}", cmd, EventMeshUtil.printMqMessage((EventMeshMessage) pkg
135-
.getBody()), eventMeshTCPServer.getClientSessionGroupMapping().getSession(ctx).getClient());
178+
messageLogger.info("pkg|c2eventMesh|cmd={}|Msg={}|user={}", cmd,
179+
EventMeshUtil.printMqMessage((EventMeshMessage) pkg.getBody()),
180+
eventMeshTCPServer.getClientSessionGroupMapping().getSession(ctx).getClient());
136181
} else {
137182
messageLogger.info("pkg|c2eventMesh|cmd={}|pkg={}|user={}", cmd, pkg,
138-
eventMeshTCPServer.getClientSessionGroupMapping().getSession(ctx).getClient());
183+
eventMeshTCPServer.getClientSessionGroupMapping().getSession(ctx).getClient());
139184
}
140185
}
141186

@@ -153,8 +198,8 @@ private void validateMsg(Package pkg) throws Exception {
153198
}
154199
}
155200

156-
private void dispatch(ChannelHandlerContext ctx, Package pkg, long startTime, Command cmd) throws
157-
Exception {
201+
private void dispatch(ChannelHandlerContext ctx, Package pkg, long startTime, Command cmd)
202+
throws Exception {
158203
Runnable task;
159204
switch (cmd) {
160205
case HEARTBEAT_REQUEST:

0 commit comments

Comments
 (0)