Skip to content

Commit 643d998

Browse files
authored
Set message durable as default (#124)
* set message durable as default * related to rabbitmq/rabbitmq-server#13918 * The user can always decide to set durable=false btw --------- Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent a8ddb3e commit 643d998

File tree

4 files changed

+85
-9
lines changed

4 files changed

+85
-9
lines changed

RabbitMQ.AMQP.Client/IMessage.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,10 @@ public interface IMessage
102102

103103
public IMessage Body(object body);
104104

105+
public IMessage Durable(bool durable);
106+
107+
public bool Durable();
108+
105109
IMessageAddressBuilder ToAddress();
106110
}
107111
}

RabbitMQ.AMQP.Client/Impl/AmqpMessage.cs

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,19 +19,16 @@ public class AmqpMessage : IMessage
1919
{
2020
public Message NativeMessage { get; }
2121

22-
public AmqpMessage()
23-
{
24-
NativeMessage = new Message();
25-
}
26-
2722
/// <summary>
2823
/// Create a message with a body of type byte[] and BodySection of type Data.
24+
/// Durable is set to true by default.
2925
/// </summary>
3026
/// <param name="body"></param>
3127
public AmqpMessage(byte[] body)
3228
{
3329
NativeMessage = new Message();
3430
NativeMessage.BodySection = new Data { Binary = body };
31+
Durable(true);
3532
}
3633

3734
/// <summary>
@@ -42,6 +39,7 @@ public AmqpMessage(string body)
4239
{
4340
NativeMessage = new Message();
4441
NativeMessage.BodySection = new Data() { Binary = System.Text.Encoding.UTF8.GetBytes(body) };
42+
Durable(true);
4543
}
4644

4745
public AmqpMessage(Message nativeMessage)
@@ -275,7 +273,6 @@ public string BodyAsString()
275273
{
276274
throw new InvalidOperationException("Body is not an Application Data");
277275
}
278-
279276
}
280277

281278
public IMessage Body(object body)
@@ -298,6 +295,18 @@ public IMessage Body(object body)
298295
return this;
299296
}
300297

298+
public IMessage Durable(bool durable)
299+
{
300+
EnsureHeader();
301+
NativeMessage.Header.Durable = durable;
302+
return this;
303+
}
304+
305+
public bool Durable()
306+
{
307+
return NativeMessage.Header.Durable;
308+
}
309+
301310
public IMessageAddressBuilder ToAddress()
302311
{
303312
return new MessageAddressBuilder(this);
@@ -329,6 +338,11 @@ private void EnsureAnnotations()
329338
NativeMessage.MessageAnnotations ??= new MessageAnnotations();
330339
}
331340

341+
private void EnsureHeader()
342+
{
343+
NativeMessage.Header ??= new Header();
344+
}
345+
332346
private void ThrowIfApplicationPropertiesNotSet()
333347
{
334348
if (NativeMessage.ApplicationProperties == null)

RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,8 @@ RabbitMQ.AMQP.Client.IMessage.CorrelationId() -> object!
256256
RabbitMQ.AMQP.Client.IMessage.CorrelationId(object! id) -> RabbitMQ.AMQP.Client.IMessage!
257257
RabbitMQ.AMQP.Client.IMessage.CreationTime() -> System.DateTime
258258
RabbitMQ.AMQP.Client.IMessage.CreationTime(System.DateTime creationTime) -> RabbitMQ.AMQP.Client.IMessage!
259+
RabbitMQ.AMQP.Client.IMessage.Durable() -> bool
260+
RabbitMQ.AMQP.Client.IMessage.Durable(bool durable) -> RabbitMQ.AMQP.Client.IMessage!
259261
RabbitMQ.AMQP.Client.IMessage.GroupId() -> string!
260262
RabbitMQ.AMQP.Client.IMessage.GroupId(string! groupId) -> RabbitMQ.AMQP.Client.IMessage!
261263
RabbitMQ.AMQP.Client.IMessage.GroupSequence() -> uint
@@ -390,7 +392,6 @@ RabbitMQ.AMQP.Client.Impl.AmqpManagement.Queue(string! name) -> RabbitMQ.AMQP.Cl
390392
RabbitMQ.AMQP.Client.Impl.AmqpMessage
391393
RabbitMQ.AMQP.Client.Impl.AmqpMessage.AbsoluteExpiryTime() -> System.DateTime
392394
RabbitMQ.AMQP.Client.Impl.AmqpMessage.AbsoluteExpiryTime(System.DateTime absoluteExpiryTime) -> RabbitMQ.AMQP.Client.IMessage!
393-
RabbitMQ.AMQP.Client.Impl.AmqpMessage.AmqpMessage() -> void
394395
RabbitMQ.AMQP.Client.Impl.AmqpMessage.AmqpMessage(Amqp.Message! nativeMessage) -> void
395396
RabbitMQ.AMQP.Client.Impl.AmqpMessage.AmqpMessage(byte[]! body) -> void
396397
RabbitMQ.AMQP.Client.Impl.AmqpMessage.AmqpMessage(string! body) -> void
@@ -407,6 +408,8 @@ RabbitMQ.AMQP.Client.Impl.AmqpMessage.CorrelationId() -> object!
407408
RabbitMQ.AMQP.Client.Impl.AmqpMessage.CorrelationId(object! id) -> RabbitMQ.AMQP.Client.IMessage!
408409
RabbitMQ.AMQP.Client.Impl.AmqpMessage.CreationTime() -> System.DateTime
409410
RabbitMQ.AMQP.Client.Impl.AmqpMessage.CreationTime(System.DateTime creationTime) -> RabbitMQ.AMQP.Client.IMessage!
411+
RabbitMQ.AMQP.Client.Impl.AmqpMessage.Durable() -> bool
412+
RabbitMQ.AMQP.Client.Impl.AmqpMessage.Durable(bool durable) -> RabbitMQ.AMQP.Client.IMessage!
410413
RabbitMQ.AMQP.Client.Impl.AmqpMessage.GroupId() -> string!
411414
RabbitMQ.AMQP.Client.Impl.AmqpMessage.GroupId(string! groupId) -> RabbitMQ.AMQP.Client.IMessage!
412415
RabbitMQ.AMQP.Client.Impl.AmqpMessage.GroupSequence() -> uint

Tests/Publisher/PublisherTests.cs

Lines changed: 57 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
// Copyright (c) 2017-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
44

55
using System;
6+
using System.Collections.Generic;
67
using System.Linq;
78
using System.Text;
89
using System.Threading.Tasks;
@@ -22,8 +23,8 @@ public async Task ValidateBuilderRaiseExceptionIfQueueOrExchangeAreNotSetCorrect
2223
Assert.NotNull(_management);
2324

2425
await Assert.ThrowsAsync<InvalidAddressException>(() =>
25-
_connection.PublisherBuilder().Queue("queue_and_exchange_cant_set_together").
26-
Exchange("queue_and_exchange_cant_set_together").BuildAsync());
26+
_connection.PublisherBuilder().Queue("queue_and_exchange_cant_set_together")
27+
.Exchange("queue_and_exchange_cant_set_together").BuildAsync());
2728

