Skip to content

Commit 16f48d9

Browse files
authored
Merge pull request #963 from lrhkobe/trace_improve_3
[ISSUE #957] add trace for http protocol in eventmesh-runtime close #957
2 parents 6250d93 + 2d90271 commit 16f48d9

File tree

7 files changed

+456
-302
lines changed

7 files changed

+456
-302
lines changed

Diff for: eventmesh-protocol-plugin/eventmesh-protocol-meshmessage/src/main/java/org/apache/eventmesh/protocol/meshmessage/resolver/http/SendMessageRequestProtocolResolver.java

+55-44
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
import java.net.URI;
3131
import java.nio.charset.StandardCharsets;
32+
import java.util.Map;
3233

3334
import io.cloudevents.CloudEvent;
3435
import io.cloudevents.SpecVersion;
@@ -64,60 +65,70 @@ public static CloudEvent buildEvent(Header header, Body body) throws ProtocolHan
6465
cloudEventBuilder = CloudEventBuilder.v1();
6566

6667
cloudEventBuilder = cloudEventBuilder.withId(sendMessageRequestBody.getBizSeqNo())
67-
.withSubject(sendMessageRequestBody.getTopic())
68-
.withType("eventmeshmessage")
69-
.withSource(URI.create("/"))
70-
.withData(content.getBytes(StandardCharsets.UTF_8))
71-
.withExtension(ProtocolKey.REQUEST_CODE, code)
72-
.withExtension(ProtocolKey.ClientInstanceKey.ENV, env)
73-
.withExtension(ProtocolKey.ClientInstanceKey.IDC, idc)
74-
.withExtension(ProtocolKey.ClientInstanceKey.IP, ip)
75-
.withExtension(ProtocolKey.ClientInstanceKey.PID, pid)
76-
.withExtension(ProtocolKey.ClientInstanceKey.SYS, sys)
77-
.withExtension(ProtocolKey.ClientInstanceKey.USERNAME, username)
78-
.withExtension(ProtocolKey.ClientInstanceKey.PASSWD, passwd)
79-
.withExtension(ProtocolKey.VERSION, version.getVersion())
80-
.withExtension(ProtocolKey.LANGUAGE, language)
81-
.withExtension(ProtocolKey.PROTOCOL_TYPE, protocolType)
82-
.withExtension(ProtocolKey.PROTOCOL_DESC, protocolDesc)
83-
.withExtension(ProtocolKey.PROTOCOL_VERSION, protocolVersion)
84-
.withExtension(SendMessageRequestBody.BIZSEQNO, sendMessageRequestBody.getBizSeqNo())
85-
.withExtension(SendMessageRequestBody.UNIQUEID, sendMessageRequestBody.getUniqueId())
86-
.withExtension(SendMessageRequestBody.PRODUCERGROUP,
87-
sendMessageRequestBody.getProducerGroup())
88-
.withExtension(SendMessageRequestBody.TTL, sendMessageRequestBody.getTtl());
68+
.withSubject(sendMessageRequestBody.getTopic())
69+
.withType("eventmeshmessage")
70+
.withSource(URI.create("/"))
71+
.withData(content.getBytes(StandardCharsets.UTF_8))
72+
.withExtension(ProtocolKey.REQUEST_CODE, code)
73+
.withExtension(ProtocolKey.ClientInstanceKey.ENV, env)
74+
.withExtension(ProtocolKey.ClientInstanceKey.IDC, idc)
75+
.withExtension(ProtocolKey.ClientInstanceKey.IP, ip)
76+
.withExtension(ProtocolKey.ClientInstanceKey.PID, pid)
77+
.withExtension(ProtocolKey.ClientInstanceKey.SYS, sys)
78+
.withExtension(ProtocolKey.ClientInstanceKey.USERNAME, username)
79+
.withExtension(ProtocolKey.ClientInstanceKey.PASSWD, passwd)
80+
.withExtension(ProtocolKey.VERSION, version.getVersion())
81+
.withExtension(ProtocolKey.LANGUAGE, language)
82+
.withExtension(ProtocolKey.PROTOCOL_TYPE, protocolType)
83+
.withExtension(ProtocolKey.PROTOCOL_DESC, protocolDesc)
84+
.withExtension(ProtocolKey.PROTOCOL_VERSION, protocolVersion)
85+
.withExtension(SendMessageRequestBody.BIZSEQNO, sendMessageRequestBody.getBizSeqNo())
86+
.withExtension(SendMessageRequestBody.UNIQUEID, sendMessageRequestBody.getUniqueId())
87+
.withExtension(SendMessageRequestBody.PRODUCERGROUP,
88+
sendMessageRequestBody.getProducerGroup())
89+
.withExtension(SendMessageRequestBody.TTL, sendMessageRequestBody.getTtl());
8990
if (StringUtils.isNotEmpty(sendMessageRequestBody.getTag())) {
9091
cloudEventBuilder = cloudEventBuilder.withExtension(SendMessageRequestBody.TAG, sendMessageRequestBody.getTag());
9192
}
93+
if (sendMessageRequestBody.getExtFields() != null && sendMessageRequestBody.getExtFields().size() > 0) {
94+
for (Map.Entry<String, String> entry : sendMessageRequestBody.getExtFields().entrySet()) {
95+
cloudEventBuilder = cloudEventBuilder.withExtension(entry.getKey(), entry.getValue());
96+
}
97+
}
9298
event = cloudEventBuilder.build();
9399
} else if (StringUtils.equals(SpecVersion.V03.toString(), protocolVersion)) {
94100
cloudEventBuilder = CloudEventBuilder.v03();
95101
cloudEventBuilder = cloudEventBuilder.withId(sendMessageRequestBody.getBizSeqNo())
96-
.withSubject(sendMessageRequestBody.getTopic())
97-
.withType("eventmeshmessage")
98-
.withSource(URI.create("/"))
99-
.withData(content.getBytes(StandardCharsets.UTF_8))
100-
.withExtension(ProtocolKey.REQUEST_CODE, code)
101-
.withExtension(ProtocolKey.ClientInstanceKey.ENV, env)
102-
.withExtension(ProtocolKey.ClientInstanceKey.IDC, idc)
103-
.withExtension(ProtocolKey.ClientInstanceKey.IP, ip)
104-
.withExtension(ProtocolKey.ClientInstanceKey.PID, pid)
105-
.withExtension(ProtocolKey.ClientInstanceKey.SYS, sys)
106-
.withExtension(ProtocolKey.ClientInstanceKey.USERNAME, username)
107-
.withExtension(ProtocolKey.ClientInstanceKey.PASSWD, passwd)
108-
.withExtension(ProtocolKey.VERSION, version.getVersion())
109-
.withExtension(ProtocolKey.LANGUAGE, language)
110-
.withExtension(ProtocolKey.PROTOCOL_TYPE, protocolType)
111-
.withExtension(ProtocolKey.PROTOCOL_DESC, protocolDesc)
112-
.withExtension(ProtocolKey.PROTOCOL_VERSION, protocolVersion)
113-
.withExtension(SendMessageRequestBody.BIZSEQNO, sendMessageRequestBody.getBizSeqNo())
114-
.withExtension(SendMessageRequestBody.UNIQUEID, sendMessageRequestBody.getUniqueId())
115-
.withExtension(SendMessageRequestBody.PRODUCERGROUP,
116-
sendMessageRequestBody.getProducerGroup())
117-
.withExtension(SendMessageRequestBody.TTL, sendMessageRequestBody.getTtl());
102+
.withSubject(sendMessageRequestBody.getTopic())
103+
.withType("eventmeshmessage")
104+
.withSource(URI.create("/"))
105+
.withData(content.getBytes(StandardCharsets.UTF_8))
106+
.withExtension(ProtocolKey.REQUEST_CODE, code)
107+
.withExtension(ProtocolKey.ClientInstanceKey.ENV, env)
108+
.withExtension(ProtocolKey.ClientInstanceKey.IDC, idc)
109+
.withExtension(ProtocolKey.ClientInstanceKey.IP, ip)
110+
.withExtension(ProtocolKey.ClientInstanceKey.PID, pid)
111+
.withExtension(ProtocolKey.ClientInstanceKey.SYS, sys)
112+
.withExtension(ProtocolKey.ClientInstanceKey.USERNAME, username)
113+
.withExtension(ProtocolKey.ClientInstanceKey.PASSWD, passwd)
114+
.withExtension(ProtocolKey.VERSION, version.getVersion())
115+
.withExtension(ProtocolKey.LANGUAGE, language)
116+
.withExtension(ProtocolKey.PROTOCOL_TYPE, protocolType)
117+
.withExtension(ProtocolKey.PROTOCOL_DESC, protocolDesc)
118+
.withExtension(ProtocolKey.PROTOCOL_VERSION, protocolVersion)
119+
.withExtension(SendMessageRequestBody.BIZSEQNO, sendMessageRequestBody.getBizSeqNo())
120+
.withExtension(SendMessageRequestBody.UNIQUEID, sendMessageRequestBody.getUniqueId())
121+
.withExtension(SendMessageRequestBody.PRODUCERGROUP,
122+
sendMessageRequestBody.getProducerGroup())
123+
.withExtension(SendMessageRequestBody.TTL, sendMessageRequestBody.getTtl());
118124
if (StringUtils.isNotEmpty(sendMessageRequestBody.getTag())) {
119125
cloudEventBuilder = cloudEventBuilder.withExtension(SendMessageRequestBody.TAG, sendMessageRequestBody.getTag());
120126
}
127+
if (sendMessageRequestBody.getExtFields() != null && sendMessageRequestBody.getExtFields().size() > 0) {
128+
for (Map.Entry<String, String> entry : sendMessageRequestBody.getExtFields().entrySet()) {
129+
cloudEventBuilder = cloudEventBuilder.withExtension(entry.getKey(), entry.getValue());
130+
}
131+
}
121132
event = cloudEventBuilder.build();
122133
}
123134
return event;

0 commit comments

Comments
 (0)