Skip to content

Commit 1e6f8ab

Browse files
lukebakkenbordingGsantomaggio
authored
Add dedicated exception for basic.return messages. (#1832)
* Add dedicated exception for `basic.return` messages. Fixes #1831 This PR adds the `PublishReturnException` class that includes the originating exchange and routing key for a `basic.return` message. It should be backwards-compatible in the API. * Add reply code and reply text to the new `PublishBasicException` exception type. * Fix bug in the code that did not set certain `PublishReturnException` data * Add exception messages * my 2cents. Add more conditions to the test Signed-off-by: Gabriele Santomaggio <[email protected]> * formatting Signed-off-by: Gabriele Santomaggio <[email protected]> --------- Signed-off-by: Gabriele Santomaggio <[email protected]> Co-authored-by: Brandon Ording <[email protected]> Co-authored-by: Gabriele Santomaggio <[email protected]>
1 parent 9ecad93 commit 1e6f8ab

File tree

5 files changed

+117
-7
lines changed

5 files changed

+117
-7
lines changed

projects/RabbitMQ.Client/Exceptions/PublishException.cs

+67-1
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,11 @@ public class PublishException : RabbitMQClientException
4242
private bool _isReturn = false;
4343
private ulong _publishSequenceNumber = ulong.MinValue;
4444

45-
public PublishException(ulong publishSequenceNumber, bool isReturn) : base()
45+
public PublishException(ulong publishSequenceNumber, bool isReturn) : this(publishSequenceNumber, isReturn, "Message rejected by broker.")
46+
{
47+
}
48+
49+
public PublishException(ulong publishSequenceNumber, bool isReturn, string message) : base(message)
4650
{
4751
if (publishSequenceNumber == ulong.MinValue)
4852
{
@@ -63,4 +67,66 @@ public PublishException(ulong publishSequenceNumber, bool isReturn) : base()
6367
/// </summary>
6468
public ulong PublishSequenceNumber => _publishSequenceNumber;
6569
}
70+
71+
/// <summary>
72+
/// Class for exceptions related to publisher confirmations
73+
/// or the <c>mandatory</c> flag, when <c>basic.return</c> is
74+
/// sent from the broker.
75+
/// </summary>
76+
public class PublishReturnException : PublishException
77+
{
78+
private readonly string _exchange;
79+
private readonly string _routingKey;
80+
private readonly ushort _replyCode;
81+
private readonly string _replyText;
82+
83+
public PublishReturnException(ulong publishSequenceNumber, string message,
84+
string? exchange = null, string? routingKey = null,
85+
ushort? replyCode = null, string? replyText = null)
86+
: base(publishSequenceNumber, true, message)
87+
{
88+
_exchange = exchange ?? string.Empty;
89+
_routingKey = routingKey ?? string.Empty;
90+
_replyCode = replyCode ?? 0;
91+
_replyText = replyText ?? string.Empty;
92+
}
93+
94+
/// <summary>
95+
/// Get the exchange associated with this <c>basic.return</c>
96+
/// </summary>
97+
public string Exchange => _exchange;
98+
99+
/// <summary>
100+
/// Get the routing key associated with this <c>basic.return</c>
101+
/// </summary>
102+
public string RoutingKey => _routingKey;
103+
104+
/// <summary>
105+
/// Get the reply code associated with this <c>basic.return</c>
106+
/// </summary>
107+
public ushort ReplyCode => _replyCode;
108+
109+
/// <summary>
110+
/// Get the reply text associated with this <c>basic.return</c>
111+
/// </summary>
112+
public string ReplyText => _replyText;
113+
}
114+
115+
internal static class PublishExceptionFactory
116+
{
117+
internal static PublishException Create(bool isReturn,
118+
ulong deliveryTag, string? exchange = null, string? routingKey = null,
119+
ushort? replyCode = null, string? replyText = null)
120+
{
121+
if (isReturn)
122+
{
123+
string message = $"{replyCode} {replyText} Exchange: {exchange} Routing Key: {routingKey}";
124+
return new PublishReturnException(deliveryTag, message, exchange, routingKey, replyCode, replyText);
125+
}
126+
else
127+
{
128+
return new PublishException(deliveryTag, isReturn);
129+
}
130+
}
131+
}
66132
}

projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs

+12-4
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,9 @@ private void HandleAck(ulong deliveryTag, bool multiple)
200200
}
201201

