Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions src/Extensions/Wolverine.RoutingSlip.Tests/ActivityTracker.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
using System.Collections.Concurrent;

namespace Wolverine.RoutingSlip.Tests;

internal static class ActivityTracker
{
private static readonly ConcurrentDictionary<Guid, List<ActivityExecutionRecord>> Executions = new();
private static readonly ConcurrentDictionary<Guid, List<ActivityCompensationRecord>> Compensations = new();

public static void RecordExecution(Guid trackingNumber, string activityName, Uri destination)
{
var records = Executions.GetOrAdd(trackingNumber, _ => []);
lock (records)
{
var attempt = records.Count(r => r.ActivityName == activityName) + 1;
records.Add(new ActivityExecutionRecord(activityName, destination, attempt));
}
}

public static void RecordCompensation(Guid trackingNumber, string activityName, Uri destination)
{
var records = Compensations.GetOrAdd(trackingNumber, _ => []);
lock (records)
{
records.Add(new ActivityCompensationRecord(activityName, destination));
}
}

public static IReadOnlyList<ActivityExecutionRecord> GetExecutions(Guid trackingNumber)
{
return Executions.TryGetValue(trackingNumber, out var records)
? records.ToList()
: Array.Empty<ActivityExecutionRecord>();
}

public static IReadOnlyList<ActivityCompensationRecord> GetCompensations(Guid trackingNumber)
{
return Compensations.TryGetValue(trackingNumber, out var records)
? records.ToList()
: Array.Empty<ActivityCompensationRecord>();
}

public static void Reset()
{
Executions.Clear();
Compensations.Clear();
}
}

internal record ActivityExecutionRecord(string ActivityName, Uri Destination, int Attempt);

internal record ActivityCompensationRecord(string ActivityName, Uri Destination);
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<IsPackable>false</IsPackable>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.14.1" />
<PackageReference Include="Shouldly" Version="4.3.0" />
<PackageReference Include="xunit" Version="2.9.0" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.8.0">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="GitHubActionsTestLogger" Version="2.4.1" PrivateAssets="All" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Wolverine.RoutingSlip\Wolverine.RoutingSlip.csproj"/>
</ItemGroup>

<ItemGroup>
<Content Include="$(SolutionDir)xunit.runner.json" CopyToOutputDirectory="PreserveNewest"/>
</ItemGroup>

</Project>
277 changes: 277 additions & 0 deletions src/Extensions/Wolverine.RoutingSlip.Tests/end_to_end.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,277 @@
using System.Collections.Concurrent;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Shouldly;
using Wolverine.Tracking;
using Wolverine.Transports.Tcp;
using Wolverine.Util;
using Xunit;

namespace Wolverine.RoutingSlip.Tests;

