Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
using System;
using System.Threading.Tasks;
using IntegrationTests;
using JasperFx;
using JasperFx.Core;
using JasperFx.Events;
using Marten;
using Marten.Exceptions;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Wolverine;
using Wolverine.Marten;
using Wolverine.Runtime;


namespace MartenTests.Bugs;

public class Bug_2026_scheduled_messages_with_partitioning
{
//[Fact] -- don't run this, but this was used to fix GH-2026 w/ some manual testing
public async Task send_messages_with_delay()
{
using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.Durability.Mode = DurabilityMode.Solo;

opts.Services.AddMarten(m =>
{
m.DatabaseSchemaName = "gh2026";
m.DisableNpgsqlLogging = true;
m.Connection(Servers.PostgresConnectionString);
}).IntegrateWithWolverine();

opts.Durability.EnableInboxPartitioning = true;
opts.Policies.LogMessageStarting(LogLevel.Information);
opts.Policies.MessageExecutionLogLevel(LogLevel.Information);
opts.Policies.MessageSuccessLogLevel(LogLevel.Information);

opts.Policies.UseDurableLocalQueues();
opts.Policies.AutoApplyTransactions();

// use a local partitioned queue for the ReproduceBug message type
opts.MessagePartitioning.UseInferredMessageGrouping()
.ByMessage<ReproduceBug>(x => x.Name)
.PublishToPartitionedLocalMessaging("repro", 8, topology =>
{
topology.Message<ReproduceBug>();
});
}).StartAsync();

await host.RebuildAllEnvelopeStorageAsync();

await host.SendAsync(new TriggerReproduceBug("Foo"));


await Task.Delay(2.Minutes());
}
}

public sealed record ReproduceBug(string Name);

public record TriggerReproduceBug(string Name);

public static class ReproduceBugHandler
{
public static void Handle(ReproduceBug command)
{
Console.WriteLine($"Reproducing bug for {command.Name}");
}

public static OutgoingMessages Handle(TriggerReproduceBug cmd)
{
var outgoingMessages = new OutgoingMessages();
outgoingMessages.Delay(new ReproduceBug(cmd.Name), 30.Seconds());
return outgoingMessages;
}
}
6 changes: 6 additions & 0 deletions src/Wolverine/Envelope.Internals.cs
Original file line number Diff line number Diff line change
Expand Up @@ -366,4 +366,10 @@ internal DeliveryOptions ToDeliveryOptions()
SagaId = SagaId
};
}

internal void ClearAnyScheduling()
{
Status = EnvelopeStatus.Incoming;
ScheduledTime = null;
}
}
3 changes: 3 additions & 0 deletions src/Wolverine/HostBuilderExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
using System;
using System.Diagnostics;
using System.Linq;
using System.Reflection;
using System.Threading.Tasks;
using JasperFx.CodeGeneration;
using JasperFx.CodeGeneration.Commands;
using JasperFx.CodeGeneration.Model;
Expand Down
7 changes: 6 additions & 1 deletion src/Wolverine/Runtime/MessageContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -593,11 +593,16 @@ private async Task flushScheduledMessagesAsync()
if (Storage is NullMessageStore)
{
foreach (var envelope in Scheduled)
{
Runtime.ScheduleLocalExecutionInMemory(envelope.ScheduledTime!.Value, envelope);
}
}
else
{
foreach (var envelope in Scheduled) await Storage.Inbox.RescheduleExistingEnvelopeForRetryAsync(envelope);
foreach (var envelope in Scheduled)
{
await Storage.Inbox.RescheduleExistingEnvelopeForRetryAsync(envelope);
}
}

Scheduled.Clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ public PartitionedMessageReRouter(PartitionedMessageTopology topology, Type mess

public Task HandleAsync(MessageContext context, CancellationToken cancellation)
{
// Knock it out of being scheduled just in case
// From https://github.com/JasperFx/wolverine/issues/2026
context.Envelope.ClearAnyScheduling();

var endpoint = _topology.SelectSlot(context.Envelope);

return context
Expand Down
Loading