Skip to content

Commit fcdd853

Browse files
authored
Merge pull request #2214 from jackyluo-learning/protocol-amqp-shadow7
[ISSUE #2213] Implementation of message sender in amqp close #2213
2 parents 3081419 + 00762b0 commit fcdd853

File tree

5 files changed

+308
-27
lines changed

5 files changed

+308
-27
lines changed

eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/amqp/AmqpMessage.java

+13-3
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,15 @@
1919

2020
package org.apache.eventmesh.common.protocol.amqp;
2121

22-
import java.util.Map;
22+
import org.apache.eventmesh.common.protocol.ProtocolTransportObject;
23+
import org.apache.eventmesh.common.protocol.amqp.common.ProtocolKey;
2324

24-
import lombok.Data;
25+
import java.util.Map;
2526

26-
import org.apache.eventmesh.common.protocol.ProtocolTransportObject;
2727
import com.rabbitmq.client.AMQP.BasicProperties;
2828

29+
import lombok.Data;
30+
2931
/**
3032
* message body of Amqp, including content header and content body
3133
*/
@@ -50,4 +52,12 @@ public AmqpMessage(BasicProperties contentHeader, byte[] contentBody, Map<String
5052
public long getSize() {
5153
return contentBody == null ? 0 : contentHeader == null ? contentBody.length : contentBody.length + contentHeader.getBodySize();
5254
}
55+
56+
public String getExchange() {
57+
return extendInfo.getOrDefault(ProtocolKey.EXCHANGE, "").toString();
58+
}
59+
60+
public String getRoutingKey() {
61+
return extendInfo.getOrDefault(ProtocolKey.ROUTING_KEY, "").toString();
62+
}
5363
}

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/consumer/AmqpConsumerImpl.java

+9-16
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,18 @@
88
import org.apache.eventmesh.protocol.api.exception.ProtocolHandleException;
99
import org.apache.eventmesh.runtime.core.plugin.MQConsumerWrapper;
1010
import org.apache.eventmesh.runtime.core.protocol.amqp.processor.AmqpChannel;
11-
import org.apache.eventmesh.runtime.core.protocol.amqp.remoting.AMQData;
1211

12+
import java.io.IOException;
1313
import java.util.ArrayList;
1414
import java.util.List;
1515
import java.util.Objects;
1616
import java.util.concurrent.ConcurrentHashMap;
1717

18+
import org.apache.eventmesh.runtime.core.protocol.amqp.remoting.protocol.ErrorCodes;
1819
import org.slf4j.Logger;
1920
import org.slf4j.LoggerFactory;
2021

2122
import io.cloudevents.CloudEvent;
22-
import io.netty.channel.ChannelFutureListener;
2323

2424
public class AmqpConsumerImpl implements AmqpConsumer {
2525

@@ -70,20 +70,13 @@ public void pushMessage(PushMessageContext pushMessageContext) {
7070
addUnAckMsg(deliveryTag, pushMessageContext);
7171
}
7272

73-
// TODO: 2022/10/20 convert AmqpMessage to AMQData
74-
AMQData amqData = this.amqpChannel.getConnection().getAmqpInOutputConverter().convertOutput(amqpMessage);
75-
76-
this.amqpChannel.getConnection().getCtx().writeAndFlush(amqData).addListener(
77-
(ChannelFutureListener) future -> {
78-
if (!future.isSuccess()) {
79-
logger.error("push message fail, amqData: {}", amqData);
80-
// TODO: 2022/10/21 retry strategy
81-
} else {
82-
logger.info("push message success, amqData: {}", amqData);
83-
// TODO: 2022/10/21 push success strategy
84-
}
85-
}
86-
);
73+
try {
74+
amqpChannel.getConnection().getAmqpOutputConverter().writeDeliver(amqpMessage, this.amqpChannel.getChannelId(),
75+
false, deliveryTag, consumerTag);
76+
} catch (IOException e) {
77+
logger.error("sendMessages IOException", e);
78+
amqpChannel.closeChannel(ErrorCodes.INTERNAL_ERROR, "system error");
79+
}
8780
}
8881

8982
@Override
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
/**
2+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
3+
* the License. You may obtain a copy of the License at
4+
* <p>
5+
* http://www.apache.org/licenses/LICENSE-2.0
6+
* <p>
7+
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
8+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
9+
* specific language governing permissions and limitations under the License.
10+
*/
11+
12+
package org.apache.eventmesh.runtime.core.protocol.amqp.consumer;
13+
14+
15+
import org.apache.eventmesh.common.protocol.amqp.AmqpMessage;
16+
import org.apache.eventmesh.runtime.core.protocol.amqp.processor.AmqpConnection;
17+
import org.apache.eventmesh.runtime.core.protocol.amqp.remoting.AMQData;
18+
import org.apache.eventmesh.runtime.core.protocol.amqp.remoting.AMQPFrame;
19+
20+
import java.io.IOException;
21+
import java.util.ArrayList;
22+
import java.util.List;
23+
24+
import io.netty.buffer.ByteBuf;
25+
26+
import com.rabbitmq.client.AMQP;
27+
import com.rabbitmq.client.impl.Frame;
28+
import com.rabbitmq.client.impl.Method;
29+
30+
import lombok.extern.log4j.Log4j2;
31+
32+
/**
33+
* Used to process command output.
34+
*/
35+
@Log4j2
36+
public class AmqpMessageSender {
37+
38+
private final AmqpConnection connection;
39+
40+
//
41+
public AmqpMessageSender(AmqpConnection connection) {
42+
this.connection = connection;
43+
}
44+
45+
public long writeDeliver(final AmqpMessage message, int channelId,
46+
boolean isRedelivered, long deliveryTag,
47+
String consumerTag) throws IOException {
48+
AMQP.Basic.Deliver deliver = connection.getCommandFactory().createBasicDeliverBody(consumerTag, deliveryTag, isRedelivered,
49+
message.getExchange(), message.getRoutingKey());
50+
return writeMessage(message, (Method) deliver, channelId);
51+
52+
}
53+
54+
public long writeGetOk(final AmqpMessage message, int channelId,
55+
boolean isRedelivered, long deliveryTag, int messageCount) throws IOException {
56+
57+
AMQP.Basic.GetOk getOk = connection.getCommandFactory().createBasicGetOkBody(deliveryTag, isRedelivered,
58+
message.getExchange(), message.getRoutingKey(), messageCount);
59+
return writeMessage(message, (Method) getOk, channelId);
60+
}
61+
62+
private long writeMessage(final AmqpMessage message, final Method method, int channelId) throws IOException {
63+
CompositeMessageBlock messageFrame = new CompositeMessageBlock();
64+
byte[] body = message.getContentBody();
65+
int bodyLength = body.length;
66+
Frame headerFrame = message.getContentHeader().toFrame(channelId, bodyLength);
67+
68+
int frameMax = connection.getMaxFrameSize();
69+
boolean cappedFrameMax = frameMax > 0;
70+
int bodyPayloadMax = cappedFrameMax ? frameMax - AMQPFrame.NON_BODY_SIZE : bodyLength;
71+
72+
if (cappedFrameMax && headerFrame.size() > frameMax) {
73+
String msg = String.format("Content headers exceeded max frame size: %d > %d", headerFrame.size(), frameMax);
74+
throw new IllegalArgumentException(msg);
75+
}
76+
messageFrame.setMethod(AMQPFrame.get(method.toFrame(channelId)));
77+
messageFrame.setHeader(AMQPFrame.get(headerFrame));
78+
79+
for (int offset = 0; offset < body.length; offset += bodyPayloadMax) {
80+
int remaining = body.length - offset;
81+
82+
int fragmentLength = (remaining < bodyPayloadMax) ? remaining
83+
: bodyPayloadMax;
84+
Frame frame = Frame.fromBodyFragment(channelId, body,
85+
offset, fragmentLength);
86+
messageFrame.addContent(AMQPFrame.get(frame));
87+
}
88+
connection.writeFrame(messageFrame);
89+
return body.length;
90+
}
91+
92+
public void writeReturn(final AmqpMessage message, int channelId, int replyCode,
93+
String replyText) throws IOException {
94+
95+
AMQP.Basic.Return returnBody = connection.getCommandFactory().createBasicReturnBody(replyCode, replyText, message.getExchange(), message.getRoutingKey());
96+
97+
writeMessage(message, (Method) returnBody, channelId);
98+
}
99+
100+
101+
public class CompositeMessageBlock implements AMQData {
102+
103+
AMQPFrame method;
104+
105+
AMQPFrame header;
106+
107+
List<AMQPFrame> contents;
108+
109+
@Override
110+
public void encode(ByteBuf buf) {
111+
method.encode(buf);
112+
header.encode(buf);
113+
contents.forEach(content -> {
114+
content.encode(buf);
115+
});
116+
}
117+
118+
public CompositeMessageBlock() {
119+
}
120+
121+
public CompositeMessageBlock(AMQPFrame method, AMQPFrame header,
122+
List<AMQPFrame> contents) {
123+
this.method = method;
124+
this.header = header;
125+
this.contents = contents;
126+
}
127+
128+
public AMQPFrame getMethod() {
129+
return method;
130+
}
131+
132+
public void setMethod(AMQPFrame method) {
133+
this.method = method;
134+
}
135+
136+
public AMQPFrame getHeader() {
137+
return header;
138+
}
139+
140+
public void setHeader(AMQPFrame header) {
141+
this.header = header;
142+
}
143+
144+
public List<AMQPFrame> getContents() {
145+
return contents;
146+
}
147+
148+
public void setContents(List<AMQPFrame> contents) {
149+
this.contents = contents;
150+
}
151+
152+
public void addContent(AMQPFrame content) {
153+
if (this.contents == null) {
154+
this.contents = new ArrayList<>(2);
155+
}
156+
contents.add(content);
157+
}
158+
159+
160+
}
161+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
/**
2+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
3+
* the License. You may obtain a copy of the License at
4+
* <p>
5+
* http://www.apache.org/licenses/LICENSE-2.0
6+
* <p>
7+
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
8+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
9+
* specific language governing permissions and limitations under the License.
10+
*/
11+
12+
package org.apache.eventmesh.runtime.core.protocol.amqp.consumer;
13+
14+
import static com.google.common.base.Preconditions.checkNotNull;
15+
16+
import org.apache.eventmesh.runtime.core.protocol.amqp.processor.AmqpChannel;
17+
18+
import java.util.Collection;
19+
import java.util.Collections;
20+
import java.util.HashMap;
21+
import java.util.HashSet;
22+
import java.util.Map;
23+
import java.util.Set;
24+
import java.util.concurrent.ConcurrentHashMap;
25+
26+
import com.google.common.annotations.VisibleForTesting;
27+
28+
/**
29+
* unack message map.
30+
*/
31+
public class UnacknowledgedMessageMap {
32+
33+
/**
34+
* unAck positionInfo.
35+
*/
36+
public static final class MessageConsumerAssociation {
37+
private final String messageId;
38+
private final AmqpConsumer consumer;
39+
private final int size;
40+
41+
private MessageConsumerAssociation(String messageId, AmqpConsumer consumer, int size) {
42+
this.messageId = messageId;
43+
this.consumer = consumer;
44+
this.size = size;
45+
}
46+
47+
public String getMessageId() {
48+
return messageId;
49+
}
50+
51+
public AmqpConsumer getConsumer() {
52+
return consumer;
53+
}
54+
55+
public int getSize() {
56+
return size;
57+
}
58+
}
59+
60+
private final Map<Long, MessageConsumerAssociation> map = new ConcurrentHashMap<>();
61+
private final AmqpChannel channel;
62+
63+
public UnacknowledgedMessageMap(AmqpChannel channel) {
64+
this.channel = channel;
65+
}
66+
67+
public Collection<MessageConsumerAssociation> acknowledge(long deliveryTag, boolean multiple) {
68+
if (multiple) {
69+
Map<Long, MessageConsumerAssociation> acks = new HashMap<>();
70+
map.entrySet().stream().forEach(entry -> {
71+
if (entry.getKey() <= deliveryTag) {
72+
acks.put(entry.getKey(), entry.getValue());
73+
}
74+
});
75+
remove(acks.keySet());
76+
return acks.values();
77+
} else {
78+
MessageConsumerAssociation association = remove(deliveryTag);
79+
if (association != null) {
80+
return Collections.singleton(association);
81+
}
82+
}
83+
return Collections.emptySet();
84+
}
85+
86+
public Collection<MessageConsumerAssociation> acknowledgeAll() {
87+
Set<MessageConsumerAssociation> associations = new HashSet<>();
88+
associations.addAll(map.values());
89+
map.clear();
90+
return associations;
91+
}
92+
93+
public void add(long deliveryTag, String messageId, AmqpConsumer consumer, int size) {
94+
checkNotNull(messageId);
95+
checkNotNull(consumer);
96+
map.put(deliveryTag, new MessageConsumerAssociation(messageId, consumer, size));
97+
}
98+
99+
public void remove(Collection<Long> deliveryTag) {
100+
deliveryTag.stream().forEach(tag -> {
101+
map.remove(tag);
102+
});
103+
}
104+
105+
public MessageConsumerAssociation remove(long deliveryTag) {
106+
MessageConsumerAssociation entry = map.remove(deliveryTag);
107+
return entry;
108+
}
109+
110+
public int size() {
111+
return map.size();
112+
}
113+
114+
@VisibleForTesting
115+
public Map<Long, MessageConsumerAssociation> getMap() {
116+
return map;
117+
}
118+
}

0 commit comments

Comments
 (0)