public class end_to_end : IAsyncLifetime
{
private IHost _pubHost;
private IHost _firstHost;
private IHost _secondHost;
private IHost _thirdHost;

private readonly int _firstHostPort = PortFinder.GetAvailablePort();
private readonly int _secondHostPort = PortFinder.GetAvailablePort();
private readonly int _thirdHostPort = PortFinder.GetAvailablePort();

[Fact]
public async Task end_to_end_message_using_routing_slip()
{
ResetActivityState();

// Arrange
var builder = new RoutingSlipBuilder();
builder.AddActivity("activity1", new Uri($"tcp://localhost:{_firstHostPort}"));
builder.AddActivity("activity2", new Uri($"tcp://localhost:{_secondHostPort}"));
builder.AddActivity("activity3", new Uri($"tcp://localhost:{_thirdHostPort}"));

// Act
var session = await _pubHost.TrackActivity()
.AlsoTrack(_firstHost)
.AlsoTrack(_secondHost)
.AlsoTrack(_thirdHost)
.ExecuteAndWaitAsync(ctx => ctx.ExecuteRoutingSlip(builder.Build()));

// Assert
var received = session.Received.MessagesOf<ExecutionContext>().ToList();
received.Count.ShouldBe(3);

var trackingNumber = received.Select(x => x.RoutingSlip.TrackingNumber).Distinct().Single();
var executions = ActivityTracker.GetExecutions(trackingNumber);

executions.Select(x => x.ActivityName).ShouldBe(new[] { "activity1", "activity2", "activity3" });
executions.Select(x => x.Destination.Port).ShouldBe(new[] { _firstHostPort, _secondHostPort, _thirdHostPort });
executions.Count(x => x.ActivityName == "activity2").ShouldBe(1);
ActivityTracker.GetCompensations(trackingNumber).ShouldBeEmpty();
}

[Fact]
public async Task end_to_end_message_using_routing_slip_with_compensation()
{
ResetActivityState();

// Arrange
var builder = new RoutingSlipBuilder();
builder.AddActivity("activity1", new Uri($"tcp://localhost:{_firstHostPort}"));
builder.AddActivity("activity2", new Uri($"tcp://localhost:{_secondHostPort}"));
builder.AddActivity("errorActivity3", new Uri($"tcp://localhost:{_thirdHostPort}"));

// Act
var session = await _pubHost.TrackActivity()
.IncludeExternalTransports()
.DoNotAssertOnExceptionsDetected()
.AlsoTrack(_firstHost)
.AlsoTrack(_secondHost)
.AlsoTrack(_thirdHost)
.ExecuteAndWaitAsync(ctx => ctx.ExecuteRoutingSlip(builder.Build()));

// Assert
var receivedExecutions = session.Received.MessagesOf<ExecutionContext>().ToList();
receivedExecutions.Count.ShouldBe(3);

var receivedCompensations = session.Received.MessagesOf<CompensationContext>().ToList();
receivedCompensations.Count.ShouldBe(2);

var trackingNumber = receivedExecutions.Select(x => x.RoutingSlip.TrackingNumber).Distinct().Single();
var compensationEvents = ActivityTracker.GetCompensations(trackingNumber);
compensationEvents.Select(x => x.ActivityName).OrderBy(x => x).ShouldBe(new[] { "activity1", "activity2" }.OrderBy(x => x).ToArray());
compensationEvents.Select(x => x.Destination.Port).OrderBy(x => x).ShouldBe(new[] {_secondHostPort, _firstHostPort}.OrderBy(x => x).ToArray());
ActivityTracker.GetExecutions(trackingNumber).Any(x => x.ActivityName == "errorActivity3").ShouldBeTrue();
}

[Fact]
public async Task retries_failed_activity_before_compensating_when_policy_overridden()
{
ResetActivityState();

var retryPolicy = new Action<RoutingSlipOptions>(options =>
{
options.OverridePolicy = policy => policy.RetryTimes(1);
});

ActivityHandlerBehavior.FailNextExecution("retryActivity2");

var firstPort = PortFinder.GetAvailablePort();
var secondPort = PortFinder.GetAvailablePort();
var thirdPort = PortFinder.GetAvailablePort();

var pubHost = await Host.CreateDefaultBuilder().UseWolverine(opts =>
{
opts.UseRoutingSlip(retryPolicy)
.PublishMessage<ExecutionContext>().ToPort(firstPort);
}).StartAsync();

var firstHost = await Host.CreateDefaultBuilder().UseWolverine(opts =>
{
opts.UseRoutingSlip(retryPolicy)
.ListenAtPort(firstPort);
}).StartAsync();

var secondHost = await Host.CreateDefaultBuilder().UseWolverine(opts =>
{
opts.UseRoutingSlip(retryPolicy)
.ListenAtPort(secondPort);
}).StartAsync();

var thirdHost = await Host.CreateDefaultBuilder().UseWolverine(opts =>
{
opts.UseRoutingSlip(retryPolicy)
.ListenAtPort(thirdPort);
}).StartAsync();

var hosts = new[] { pubHost, firstHost, secondHost, thirdHost };

var builder = new RoutingSlipBuilder();
builder.AddActivity("activity1", new Uri($"tcp://localhost:{firstPort}"));
builder.AddActivity("retryActivity2", new Uri($"tcp://localhost:{secondPort}"));
builder.AddActivity("activity3", new Uri($"tcp://localhost:{thirdPort}"));

try
{
var session = await pubHost.TrackActivity()
.IncludeExternalTransports()
.DoNotAssertOnExceptionsDetected()
.AlsoTrack(firstHost)
.AlsoTrack(secondHost)
.AlsoTrack(thirdHost)
.ExecuteAndWaitAsync(ctx => ctx.ExecuteRoutingSlip(builder.Build()));

var trackingNumber = session.Received.MessagesOf<ExecutionContext>()
.Select(x => x.RoutingSlip.TrackingNumber).Distinct().Single();

var executions = ActivityTracker.GetExecutions(trackingNumber);
executions.Count(x => x.ActivityName == "retryActivity2").ShouldBe(2);
executions.Where(x => x.ActivityName == "retryActivity2")
.Select(x => x.Attempt).ShouldBe([1, 2]);

ActivityTracker.GetCompensations(trackingNumber).ShouldBeEmpty();
}
finally
{
foreach (var host in hosts)
{
await host.StopAsync();
host.Dispose();
}
}
}

#region Test setup

public async Task InitializeAsync()
{
_pubHost = await Host.CreateDefaultBuilder().UseWolverine( opts =>
{
opts.UseRoutingSlip()
.PublishMessage<ExecutionContext>().ToPort(_firstHostPort);
}).StartAsync();

_firstHost = await Host.CreateDefaultBuilder().UseWolverine(opts =>
{
opts.UseRoutingSlip()
.ListenAtPort(_firstHostPort);
}).StartAsync();

_secondHost = await Host.CreateDefaultBuilder().UseWolverine(opts =>
{
opts.UseRoutingSlip()
.ListenAtPort(_secondHostPort);
}).StartAsync();

_thirdHost = await Host.CreateDefaultBuilder().UseWolverine(opts =>
{
opts.UseRoutingSlip()
.ListenAtPort(_thirdHostPort);
}).StartAsync();
}

public async Task DisposeAsync()
{
await _pubHost.StopAsync();
await _firstHost.StopAsync();
await _secondHost.StopAsync();
await _thirdHost.StopAsync();
}

private static void ResetActivityState()
{
ActivityTracker.Reset();
ActivityHandlerBehavior.Reset();
}

#endregion
}

