Skip to content

Use Data Section by Default instead of AMQPValue for the message creation #116

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Apr 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .ci/ubuntu/cluster/gha-setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ function run_docker_compose
docker compose --file "$script_dir/docker-compose.yml" $@
}

readonly rabbitmq_image="${RABBITMQ_IMAGE:-rabbitmq:4.1.0-beta.4-management-alpine}"
readonly rabbitmq_image="${RABBITMQ_IMAGE:-rabbitmq:4.1.0-management-alpine}"

if [[ ! -v GITHUB_ACTIONS ]]
then
Expand Down
2 changes: 1 addition & 1 deletion .ci/ubuntu/one-node/gha-setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ readonly script_dir
echo "[INFO] script_dir: '$script_dir'"


readonly rabbitmq_image="${RABBITMQ_IMAGE:-rabbitmq:4.1.0-beta.4-management-alpine}"
readonly rabbitmq_image="${RABBITMQ_IMAGE:-rabbitmq:4.1.0-management-alpine}"



Expand Down
2 changes: 1 addition & 1 deletion .ci/windows/versions.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"erlang": "27.2",
"rabbitmq": "4.1.0-beta.4"
"rabbitmq": "4.1.0"
}
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -129,4 +129,5 @@ docs/temp/

# ci logs
.ci/ubuntu/log/*
.ci/ubuntu/cluster/log/*

1 change: 1 addition & 0 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
<PackageVersion Include="AMQPNetLite.Core" Version="2.4.11" />
<PackageVersion Include="OpenTelemetry" Version="1.10.0" />
<PackageVersion Include="OpenTelemetry.Exporter.Console" Version="1.10.0" />
<PackageVersion Include="RabbitMQ.Client" Version="6.8.1" />
<PackageVersion Include="System.Diagnostics.DiagnosticSource" Version="9.0.0" />
<!-- HAClient -->
<PackageVersion Include="DotNext.Threading" Version="5.15.0" />
Expand Down
5 changes: 4 additions & 1 deletion RabbitMQ.AMQP.Client/IMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,10 @@ public interface IMessage
public object Annotation(string key);
public IMessage Annotation(string key, object value);

public object Body();
public byte[] Body();

public string BodyAsString();

public IMessage Body(object body);

IMessageAddressBuilder ToAddress();
Expand Down
3 changes: 2 additions & 1 deletion RabbitMQ.AMQP.Client/IRpcServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ public interface IRpcServer : ILifeCycle

public interface IContext
{
IMessage Message(object body);
IMessage Message(byte[] body);
IMessage Message(string body);
}
}
}
52 changes: 36 additions & 16 deletions RabbitMQ.AMQP.Client/Impl/AmqpMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,24 @@ public AmqpMessage()
NativeMessage = new Message();
}

public AmqpMessage(object body)
/// <summary>
/// Create a message with a body of type byte[] and BodySection of type Data.
/// </summary>
/// <param name="body"></param>
public AmqpMessage(byte[] body)
{
NativeMessage = new Message(body);
NativeMessage = new Message();
NativeMessage.BodySection = new Data { Binary = body };
}

/// <summary>
/// Create a message with a body of type string and BodySection of type Data.
/// The string is converted to a byte[] using UTF8 encoding.
/// </summary>
public AmqpMessage(string body)
{
NativeMessage = new Message();
NativeMessage.BodySection = new Data() { Binary = System.Text.Encoding.UTF8.GetBytes(body) };
}

public AmqpMessage(Message nativeMessage)
Expand Down Expand Up @@ -245,35 +260,40 @@ public object Annotation(string key)
return NativeMessage.MessageAnnotations[new Symbol(key)];
}

public object Body()
public byte[] Body()
{
return NativeMessage.Body;
return (byte[])NativeMessage.Body;
}

public string BodyAsString()
{
if (NativeMessage.BodySection is Data data)
{
return System.Text.Encoding.UTF8.GetString(data.Binary);
}
else
{
throw new InvalidOperationException("Body is not an Application Data");
}

}

public IMessage Body(object body)
{
RestrictedDescribed bodySection;
if (body is byte[] byteArray)
{
bodySection = new Data
{
Binary = byteArray
};
bodySection = new Data { Binary = byteArray };
}
else if (body is IList list)
{
bodySection = new AmqpSequence
{
List = list
};
bodySection = new AmqpSequence { List = list };
}
else
{
bodySection = new AmqpValue
{
Value = body
};
bodySection = new AmqpValue { Value = body };
}

NativeMessage.BodySection = bodySection;
return this;
}
Expand Down
3 changes: 2 additions & 1 deletion RabbitMQ.AMQP.Client/Impl/AmqpRpcServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,8 @@ await Utils.WaitWithBackOffUntilFuncAsync(async () =>

private class RpcServerContext : IRpcServer.IContext
{
public IMessage Message(object body) => new AmqpMessage(body);
public IMessage Message(byte[] body) => new AmqpMessage(body);
public IMessage Message(string body) => new AmqpMessage(body);
}

public override async Task CloseAsync()
Expand Down
12 changes: 8 additions & 4 deletions RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -245,8 +245,9 @@ RabbitMQ.AMQP.Client.IMessage.AbsoluteExpiryTime() -> System.DateTime
RabbitMQ.AMQP.Client.IMessage.AbsoluteExpiryTime(System.DateTime absoluteExpiryTime) -> RabbitMQ.AMQP.Client.IMessage!
RabbitMQ.AMQP.Client.IMessage.Annotation(string! key) -> object!
RabbitMQ.AMQP.Client.IMessage.Annotation(string! key, object! value) -> RabbitMQ.AMQP.Client.IMessage!
RabbitMQ.AMQP.Client.IMessage.Body() -> object!
RabbitMQ.AMQP.Client.IMessage.Body() -> byte[]!
RabbitMQ.AMQP.Client.IMessage.Body(object! body) -> RabbitMQ.AMQP.Client.IMessage!
RabbitMQ.AMQP.Client.IMessage.BodyAsString() -> string!
RabbitMQ.AMQP.Client.IMessage.ContentEncoding() -> string!
RabbitMQ.AMQP.Client.IMessage.ContentEncoding(string! contentEncoding) -> RabbitMQ.AMQP.Client.IMessage!
RabbitMQ.AMQP.Client.IMessage.ContentType() -> string!
Expand Down Expand Up @@ -391,11 +392,13 @@ RabbitMQ.AMQP.Client.Impl.AmqpMessage.AbsoluteExpiryTime() -> System.DateTime
RabbitMQ.AMQP.Client.Impl.AmqpMessage.AbsoluteExpiryTime(System.DateTime absoluteExpiryTime) -> RabbitMQ.AMQP.Client.IMessage!
RabbitMQ.AMQP.Client.Impl.AmqpMessage.AmqpMessage() -> void
RabbitMQ.AMQP.Client.Impl.AmqpMessage.AmqpMessage(Amqp.Message! nativeMessage) -> void
RabbitMQ.AMQP.Client.Impl.AmqpMessage.AmqpMessage(object! body) -> void
RabbitMQ.AMQP.Client.Impl.AmqpMessage.AmqpMessage(byte[]! body) -> void
RabbitMQ.AMQP.Client.Impl.AmqpMessage.AmqpMessage(string! body) -> void
RabbitMQ.AMQP.Client.Impl.AmqpMessage.Annotation(string! key) -> object!
RabbitMQ.AMQP.Client.Impl.AmqpMessage.Annotation(string! key, object! value) -> RabbitMQ.AMQP.Client.IMessage!
RabbitMQ.AMQP.Client.Impl.AmqpMessage.Body() -> object!
RabbitMQ.AMQP.Client.Impl.AmqpMessage.Body() -> byte[]!
RabbitMQ.AMQP.Client.Impl.AmqpMessage.Body(object! body) -> RabbitMQ.AMQP.Client.IMessage!
RabbitMQ.AMQP.Client.Impl.AmqpMessage.BodyAsString() -> string!
RabbitMQ.AMQP.Client.Impl.AmqpMessage.ContentEncoding() -> string!
RabbitMQ.AMQP.Client.Impl.AmqpMessage.ContentEncoding(string! contentEncoding) -> RabbitMQ.AMQP.Client.IMessage!
RabbitMQ.AMQP.Client.Impl.AmqpMessage.ContentType() -> string!
Expand Down Expand Up @@ -674,7 +677,8 @@ RabbitMQ.AMQP.Client.IRpcClientBuilder.RequestPostProcessor(System.Func<RabbitMQ
RabbitMQ.AMQP.Client.IRpcClientBuilder.Timeout(System.TimeSpan timeout) -> RabbitMQ.AMQP.Client.IRpcClientBuilder!
RabbitMQ.AMQP.Client.IRpcServer
RabbitMQ.AMQP.Client.IRpcServer.IContext
RabbitMQ.AMQP.Client.IRpcServer.IContext.Message(object! body) -> RabbitMQ.AMQP.Client.IMessage!
RabbitMQ.AMQP.Client.IRpcServer.IContext.Message(byte[]! body) -> RabbitMQ.AMQP.Client.IMessage!
RabbitMQ.AMQP.Client.IRpcServer.IContext.Message(string! body) -> RabbitMQ.AMQP.Client.IMessage!
RabbitMQ.AMQP.Client.IRpcServerBuilder
RabbitMQ.AMQP.Client.IRpcServerBuilder.BuildAsync() -> System.Threading.Tasks.Task<RabbitMQ.AMQP.Client.IRpcServer!>!
RabbitMQ.AMQP.Client.IRpcServerBuilder.CorrelationIdExtractor(System.Func<RabbitMQ.AMQP.Client.IMessage!, object!>? correlationIdExtractor) -> RabbitMQ.AMQP.Client.IRpcServerBuilder!
Expand Down
94 changes: 94 additions & 0 deletions Tests/Amqp091/FromToAmqp091Tests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// This source code is dual-licensed under the Apache License, version 2.0,
// and the Mozilla Public License, version 2.0.
// Copyright (c) 2017-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.

using System.Threading.Tasks;
using RabbitMQ.AMQP.Client;
using RabbitMQ.AMQP.Client.Impl;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using Xunit;
using Xunit.Abstractions;

namespace Tests.Amqp091
{
public class FromToAmqp091Tests(ITestOutputHelper testOutputHelper) : IntegrationTest(testOutputHelper)
{
[Fact]
public async Task ToAmqp091()
{
Assert.NotNull(_connection);
Assert.NotNull(_management);

IQueueSpecification queueSpec = _management.Queue().Name(_queueName).Type(QueueType.QUORUM);
await queueSpec.DeclareAsync();

var publisher = await _connection.PublisherBuilder().BuildAsync();
const string body = "{Text:as,Seq:1,Max:7000}";
IMessage amqpMessage = new AmqpMessage(body).ToAddress().Queue(_queueName).Build();
for (int i = 0; i < 1; i++)
{
PublishResult result = await publisher.PublishAsync(message: amqpMessage).ConfigureAwait(true);
Assert.NotNull(result);
Assert.Equal(OutcomeState.Accepted, result.Outcome.State);
}

var factory = new ConnectionFactory();
var connection = factory.CreateConnection();
var channel = connection.CreateModel();
var consumer091 = new EventingBasicConsumer(channel);
var tcs091 = new TaskCompletionSource<BasicDeliverEventArgs>();
consumer091.Received += (sender, ea) =>
{
tcs091.SetResult(ea);
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(_queueName, false, "consumerTag", consumer091);
var receivedMessage091 = await tcs091.Task;
Assert.NotNull(receivedMessage091);
Assert.Equal(_queueName, receivedMessage091.RoutingKey);
Assert.Equal("consumerTag", receivedMessage091.ConsumerTag);
Assert.Equal("{Text:as,Seq:1,Max:7000}",
System.Text.Encoding.UTF8.GetString(receivedMessage091.Body.ToArray()));
channel.Close();
connection.Close();
}

[Fact]
public async Task FromAmqp091()
{
Assert.NotNull(_connection);
Assert.NotNull(_management);

IQueueSpecification queueSpec = _management.Queue().Name(_queueName).Type(QueueType.QUORUM);
await queueSpec.DeclareAsync();

// publish a message with AMQP 0-9-1
var factory = new ConnectionFactory();
var connection = factory.CreateConnection();
var channel = connection.CreateModel();
channel.BasicPublish(
exchange: "",
routingKey: _queueName,
basicProperties: null,
body: System.Text.Encoding.UTF8.GetBytes("{Text:as,Seq:1,Max:7000}"));

TaskCompletionSource<IMessage> tcs = new();
IConsumer consumer = await _connection.ConsumerBuilder()
.Queue(_queueName)
.MessageHandler((context, message) =>
{
tcs.SetResult(message);
context.Accept();
return Task.CompletedTask;
}).BuildAndStartAsync();

var receivedMessage = await tcs.Task;
Assert.NotNull(receivedMessage);
Assert.Equal("{Text:as,Seq:1,Max:7000}", receivedMessage.BodyAsString());
channel.Close();
connection.Close();
await consumer.CloseAsync();
}
}
}
6 changes: 3 additions & 3 deletions Tests/Consumer/BasicConsumerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public async Task SimpleConsumeMessage()

await WhenTcsCompletes(tcs);
IMessage receivedMessage = await tcs.Task;
Assert.Equal("message_0", receivedMessage.Body());
Assert.Equal("message_0", receivedMessage.BodyAsString());

await consumer.CloseAsync();
consumer.Dispose();
Expand Down Expand Up @@ -69,7 +69,7 @@ public async Task ConsumerReQueueMessage()
{
try
{
Assert.Equal("message_0", message.Body());
Assert.Equal("message_0", message.BodyAsString());
Interlocked.Increment(ref consumed);
switch (consumed)
{
Expand Down Expand Up @@ -167,7 +167,7 @@ Task MessageHandler(IContext cxt, IMessage msg)
{
if (i % 2 == 0)
{
Assert.Equal($"message_{i}", receivedMessagesFromTask[i].Body());
Assert.Equal($"message_{i}", receivedMessagesFromTask[i].BodyAsString());
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion Tests/MessagesTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public void ValidateMessage()
Assert.Equal("CorrelationId_2123", message.CorrelationId());
Assert.Equal("ReplyTo_5123", message.ReplyTo());
Assert.Equal("Subject_9123", message.Subject());
Assert.Equal("my_body", message.Body());
Assert.Equal("my_body", message.BodyAsString());
}

[Fact]
Expand Down
2 changes: 1 addition & 1 deletion Tests/Rpc/RecoveryRPCTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public async Task RpcServerAndClientShouldRecoverAfterKillConnection()
{
IMessage response = await rpcClient.PublishAsync(request);
messagesConfirmed++;
Assert.Equal("pong", response.Body());
Assert.Equal("pong", response.BodyAsString());
}
catch (AmqpNotOpenException)
{
Expand Down
14 changes: 7 additions & 7 deletions Tests/Rpc/RpcServerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ Task<IMessage> RpcHandler(IRpcServer.IContext context, IMessage request)

await p.PublishAsync(new AmqpMessage("test"));
IMessage m = await WhenTcsCompletes(tcs);
Assert.Equal("pong", m.Body());
Assert.Equal("pong", m.BodyAsString());
await rpcServer.CloseAsync();
}

Expand Down Expand Up @@ -143,7 +143,7 @@ Task MessageHandler(IContext context, IMessage message)
Assert.Equal(OutcomeState.Accepted, pr.Outcome.State);

IMessage m = await WhenTcsCompletes(tcs);
Assert.Equal("pong", m.Body());
Assert.Equal("pong", m.BodyAsString());

await rpcServer.CloseAsync();
await consumer.CloseAsync();
Expand Down Expand Up @@ -173,7 +173,7 @@ public async Task RpcServerClientPingPongWithDefault()
IMessage message = new AmqpMessage("ping");

IMessage response = await rpcClient.PublishAsync(message);
Assert.Equal("pong", response.Body());
Assert.Equal("pong", response.BodyAsString());
await rpcClient.CloseAsync();
await rpcServer.CloseAsync();
}
Expand Down Expand Up @@ -209,7 +209,7 @@ public async Task RpcServerClientPingPongWithCustomReplyToQueueAndCorrelationIdS
IMessage message = new AmqpMessage("ping");

IMessage response = await rpcClient.PublishAsync(message);
Assert.Equal("pong", response.Body());
Assert.Equal("pong", response.BodyAsString());
Assert.Equal(_correlationId, response.CorrelationId());
await rpcClient.CloseAsync();
await rpcServer.CloseAsync();
Expand Down Expand Up @@ -265,7 +265,7 @@ public async Task RpcServerClientOverridingTheRequestAndResponsePostProcessor()
while (i < 30)
{
IMessage response = await rpcClient.PublishAsync(message);
Assert.Equal("pong", response.Body());
Assert.Equal("pong", response.BodyAsString());
// the server replies with the correlation id in the application properties
Assert.Equal($"{_correlationId}_{i}", response.Property("correlationId"));
Assert.Equal($"{_correlationId}_{i}", response.Properties()["correlationId"]);
Expand Down Expand Up @@ -324,7 +324,7 @@ Task<IMessage> RpcHandler(IRpcServer.IContext context, IMessage request)
{
IMessage message = new AmqpMessage("ping").Property("id", i1);
IMessage response = await rpcClient.PublishAsync(message);
Assert.Equal("pong", response.Body());
Assert.Equal("pong", response.BodyAsString());
}));
}

Expand Down Expand Up @@ -376,7 +376,7 @@ static async Task<IMessage> RpcHandler(IRpcServer.IContext context, IMessage req

IMessage msg = new AmqpMessage("ping").Property("wait", 1);
IMessage reply = await rpcClient.PublishAsync(msg);
Assert.Equal("pong", reply.Body());
Assert.Equal("pong", reply.BodyAsString());

await Assert.ThrowsAsync<TimeoutException>(() => rpcClient.PublishAsync(
new AmqpMessage("ping").Property("wait", 700)));
Expand Down
1 change: 1 addition & 0 deletions Tests/Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
<PackageReference Include="Microsoft.Extensions.Diagnostics.Testing" />
<PackageReference Include="Microsoft.Extensions.Diagnostics" />
<PackageReference Include="Microsoft.NET.Test.Sdk" />
<PackageReference Include="RabbitMQ.Client" />
<PackageReference Include="System.IdentityModel.Tokens.Jwt" />
<PackageReference Include="System.Text.Json" />
<PackageReference Include="xunit" />
Expand Down