Skip to content

Commit c7efd97

Browse files
committed
UpdatedAggregate<T> for usage with handlers that touch multiple event streams of different aggregate types. Closes GH-2011
1 parent 1bb86ac commit c7efd97

File tree

5 files changed

+200
-4
lines changed

5 files changed

+200
-4
lines changed

docs/guide/http/marten.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,17 @@ public static (UpdatedAggregate, Events) ConfirmDifferent(ConfirmOrder command,
344344
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Http/WolverineWebApi/Marten/Orders.cs#L298-L312' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_returning_updated_aggregate_as_response_from_http_endpoint' title='Start of snippet'>anchor</a></sup>
345345
<!-- endSnippet -->
346346

347+
If you should happen to have a message handler or HTTP endpoint signature that uses multiple event streams,
348+
but you want the `UpdatedAggregate` to **only** apply to one of the streams, you can use the `UpdatedAggregate<T>`
349+
to tip off Wolverine about that like in this sample:
350+
351+
snippet: sample_MakePurchaseHandler
352+
353+
::: info
354+
Wolverine can't (yet) handle a signature with multiple event streams of the same aggregate type and
355+
`UpdatedAggregate`.
356+
:::
357+
347358
## Reading the Latest Version of an Aggregate
348359

349360
::: info
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
using IntegrationTests;
2+
using Marten;
3+
using Marten.Events;
4+
using Marten.Events.Projections;
5+
using Microsoft.Extensions.Hosting;
6+
using Shouldly;
7+
using Wolverine;
8+
using Wolverine.Marten;
9+
using Wolverine.Tracking;
10+
11+
namespace MartenTests.AggregateHandlerWorkflow;
12+
13+
public class mixed_aggregate_handler_with_multiple_streams
14+
{
15+
[Fact]
16+
public async Task get_the_correct_aggregate_back_out()
17+
{
18+
using var host = await Host.CreateDefaultBuilder()
19+
.UseWolverine(opts =>
20+
{
21+
MartenServiceCollectionExtensions.AddMarten(opts.Services, m =>
22+
{
23+
m.Connection(Servers.PostgresConnectionString);
24+
m.DatabaseSchemaName = "accounts";
25+
26+
m.Projections.Snapshot<XAccount>(SnapshotLifecycle.Inline);
27+
m.Projections.Snapshot<Inventory>(SnapshotLifecycle.Inline);
28+
}).IntegrateWithWolverine();
29+
}).StartAsync();
30+
31+
using var session = host.DocumentStore().LightweightSession();
32+
var inventoryId = session.Events.StartStream<Inventory>(new InventoryStarted("XFX", 100, 10)).Id;
33+
var accountId = session.Events.StartStream<XAccount>(new XAccountOpened(2000)).Id;
34+
await session.SaveChangesAsync();
35+
36+
var (tracked, account) = await host.InvokeMessageAndWaitAsync<XAccount>(new MakePurchase(accountId, inventoryId, 30));
37+
account.Balance.ShouldBe(1700);
38+
39+
}
40+
}
41+
42+
public record XAccountOpened(double Balance);
43+
44+
public record ItemPurchased(Guid InventoryId, int Number, double UnitPrice);
45+
46+
public class XAccount
47+
{
48+
public Guid Id { get; set; }
49+
public double Balance { get; set; }
50+
51+
public XAccount()
52+
{
53+
}
54+
55+
public static XAccount Create(XAccountOpened opened) => new XAccount { Balance = opened.Balance };
56+
57+
public void Apply(ItemPurchased purchased)
58+
{
59+
Balance -= (purchased.Number * purchased.UnitPrice);
60+
}
61+
}
62+
63+
public record InventoryStarted(string Name, int Quantity, double UnitPrice);
64+
65+
public record Drawdown(int Quantity);
66+
67+
public class Inventory
68+
{
69+
public Guid Id { get; set; }
70+
public string Name { get; set; }
71+
public int Quantity { get; set; }
72+
public double UnitPrice { get; set; }
73+
74+
public static Inventory Create(InventoryStarted started) => new Inventory
75+
{
76+
Name = started.Name,
77+
Quantity = started.Quantity,
78+
UnitPrice = started.UnitPrice
79+
};
80+
81+
public void Apply(Drawdown down) => Quantity -= down.Quantity;
82+
}
83+
84+
public record MakePurchase(Guid XAccountId, Guid InventoryId, int Number);
85+
86+
#region sample_MakePurchaseHandler
87+
88+
public static class MakePurchaseHandler
89+
{
90+
// See how we used the generic version
91+
// of UpdatedAggregate to tell Wolverine we
92+
// want *only* the XAccount as the response
93+
// from this handler
94+
public static UpdatedAggregate<XAccount> Handle(
95+
MakePurchase command,
96+
97+
[WriteAggregate] IEventStream<XAccount> account,
98+
99+
[WriteAggregate] IEventStream<Inventory> inventory)
100+
{
101+
if (command.Number > inventory.Aggregate.Quantity ||
102+
(command.Number * inventory.Aggregate.UnitPrice) > account.Aggregate.Balance)
103+
{
104+
// Do Nothing!
105+
return new UpdatedAggregate<XAccount>();
106+
}
107+
108+
account.AppendOne(new ItemPurchased(command.InventoryId, command.Number, inventory.Aggregate.UnitPrice));
109+
inventory.AppendOne(new Drawdown(command.Number));
110+
111+
return new UpdatedAggregate<XAccount>();
112+
}
113+
}
114+
115+
#endregion

src/Persistence/Wolverine.Marten/AggregateHandling.cs

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,23 @@ public Variable Apply(IChain chain, IServiceContainer container)
7171

7272
public void Store(IChain chain)
7373
{
74-
chain.Tags[nameof(AggregateHandling)] = this;
74+
if (chain.Tags.TryGetValue(nameof(AggregateHandling), out var raw))
75+
{
76+
if (raw is AggregateHandling handling)
77+
{
78+
if (ReferenceEquals(handling, this)) return;
79+
80+
chain.Tags[nameof(AggregateHandling)] = new List<AggregateHandling> { handling, this };
81+
}
82+
else if (raw is List<AggregateHandling> list)
83+
{
84+
list.Add(this);
85+
}
86+
}
87+
else
88+
{
89+
chain.Tags[nameof(AggregateHandling)] = this;
90+
}
7591
}
7692

7793
public static bool TryLoad(IChain chain, out AggregateHandling handling)
@@ -88,6 +104,27 @@ public static bool TryLoad(IChain chain, out AggregateHandling handling)
88104
handling = default;
89105
return false;
90106
}
107+
108+
public static bool TryLoad<T>(IChain chain, out AggregateHandling handling)
109+
{
110+
if (chain.Tags.TryGetValue(nameof(AggregateHandling), out var raw))
111+
{
112+
if (raw is AggregateHandling h && h.AggregateType == typeof(T))
113+
{
114+
handling = h;
115+
return true;
116+
}
117+
118+
if (raw is List<AggregateHandling> list)
119+
{
120+
handling = list.FirstOrDefault(x => x.AggregateType == typeof(T));
121+
return handling != null;
122+
}
123+
}
124+
125+
handling = default;
126+
return false;
127+
}
91128

