Skip to content

Commit d584d55

Browse files
authored
Use ApplicationData by Default instead of AMQPValue for the message creation (#116)
* Change the message interface from object to byte array. Change the Message body section from AMQPValue to Data. It is better for cross-protocol use cases --------- Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 4e3d7d7 commit d584d55

16 files changed

+164
-38
lines changed

.ci/ubuntu/cluster/gha-setup.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ function run_docker_compose
1919
docker compose --file "$script_dir/docker-compose.yml" $@
2020
}
2121

22-
readonly rabbitmq_image="${RABBITMQ_IMAGE:-rabbitmq:4.1.0-beta.4-management-alpine}"
22+
readonly rabbitmq_image="${RABBITMQ_IMAGE:-rabbitmq:4.1.0-management-alpine}"
2323

2424
if [[ ! -v GITHUB_ACTIONS ]]
2525
then

.ci/ubuntu/one-node/gha-setup.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ readonly script_dir
99
echo "[INFO] script_dir: '$script_dir'"
1010

1111

12-
readonly rabbitmq_image="${RABBITMQ_IMAGE:-rabbitmq:4.1.0-beta.4-management-alpine}"
12+
readonly rabbitmq_image="${RABBITMQ_IMAGE:-rabbitmq:4.1.0-management-alpine}"
1313

1414

1515

.ci/windows/versions.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
22
"erlang": "27.2",
3-
"rabbitmq": "4.1.0-beta.4"
3+
"rabbitmq": "4.1.0"
44
}

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,4 +129,5 @@ docs/temp/
129129

