Skip to content

Commit 6584e7c

Browse files
authored
Merge pull request #2517 from jackyluo-learning/protocol-amqp-shadow1
[ISSUE #2516] AmqpConsumer bugfix
2 parents 0663f6c + 06d049c commit 6584e7c

File tree

4 files changed

+26
-31
lines changed

4 files changed

+26
-31
lines changed

Diff for: eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/amqp/AmqpMessage.java

+1-4
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,12 @@
1919

2020
package org.apache.eventmesh.common.protocol.amqp;
2121

22-
import com.rabbitmq.client.impl.AMQContentHeader;
2322
import org.apache.eventmesh.common.protocol.ProtocolTransportObject;
2423
import org.apache.eventmesh.common.protocol.amqp.common.ProtocolKey;
2524

2625
import java.util.Map;
2726

28-
import com.rabbitmq.client.ContentHeader;
29-
30-
import com.rabbitmq.client.AMQP.BasicProperties;
27+
import com.rabbitmq.client.impl.AMQContentHeader;
3128

3229
import lombok.Data;
3330

Diff for: eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/consumer/AmqpConsumerImpl.java

+9-7
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,14 @@
88
import org.apache.eventmesh.protocol.api.exception.ProtocolHandleException;
99
import org.apache.eventmesh.runtime.core.plugin.MQConsumerWrapper;
1010
import org.apache.eventmesh.runtime.core.protocol.amqp.processor.AmqpChannel;
11+
import org.apache.eventmesh.runtime.core.protocol.amqp.remoting.protocol.ErrorCodes;
1112

1213
import java.io.IOException;
1314
import java.util.ArrayList;
1415
import java.util.List;
1516
import java.util.Objects;
1617
import java.util.concurrent.ConcurrentHashMap;
1718

18-
import org.apache.eventmesh.runtime.core.protocol.amqp.remoting.protocol.ErrorCodes;
1919
import org.slf4j.Logger;
2020
import org.slf4j.LoggerFactory;
2121

@@ -46,7 +46,7 @@ public class AmqpConsumerImpl implements AmqpConsumer {
4646
/**
4747
* amqpChannel that current consumer used
4848
*/
49-
private AmqpChannel amqpChannel;
49+
private AmqpChannel channel;
5050

5151
/**
5252
* a map that store all un ack message which has been pushed to client
@@ -65,18 +65,20 @@ public void pushMessage(PushMessageContext pushMessageContext) {
6565
// TODO: 2022/10/20 exception handle
6666
throw new RuntimeException(e);
6767
}
68-
long deliveryTag = this.amqpChannel.getNextDeliveryTag();
68+
long deliveryTag = this.channel.getNextDeliveryTag();
6969
if (!autoAck) {
7070
addUnAckMsg(deliveryTag, pushMessageContext);
7171
}
7272

7373
try {
74-
amqpChannel.getConnection().getAmqpOutputConverter().writeDeliver(amqpMessage, this.amqpChannel.getChannelId(),
74+
channel.getConnection().getAmqpOutputConverter().writeDeliver(amqpMessage, channel.getChannelId(),
7575
false, deliveryTag, consumerTag);
7676
} catch (IOException e) {
7777
logger.error("sendMessages IOException", e);
78-
amqpChannel.closeChannel(ErrorCodes.INTERNAL_ERROR, "system error");
78+
channel.closeChannel(ErrorCodes.INTERNAL_ERROR, "system error");
7979
}
80+
81+
8082
}
8183

8284
@Override
@@ -126,10 +128,10 @@ public void notifyConsumer() {
126128
}
127129

128130
/**
129-
* 添加unAck msg
131+
* add unacked msg
130132
*/
131133
private void addUnAckMsg(Long deliveryTag, PushMessageContext pushMessageContext) {
132-
this.amqpChannel.getUnackMessageMap().put(deliveryTag, pushMessageContext);
134+
this.channel.getUnacknowledgedMessageMap().add(deliveryTag, pushMessageContext.getMessageId(), this, 1);
133135
this.unAckMap.put(pushMessageContext.getMessageId(), pushMessageContext);
134136
}
135137
}

Diff for: eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/consumer/QueueConsumerMapping.java

+10-13
Original file line numberDiff line numberDiff line change
@@ -18,32 +18,29 @@ public class QueueConsumerMapping {
1818

1919
public void registerConsumer(String virtualHost, String queue, AmqpConsumer consumer) throws AmqpNotFoundException {
2020

21-
ConcurrentHashMap<String, Set<AmqpConsumer>> infoMap = queueConsumerMapping.computeIfAbsent(virtualHost, m -> new ConcurrentHashMap<>());
22-
if (infoMap == null) {
23-
logger.error("virtualHost not found {}", virtualHost);
24-
throw new AmqpNotFoundException("vhost not found");
25-
}
21+
ConcurrentHashMap<String, Set<AmqpConsumer>> infoMap = getInfoMap(virtualHost);
2622
Set<AmqpConsumer> consumers = infoMap.computeIfAbsent(queue, m -> new HashSet<>());
2723
consumers.add(consumer);
2824
}
2925

3026
public void removeConsumer(String virtualHost, String queue, AmqpConsumer consumer) throws AmqpNotFoundException {
3127

32-
ConcurrentHashMap<String, Set<AmqpConsumer>> infoMap = queueConsumerMapping.computeIfAbsent(virtualHost, m -> new ConcurrentHashMap<>());
33-
if (infoMap == null) {
34-
logger.error("virtualHost not found {}", virtualHost);
35-
throw new AmqpNotFoundException("vhost not found");
36-
}
28+
ConcurrentHashMap<String, Set<AmqpConsumer>> infoMap = getInfoMap(virtualHost);
3729
Set<AmqpConsumer> consumers = infoMap.computeIfAbsent(queue, m -> new HashSet<>());
3830
consumers.remove(consumer);
3931
}
4032

41-
public Set<AmqpConsumer> getConsumers(String virtualHost, String queue) {
33+
private ConcurrentHashMap<String, Set<AmqpConsumer>> getInfoMap(String virtualHost) throws AmqpNotFoundException {
4234
ConcurrentHashMap<String, Set<AmqpConsumer>> infoMap = queueConsumerMapping.computeIfAbsent(virtualHost, m -> new ConcurrentHashMap<>());
43-
if (infoMap == null) {
44-
return null;
35+
if (infoMap.isEmpty()) {
36+
logger.error("virtualHost not found {}", virtualHost);
37+
throw new AmqpNotFoundException("vhost not found");
4538
}
39+
return infoMap;
40+
}
4641

42+
public Set<AmqpConsumer> getConsumers(String virtualHost, String queue) throws AmqpNotFoundException {
43+
ConcurrentHashMap<String, Set<AmqpConsumer>> infoMap = getInfoMap(virtualHost);
4744
return infoMap.get(queue);
4845

4946
}

Diff for: eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/amqp/downstreamstrategy/RandomDispatchStrategy.java

+6-7
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,12 @@
22

33
import org.apache.eventmesh.runtime.core.protocol.amqp.consumer.AmqpConsumer;
44
import org.apache.eventmesh.runtime.core.protocol.amqp.consumer.QueueConsumerMapping;
5-
6-
import org.apache.commons.collections4.MapUtils;
7-
import org.apache.commons.lang3.StringUtils;
5+
import org.apache.eventmesh.runtime.core.protocol.amqp.exception.AmqpNotFoundException;
86

97
import java.util.ArrayList;
108
import java.util.Collections;
119
import java.util.List;
1210
import java.util.Set;
13-
import java.util.concurrent.ConcurrentHashMap;
1411

1512
/**
1613
* randomly select channel
@@ -19,9 +16,11 @@ public class RandomDispatchStrategy implements DownstreamDispatchStrategy {
1916
@Override
2017
public AmqpConsumer select(String queue, QueueConsumerMapping queueConsumerMapping) {
2118

22-
Set<AmqpConsumer> consumers = queueConsumerMapping.getConsumers(null, queue);
23-
if (consumers == null || consumers.size() <= 0) {
24-
return null;
19+
Set<AmqpConsumer> consumers = null;
20+
try {
21+
consumers = queueConsumerMapping.getConsumers(null, queue);
22+
} catch (AmqpNotFoundException e) {
23+
throw new RuntimeException(e);
2524
}
2625

2726
List<AmqpConsumer> amqpConsumerList = new ArrayList<>(consumers);

0 commit comments

Comments
 (0)