Skip to content

Commit 277dbef

Browse files
fixed AmqpMessage
1 parent 849cebc commit 277dbef

File tree

2 files changed

+14
-13
lines changed
  • eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/amqp/common
  • eventmesh-protocol-plugin/eventmesh-protocol-amqp/src/main/java/org/apache/eventmesh/protocol/amqp/resolver

2 files changed

+14
-13
lines changed

Diff for: eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/amqp/common/ProtocolKey.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@
1919

2020
package org.apache.eventmesh.common.protocol.amqp.common;
2121

22-
import java.util.Map;
23-
2422
public class ProtocolKey {
2523
public static final String ROUTING_KEY = "routingKey";
2624

@@ -32,7 +30,7 @@ public class ProtocolKey {
3230

3331
public static final String QUEUE_NAME = "queueName";
3432

35-
public static final String BASIC_PROPERTIES = "basicProperties";
33+
public static final String AMQ_CONTEND_HEADER = "contentHeader";
3634

3735
public static class ContentHeaderProperties {
3836
public static final String CLASS_ID = "classId";

Diff for: eventmesh-protocol-plugin/eventmesh-protocol-amqp/src/main/java/org/apache/eventmesh/protocol/amqp/resolver/AmqpProtocolResolver.java

+13-10
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,24 @@
1717

1818
package org.apache.eventmesh.protocol.amqp.resolver;
1919

20-
import com.alibaba.fastjson.JSON;
21-
import com.rabbitmq.client.AMQP.BasicProperties;
22-
import io.cloudevents.CloudEvent;
23-
import io.cloudevents.core.v1.CloudEventBuilder;
24-
import org.apache.commons.collections4.MapUtils;
2520
import org.apache.eventmesh.common.protocol.amqp.AmqpMessage;
2621
import org.apache.eventmesh.common.protocol.amqp.common.ProtocolKey;
2722
import org.apache.eventmesh.protocol.amqp.AMQPProtocolConstant;
2823

24+
import org.apache.commons.collections4.MapUtils;
25+
2926
import java.net.URI;
3027
import java.util.HashMap;
3128
import java.util.Map;
3229
import java.util.Objects;
3330
import java.util.UUID;
3431

32+
import io.cloudevents.CloudEvent;
33+
import io.cloudevents.core.v1.CloudEventBuilder;
34+
35+
import com.alibaba.fastjson.JSON;
36+
import com.rabbitmq.client.impl.AMQContentHeader;
37+
3538
/**
3639
* Resolve AmqpMessage or CloudEvent
3740
*/
@@ -46,19 +49,19 @@ public static CloudEvent buildEvent(AmqpMessage amqpMessage) {
4649
String queueName = "";
4750
Map<String, Object> extendInfo = amqpMessage.getExtendInfo();
4851
if (MapUtils.isNotEmpty(extendInfo)) {
49-
// routingKey, exchange去掉,路由信息在这之前完成
52+
// removed routingKey, exchange, routing should be finished before
5053
// routingKey = extendInfo.get(ProtocolKey.ROUTING_KEY).toString();
5154
// exchange = extendInfo.get(ProtocolKey.EXCHANGE).toString();
5255
queueName = extendInfo.get(ProtocolKey.QUEUE_NAME).toString();
5356
}
54-
BasicProperties amqBasicProperties = amqpMessage.getContentHeader();
57+
AMQContentHeader amqContentHeader = amqpMessage.getContentHeader();
5558
byte[] contentBody = amqpMessage.getContentBody();
5659
cloudEventBuilder
5760
.withId(id)
5861
.withSource(source)
5962
.withType(AMQPProtocolConstant.PROTOCOL_NAME)
6063
.withSubject(queueName)
61-
.withExtension(ProtocolKey.BASIC_PROPERTIES, JSON.toJSONString(amqBasicProperties))
64+
.withExtension(ProtocolKey.AMQ_CONTEND_HEADER, JSON.toJSONString(amqContentHeader))
6265
.withData(contentBody);
6366
return cloudEventBuilder.build();
6467
}
@@ -70,8 +73,8 @@ public static AmqpMessage buildAmqpMessage(CloudEvent cloudEvent) {
7073
extendInfo.put(ProtocolKey.ROUTING_KEY, routingKey);
7174
extendInfo.put(ProtocolKey.EXCHANGE, exchange);
7275
byte[] contentBody = Objects.requireNonNull(cloudEvent.getData()).toBytes();
73-
BasicProperties amqBasicProperties = JSON.parseObject(Objects.requireNonNull(cloudEvent.getExtension(ProtocolKey.BASIC_PROPERTIES)).toString(), BasicProperties.class);
74-
return new AmqpMessage(amqBasicProperties, contentBody, extendInfo);
76+
AMQContentHeader amqContentHeader = JSON.parseObject(Objects.requireNonNull(cloudEvent.getExtension(ProtocolKey.AMQ_CONTEND_HEADER)).toString(), AMQContentHeader.class);
77+
return new AmqpMessage(amqContentHeader, contentBody, extendInfo);
7578
}
7679

7780
}

0 commit comments

Comments
 (0)