Skip to content

Set message durable as default #124

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions RabbitMQ.AMQP.Client/IMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ public interface IMessage

public IMessage Body(object body);

public IMessage Durable(bool durable);

public bool Durable();

IMessageAddressBuilder ToAddress();
}
}
26 changes: 20 additions & 6 deletions RabbitMQ.AMQP.Client/Impl/AmqpMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,16 @@ public class AmqpMessage : IMessage
{
public Message NativeMessage { get; }

public AmqpMessage()
{
NativeMessage = new Message();
}

/// <summary>
/// Create a message with a body of type byte[] and BodySection of type Data.
/// Durable is set to true by default.
/// </summary>
/// <param name="body"></param>
public AmqpMessage(byte[] body)
{
NativeMessage = new Message();
NativeMessage.BodySection = new Data { Binary = body };
Durable(true);
}

/// <summary>
Expand All @@ -42,6 +39,7 @@ public AmqpMessage(string body)
{
NativeMessage = new Message();
NativeMessage.BodySection = new Data() { Binary = System.Text.Encoding.UTF8.GetBytes(body) };
Durable(true);
}

public AmqpMessage(Message nativeMessage)
Expand Down Expand Up @@ -275,7 +273,6 @@ public string BodyAsString()
{
throw new InvalidOperationException("Body is not an Application Data");
}

}

public IMessage Body(object body)
Expand All @@ -298,6 +295,18 @@ public IMessage Body(object body)
return this;
}

public IMessage Durable(bool durable)
{
EnsureHeader();
NativeMessage.Header.Durable = durable;
return this;
}

public bool Durable()
{
return NativeMessage.Header.Durable;
}

public IMessageAddressBuilder ToAddress()
{
return new MessageAddressBuilder(this);
Expand Down Expand Up @@ -329,6 +338,11 @@ private void EnsureAnnotations()
NativeMessage.MessageAnnotations ??= new MessageAnnotations();
}

private void EnsureHeader()
{
NativeMessage.Header ??= new Header();
}

private void ThrowIfApplicationPropertiesNotSet()
{
if (NativeMessage.ApplicationProperties == null)
Expand Down
5 changes: 4 additions & 1 deletion RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,8 @@ RabbitMQ.AMQP.Client.IMessage.CorrelationId() -> object!
RabbitMQ.AMQP.Client.IMessage.CorrelationId(object! id) -> RabbitMQ.AMQP.Client.IMessage!
RabbitMQ.AMQP.Client.IMessage.CreationTime() -> System.DateTime
RabbitMQ.AMQP.Client.IMessage.CreationTime(System.DateTime creationTime) -> RabbitMQ.AMQP.Client.IMessage!
RabbitMQ.AMQP.Client.IMessage.Durable() -> bool
RabbitMQ.AMQP.Client.IMessage.Durable(bool durable) -> RabbitMQ.AMQP.Client.IMessage!
RabbitMQ.AMQP.Client.IMessage.GroupId() -> string!
RabbitMQ.AMQP.Client.IMessage.GroupId(string! groupId) -> RabbitMQ.AMQP.Client.IMessage!
RabbitMQ.AMQP.Client.IMessage.GroupSequence() -> uint
Expand Down Expand Up @@ -390,7 +392,6 @@ RabbitMQ.AMQP.Client.Impl.AmqpManagement.Queue(string! name) -> RabbitMQ.AMQP.Cl
RabbitMQ.AMQP.Client.Impl.AmqpMessage
RabbitMQ.AMQP.Client.Impl.AmqpMessage.AbsoluteExpiryTime() -> System.DateTime
RabbitMQ.AMQP.Client.Impl.AmqpMessage.AbsoluteExpiryTime(System.DateTime absoluteExpiryTime) -> RabbitMQ.AMQP.Client.IMessage!
RabbitMQ.AMQP.Client.Impl.AmqpMessage.AmqpMessage() -> void
RabbitMQ.AMQP.Client.Impl.AmqpMessage.AmqpMessage(Amqp.Message! nativeMessage) -> void
RabbitMQ.AMQP.Client.Impl.AmqpMessage.AmqpMessage(byte[]! body) -> void
RabbitMQ.AMQP.Client.Impl.AmqpMessage.AmqpMessage(string! body) -> void
Expand All @@ -407,6 +408,8 @@ RabbitMQ.AMQP.Client.Impl.AmqpMessage.CorrelationId() -> object!
RabbitMQ.AMQP.Client.Impl.AmqpMessage.CorrelationId(object! id) -> RabbitMQ.AMQP.Client.IMessage!
RabbitMQ.AMQP.Client.Impl.AmqpMessage.CreationTime() -> System.DateTime
RabbitMQ.AMQP.Client.Impl.AmqpMessage.CreationTime(System.DateTime creationTime) -> RabbitMQ.AMQP.Client.IMessage!
RabbitMQ.AMQP.Client.Impl.AmqpMessage.Durable() -> bool
RabbitMQ.AMQP.Client.Impl.AmqpMessage.Durable(bool durable) -> RabbitMQ.AMQP.Client.IMessage!
RabbitMQ.AMQP.Client.Impl.AmqpMessage.GroupId() -> string!
RabbitMQ.AMQP.Client.Impl.AmqpMessage.GroupId(string! groupId) -> RabbitMQ.AMQP.Client.IMessage!
RabbitMQ.AMQP.Client.Impl.AmqpMessage.GroupSequence() -> uint
Expand Down
59 changes: 57 additions & 2 deletions Tests/Publisher/PublisherTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// Copyright (c) 2017-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
Expand All @@ -22,8 +23,8 @@ public async Task ValidateBuilderRaiseExceptionIfQueueOrExchangeAreNotSetCorrect
Assert.NotNull(_management);

await Assert.ThrowsAsync<InvalidAddressException>(() =>
_connection.PublisherBuilder().Queue("queue_and_exchange_cant_set_together").
Exchange("queue_and_exchange_cant_set_together").BuildAsync());
_connection.PublisherBuilder().Queue("queue_and_exchange_cant_set_together")
.Exchange("queue_and_exchange_cant_set_together").BuildAsync());

