Skip to content

Commit 0c04724

Browse files
authored
bug: Defer Exception causes ASB to Attempt a Reject then Complete with the same Lock Token (BrighterCommand#3619)
This fixes BrighterCommand#3585 for Brighter v10 by allowing the signaling of weather the channel has already acknowledged the Message back to the pump
1 parent 65aeeac commit 0c04724

18 files changed

Lines changed: 66 additions & 66 deletions

File tree

src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageConsumer.cs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -120,17 +120,19 @@ await client.DeleteMessageAsync(new DeleteMessageRequest(_channelUrl, receiptHan
120120
/// Sync over async
121121
/// </summary>
122122
/// <param name="message">The message.</param>
123-
public void Reject(Message message) => BrighterAsyncContext.Run(async () => await RejectAsync(message));
123+
/// <returns>True if the message has been removed from the channel, false otherwise</returns>
124+
public bool Reject(Message message) => BrighterAsyncContext.Run(async () => await RejectAsync(message));
124125

125126
/// <summary>
126127
/// Rejects the specified message.
127128
/// </summary>
128129
/// <param name="message">The message.</param>
129130
/// <param name="cancellationToken">Cancel the reject operation</param>
130-
public async Task RejectAsync(Message message, CancellationToken cancellationToken = default(CancellationToken))
131+
/// <returns>True if the message has been removed from the channel, false otherwise</returns>
132+
public async Task<bool> RejectAsync(Message message, CancellationToken cancellationToken = default(CancellationToken))
131133
{
132134
if (!message.Header.Bag.TryGetValue("ReceiptHandle", out object? value))
133-
return;
135+
return false;
134136

135137
var receiptHandle = value.ToString();
136138

@@ -157,6 +159,8 @@ await client.ChangeMessageVisibilityAsync(
157159
Log.ErrorRejectingMessage(s_logger, exception, message.Id, receiptHandle, _queueName);
158160
throw;
159161
}
162+
163+
return true;
160164
}
161165

162166
/// <summary>

src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusConsumer.cs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -230,14 +230,16 @@ public async ValueTask DisposeAsync()
230230
/// Sync over Async
231231
/// </summary>
232232
/// <param name="message">The message.</param>
233-
public void Reject(Message message) => BrighterAsyncContext.Run(async () => await RejectAsync(message));
233+
/// <returns>True if the message has been removed from the channel, false otherwise</returns>
234+
public bool Reject(Message message) => BrighterAsyncContext.Run(async () => await RejectAsync(message));
234235

235236
/// <summary>
236237
/// Rejects the specified message.
237238
/// </summary>
238239
/// <param name="message">The message.</param>
239240
/// <param name="cancellationToken">Cancel the rejection</param>
240-
public async Task RejectAsync(Message message, CancellationToken cancellationToken = default(CancellationToken))
241+
/// <returns>True if the message has been removed from the channel, false otherwise</returns>
242+
public async Task<bool> RejectAsync(Message message, CancellationToken cancellationToken = default(CancellationToken))
241243
{
242244
try
243245
{
@@ -260,6 +262,8 @@ public async ValueTask DisposeAsync()
260262
Logger.LogError(ex, "Error Dead Lettering message with id {Id}", message.Id);
261263
throw;
262264
}
265+
266+
return true;
263267
}
264268

265269
/// <summary>

src/Paramore.Brighter.MessagingGateway.Kafka/KafkaMessageConsumer.cs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -480,9 +480,11 @@ public Message[] Receive(TimeSpan? timeOut = null)
480480
/// This is just a commit of the offset to move past the record without processing it
481481
/// </remarks>
482482
/// <param name="message">The message.</param>
483-
public void Reject(Message message)
483+
/// <returns>True if the message has been removed from the channel, false otherwise</returns>
484+
public bool Reject(Message message)
484485
{
485486
Acknowledge(message);
487+
return true;
486488
}
487489

488490
/// <summary>
@@ -494,10 +496,10 @@ public void Reject(Message message)
494496
/// </remarks>
495497
/// <param name="message">The message.</param>
496498
/// <param name="cancellationToken">Cancels the reject; not used as non-blocking</param>
497-
public Task RejectAsync(Message message, CancellationToken cancellationToken = default(CancellationToken))
499+
public Task<bool> RejectAsync(Message message, CancellationToken cancellationToken = default(CancellationToken))
498500
{
499501
Reject(message);
500-
return Task.CompletedTask;
502+
return Task.FromResult(true);
501503
}
502504

503505
/// <summary>
@@ -508,7 +510,7 @@ public void Reject(Message message)
508510
/// <returns>False as no requeue support on Kafka</returns>
509511
public bool Requeue(Message message, TimeSpan? delay = null)
510512
{
511-
return false;
513+
return true;
512514
}
513515

514516
/// <summary>
@@ -520,7 +522,7 @@ public bool Requeue(Message message, TimeSpan? delay = null)
520522
/// <returns>False as no requeue support on Kafka</returns>
521523
public Task<bool> RequeueAsync(Message message, TimeSpan? delay = null, CancellationToken cancellationToken = default(CancellationToken))
522524
{
523-
return Task.FromResult(false);
525+
return Task.FromResult(true);
524526
}
525527

526528
private void CheckHasPartitions()

src/Paramore.Brighter.MessagingGateway.MQTT/MQTTMessageConsumer.cs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -175,19 +175,16 @@ public Task<Message[]> ReceiveAsync(TimeSpan? timeOut = null, CancellationToken
175175
/// Not implemented Reject Method.
176176
/// </summary>
177177
/// <param name="message"></param>
178-
public void Reject(Message message)
179-
{
180-
}
178+
public bool Reject(Message message)
179+
=> false;
181180

182181
/// <summary>
183182
/// Not implemented Reject Method.
184183
/// </summary>
185184
/// <param name="message"></param>
186185
/// <param name="cancellationToken"></param>
187-
public Task RejectAsync(Message message, CancellationToken cancellationToken = default)
188-
{
189-
return Task.CompletedTask;
190-
}
186+
public Task<bool> RejectAsync(Message message, CancellationToken cancellationToken = default)
187+
=> Task.FromResult(false);
191188

192189

193190
/// <summary>

src/Paramore.Brighter.MessagingGateway.MsSql/MsSqlMessageConsumer.cs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -101,9 +101,10 @@ public Message[] Receive(TimeSpan? timeOut = null)
101101
/// Not implemented for the MSSQL message consumer
102102
/// </remarks>
103103
/// <param name="message">The message.</param>
104-
public void Reject(Message message)
104+
public bool Reject(Message message)
105105
{
106106
Log.RejectingMessageNotImplemented(s_logger, message.Header.Topic, message.Id);
107+
return false;
107108
}
108109

109110
/// <summary>
@@ -114,11 +115,8 @@ public void Reject(Message message)
114115
/// </remarks>
115116
/// <param name="message">The message.</param>
116117
/// <param name="cancellationToken">A <see cref="CancellationToken"/> to cancel the reject</param>
117-
public Task RejectAsync(Message message, CancellationToken cancellationToken = default(CancellationToken))
118-
{
119-
Reject(message);
120-
return Task.CompletedTask;
121-
}
118+
public Task<bool> RejectAsync(Message message, CancellationToken cancellationToken = default(CancellationToken))
119+
=> Task.FromResult(Reject(message));
122120

123121
/// <summary>
124122
/// Requeues the specified message.

src/Paramore.Brighter.MessagingGateway.Postgres/PostgresMessageConsumer.cs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,11 @@ public async Task AcknowledgeAsync(Message message, CancellationToken cancellati
5959
}
6060

6161
/// <inheritdoc />
62-
public async Task RejectAsync(Message message, CancellationToken cancellationToken = default)
62+
public async Task<bool> RejectAsync(Message message, CancellationToken cancellationToken = default)
6363
{
6464
if (!message.Header.Bag.TryGetValue("ReceiptHandle", out var receiptHandle))
6565
{
66-
return;
66+
return false;
6767
}
6868

6969
try
@@ -73,6 +73,7 @@ public async Task RejectAsync(Message message, CancellationToken cancellationTok
7373
await using var command = connection.CreateCommand();
7474
command.CommandText = $"DELETE FROM \"{SchemaName}\".\"{TableName}\" WHERE \"id\" = $1";
7575
command.Parameters.Add(new NpgsqlParameter { Value = receiptHandle });
76+
return true;
7677
}
7778
catch (Exception exception)
7879
{
@@ -220,11 +221,11 @@ public void Acknowledge(Message message)
220221
}
221222

222223
/// <inheritdoc />
223-
public void Reject(Message message)
224+
public bool Reject(Message message)
224225
{
225226
if (!message.Header.Bag.TryGetValue("ReceiptHandle", out var receiptHandle))
226227
{
227-
return;
228+
return false;
228229
}
229230

230231
try
@@ -235,7 +236,8 @@ public void Reject(Message message)
235236
using var command = connection.CreateCommand();
236237
command.CommandText = $"DELETE FROM \"{SchemaName}\".\"{TableName}\" WHERE \"id\" = $1";
237238
command.Parameters.Add(new NpgsqlParameter { Value = receiptHandle });
238-
command.ExecuteNonQuery();
239+
command.ExecuteNonQuery();
240+
return true;
239241
}
240242
catch (Exception exception)
241243
{

src/Paramore.Brighter.MessagingGateway.RMQ.Async/RmqMessageConsumer.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -330,9 +330,9 @@ public async Task<bool> RequeueAsync(Message message, TimeSpan? timeout = null,
330330
/// Rejects the specified message.
331331
/// </summary>
332332
/// <param name="message">The message.</param>
333-
public void Reject(Message message) => BrighterAsyncContext.Run(async () => await RejectAsync(message));
333+
public bool Reject(Message message) => BrighterAsyncContext.Run(async () => await RejectAsync(message));
334334

335-
public async Task RejectAsync(Message message, CancellationToken cancellationToken = default)
335+
public async Task<bool> RejectAsync(Message message, CancellationToken cancellationToken = default)
336336
{
337337
try
338338
{
@@ -343,6 +343,7 @@ public async Task RejectAsync(Message message, CancellationToken cancellationTok
343343
Log.NoAckMessage(s_logger, message.Id, message.DeliveryTag);
344344
//if we have a DLQ, this will force over to the DLQ
345345
await Channel.BasicRejectAsync(message.DeliveryTag, false, cancellationToken);
346+
return true;
346347
}
347348
catch (Exception exception)
348349
{

src/Paramore.Brighter.MessagingGateway.RMQ.Sync/RmqMessageConsumer.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,14 +233,15 @@ public bool Requeue(Message message, TimeSpan? timeout = null)
233233
/// Rejects the specified message.
234234
/// </summary>
235235
/// <param name="message">The message.</param>
236-
public void Reject(Message message)
236+
public bool Reject(Message message)
237237
{
238238
try
239239
{
240240
EnsureBroker(_queueName);
241241
Log.NoAckMessage(s_logger, message.Id, message.DeliveryTag);
242242
//if we have a DLQ, this will force over to the DLQ
243243
Channel!.BasicReject(message.DeliveryTag, false);
244+
return true;
244245
}
245246
catch (Exception exception)
246247
{

src/Paramore.Brighter.MessagingGateway.Redis/RedisMessageConsumer.cs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -258,22 +258,19 @@ public Message[] Receive(TimeSpan? timeOut = null)
258258
/// This a 'do nothing operation' as we have already popped
259259
/// </summary>
260260
/// <param name="message">The message to reject</param>
261-
public void Reject(Message message)
261+
public bool Reject(Message message)
262262
{
263263
_inflight.Remove(message.Id);
264+
return true;
264265
}
265266

266267
/// <summary>
267268
/// This a 'do nothing operation' as we have already popped
268269
/// </summary>
269270
/// <param name="message">The message to reject</param>
270271
/// <param name="cancellationToken">The cancellation token</param>
271-
public async Task RejectAsync(Message message, CancellationToken cancellationToken = default(CancellationToken))
272-
{
273-
Reject(message);
274-
await Task.CompletedTask;
275-
}
276-
272+
public Task<bool> RejectAsync(Message message, CancellationToken cancellationToken = default(CancellationToken))
273+
=> Task.FromResult(Reject(message));
277274

278275
/// <summary>
279276
/// Requeues the specified message.

src/Paramore.Brighter.ServiceActivator/Proactor.cs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -339,12 +339,12 @@ private RequestContext InitRequestContext(Activity? span, Message message)
339339
return context;
340340
}
341341

342-
private async Task RejectMessage(Message message)
342+
private async Task<bool> RejectMessage(Message message)
343343
{
344344
Log.RejectingMessage(s_logger, message.Id, Channel.Name, Channel.RoutingKey, Environment.CurrentManagedThreadId);
345345
IncrementUnacceptableMessageLimit();
346346

347-
await Channel.RejectAsync(message);
347+
return await Channel.RejectAsync(message);
348348
}
349349

350350
private async Task<bool> RequeueMessage(Message message)
@@ -361,8 +361,7 @@ private async Task<bool> RequeueMessage(Message message)
361361
? string.Empty
362362
: $" (original message id {originalMessageId})", Channel.Name, Channel.RoutingKey, Thread.CurrentThread.ManagedThreadId);
363363

364-
await RejectMessage(message);
365-
return false;
364+
return await RejectMessage(message);
366365
}
367366
}
368367

0 commit comments

Comments
 (0)