130130
# ci logs
131131
.ci/ubuntu/log/*
132+
.ci/ubuntu/cluster/log/*
132133

Directory.Packages.props

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
<PackageVersion Include="AMQPNetLite.Core" Version="2.4.11" />
88
<PackageVersion Include="OpenTelemetry" Version="1.10.0" />
99
<PackageVersion Include="OpenTelemetry.Exporter.Console" Version="1.10.0" />
10+
<PackageVersion Include="RabbitMQ.Client" Version="6.8.1" />
1011
<PackageVersion Include="System.Diagnostics.DiagnosticSource" Version="9.0.0" />
1112
<!-- HAClient -->
1213
<PackageVersion Include="DotNext.Threading" Version="5.15.0" />

RabbitMQ.AMQP.Client/IMessage.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,10 @@ public interface IMessage
9696
public object Annotation(string key);
9797
public IMessage Annotation(string key, object value);
9898

99-
public object Body();
99+
public byte[] Body();
100+
101+
public string BodyAsString();
102+
100103
public IMessage Body(object body);
101104

102105
IMessageAddressBuilder ToAddress();

RabbitMQ.AMQP.Client/IRpcServer.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,8 @@ public interface IRpcServer : ILifeCycle
7878

7979
public interface IContext
8080
{
81-
IMessage Message(object body);
81+
IMessage Message(byte[] body);
82+
IMessage Message(string body);
8283
}
8384
}
8485
}

RabbitMQ.AMQP.Client/Impl/AmqpMessage.cs

Lines changed: 36 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,24 @@ public AmqpMessage()
2424
NativeMessage = new Message();
2525
}
2626

27-
public AmqpMessage(object body)
27+
/// <summary>
28+
/// Create a message with a body of type byte[] and BodySection of type Data.
29+
/// </summary>
30+
/// <param name="body"></param>
31+
public AmqpMessage(byte[] body)
2832
{
29-
NativeMessage = new Message(body);
33+
NativeMessage = new Message();
34+
NativeMessage.BodySection = new Data { Binary = body };
35+
}
36+
37+
/// <summary>
38+
/// Create a message with a body of type string and BodySection of type Data.
39+
/// The string is converted to a byte[] using UTF8 encoding.
40+
/// </summary>
41+
public AmqpMessage(string body)
42+
{
43+
NativeMessage = new Message();
44+
NativeMessage.BodySection = new Data() { Binary = System.Text.Encoding.UTF8.GetBytes(body) };
3045
}
3146

3247
public AmqpMessage(Message nativeMessage)
@@ -245,35 +260,40 @@ public object Annotation(string key)
245260
return NativeMessage.MessageAnnotations[new Symbol(key)];
246261
}
247262

248-
public object Body()
263+
public byte[] Body()
249264
{
250-
return NativeMessage.Body;
265+
return (byte[])NativeMessage.Body;
266+
}
267+
268+
public string BodyAsString()
269+
{
270+
if (NativeMessage.BodySection is Data data)
271+
{
272+
return System.Text.Encoding.UTF8.GetString(data.Binary);
273+
}
274+
else
275+
{
276+
throw new InvalidOperationException("Body is not an Application Data");
277+
}
278+
251279
}
252280

253281
public IMessage Body(object body)
254282
{
255283
RestrictedDescribed bodySection;
256284
if (body is byte[] byteArray)
257285
{
258-
bodySection = new Data
259-
{
260-
Binary = byteArray
261-
};
286+
bodySection = new Data { Binary = byteArray };
262287
}
263288
else if (body is IList list)
264289
{
265-
bodySection = new AmqpSequence
266-
{
267-
List = list
268-
};
290+
bodySection = new AmqpSequence { List = list };
269291
}
270292
else
271293
{
272-
bodySection = new AmqpValue
273-
{
274-
Value = body
275-
};
294+
bodySection = new AmqpValue { Value = body };
276295
}
296+
277297
NativeMessage.BodySection = bodySection;
278298
return this;
279299
}

RabbitMQ.AMQP.Client/Impl/AmqpRpcServer.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,8 @@ await Utils.WaitWithBackOffUntilFuncAsync(async () =>
166166

167167
private class RpcServerContext : IRpcServer.IContext
168168
{
169-
public IMessage Message(object body) => new AmqpMessage(body);
169+
public IMessage Message(byte[] body) => new AmqpMessage(body);
170+
public IMessage Message(string body) => new AmqpMessage(body);
170171
}
171172

172173
public override async Task CloseAsync()

RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -245,8 +245,9 @@ RabbitMQ.AMQP.Client.IMessage.AbsoluteExpiryTime() -> System.DateTime
245245
RabbitMQ.AMQP.Client.IMessage.AbsoluteExpiryTime(System.DateTime absoluteExpiryTime) -> RabbitMQ.AMQP.Client.IMessage!
246246
RabbitMQ.AMQP.Client.IMessage.Annotation(string! key) -> object!
247247
RabbitMQ.AMQP.Client.IMessage.Annotation(string! key, object! value) -> RabbitMQ.AMQP.Client.IMessage!
248-
RabbitMQ.AMQP.Client.IMessage.Body() -> object!
248+
RabbitMQ.AMQP.Client.IMessage.Body() -> byte[]!
249249
RabbitMQ.AMQP.Client.IMessage.Body(object! body) -> RabbitMQ.AMQP.Client.IMessage!
250+
RabbitMQ.AMQP.Client.IMessage.BodyAsString() -> string!
250251
RabbitMQ.AMQP.Client.IMessage.ContentEncoding() -> string!
251252
RabbitMQ.AMQP.Client.IMessage.ContentEncoding(string! contentEncoding) -> RabbitMQ.AMQP.Client.IMessage!
252253
RabbitMQ.AMQP.Client.IMessage.ContentType() -> string!
@@ -391,11 +392,13 @@ RabbitMQ.AMQP.Client.Impl.AmqpMessage.AbsoluteExpiryTime() -> System.DateTime
391392
RabbitMQ.AMQP.Client.Impl.AmqpMessage.AbsoluteExpiryTime(System.DateTime absoluteExpiryTime) -> RabbitMQ.AMQP.Client.IMessage!
392393
RabbitMQ.AMQP.Client.Impl.AmqpMessage.AmqpMessage() -> void
393394
RabbitMQ.AMQP.Client.Impl.AmqpMessage.AmqpMessage(Amqp.Message! nativeMessage) -> void
394-
RabbitMQ.AMQP.Client.Impl.AmqpMessage.AmqpMessage(object! body) -> void
395+
RabbitMQ.AMQP.Client.Impl.AmqpMessage.AmqpMessage(byte[]! body) -> void
396+
RabbitMQ.AMQP.Client.Impl.AmqpMessage.AmqpMessage(string! body) -> void
395397
RabbitMQ.AMQP.Client.Impl.AmqpMessage.Annotation(string! key) -> object!
396398
RabbitMQ.AMQP.Client.Impl.AmqpMessage.Annotation(string! key, object! value) -> RabbitMQ.AMQP.Client.IMessage!
397-
RabbitMQ.AMQP.Client.Impl.AmqpMessage.Body() -> object!
399+
RabbitMQ.AMQP.Client.Impl.AmqpMessage.Body() -> byte[]!
398400
RabbitMQ.AMQP.Client.Impl.AmqpMessage.Body(object! body) -> RabbitMQ.AMQP.Client.IMessage!
401+
RabbitMQ.AMQP.Client.Impl.AmqpMessage.BodyAsString() -> string!
399402
RabbitMQ.AMQP.Client.Impl.AmqpMessage.ContentEncoding() -> string!
400403
RabbitMQ.AMQP.Client.Impl.AmqpMessage.ContentEncoding(string! contentEncoding) -> RabbitMQ.AMQP.Client.IMessage!
401404
RabbitMQ.AMQP.Client.Impl.AmqpMessage.ContentType() -> string!
@@ -674,7 +677,8 @@ RabbitMQ.AMQP.Client.IRpcClientBuilder.RequestPostProcessor(System.Func<RabbitMQ
674677
RabbitMQ.AMQP.Client.IRpcClientBuilder.Timeout(System.TimeSpan timeout) -> RabbitMQ.AMQP.Client.IRpcClientBuilder!
675678
RabbitMQ.AMQP.Client.IRpcServer
676679
RabbitMQ.AMQP.Client.IRpcServer.IContext
677-
RabbitMQ.AMQP.Client.IRpcServer.IContext.Message(object! body) -> RabbitMQ.AMQP.Client.IMessage!
680+
RabbitMQ.AMQP.Client.IRpcServer.IContext.Message(byte[]! body) -> RabbitMQ.AMQP.Client.IMessage!
681+
RabbitMQ.AMQP.Client.IRpcServer.IContext.Message(string! body) -> RabbitMQ.AMQP.Client.IMessage!
678682
RabbitMQ.AMQP.Client.IRpcServerBuilder
679683
RabbitMQ.AMQP.Client.IRpcServerBuilder.BuildAsync() -> System.Threading.Tasks.Task<RabbitMQ.AMQP.Client.IRpcServer!>!
680684
RabbitMQ.AMQP.Client.IRpcServerBuilder.CorrelationIdExtractor(System.Func<RabbitMQ.AMQP.Client.IMessage!, object!>? correlationIdExtractor) -> RabbitMQ.AMQP.Client.IRpcServerBuilder!

Tests/Amqp091/FromToAmqp091Tests.cs

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
// This source code is dual-licensed under the Apache License, version 2.0,
2+
// and the Mozilla Public License, version 2.0.
3+
// Copyright (c) 2017-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
4+
5+
using System.Threading.Tasks;
6+
using RabbitMQ.AMQP.Client;
7+
using RabbitMQ.AMQP.Client.Impl;
8+
using RabbitMQ.Client;
9+
using RabbitMQ.Client.Events;
10+
using Xunit;
11+
using Xunit.Abstractions;
12+
13+
namespace Tests.Amqp091
14+
{
15+
public class FromToAmqp091Tests(ITestOutputHelper testOutputHelper) : IntegrationTest(testOutputHelper)
16+
{
17+
[Fact]
18+
public async Task ToAmqp091()
19+
{
20+
Assert.NotNull(_connection);
21+
Assert.NotNull(_management);
22+
23+
IQueueSpecification queueSpec = _management.Queue().Name(_queueName).Type(QueueType.QUORUM);
24+
await queueSpec.DeclareAsync();
25+
26+
var publisher = await _connection.PublisherBuilder().BuildAsync();
27+
const string body = "{Text:as,Seq:1,Max:7000}";
28+
IMessage amqpMessage = new AmqpMessage(body).ToAddress().Queue(_queueName).Build();
29+
for (int i = 0; i < 1; i++)
30+
{
31+
PublishResult result = await publisher.PublishAsync(message: amqpMessage).ConfigureAwait(true);
32+
Assert.NotNull(result);
33+
Assert.Equal(OutcomeState.Accepted, result.Outcome.State);
34+
}
35+
36+
var factory = new ConnectionFactory();
37+
var connection = factory.CreateConnection();
38+
var channel = connection.CreateModel();
39+
var consumer091 = new EventingBasicConsumer(channel);
40+
var tcs091 = new TaskCompletionSource<BasicDeliverEventArgs>();
41+
consumer091.Received += (sender, ea) =>
42+
{
43+
tcs091.SetResult(ea);
44+
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
45+
};
46+
channel.BasicConsume(_queueName, false, "consumerTag", consumer091);
47+
var receivedMessage091 = await tcs091.Task;
48+
Assert.NotNull(receivedMessage091);
49+
Assert.Equal(_queueName, receivedMessage091.RoutingKey);
50+
Assert.Equal("consumerTag", receivedMessage091.ConsumerTag);
51+
Assert.Equal("{Text:as,Seq:1,Max:7000}",
52+
System.Text.Encoding.UTF8.GetString(receivedMessage091.Body.ToArray()));
53+
channel.Close();
54+
connection.Close();
55+
}
56+
57+
[Fact]
58+
public async Task FromAmqp091()
59+
{
60+
Assert.NotNull(_connection);
61+
Assert.NotNull(_management);
62+
63+
IQueueSpecification queueSpec = _management.Queue().Name(_queueName).Type(QueueType.QUORUM);
64+
await queueSpec.DeclareAsync();
65+
66+
// publish a message with AMQP 0-9-1
67+
var factory = new ConnectionFactory();
68+
var connection = factory.CreateConnection();
69+
var channel = connection.CreateModel();
70+
channel.BasicPublish(
71+
exchange: "",
72+
routingKey: _queueName,
73+
basicProperties: null,
74+
body: System.Text.Encoding.UTF8.GetBytes("{Text:as,Seq:1,Max:7000}"));
75+
76+
TaskCompletionSource<IMessage> tcs = new();
77+
IConsumer consumer = await _connection.ConsumerBuilder()
78+
.Queue(_queueName)
79+
.MessageHandler((context, message) =>
80+
{
81+
tcs.SetResult(message);
82+
context.Accept();
83+
return Task.CompletedTask;
84+
}).BuildAndStartAsync();
85+
86+
var receivedMessage = await tcs.Task;
87+
Assert.NotNull(receivedMessage);
88+
Assert.Equal("{Text:as,Seq:1,Max:7000}", receivedMessage.BodyAsString());
89+
channel.Close();
90+
connection.Close();
91+
await consumer.CloseAsync();
92+
}
93+
}
94+
}

Tests/Consumer/BasicConsumerTests.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public async Task SimpleConsumeMessage()
3838

3939
await WhenTcsCompletes(tcs);
4040
IMessage receivedMessage = await tcs.Task;
41-
Assert.Equal("message_0", receivedMessage.Body());
41+
Assert.Equal("message_0", receivedMessage.BodyAsString());
4242

4343
await consumer.CloseAsync();
4444
consumer.Dispose();
@@ -69,7 +69,7 @@ public async Task ConsumerReQueueMessage()
6969
{
7070
try
7171
{
72-
Assert.Equal("message_0", message.Body());
72+
Assert.Equal("message_0", message.BodyAsString());
7373
Interlocked.Increment(ref consumed);
7474
switch (consumed)
7575
{
@@ -167,7 +167,7 @@ Task MessageHandler(IContext cxt, IMessage msg)
167167
{
168168
if (i % 2 == 0)
169169
{
170-
Assert.Equal($"message_{i}", receivedMessagesFromTask[i].Body());
170+
Assert.Equal($"message_{i}", receivedMessagesFromTask[i].BodyAsString());
171171
}
172172
}
173173
}

Tests/MessagesTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ public void ValidateMessage()
2222
Assert.Equal("CorrelationId_2123", message.CorrelationId());
2323
Assert.Equal("ReplyTo_5123", message.ReplyTo());
2424
Assert.Equal("Subject_9123", message.Subject());
25-
Assert.Equal("my_body", message.Body());
25+
Assert.Equal("my_body", message.BodyAsString());
2626
}
2727

2828
[Fact]

Tests/Rpc/RecoveryRPCTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public async Task RpcServerAndClientShouldRecoverAfterKillConnection()
6767
{
6868
IMessage response = await rpcClient.PublishAsync(request);
6969
messagesConfirmed++;
70-
Assert.Equal("pong", response.Body());
70+
Assert.Equal("pong", response.BodyAsString());
7171
}
7272
catch (AmqpNotOpenException)
7373
{

Tests/Rpc/RpcServerTests.cs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ Task<IMessage> RpcHandler(IRpcServer.IContext context, IMessage request)
5757

5858
await p.PublishAsync(new AmqpMessage("test"));
5959
IMessage m = await WhenTcsCompletes(tcs);
60-
Assert.Equal("pong", m.Body());
60+
Assert.Equal("pong", m.BodyAsString());
6161
await rpcServer.CloseAsync();
6262
}
6363

@@ -143,7 +143,7 @@ Task MessageHandler(IContext context, IMessage message)
143143
Assert.Equal(OutcomeState.Accepted, pr.Outcome.State);
144144

145145
IMessage m = await WhenTcsCompletes(tcs);
146-
Assert.Equal("pong", m.Body());
146+
Assert.Equal("pong", m.BodyAsString());
147147

148148
await rpcServer.CloseAsync();
149149
await consumer.CloseAsync();
@@ -173,7 +173,7 @@ public async Task RpcServerClientPingPongWithDefault()
173173
IMessage message = new AmqpMessage("ping");
174174

175175
IMessage response = await rpcClient.PublishAsync(message);
176-
Assert.Equal("pong", response.Body());
176+
Assert.Equal("pong", response.BodyAsString());
177177
await rpcClient.CloseAsync();
178178
await rpcServer.CloseAsync();
179179
}
@@ -209,7 +209,7 @@ public async Task RpcServerClientPingPongWithCustomReplyToQueueAndCorrelationIdS
209209
IMessage message = new AmqpMessage("ping");
210210

211211
IMessage response = await rpcClient.PublishAsync(message);
212-
Assert.Equal("pong", response.Body());
212+
Assert.Equal("pong", response.BodyAsString());
213213
Assert.Equal(_correlationId, response.CorrelationId());
214214
await rpcClient.CloseAsync();
215215
await rpcServer.CloseAsync();
@@ -265,7 +265,7 @@ public async Task RpcServerClientOverridingTheRequestAndResponsePostProcessor()
265265
while (i < 30)
266266
{
267267
IMessage response = await rpcClient.PublishAsync(message);
268-
Assert.Equal("pong", response.Body());
268+
Assert.Equal("pong", response.BodyAsString());
269269
// the server replies with the correlation id in the application properties
270270
Assert.Equal($"{_correlationId}_{i}", response.Property("correlationId"));
271271
Assert.Equal($"{_correlationId}_{i}", response.Properties()["correlationId"]);
@@ -324,7 +324,7 @@ Task<IMessage> RpcHandler(IRpcServer.IContext context, IMessage request)
324324
{
325325
IMessage message = new AmqpMessage("ping").Property("id", i1);
326326
IMessage response = await rpcClient.PublishAsync(message);
327-
Assert.Equal("pong", response.Body());
327+
Assert.Equal("pong", response.BodyAsString());
328328
}));
329329
}
330330

@@ -376,7 +376,7 @@ static async Task<IMessage> RpcHandler(IRpcServer.IContext context, IMessage req
376376

377377
IMessage msg = new AmqpMessage("ping").Property("wait", 1);
378378
IMessage reply = await rpcClient.PublishAsync(msg);
379-
Assert.Equal("pong", reply.Body());
379+
Assert.Equal("pong", reply.BodyAsString());
380380

381381
await Assert.ThrowsAsync<TimeoutException>(() => rpcClient.PublishAsync(
382382
new AmqpMessage("ping").Property("wait", 700)));

Tests/Tests.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
<PackageReference Include="Microsoft.Extensions.Diagnostics.Testing" />
3030
<PackageReference Include="Microsoft.Extensions.Diagnostics" />
3131
<PackageReference Include="Microsoft.NET.Test.Sdk" />
32+
<PackageReference Include="RabbitMQ.Client" />
3233
<PackageReference Include="System.IdentityModel.Tokens.Jwt" />
3334
<PackageReference Include="System.Text.Json" />
3435
<PackageReference Include="xunit" />

0 commit comments

Comments
 (0)