From ea0303bf8e90d966c3a7e73505997e68d5f7e740 Mon Sep 17 00:00:00 2001 From: Oskar Dudycz Date: Thu, 26 Feb 2026 09:19:37 +0100 Subject: [PATCH] Made Exercise with Multi-Stream Marten projection to be async Multi-Stream projections should be registered as async, especially if they raise side effects. I was lazy before, now this PR fixes it. It adds registration as async and properly starting the Async Daemon in tests and waiting for projection data to be processed. --- .../README.md | 15 +++++++ .../ProjectionsTests.cs | 8 ++++ .../README.md | 13 +++--- .../ProjectionsTests.cs | 44 ++++++++++++++++++- .../README.md | 17 +++++++ .../ProjectionsTests.cs | 10 ++++- .../README.md | 8 ++++ .../ProjectionsTests.cs | 14 +++--- .../README.md | 9 ++++ 9 files changed, 124 insertions(+), 14 deletions(-) diff --git a/Workshops/IntroductionToEventSourcing/16-Projections.MultiStream.OutOfOrder/README.md b/Workshops/IntroductionToEventSourcing/16-Projections.MultiStream.OutOfOrder/README.md index 892dfb64..ad28b5be 100644 --- a/Workshops/IntroductionToEventSourcing/16-Projections.MultiStream.OutOfOrder/README.md +++ b/Workshops/IntroductionToEventSourcing/16-Projections.MultiStream.OutOfOrder/README.md @@ -6,6 +6,20 @@ Fix the projection from Exercise 15 to handle out-of-order events. Learn how to build resilient projections that work even when events arrive in any order. +## Scenario + +Events arrive from three different streams (payment, merchant, and fraud check), but they all reference the same `PaymentId`. Your projection must: + +1. Collect data from all three event types +2. Store them in a single `PaymentVerification` read model +3. Derive the payment verification status when all data is present + +Decision logic: +- Reject if merchant failed +- Reject if fraud score > 0.75 +- Reject if amount > 10000 AND fraud score > 0.5 +- Otherwise approve +- ## Context Events can arrive out of order (e.g., from different RabbitMQ queues or Kafka topics). The projection from Exercise 15 was built assuming ordered events — run the test to see it fail. @@ -17,3 +31,4 @@ For example, `FraudScoreCalculated` might fire before `PaymentRecorded`, meaning ## Decision Logic Only derive a final status when you have all three pieces of data (payment, merchant check, fraud check). Then apply the same rules as Exercise 15. + diff --git a/Workshops/IntroductionToEventSourcing/17-Projections.MultiStream.Marten/ProjectionsTests.cs b/Workshops/IntroductionToEventSourcing/17-Projections.MultiStream.Marten/ProjectionsTests.cs index 4963ac8d..58c15743 100644 --- a/Workshops/IntroductionToEventSourcing/17-Projections.MultiStream.Marten/ProjectionsTests.cs +++ b/Workshops/IntroductionToEventSourcing/17-Projections.MultiStream.Marten/ProjectionsTests.cs @@ -82,6 +82,11 @@ public async Task MultiStreamProjection_WithMarten_ShouldSucceed() // 2. Register the projection here using: options.Projections.Add(ProjectionLifecycle.Inline); }); + // Let's start Async Daemon to process async projections in the background + // Read more: https://martendb.io/events/projections/async-daemon.html#async-projections-daemon + using var daemon = await documentStore.BuildProjectionDaemonAsync(); + await daemon.StartAllAsync(); + await using var session = documentStore.LightweightSession(); // Payment 1: Approved — all checks pass @@ -110,6 +115,9 @@ public async Task MultiStreamProjection_WithMarten_ShouldSucceed() await session.SaveChangesAsync(); + // Wait until Async Daemon processes all events + await daemon.WaitForNonStaleData(TimeSpan.FromSeconds(5)); + // Assert Payment 1: Approved var payment1 = await session.LoadAsync(payment1Id); payment1.Should().NotBeNull(); diff --git a/Workshops/IntroductionToEventSourcing/17-Projections.MultiStream.Marten/README.md b/Workshops/IntroductionToEventSourcing/17-Projections.MultiStream.Marten/README.md index 2848953e..17307ea1 100644 --- a/Workshops/IntroductionToEventSourcing/17-Projections.MultiStream.Marten/README.md +++ b/Workshops/IntroductionToEventSourcing/17-Projections.MultiStream.Marten/README.md @@ -14,16 +14,19 @@ Events arrive from three different streams (payment, merchant, and fraud check), 2. Store them in a single `PaymentVerification` read model 3. Derive the payment verification status when all data is present +Decision logic: +- Reject if merchant failed +- Reject if fraud score > 0.75 +- Reject if amount > 10000 AND fraud score > 0.5 +- Otherwise approve + ## Steps 1. Create a `PaymentVerificationProjection` class with `Handle` methods for each event type 2. Register your handlers using document store options `options.Projections.Add` as inline. -3. Implement decision logic in the `FraudScoreCalculated` handler (always last for completed payments): - - Reject if merchant failed - - Reject if fraud score > 0.75 - - Reject if amount > 10000 AND fraud score > 0.5 - - Otherwise approve +3. Implement decision logic in the `FraudScoreCalculated` handler (always last for completed payments) ## Reference - [Marten Multi-Stream Projections](https://martendb.io/events/projections/multi-stream-projections.html) +- [Marten Async Daemon - Backround worker processing async projections](https://martendb.io/events/projections/async-daemon.html#async-projections-daemon) diff --git a/Workshops/IntroductionToEventSourcing/18-Projections.MultiStream.OutOfOrder.Marten/ProjectionsTests.cs b/Workshops/IntroductionToEventSourcing/18-Projections.MultiStream.OutOfOrder.Marten/ProjectionsTests.cs index 3c7727a8..14caf57b 100644 --- a/Workshops/IntroductionToEventSourcing/18-Projections.MultiStream.OutOfOrder.Marten/ProjectionsTests.cs +++ b/Workshops/IntroductionToEventSourcing/18-Projections.MultiStream.OutOfOrder.Marten/ProjectionsTests.cs @@ -59,6 +59,10 @@ public class PaymentVerification public PaymentStatus Status { get; set; } } + +// TODO: This projection was built assuming ordered events. Run the test — it fails. +// Events can arrive out of order (e.g. from different RabbitMQ queues or Kafka topics). +// Fix it to handle out-of-order events and derive the verification decision. public class PaymentVerificationProjection: MultiStreamProjection { public PaymentVerificationProjection() @@ -138,11 +142,15 @@ public async Task MultiStreamProjection_WithOutOfOrderEventsAndMarten_ShouldSucc // TODO: This projection was built assuming ordered events. Run the test — it fails. // Events can arrive out of order (e.g. from different RabbitMQ queues or Kafka topics). // Fix it to handle out-of-order events and derive the verification decision. - - options.Projections.Add(ProjectionLifecycle.Inline); + options.Projections.Add(ProjectionLifecycle.Async); options.Events.StreamIdentity = StreamIdentity.AsString; }); + // Let's start Async Daemon to process async projections in the background + // Read more: https://martendb.io/events/projections/async-daemon.html#async-projections-daemon + using var daemon = await documentStore.BuildProjectionDaemonAsync(); + await daemon.StartAllAsync(); + await using var session = documentStore.LightweightSession(); // Payment 1: Approved — FraudScore arrives first @@ -171,29 +179,61 @@ public async Task MultiStreamProjection_WithOutOfOrderEventsAndMarten_ShouldSucc await session.SaveChangesAsync(); + await daemon.WaitForNonStaleData(TimeSpan.FromSeconds(5)); + // Assert Payment 1: Approved var payment1 = await session.LoadAsync(payment1Id); payment1.Should().NotBeNull(); + payment1.Id.Should().Be(payment1Id); + payment1.OrderId.Should().Be(order1Id); + payment1.Amount.Should().Be(100m); + payment1.MerchantLimitStatus.Should().Be(VerificationStatus.Passed); + payment1.FraudStatus.Should().Be(VerificationStatus.Passed); + payment1.FraudScore.Should().Be(0.1m); payment1.Status.Should().Be(PaymentStatus.Approved); // Assert Payment 2: Rejected var payment2 = await session.LoadAsync(payment2Id); payment2.Should().NotBeNull(); + payment2.Id.Should().Be(payment2Id); + payment2.OrderId.Should().Be(order2Id); + payment2.Amount.Should().Be(5000m); + payment2.MerchantLimitStatus.Should().Be(VerificationStatus.Failed); + payment2.FraudStatus.Should().Be(VerificationStatus.Passed); + payment2.FraudScore.Should().Be(0.2m); payment2.Status.Should().Be(PaymentStatus.Rejected); // Assert Payment 3: Rejected var payment3 = await session.LoadAsync(payment3Id); payment3.Should().NotBeNull(); + payment3.Id.Should().Be(payment3Id); + payment3.OrderId.Should().Be(order3Id); + payment3.Amount.Should().Be(200m); + payment3.MerchantLimitStatus.Should().Be(VerificationStatus.Passed); + payment3.FraudStatus.Should().Be(VerificationStatus.Failed); + payment3.FraudScore.Should().Be(0.95m); payment3.Status.Should().Be(PaymentStatus.Rejected); // Assert Payment 4: Rejected var payment4 = await session.LoadAsync(payment4Id); payment4.Should().NotBeNull(); + payment4.Id.Should().Be(payment4Id); + payment4.OrderId.Should().Be(order4Id); + payment4.Amount.Should().Be(15000m); + payment4.MerchantLimitStatus.Should().Be(VerificationStatus.Passed); + payment4.FraudStatus.Should().Be(VerificationStatus.Passed); + payment4.FraudScore.Should().Be(0.6m); payment4.Status.Should().Be(PaymentStatus.Rejected); // Assert Payment 5: Pending var payment5 = await session.LoadAsync(payment5Id); payment5.Should().NotBeNull(); + payment5.Id.Should().Be(payment5Id); + payment5.OrderId.Should().Be(order5Id); + payment5.Amount.Should().Be(50m); + payment5.MerchantLimitStatus.Should().Be(VerificationStatus.Passed); + payment5.FraudStatus.Should().Be(VerificationStatus.Pending); + payment5.FraudScore.Should().Be(0m); payment5.Status.Should().Be(PaymentStatus.Pending); // Assert Payment 1: Verification is emitted diff --git a/Workshops/IntroductionToEventSourcing/18-Projections.MultiStream.OutOfOrder.Marten/README.md b/Workshops/IntroductionToEventSourcing/18-Projections.MultiStream.OutOfOrder.Marten/README.md index da3dce0c..fb37e9b1 100644 --- a/Workshops/IntroductionToEventSourcing/18-Projections.MultiStream.OutOfOrder.Marten/README.md +++ b/Workshops/IntroductionToEventSourcing/18-Projections.MultiStream.OutOfOrder.Marten/README.md @@ -6,6 +6,20 @@ Fix the Marten projection from Exercise 17 to handle out-of-order events. Learn how to build resilient Marten projections that work even when events arrive in any order. +## Scenario + +Events arrive from three different streams (payment, merchant, and fraud check), but they all reference the same `PaymentId`. Your projection must: + +1. Collect data from all three event types +2. Store them in a single `PaymentVerification` read model +3. Derive the payment verification status when all data is present + +Decision logic: +- Reject if merchant failed +- Reject if fraud score > 0.75 +- Reject if amount > 10000 AND fraud score > 0.5 +- Otherwise approve + ## Context Same out-of-order context as Exercise 16: events can arrive in any order (e.g., from different RabbitMQ queues or Kafka topics). The projection from Exercise 17 assumes ordered events — run the test to see it fail. @@ -15,3 +29,6 @@ Same out-of-order context as Exercise 16: events can arrive in any order (e.g., ## Reference - [Marten Multi-Stream Projections](https://martendb.io/events/projections/multi-stream-projections.html) +- [Marten Async Daemon - Backround worker processing async projections](https://martendb.io/events/projections/async-daemon.html#async-projections-daemon) +- [`RaiseSideEffects` method from Marten projections](https://martendb.io/events/projections/side-effects.html#side-effects) + diff --git a/Workshops/IntroductionToEventSourcing/Solved/17-Projections.MultiStream.Marten/ProjectionsTests.cs b/Workshops/IntroductionToEventSourcing/Solved/17-Projections.MultiStream.Marten/ProjectionsTests.cs index ef82e0ce..1bbcf1e8 100644 --- a/Workshops/IntroductionToEventSourcing/Solved/17-Projections.MultiStream.Marten/ProjectionsTests.cs +++ b/Workshops/IntroductionToEventSourcing/Solved/17-Projections.MultiStream.Marten/ProjectionsTests.cs @@ -129,9 +129,14 @@ public async Task MultiStreamProjection_WithMarten_ShouldSucceed() options.DatabaseSchemaName = options.Events.DatabaseSchemaName = "Exercise17MultiStreamMarten"; options.AutoCreateSchemaObjects = AutoCreate.All; - options.Projections.Add(ProjectionLifecycle.Inline); + options.Projections.Add(ProjectionLifecycle.Async); }); + // Let's start Async Daemon to process async projections in the background + // Read more: https://martendb.io/events/projections/async-daemon.html#async-projections-daemon + using var daemon = await documentStore.BuildProjectionDaemonAsync(); + await daemon.StartAllAsync(); + await using var session = documentStore.LightweightSession(); // Payment 1: Approved — all checks pass @@ -160,6 +165,9 @@ public async Task MultiStreamProjection_WithMarten_ShouldSucceed() await session.SaveChangesAsync(); + // Wait until Async Daemon processes all events + await daemon.WaitForNonStaleData(TimeSpan.FromSeconds(5)); + // Assert Payment 1: Approved var payment1 = await session.LoadAsync(payment1Id); payment1.Should().NotBeNull(); diff --git a/Workshops/IntroductionToEventSourcing/Solved/17-Projections.MultiStream.Marten/README.md b/Workshops/IntroductionToEventSourcing/Solved/17-Projections.MultiStream.Marten/README.md index 8d023ef8..232bc97b 100644 --- a/Workshops/IntroductionToEventSourcing/Solved/17-Projections.MultiStream.Marten/README.md +++ b/Workshops/IntroductionToEventSourcing/Solved/17-Projections.MultiStream.Marten/README.md @@ -14,6 +14,13 @@ Learn how to use Marten's built-in multi-stream projection support to simplify c 4. Put decision logic in the `FraudScoreCalculated` Apply method (same rules as Exercise 15) 5. Register the projection using `options.Projections.Add(ProjectionLifecycle.Inline)` +Decision logic: +- Reject if merchant failed +- Reject if fraud score > 0.75 +- Reject if amount > 10000 AND fraud score > 0.5 +- Otherwise approve + + ## Key Differences from Exercise 15 Instead of manually handling event routing and database operations, Marten: @@ -24,3 +31,4 @@ Instead of manually handling event routing and database operations, Marten: ## Reference - [Marten Multi-Stream Projections](https://martendb.io/events/projections/multi-stream-projections.html) +- [Marten Async Daemon - Backround worker processing async projections](https://martendb.io/events/projections/async-daemon.html#async-projections-daemon) diff --git a/Workshops/IntroductionToEventSourcing/Solved/18-Projections.MultiStream.OutOfOrder.Marten/ProjectionsTests.cs b/Workshops/IntroductionToEventSourcing/Solved/18-Projections.MultiStream.OutOfOrder.Marten/ProjectionsTests.cs index 3e45c034..1bd78a6f 100644 --- a/Workshops/IntroductionToEventSourcing/Solved/18-Projections.MultiStream.OutOfOrder.Marten/ProjectionsTests.cs +++ b/Workshops/IntroductionToEventSourcing/Solved/18-Projections.MultiStream.OutOfOrder.Marten/ProjectionsTests.cs @@ -156,7 +156,6 @@ public class ProjectionsTests "PORT = 5432; HOST = localhost; TIMEOUT = 15; POOLING = True; DATABASE = 'postgres'; PASSWORD = 'Password12!'; USER ID = 'postgres'"; [Fact] - [Trait("Category", "SkipCI")] public async Task MultiStreamProjection_WithOutOfOrderEventsAndMarten_ShouldSucceed() { var payment1Id = $"payment:{Guid.CreateVersion7()}"; @@ -185,14 +184,15 @@ public async Task MultiStreamProjection_WithOutOfOrderEventsAndMarten_ShouldSucc options.DatabaseSchemaName = options.Events.DatabaseSchemaName = "Exercise18MultiStreamOutOfOrderMarten"; options.AutoCreateSchemaObjects = AutoCreate.All; - // TODO: This projection was built assuming ordered events. Run the test — it fails. - // Events can arrive out of order (e.g. from different RabbitMQ queues or Kafka topics). - // Fix it to handle out-of-order events and derive the verification decision. - - options.Projections.Add(ProjectionLifecycle.Inline); + options.Projections.Add(ProjectionLifecycle.Async); options.Events.StreamIdentity = StreamIdentity.AsString; }); + // Let's start Async Daemon to process async projections in the background + // Read more: https://martendb.io/events/projections/async-daemon.html#async-projections-daemon + using var daemon = await documentStore.BuildProjectionDaemonAsync(); + await daemon.StartAllAsync(); + await using var session = documentStore.LightweightSession(); // Payment 1: Approved — FraudScore arrives first @@ -221,6 +221,8 @@ public async Task MultiStreamProjection_WithOutOfOrderEventsAndMarten_ShouldSucc await session.SaveChangesAsync(); + await daemon.WaitForNonStaleData(TimeSpan.FromSeconds(5)); + // Assert Payment 1: Approved var payment1 = await session.LoadAsync(payment1Id); payment1.Should().NotBeNull(); diff --git a/Workshops/IntroductionToEventSourcing/Solved/18-Projections.MultiStream.OutOfOrder.Marten/README.md b/Workshops/IntroductionToEventSourcing/Solved/18-Projections.MultiStream.OutOfOrder.Marten/README.md index 3446828a..abac7800 100644 --- a/Workshops/IntroductionToEventSourcing/Solved/18-Projections.MultiStream.OutOfOrder.Marten/README.md +++ b/Workshops/IntroductionToEventSourcing/Solved/18-Projections.MultiStream.OutOfOrder.Marten/README.md @@ -18,6 +18,13 @@ Same out-of-order context as Exercise 16: events can arrive in any order (e.g., 4. **Track data quality**: Add a `DataQuality` enum and field to track completeness 5. **Use RaiseSideEffects**: Override `RaiseSideEffects` to publish `PaymentVerificationCompleted` using `slice.AppendEvent()` when a decision is made +Decision logic: +- Reject if merchant failed +- Reject if fraud score > 0.75 +- Reject if amount > 10000 AND fraud score > 0.5 +- Otherwise approve + + ## RaiseSideEffects Pattern ```csharp @@ -39,3 +46,5 @@ public override ValueTask RaiseSideEffects( - [Dealing with Race Conditions in Event-Driven Architecture](https://www.architecture-weekly.com/p/dealing-with-race-conditions-in-event) - [Marten Multi-Stream Projections](https://martendb.io/events/projections/multi-stream-projections.html) +- [Marten Async Daemon - Backround worker processing async projections](https://martendb.io/events/projections/async-daemon.html#async-projections-daemon) +