public sealed class ActivityHandler(ILogger<ActivityHandler> logger) : IExecutionActivity, ICompensationActivity
{
public ValueTask HandleAsync(ExecutionContext context, CancellationToken ct)
{
if (context.CurrentActivity is { } activity)
{
ActivityTracker.RecordExecution(context.RoutingSlip.TrackingNumber, activity.Name, activity.DestinationUri);
}

logger.LogInformation("ExecutionContext on {Host} Tracking={Tracking}",
Environment.MachineName, context.RoutingSlip.TrackingNumber);

if (context.CurrentActivity?.Name == "errorActivity3" ||
ActivityHandlerBehavior.ShouldFail(context.CurrentActivity?.Name))
{
throw new Exception("Something went wrong");
}

return ValueTask.CompletedTask;
}

public ValueTask HandleAsync(CompensationContext context, CancellationToken ct)
{
ActivityTracker.RecordCompensation(context.RoutingSlip.TrackingNumber,
context.CurrentLog.ExecutionName, context.CurrentLog.DestinationUri);

logger.LogInformation("CompensationContext on {Host} racking={Tracking} " +
"ExecutionId={ExecutionId} ExecutionId={ExecutionName}",
Environment.MachineName, context.RoutingSlip.TrackingNumber,
context.ExecutionId, context.CurrentLog.ExecutionName);
return ValueTask.CompletedTask;
}
}

internal static class ActivityHandlerBehavior
{
private static readonly ConcurrentDictionary<string, int> Failures =
new(StringComparer.OrdinalIgnoreCase);

public static void FailNextExecution(string activityName, int attempts = 1)
{
Failures[activityName] = attempts;
}

public static bool ShouldFail(string? activityName)
{
if (string.IsNullOrEmpty(activityName))
{
return false;
}

while (true)
{
if (!Failures.TryGetValue(activityName, out var remaining) || remaining == 0)
{
return false;
}

if (Failures.TryUpdate(activityName, remaining - 1, remaining))
{
return true;
}
}
}

public static void Reset() => Failures.Clear();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
using Wolverine.Runtime;

namespace Wolverine.RoutingSlip.Abstractions;

/// <summary>
/// Executes the next activity in a routing slip
/// </summary>
public interface IActivityExecutor
{
/// <summary>
/// Execute the next activity in a routing slip
/// </summary>
/// <param name="context"></param>
/// <param name="slip"></param>
/// <param name="next"></param>
/// <returns></returns>
ValueTask ExecuteAsync(IMessageContext context, RoutingSlip slip, RoutingSlipExecution next);

/// <summary>
/// Compensate the next activity in a routing slip
/// </summary>
/// <param name="context"></param>
/// <param name="slip"></param>
/// <param name="next"></param>
/// <returns></returns>
ValueTask CompensateAsync(IMessageContext context, RoutingSlip slip, RoutingSlipExecutionLog next);
}
Loading