92129
internal static (MemberInfo, MemberInfo?) DetermineAggregateIdAndVersion(Type aggregateType, Type commandType,
93130
IServiceContainer container)

src/Persistence/Wolverine.Marten/UpdatedAggregate.cs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,36 @@ public static void ConfigureResponse(IChain chain)
3838
}
3939
}
4040

41+
/// <summary>
42+
/// Use this as a response from a message handler
43+
/// or HTTP endpoint using the aggregate handler workflow
44+
/// to response with the updated version of the aggregate being
45+
/// altered *after* any new events have been applied
46+
/// </summary>
47+
/// <typeparam name="T">The aggregate type. Use this version of UpdatedAggregate if you need to help Wolverine "know" which of multiple event streams should be the "updated aggregate"</typeparam>
48+
public class UpdatedAggregate<T> : IResponseAware
49+
{
50+
public static void ConfigureResponse(IChain chain)
51+
{
52+
if (AggregateHandling.TryLoad<T>(chain, out var handling))
53+
{
54+
var idType = handling.AggregateId.VariableType;
55+
56+
// TODO -- with https://github.com/JasperFx/wolverine/issues/1167, this might need to try to create value
57+
// type first
58+
var openType = idType == typeof(Guid) ? typeof(FetchLatestByGuid<>) : typeof(FetchLatestByString<>);
59+
var frame = openType.CloseAndBuildAs<MethodCall>(handling.AggregateId, handling.AggregateType);
60+
61+
chain.UseForResponse(frame);
62+
}
63+
else
64+
{
65+
throw new InvalidOperationException($"UpdatedAggregate cannot be used because Chain {chain} is not marked as an aggregate handler. Are you missing an [AggregateHandler] or [Aggregate] attribute on the handler?");
66+
}
67+
68+
}
69+
}
70+
4171
internal class FetchLatestByGuid<T> : MethodCall where T : class
4272
{
4373
public FetchLatestByGuid(Variable id) : base(typeof(IEventStoreOperations), ReflectionHelper.GetMethod<IEventStoreOperations>(x => x.FetchLatest<T>(Guid.Empty, CancellationToken.None)))

src/Wolverine/Runtime/Handlers/HandlerChain.cs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -528,18 +528,21 @@ protected void applyCustomizations(GenerationRules rules, IServiceContainer cont
528528
if (!_hasConfiguredFrames)
529529
{
530530
_hasConfiguredFrames = true;
531-
532-
applyAttributesAndConfigureMethods(rules, container);
533-
531+
534532
foreach (var attribute in MessageType
535533
.GetCustomAttributes(typeof(ModifyHandlerChainAttribute))
536534
.OfType<ModifyHandlerChainAttribute>()) attribute.Modify(this, rules);
537535

538536
foreach (var attribute in MessageType.GetCustomAttributes(typeof(ModifyChainAttribute))
539537
.OfType<ModifyChainAttribute>()) attribute.Modify(this, rules, container);
540538

539+
// THIS has to go before the baseline attributes and configure
541540
foreach (var handlerCall in HandlerCalls())
542541
WolverineParameterAttribute.TryApply(handlerCall, container, rules, this);
542+
543+
applyAttributesAndConfigureMethods(rules, container);
544+
545+
543546
}
544547

545548
ApplyImpliedMiddlewareFromHandlers(rules);

0 commit comments

Comments
 (0)