202202
[MethodImpl(MethodImplOptions.AggressiveInlining)]
203-
private void HandleNack(ulong deliveryTag, bool multiple, bool isReturn)
203+
private void HandleNack(ulong deliveryTag, bool multiple, bool isReturn,
204+
string? exchange = null, string? routingKey = null,
205+
ushort? replyCode = null, string? replyText = null)
204206
{
205207
if (ShouldHandleAckOrNack(deliveryTag))
206208
{
@@ -210,7 +212,9 @@ private void HandleNack(ulong deliveryTag, bool multiple, bool isReturn)
210212
{
211213
if (pair.Key <= deliveryTag)
212214
{
213-
pair.Value.SetException(new PublishException(pair.Key, isReturn));
215+
PublishException ex = PublishExceptionFactory.Create(isReturn, pair.Key,
216+
exchange, routingKey, replyCode, replyText);
217+
pair.Value.SetException(ex);
214218
_confirmsTaskCompletionSources.Remove(pair.Key, out _);
215219
}
216220
}
@@ -219,7 +223,9 @@ private void HandleNack(ulong deliveryTag, bool multiple, bool isReturn)
219223
{
220224
if (_confirmsTaskCompletionSources.Remove(deliveryTag, out TaskCompletionSource<bool>? tcs))
221225
{
222-
tcs.SetException(new PublishException(deliveryTag, isReturn));
226+
PublishException ex = PublishExceptionFactory.Create(isReturn, deliveryTag,
227+
exchange, routingKey, replyCode, replyText);
228+
tcs.SetException(ex);
223229
}
224230
}
225231
}
@@ -249,7 +255,9 @@ private void HandleReturn(BasicReturnEventArgs basicReturnEvent)
249255
}
250256
}
251257

252-
HandleNack(publishSequenceNumber, multiple: false, isReturn: true);
258+
HandleNack(publishSequenceNumber, multiple: false, isReturn: true,
259+
exchange: basicReturnEvent.Exchange, routingKey: basicReturnEvent.RoutingKey,
260+
replyCode: basicReturnEvent.ReplyCode, replyText: basicReturnEvent.ReplyText);
253261
}
254262
}
255263

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
RabbitMQ.Client.Exceptions.PublishException.PublishException(ulong publishSequenceNumber, bool isReturn, string! message) -> void
2+
RabbitMQ.Client.Exceptions.PublishReturnException
3+
RabbitMQ.Client.Exceptions.PublishReturnException.Exchange.get -> string!
4+
RabbitMQ.Client.Exceptions.PublishReturnException.PublishReturnException(ulong publishSequenceNumber, string! message, string? exchange = null, string? routingKey = null, ushort? replyCode = null, string? replyText = null) -> void
5+
RabbitMQ.Client.Exceptions.PublishReturnException.ReplyCode.get -> ushort
6+
RabbitMQ.Client.Exceptions.PublishReturnException.ReplyText.get -> string!
7+
RabbitMQ.Client.Exceptions.PublishReturnException.RoutingKey.get -> string!

projects/Test/Integration/TestBasicPublishAsync.cs

+25-1
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,10 @@
2929
// Copyright (c) 2007-2025 Broadcom. All Rights Reserved.
3030
//---------------------------------------------------------------------------
3131

32+
using System;
3233
using System.Threading.Tasks;
3334
using RabbitMQ.Client;
35+
using RabbitMQ.Client.Exceptions;
3436
using Xunit;
3537
using Xunit.Abstractions;
3638

@@ -49,7 +51,6 @@ public async Task TestQueuePurgeAsync()
4951

5052
var publishSyncSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
5153

52-
5354
QueueDeclareOk q = await _channel.QueueDeclareAsync(string.Empty, false, false, true);
5455

5556
var publishTask = Task.Run(async () =>
@@ -65,5 +66,28 @@ public async Task TestQueuePurgeAsync()
6566
Assert.True(await publishSyncSource.Task);
6667
Assert.Equal((uint)messageCount, await _channel.QueuePurgeAsync(q));
6768
}
69+
70+
[Fact]
71+
public async Task TestBasicReturnAsync()
72+
{
73+
string routingKey = Guid.NewGuid().ToString();
74+
try
75+
{
76+
await _channel.BasicPublishAsync(exchange: string.Empty, routingKey: routingKey,
77+
mandatory: true, body: GetRandomBody());
78+
}
79+
catch (PublishReturnException prex)
80+
{
81+
Assert.True(prex.IsReturn);
82+
Assert.NotNull(prex.Exchange);
83+
Assert.Equal(string.Empty, prex.Exchange);
84+
Assert.NotNull(prex.RoutingKey);
85+
Assert.Equal(routingKey, prex.RoutingKey);
86+
Assert.NotEqual(0, prex.ReplyCode);
87+
Assert.NotNull(prex.ReplyText);
88+
Assert.Equal("NO_ROUTE", prex.ReplyText);
89+
90+
}
91+
}
6892
}
6993
}

projects/Test/Integration/TestConcurrentAccessWithSharedChannel.cs

+6-1
Original file line numberDiff line numberDiff line change
@@ -145,8 +145,13 @@ await TestConcurrentOperationsAsync(async () =>
145145
}
146146
catch (PublishException ex)
147147
{
148-
if (ex.IsReturn)
148+
if (ex is PublishReturnException prex)
149149
{
150+
Assert.True(prex.IsReturn);
151+
Assert.NotNull(prex.Exchange);
152+
Assert.NotNull(prex.RoutingKey);
153+
Assert.NotEqual(0, prex.ReplyCode);
154+
Assert.NotNull(prex.ReplyText);
150155
Interlocked.Increment(ref totalReturnCount);
151156
}
152157
else

0 commit comments

Comments
 (0)