2829
await _connection.CloseAsync();
2930
Assert.Empty(_connection.Publishers);
@@ -192,6 +193,7 @@ public async Task PublisherSendingShouldThrowWhenExchangeHasBeenDeleted()
192193
publishOutcome = nextPublishResult.Outcome;
193194
break;
194195
}
196+
195197
await Task.Delay(TimeSpan.FromMilliseconds(100));
196198
}
197199

@@ -243,6 +245,7 @@ public async Task PublisherSendingShouldThrowWhenQueueHasBeenDeleted()
243245
publishOutcome = nextPublishResult.Outcome;
244246
break;
245247
}
248+
246249
await Task.Delay(TimeSpan.FromMilliseconds(100));
247250
}
248251

@@ -256,4 +259,56 @@ public async Task PublisherSendingShouldThrowWhenQueueHasBeenDeleted()
256259
await publisher.CloseAsync();
257260
publisher.Dispose();
258261
}
262+
263+
[Theory]
264+
[InlineData(QueueType.QUORUM)]
265+
[InlineData(QueueType.CLASSIC)]
266+
public async Task MessageShouldBeDurableByDefault(QueueType queueType)
267+
{
268+
Assert.NotNull(_connection);
269+
Assert.NotNull(_management);
270+
271+
IQueueSpecification queueSpec = _management.Queue(_queueName).Type(queueType);
272+
await queueSpec.DeclareAsync();
273+
274+
IPublisher publisher = await _connection.PublisherBuilder().Queue(queueSpec).BuildAsync();
275+
List<IMessage> messages = new();
276+
TaskCompletionSource<List<IMessage>> tcs = new();
277+
IConsumer consumer = await _connection.ConsumerBuilder()
278+
.Queue(queueSpec)
279+
.MessageHandler((context, message) =>
280+
{
281+
messages.Add(message);
282+
context.Accept();
283+
if (messages.Count == 2)
284+
{
285+
tcs.SetResult(messages);
286+
}
287+
288+
return Task.CompletedTask;
289+
}).BuildAndStartAsync();
290+
291+
// the first message should be durable by default
292+
AmqpMessage durable = new("Hello wold!");
293+
PublishResult pr = await publisher.PublishAsync(durable);
294+
Assert.Equal(OutcomeState.Accepted, pr.Outcome.State);
295+
Assert.True(durable.Durable());
296+
297+
// the second message should be not durable set by the user
298+
299+
AmqpMessage notDurable = new("Hello wold!");
300+
notDurable.Durable(false);
301+
PublishResult pr2 = await publisher.PublishAsync(notDurable);
302+
Assert.Equal(OutcomeState.Accepted, pr2.Outcome.State);
303+
Assert.False(notDurable.Durable());
304+
var r = await tcs.Task.WaitAsync(TimeSpan.FromSeconds(10));
305+
Assert.True(r[0].Durable());
306+
Assert.False(r[1].Durable());
307+
308+
await consumer.CloseAsync();
309+
await publisher.CloseAsync();
310+
await queueSpec.DeleteAsync();
311+
312+
Assert.Empty(_connection.Publishers);
313+
}
259314
}

0 commit comments

Comments
 (0)