Skip to content

Commit 12286b0

Browse files
update basicPublish method in AmqpChannel
1 parent fcdd853 commit 12286b0

File tree

3 files changed

+216
-78
lines changed

3 files changed

+216
-78
lines changed

eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/amqp/AmqpMessage.java

+4-6
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424

2525
import java.util.Map;
2626

27+
import com.rabbitmq.client.ContentHeader;
28+
2729
import com.rabbitmq.client.AMQP.BasicProperties;
2830

2931
import lombok.Data;
@@ -33,7 +35,7 @@
3335
*/
3436
@Data
3537
public class AmqpMessage implements ProtocolTransportObject {
36-
private BasicProperties contentHeader;
38+
private ContentHeader contentHeader;
3739

3840
private byte[] contentBody;
3941

@@ -43,16 +45,12 @@ public AmqpMessage() {
4345
this(null, null, null);
4446
}
4547

46-
public AmqpMessage(BasicProperties contentHeader, byte[] contentBody, Map<String, Object> extendInfo) {
48+
public AmqpMessage(ContentHeader contentHeader, byte[] contentBody, Map<String, Object> extendInfo) {
4749
this.contentHeader = contentHeader;
4850
this.contentBody = contentBody;
4951
this.extendInfo = extendInfo;
5052
}
5153

52-
public long getSize() {
53-
return contentBody == null ? 0 : contentHeader == null ? contentBody.length : contentBody.length + contentHeader.getBodySize();
54-
}
55-
5654
public String getExchange() {
5755
return extendInfo.getOrDefault(ProtocolKey.EXCHANGE, "").toString();
5856
}

eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshAmqpServer.java

+27-6
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,10 @@
1717

1818
package org.apache.eventmesh.runtime.boot;
1919

20-
import io.netty.bootstrap.ServerBootstrap;
21-
import io.netty.buffer.PooledByteBufAllocator;
22-
import io.netty.channel.*;
23-
import io.netty.channel.socket.SocketChannel;
24-
import io.netty.channel.socket.nio.NioServerSocketChannel;
2520
import org.apache.eventmesh.runtime.configuration.EventMeshAmqpConfiguration;
21+
import org.apache.eventmesh.runtime.core.protocol.amqp.exchange.RouteComponent;
2622
import org.apache.eventmesh.runtime.core.protocol.amqp.metadata.MetaStore;
23+
import org.apache.eventmesh.runtime.core.protocol.amqp.producer.AmqpProducerManager;
2724
import org.apache.eventmesh.runtime.core.protocol.amqp.remoting.codec.AmqpCodeDecoder;
2825
import org.apache.eventmesh.runtime.core.protocol.amqp.remoting.codec.AmqpCodeEncoder;
2926
import org.apache.eventmesh.runtime.core.protocol.amqp.service.ExchangeService;
@@ -32,6 +29,12 @@
3229
import org.apache.eventmesh.runtime.core.protocol.amqp.service.QueueServiceImpl;
3330
import org.apache.eventmesh.runtime.registry.Registry;
3431

32+
import io.netty.bootstrap.ServerBootstrap;
33+
import io.netty.buffer.PooledByteBufAllocator;
34+
import io.netty.channel.*;
35+
import io.netty.channel.socket.SocketChannel;
36+
import io.netty.channel.socket.nio.NioServerSocketChannel;
37+
3538
public class EventMeshAmqpServer extends AbstractRemotingServer {
3639

3740
private EventMeshServer eventMeshServer;
@@ -46,12 +49,18 @@ public class EventMeshAmqpServer extends AbstractRemotingServer {
4649

4750
private MetaStore metaStore;
4851

52+
private RouteComponent routeComponent;
53+
54+
private AmqpProducerManager producerManager;
55+
4956
public EventMeshAmqpServer(EventMeshServer eventMeshServer,
50-
EventMeshAmqpConfiguration eventMeshAmqpConfiguration, Registry registry) {
57+
EventMeshAmqpConfiguration eventMeshAmqpConfiguration, Registry registry, RouteComponent routeComponent, AmqpProducerManager producerManager) {
5158
super();
5259
this.eventMeshServer = eventMeshServer;
5360
this.eventMeshAmqpConfiguration = eventMeshAmqpConfiguration;
5461
this.registry = registry;
62+
this.routeComponent = routeComponent;
63+
this.producerManager = producerManager;
5564
this.metaStore=new MetaStore(this);
5665
this.exchangeService=new ExchangeServiceImpl(this,metaStore);
5766
this.queueService=new QueueServiceImpl(this,metaStore);
@@ -131,6 +140,18 @@ public QueueService getQueueService() {
131140
return queueService;
132141
}
133142

143+
public MetaStore getMetaStore() {
144+
return metaStore;
145+
}
146+
147+
public RouteComponent getRouteComponent() {
148+
return routeComponent;
149+
}
150+
151+
public AmqpProducerManager getProducerManager() {
152+
return producerManager;
153+
}
154+
134155
/**
135156
* A channel initializer that initialize channels for amqp protocol.
136157
*/

0 commit comments

Comments
 (0)