await _connection.CloseAsync();
Assert.Empty(_connection.Publishers);
Expand Down Expand Up @@ -192,6 +193,7 @@ public async Task PublisherSendingShouldThrowWhenExchangeHasBeenDeleted()
publishOutcome = nextPublishResult.Outcome;
break;
}

await Task.Delay(TimeSpan.FromMilliseconds(100));
}

Expand Down Expand Up @@ -243,6 +245,7 @@ public async Task PublisherSendingShouldThrowWhenQueueHasBeenDeleted()
publishOutcome = nextPublishResult.Outcome;
break;
}

await Task.Delay(TimeSpan.FromMilliseconds(100));
}

Expand All @@ -256,4 +259,56 @@ public async Task PublisherSendingShouldThrowWhenQueueHasBeenDeleted()
await publisher.CloseAsync();
publisher.Dispose();
}

[Theory]
[InlineData(QueueType.QUORUM)]
[InlineData(QueueType.CLASSIC)]
public async Task MessageShouldBeDurableByDefault(QueueType queueType)
{
Assert.NotNull(_connection);
Assert.NotNull(_management);

IQueueSpecification queueSpec = _management.Queue(_queueName).Type(queueType);
await queueSpec.DeclareAsync();

IPublisher publisher = await _connection.PublisherBuilder().Queue(queueSpec).BuildAsync();
List<IMessage> messages = new();
TaskCompletionSource<List<IMessage>> tcs = new();
IConsumer consumer = await _connection.ConsumerBuilder()
.Queue(queueSpec)
.MessageHandler((context, message) =>
{
messages.Add(message);
context.Accept();
if (messages.Count == 2)
{
tcs.SetResult(messages);
}

return Task.CompletedTask;
}).BuildAndStartAsync();

// the first message should be durable by default
AmqpMessage durable = new("Hello wold!");
PublishResult pr = await publisher.PublishAsync(durable);
Assert.Equal(OutcomeState.Accepted, pr.Outcome.State);
Assert.True(durable.Durable());

// the second message should be not durable set by the user

AmqpMessage notDurable = new("Hello wold!");
notDurable.Durable(false);
PublishResult pr2 = await publisher.PublishAsync(notDurable);
Assert.Equal(OutcomeState.Accepted, pr2.Outcome.State);
Assert.False(notDurable.Durable());
var r = await tcs.Task.WaitAsync(TimeSpan.FromSeconds(10));
Assert.True(r[0].Durable());
Assert.False(r[1].Durable());

await consumer.CloseAsync();
await publisher.CloseAsync();
await queueSpec.DeleteAsync();

Assert.Empty(_connection.Publishers);
}
}