Skip to content

Commit 8af80f2

Browse files
committed
Hardened the handled marking on non-transactional idempotency. Closes GH-2036
1 parent 0710979 commit 8af80f2

File tree

2 files changed

+25
-2
lines changed

2 files changed

+25
-2
lines changed
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
using Wolverine.Runtime;
2+
using Wolverine.Runtime.Agents;
3+
4+
namespace Wolverine.Persistence.Durability;
5+
6+
public class PersistHandled(Envelope Handled) : IAgentCommand
7+
{
8+
public async Task<AgentCommands> ExecuteAsync(IWolverineRuntime runtime, CancellationToken cancellationToken)
9+
{
10+
await runtime.Storage.Inbox.StoreIncomingAsync(Handled);
11+
12+
return AgentCommands.Empty;
13+
}
14+
}

src/Wolverine/Runtime/MessageContext.cs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,9 +102,18 @@ public async Task AssertEagerIdempotencyAsync(CancellationToken cancellation)
102102

103103
public async Task PersistHandledAsync()
104104
{
105-
// TODO -- add some retries here!!!!
106105
var handled = Envelope.ForPersistedHandled(Envelope, DateTimeOffset.UtcNow, Runtime.Options.Durability);
107-
await Runtime.Storage.Inbox.StoreIncomingAsync(handled);
106+
try
107+
{
108+
await Runtime.Storage.Inbox.StoreIncomingAsync(handled);
109+
}
110+
catch (Exception e)
111+
{
112+
Runtime.Logger.LogError(e, "Error trying to mark message {Id} as handled. Retrying later.", handled.Id);
113+
114+
// Retry this off to the side...
115+
await new MessageBus(Runtime).PublishAsync(new PersistHandled(handled));
116+
}
108117
}
109118

110119
public async Task FlushOutgoingMessagesAsync()

0 commit comments

Comments
 (0)