Skip to content

Commit 0663f6c

Browse files
authored
Merge pull request #2219 from jackyluo-learning/protocol-amqp-shadow2
[ISSUE #2218]fixed bug in AmqpMessage close #2218
2 parents 560dc3b + 277dbef commit 0663f6c

File tree

3 files changed

+17
-15
lines changed

3 files changed

+17
-15
lines changed

Diff for: eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/amqp/AmqpMessage.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.apache.eventmesh.common.protocol.amqp;
2121

22+
import com.rabbitmq.client.impl.AMQContentHeader;
2223
import org.apache.eventmesh.common.protocol.ProtocolTransportObject;
2324
import org.apache.eventmesh.common.protocol.amqp.common.ProtocolKey;
2425

@@ -35,7 +36,7 @@
3536
*/
3637
@Data
3738
public class AmqpMessage implements ProtocolTransportObject {
38-
private ContentHeader contentHeader;
39+
private AMQContentHeader contentHeader;
3940

4041
private byte[] contentBody;
4142

@@ -45,7 +46,7 @@ public AmqpMessage() {
4546
this(null, null, null);
4647
}
4748

48-
public AmqpMessage(ContentHeader contentHeader, byte[] contentBody, Map<String, Object> extendInfo) {
49+
public AmqpMessage(AMQContentHeader contentHeader, byte[] contentBody, Map<String, Object> extendInfo) {
4950
this.contentHeader = contentHeader;
5051
this.contentBody = contentBody;
5152
this.extendInfo = extendInfo;

Diff for: eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/amqp/common/ProtocolKey.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@
1919

2020
package org.apache.eventmesh.common.protocol.amqp.common;
2121

22-
import java.util.Map;
23-
2422
public class ProtocolKey {
2523
public static final String ROUTING_KEY = "routingKey";
2624

@@ -32,7 +30,7 @@ public class ProtocolKey {
3230

3331
public static final String QUEUE_NAME = "queueName";
3432

35-
public static final String BASIC_PROPERTIES = "basicProperties";
33+
public static final String AMQ_CONTEND_HEADER = "contentHeader";
3634

3735
public static class ContentHeaderProperties {
3836
public static final String CLASS_ID = "classId";

Diff for: eventmesh-protocol-plugin/eventmesh-protocol-amqp/src/main/java/org/apache/eventmesh/protocol/amqp/resolver/AmqpProtocolResolver.java

+13-10
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,24 @@
1717

1818
package org.apache.eventmesh.protocol.amqp.resolver;
1919

20-
import com.alibaba.fastjson.JSON;
21-
import com.rabbitmq.client.AMQP.BasicProperties;
22-
import io.cloudevents.CloudEvent;
23-
import io.cloudevents.core.v1.CloudEventBuilder;
24-
import org.apache.commons.collections4.MapUtils;
2520
import org.apache.eventmesh.common.protocol.amqp.AmqpMessage;
2621
import org.apache.eventmesh.common.protocol.amqp.common.ProtocolKey;
2722
import org.apache.eventmesh.protocol.amqp.AMQPProtocolConstant;
2823

24+
import org.apache.commons.collections4.MapUtils;
25+
2926
import java.net.URI;
3027
import java.util.HashMap;
3128
import java.util.Map;
3229
import java.util.Objects;
3330
import java.util.UUID;
3431

32+
import io.cloudevents.CloudEvent;
33+
import io.cloudevents.core.v1.CloudEventBuilder;
34+
35+
import com.alibaba.fastjson.JSON;
36+
import com.rabbitmq.client.impl.AMQContentHeader;
37+
3538
/**
3639
* Resolve AmqpMessage or CloudEvent
3740
*/
@@ -46,19 +49,19 @@ public static CloudEvent buildEvent(AmqpMessage amqpMessage) {
4649
String queueName = "";
4750
Map<String, Object> extendInfo = amqpMessage.getExtendInfo();
4851
if (MapUtils.isNotEmpty(extendInfo)) {
49-
// routingKey, exchange去掉,路由信息在这之前完成
52+
// removed routingKey, exchange, routing should be finished before
5053
// routingKey = extendInfo.get(ProtocolKey.ROUTING_KEY).toString();
5154
// exchange = extendInfo.get(ProtocolKey.EXCHANGE).toString();
5255
queueName = extendInfo.get(ProtocolKey.QUEUE_NAME).toString();
5356
}
54-
BasicProperties amqBasicProperties = amqpMessage.getContentHeader();
57+
AMQContentHeader amqContentHeader = amqpMessage.getContentHeader();
5558
byte[] contentBody = amqpMessage.getContentBody();
5659
cloudEventBuilder
5760
.withId(id)
5861
.withSource(source)
5962
.withType(AMQPProtocolConstant.PROTOCOL_NAME)
6063
.withSubject(queueName)
61-
.withExtension(ProtocolKey.BASIC_PROPERTIES, JSON.toJSONString(amqBasicProperties))
64+
.withExtension(ProtocolKey.AMQ_CONTEND_HEADER, JSON.toJSONString(amqContentHeader))
6265
.withData(contentBody);
6366
return cloudEventBuilder.build();
6467
}
@@ -70,8 +73,8 @@ public static AmqpMessage buildAmqpMessage(CloudEvent cloudEvent) {
7073
extendInfo.put(ProtocolKey.ROUTING_KEY, routingKey);
7174
extendInfo.put(ProtocolKey.EXCHANGE, exchange);
7275
byte[] contentBody = Objects.requireNonNull(cloudEvent.getData()).toBytes();
73-
BasicProperties amqBasicProperties = JSON.parseObject(Objects.requireNonNull(cloudEvent.getExtension(ProtocolKey.BASIC_PROPERTIES)).toString(), BasicProperties.class);
74-
return new AmqpMessage(amqBasicProperties, contentBody, extendInfo);
76+
AMQContentHeader amqContentHeader = JSON.parseObject(Objects.requireNonNull(cloudEvent.getExtension(ProtocolKey.AMQ_CONTEND_HEADER)).toString(), AMQContentHeader.class);
77+
return new AmqpMessage(amqContentHeader, contentBody, extendInfo);
7578
}
7679

7780
}

0 commit comments

Comments
 (0)