Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions docs/guide/http/marten.md
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,17 @@ public static (UpdatedAggregate, Events) ConfirmDifferent(ConfirmOrder command,
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Http/WolverineWebApi/Marten/Orders.cs#L298-L312' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_returning_updated_aggregate_as_response_from_http_endpoint' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

If you should happen to have a message handler or HTTP endpoint signature that uses multiple event streams,
but you want the `UpdatedAggregate` to **only** apply to one of the streams, you can use the `UpdatedAggregate<T>`
to tip off Wolverine about that like in this sample:

snippet: sample_MakePurchaseHandler

::: info
Wolverine can't (yet) handle a signature with multiple event streams of the same aggregate type and
`UpdatedAggregate`.
:::

## Reading the Latest Version of an Aggregate

::: info
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
using IntegrationTests;
using Marten;
using Marten.Events;
using Marten.Events.Projections;
using Microsoft.Extensions.Hosting;
using Shouldly;
using Wolverine;
using Wolverine.Marten;
using Wolverine.Tracking;

namespace MartenTests.AggregateHandlerWorkflow;

public class mixed_aggregate_handler_with_multiple_streams
{
[Fact]
public async Task get_the_correct_aggregate_back_out()
{
using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
MartenServiceCollectionExtensions.AddMarten(opts.Services, m =>
{
m.Connection(Servers.PostgresConnectionString);
m.DatabaseSchemaName = "accounts";

m.Projections.Snapshot<XAccount>(SnapshotLifecycle.Inline);
m.Projections.Snapshot<Inventory>(SnapshotLifecycle.Inline);
}).IntegrateWithWolverine();
}).StartAsync();

using var session = host.DocumentStore().LightweightSession();
var inventoryId = session.Events.StartStream<Inventory>(new InventoryStarted("XFX", 100, 10)).Id;
var accountId = session.Events.StartStream<XAccount>(new XAccountOpened(2000)).Id;
await session.SaveChangesAsync();

var (tracked, account) = await host.InvokeMessageAndWaitAsync<XAccount>(new MakePurchase(accountId, inventoryId, 30));
account.Balance.ShouldBe(1700);

}
}

public record XAccountOpened(double Balance);

public record ItemPurchased(Guid InventoryId, int Number, double UnitPrice);

public class XAccount
{
public Guid Id { get; set; }
public double Balance { get; set; }

public XAccount()
{
}

public static XAccount Create(XAccountOpened opened) => new XAccount { Balance = opened.Balance };

public void Apply(ItemPurchased purchased)
{
Balance -= (purchased.Number * purchased.UnitPrice);
}
}

public record InventoryStarted(string Name, int Quantity, double UnitPrice);

public record Drawdown(int Quantity);

public class Inventory
{
public Guid Id { get; set; }
public string Name { get; set; }
public int Quantity { get; set; }
public double UnitPrice { get; set; }

public static Inventory Create(InventoryStarted started) => new Inventory
{
Name = started.Name,
Quantity = started.Quantity,
UnitPrice = started.UnitPrice
};

public void Apply(Drawdown down) => Quantity -= down.Quantity;
}

public record MakePurchase(Guid XAccountId, Guid InventoryId, int Number);

#region sample_MakePurchaseHandler

public static class MakePurchaseHandler
{
// See how we used the generic version
// of UpdatedAggregate to tell Wolverine we
// want *only* the XAccount as the response
// from this handler
public static UpdatedAggregate<XAccount> Handle(
MakePurchase command,

[WriteAggregate] IEventStream<XAccount> account,

[WriteAggregate] IEventStream<Inventory> inventory)
{
if (command.Number > inventory.Aggregate.Quantity ||
(command.Number * inventory.Aggregate.UnitPrice) > account.Aggregate.Balance)
{
// Do Nothing!
return new UpdatedAggregate<XAccount>();
}

account.AppendOne(new ItemPurchased(command.InventoryId, command.Number, inventory.Aggregate.UnitPrice));
inventory.AppendOne(new Drawdown(command.Number));

return new UpdatedAggregate<XAccount>();
}
}

#endregion
39 changes: 38 additions & 1 deletion src/Persistence/Wolverine.Marten/AggregateHandling.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,23 @@ public Variable Apply(IChain chain, IServiceContainer container)

public void Store(IChain chain)
{
chain.Tags[nameof(AggregateHandling)] = this;
if (chain.Tags.TryGetValue(nameof(AggregateHandling), out var raw))
{
if (raw is AggregateHandling handling)
{
if (ReferenceEquals(handling, this)) return;

chain.Tags[nameof(AggregateHandling)] = new List<AggregateHandling> { handling, this };
}
else if (raw is List<AggregateHandling> list)
{
list.Add(this);
}
}
else
{
chain.Tags[nameof(AggregateHandling)] = this;
}
}

public static bool TryLoad(IChain chain, out AggregateHandling handling)
Expand All @@ -88,6 +104,27 @@ public static bool TryLoad(IChain chain, out AggregateHandling handling)
handling = default;
return false;
}

public static bool TryLoad<T>(IChain chain, out AggregateHandling handling)
{
if (chain.Tags.TryGetValue(nameof(AggregateHandling), out var raw))
{
if (raw is AggregateHandling h && h.AggregateType == typeof(T))
{
handling = h;
return true;
}

if (raw is List<AggregateHandling> list)
{
handling = list.FirstOrDefault(x => x.AggregateType == typeof(T));
return handling != null;
}
}

handling = default;
return false;
}

internal static (MemberInfo, MemberInfo?) DetermineAggregateIdAndVersion(Type aggregateType, Type commandType,
IServiceContainer container)
Expand Down
30 changes: 30 additions & 0 deletions src/Persistence/Wolverine.Marten/UpdatedAggregate.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,36 @@ public static void ConfigureResponse(IChain chain)
}
}

/// <summary>
/// Use this as a response from a message handler
/// or HTTP endpoint using the aggregate handler workflow
/// to response with the updated version of the aggregate being
/// altered *after* any new events have been applied
/// </summary>
/// <typeparam name="T">The aggregate type. Use this version of UpdatedAggregate if you need to help Wolverine "know" which of multiple event streams should be the "updated aggregate"</typeparam>
public class UpdatedAggregate<T> : IResponseAware
{
public static void ConfigureResponse(IChain chain)
{
if (AggregateHandling.TryLoad<T>(chain, out var handling))
{
var idType = handling.AggregateId.VariableType;

// TODO -- with https://github.com/JasperFx/wolverine/issues/1167, this might need to try to create value
// type first
var openType = idType == typeof(Guid) ? typeof(FetchLatestByGuid<>) : typeof(FetchLatestByString<>);
var frame = openType.CloseAndBuildAs<MethodCall>(handling.AggregateId, handling.AggregateType);

chain.UseForResponse(frame);
}
else
{
throw new InvalidOperationException($"UpdatedAggregate cannot be used because Chain {chain} is not marked as an aggregate handler. Are you missing an [AggregateHandler] or [Aggregate] attribute on the handler?");
}

}
}

internal class FetchLatestByGuid<T> : MethodCall where T : class
{
public FetchLatestByGuid(Variable id) : base(typeof(IEventStoreOperations), ReflectionHelper.GetMethod<IEventStoreOperations>(x => x.FetchLatest<T>(Guid.Empty, CancellationToken.None)))
Expand Down
46 changes: 46 additions & 0 deletions src/Testing/CoreTests/Bugs/Bug_2004_separated_handler_stuff.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
using System.Diagnostics;
using JasperFx.CodeGeneration;
using Microsoft.Extensions.Hosting;
using Wolverine.Tracking;
using Xunit;

namespace CoreTests.Bugs;

public class Bug_2004_separated_handler_stuff
{
[Fact]
public async Task multiple_handler_file_overwrite()
{
using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.CodeGeneration.TypeLoadMode = TypeLoadMode.Auto;
opts.MultipleHandlerBehavior = MultipleHandlerBehavior.Separated;
})
.StartAsync();

await host.SendMessageAndWaitAsync(new SayStuff0());
}
}

public record SayStuff0();

public record SayStuff1(string Text);
public record SayStuff2(string Text);

public class BSayStuffHandler
{

public (SayStuff1, SayStuff2) Handle(SayStuff0 _)
{
return (new SayStuff1("Hello"), new SayStuff2("World"));
}
public void Handle(SayStuff1 stuff) => Debug.WriteLine(stuff.Text);
public void Handle(SayStuff2 stuff) => Debug.WriteLine(stuff.Text);
}

public class ASayStuffHandler
{
public void Handle(SayStuff1 stuff) => Debug.WriteLine(stuff.Text);
public void Handle(SayStuff2 stuff) => Debug.WriteLine(stuff.Text);
}
11 changes: 7 additions & 4 deletions src/Wolverine/Runtime/Handlers/HandlerChain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ public override void ApplyParameterMatching(MethodCall call)
/// <summary>
/// Wolverine's string identification for this message type
/// </summary>
public string TypeName { get; }
public string TypeName { get; internal set; }

internal MessageHandler? Handler { get; private set; }

Expand Down Expand Up @@ -528,18 +528,21 @@ protected void applyCustomizations(GenerationRules rules, IServiceContainer cont
if (!_hasConfiguredFrames)
{
_hasConfiguredFrames = true;

applyAttributesAndConfigureMethods(rules, container);


foreach (var attribute in MessageType
.GetCustomAttributes(typeof(ModifyHandlerChainAttribute))
.OfType<ModifyHandlerChainAttribute>()) attribute.Modify(this, rules);

foreach (var attribute in MessageType.GetCustomAttributes(typeof(ModifyChainAttribute))
.OfType<ModifyChainAttribute>()) attribute.Modify(this, rules, container);

// THIS has to go before the baseline attributes and configure
foreach (var handlerCall in HandlerCalls())
WolverineParameterAttribute.TryApply(handlerCall, container, rules, this);

applyAttributesAndConfigureMethods(rules, container);


}

ApplyImpliedMiddlewareFromHandlers(rules);
Expand Down
15 changes: 15 additions & 0 deletions src/Wolverine/Runtime/Handlers/HandlerGraph.cs
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,21 @@ IEnumerable<HandlerChain> explodeChains(HandlerChain chain)
}

var allChains = Chains.SelectMany(explodeChains).ToArray();

// This lovely thing was brought to you by https://github.com/JasperFx/wolverine/issues/2004
var duplicateTypeNames = allChains
.GroupBy(x => x.TypeName)
.Where(x => x.Count() > 1)
.ToArray();

foreach (var @group in duplicateTypeNames)
{
foreach (var chain in @group)
{
chain.TypeName =
$"{chain.MessageType.ToSuffixedTypeName("")}_{chain.HandlerCalls().First().HandlerType.ToSuffixedTypeName("Handler")}";
}
}

foreach (var policy in handlerPolicies(options)) policy.Apply(allChains, Rules, container);

Expand Down
Loading