Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,20 @@ Fix the projection from Exercise 15 to handle out-of-order events.

Learn how to build resilient projections that work even when events arrive in any order.

## Scenario

Events arrive from three different streams (payment, merchant, and fraud check), but they all reference the same `PaymentId`. Your projection must:

1. Collect data from all three event types
2. Store them in a single `PaymentVerification` read model
3. Derive the payment verification status when all data is present

Decision logic:
- Reject if merchant failed
- Reject if fraud score > 0.75
- Reject if amount > 10000 AND fraud score > 0.5
- Otherwise approve
-
## Context

Events can arrive out of order (e.g., from different RabbitMQ queues or Kafka topics). The projection from Exercise 15 was built assuming ordered events — run the test to see it fail.
Expand All @@ -17,3 +31,4 @@ For example, `FraudScoreCalculated` might fire before `PaymentRecorded`, meaning
## Decision Logic

Only derive a final status when you have all three pieces of data (payment, merchant check, fraud check). Then apply the same rules as Exercise 15.

Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ public async Task MultiStreamProjection_WithMarten_ShouldSucceed()
// 2. Register the projection here using: options.Projections.Add<PaymentVerificationProjection>(ProjectionLifecycle.Inline);
});

// Let's start Async Daemon to process async projections in the background
// Read more: https://martendb.io/events/projections/async-daemon.html#async-projections-daemon
using var daemon = await documentStore.BuildProjectionDaemonAsync();
await daemon.StartAllAsync();

await using var session = documentStore.LightweightSession();

// Payment 1: Approved — all checks pass
Expand Down Expand Up @@ -110,6 +115,9 @@ public async Task MultiStreamProjection_WithMarten_ShouldSucceed()

await session.SaveChangesAsync();

// Wait until Async Daemon processes all events
await daemon.WaitForNonStaleData(TimeSpan.FromSeconds(5));

// Assert Payment 1: Approved
var payment1 = await session.LoadAsync<PaymentVerification>(payment1Id);
payment1.Should().NotBeNull();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,19 @@ Events arrive from three different streams (payment, merchant, and fraud check),
2. Store them in a single `PaymentVerification` read model
3. Derive the payment verification status when all data is present

Decision logic:
- Reject if merchant failed
- Reject if fraud score > 0.75
- Reject if amount > 10000 AND fraud score > 0.5
- Otherwise approve

## Steps

1. Create a `PaymentVerificationProjection` class with `Handle` methods for each event type
2. Register your handlers using document store options `options.Projections.Add` as inline.
3. Implement decision logic in the `FraudScoreCalculated` handler (always last for completed payments):
- Reject if merchant failed
- Reject if fraud score > 0.75
- Reject if amount > 10000 AND fraud score > 0.5
- Otherwise approve
3. Implement decision logic in the `FraudScoreCalculated` handler (always last for completed payments)

## Reference

- [Marten Multi-Stream Projections](https://martendb.io/events/projections/multi-stream-projections.html)
- [Marten Async Daemon - Backround worker processing async projections](https://martendb.io/events/projections/async-daemon.html#async-projections-daemon)
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ public class PaymentVerification
public PaymentStatus Status { get; set; }
}


// TODO: This projection was built assuming ordered events. Run the test — it fails.
// Events can arrive out of order (e.g. from different RabbitMQ queues or Kafka topics).
// Fix it to handle out-of-order events and derive the verification decision.
public class PaymentVerificationProjection: MultiStreamProjection<PaymentVerification, string>
{
public PaymentVerificationProjection()
Expand Down Expand Up @@ -138,11 +142,15 @@ public async Task MultiStreamProjection_WithOutOfOrderEventsAndMarten_ShouldSucc
// TODO: This projection was built assuming ordered events. Run the test — it fails.
// Events can arrive out of order (e.g. from different RabbitMQ queues or Kafka topics).
// Fix it to handle out-of-order events and derive the verification decision.

options.Projections.Add<PaymentVerificationProjection>(ProjectionLifecycle.Inline);
options.Projections.Add<PaymentVerificationProjection>(ProjectionLifecycle.Async);
options.Events.StreamIdentity = StreamIdentity.AsString;
});

// Let's start Async Daemon to process async projections in the background
// Read more: https://martendb.io/events/projections/async-daemon.html#async-projections-daemon
using var daemon = await documentStore.BuildProjectionDaemonAsync();
await daemon.StartAllAsync();

await using var session = documentStore.LightweightSession();

// Payment 1: Approved — FraudScore arrives first
Expand Down Expand Up @@ -171,29 +179,61 @@ public async Task MultiStreamProjection_WithOutOfOrderEventsAndMarten_ShouldSucc

await session.SaveChangesAsync();

await daemon.WaitForNonStaleData(TimeSpan.FromSeconds(5));

// Assert Payment 1: Approved
var payment1 = await session.LoadAsync<PaymentVerification>(payment1Id);
payment1.Should().NotBeNull();
payment1.Id.Should().Be(payment1Id);
payment1.OrderId.Should().Be(order1Id);
payment1.Amount.Should().Be(100m);
payment1.MerchantLimitStatus.Should().Be(VerificationStatus.Passed);
payment1.FraudStatus.Should().Be(VerificationStatus.Passed);
payment1.FraudScore.Should().Be(0.1m);
payment1.Status.Should().Be(PaymentStatus.Approved);

// Assert Payment 2: Rejected
var payment2 = await session.LoadAsync<PaymentVerification>(payment2Id);
payment2.Should().NotBeNull();
payment2.Id.Should().Be(payment2Id);
payment2.OrderId.Should().Be(order2Id);
payment2.Amount.Should().Be(5000m);
payment2.MerchantLimitStatus.Should().Be(VerificationStatus.Failed);
payment2.FraudStatus.Should().Be(VerificationStatus.Passed);
payment2.FraudScore.Should().Be(0.2m);
payment2.Status.Should().Be(PaymentStatus.Rejected);

// Assert Payment 3: Rejected
var payment3 = await session.LoadAsync<PaymentVerification>(payment3Id);
payment3.Should().NotBeNull();
payment3.Id.Should().Be(payment3Id);
payment3.OrderId.Should().Be(order3Id);
payment3.Amount.Should().Be(200m);
payment3.MerchantLimitStatus.Should().Be(VerificationStatus.Passed);
payment3.FraudStatus.Should().Be(VerificationStatus.Failed);
payment3.FraudScore.Should().Be(0.95m);
payment3.Status.Should().Be(PaymentStatus.Rejected);

// Assert Payment 4: Rejected
var payment4 = await session.LoadAsync<PaymentVerification>(payment4Id);
payment4.Should().NotBeNull();
payment4.Id.Should().Be(payment4Id);
payment4.OrderId.Should().Be(order4Id);
payment4.Amount.Should().Be(15000m);
payment4.MerchantLimitStatus.Should().Be(VerificationStatus.Passed);
payment4.FraudStatus.Should().Be(VerificationStatus.Passed);
payment4.FraudScore.Should().Be(0.6m);
payment4.Status.Should().Be(PaymentStatus.Rejected);

// Assert Payment 5: Pending
var payment5 = await session.LoadAsync<PaymentVerification>(payment5Id);
payment5.Should().NotBeNull();
payment5.Id.Should().Be(payment5Id);
payment5.OrderId.Should().Be(order5Id);
payment5.Amount.Should().Be(50m);
payment5.MerchantLimitStatus.Should().Be(VerificationStatus.Passed);
payment5.FraudStatus.Should().Be(VerificationStatus.Pending);
payment5.FraudScore.Should().Be(0m);
payment5.Status.Should().Be(PaymentStatus.Pending);

// Assert Payment 1: Verification is emitted
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,20 @@ Fix the Marten projection from Exercise 17 to handle out-of-order events.

Learn how to build resilient Marten projections that work even when events arrive in any order.

## Scenario

Events arrive from three different streams (payment, merchant, and fraud check), but they all reference the same `PaymentId`. Your projection must:

1. Collect data from all three event types
2. Store them in a single `PaymentVerification` read model
3. Derive the payment verification status when all data is present

Decision logic:
- Reject if merchant failed
- Reject if fraud score > 0.75
- Reject if amount > 10000 AND fraud score > 0.5
- Otherwise approve

## Context

Same out-of-order context as Exercise 16: events can arrive in any order (e.g., from different RabbitMQ queues or Kafka topics). The projection from Exercise 17 assumes ordered events — run the test to see it fail.
Expand All @@ -15,3 +29,6 @@ Same out-of-order context as Exercise 16: events can arrive in any order (e.g.,
## Reference

- [Marten Multi-Stream Projections](https://martendb.io/events/projections/multi-stream-projections.html)
- [Marten Async Daemon - Backround worker processing async projections](https://martendb.io/events/projections/async-daemon.html#async-projections-daemon)
- [`RaiseSideEffects` method from Marten projections](https://martendb.io/events/projections/side-effects.html#side-effects)

Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,14 @@ public async Task MultiStreamProjection_WithMarten_ShouldSucceed()
options.DatabaseSchemaName = options.Events.DatabaseSchemaName = "Exercise17MultiStreamMarten";
options.AutoCreateSchemaObjects = AutoCreate.All;

options.Projections.Add<PaymentVerificationProjection>(ProjectionLifecycle.Inline);
options.Projections.Add<PaymentVerificationProjection>(ProjectionLifecycle.Async);
});

// Let's start Async Daemon to process async projections in the background
// Read more: https://martendb.io/events/projections/async-daemon.html#async-projections-daemon
using var daemon = await documentStore.BuildProjectionDaemonAsync();
await daemon.StartAllAsync();

await using var session = documentStore.LightweightSession();

// Payment 1: Approved — all checks pass
Expand Down Expand Up @@ -160,6 +165,9 @@ public async Task MultiStreamProjection_WithMarten_ShouldSucceed()

await session.SaveChangesAsync();

// Wait until Async Daemon processes all events
await daemon.WaitForNonStaleData(TimeSpan.FromSeconds(5));

// Assert Payment 1: Approved
var payment1 = await session.LoadAsync<PaymentVerification>(payment1Id);
payment1.Should().NotBeNull();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@ Learn how to use Marten's built-in multi-stream projection support to simplify c
4. Put decision logic in the `FraudScoreCalculated` Apply method (same rules as Exercise 15)
5. Register the projection using `options.Projections.Add<PaymentVerificationProjection>(ProjectionLifecycle.Inline)`

Decision logic:
- Reject if merchant failed
- Reject if fraud score > 0.75
- Reject if amount > 10000 AND fraud score > 0.5
- Otherwise approve


## Key Differences from Exercise 15

Instead of manually handling event routing and database operations, Marten:
Expand All @@ -24,3 +31,4 @@ Instead of manually handling event routing and database operations, Marten:
## Reference

- [Marten Multi-Stream Projections](https://martendb.io/events/projections/multi-stream-projections.html)
- [Marten Async Daemon - Backround worker processing async projections](https://martendb.io/events/projections/async-daemon.html#async-projections-daemon)
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@ public class ProjectionsTests
"PORT = 5432; HOST = localhost; TIMEOUT = 15; POOLING = True; DATABASE = 'postgres'; PASSWORD = 'Password12!'; USER ID = 'postgres'";

[Fact]
[Trait("Category", "SkipCI")]
public async Task MultiStreamProjection_WithOutOfOrderEventsAndMarten_ShouldSucceed()
{
var payment1Id = $"payment:{Guid.CreateVersion7()}";
Expand Down Expand Up @@ -185,14 +184,15 @@ public async Task MultiStreamProjection_WithOutOfOrderEventsAndMarten_ShouldSucc
options.DatabaseSchemaName = options.Events.DatabaseSchemaName = "Exercise18MultiStreamOutOfOrderMarten";
options.AutoCreateSchemaObjects = AutoCreate.All;

// TODO: This projection was built assuming ordered events. Run the test — it fails.
// Events can arrive out of order (e.g. from different RabbitMQ queues or Kafka topics).
// Fix it to handle out-of-order events and derive the verification decision.

options.Projections.Add<PaymentVerificationProjection>(ProjectionLifecycle.Inline);
options.Projections.Add<PaymentVerificationProjection>(ProjectionLifecycle.Async);
options.Events.StreamIdentity = StreamIdentity.AsString;
});

// Let's start Async Daemon to process async projections in the background
// Read more: https://martendb.io/events/projections/async-daemon.html#async-projections-daemon
using var daemon = await documentStore.BuildProjectionDaemonAsync();
await daemon.StartAllAsync();

await using var session = documentStore.LightweightSession();

// Payment 1: Approved — FraudScore arrives first
Expand Down Expand Up @@ -221,6 +221,8 @@ public async Task MultiStreamProjection_WithOutOfOrderEventsAndMarten_ShouldSucc

await session.SaveChangesAsync();

await daemon.WaitForNonStaleData(TimeSpan.FromSeconds(5));

// Assert Payment 1: Approved
var payment1 = await session.LoadAsync<PaymentVerification>(payment1Id);
payment1.Should().NotBeNull();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@ Same out-of-order context as Exercise 16: events can arrive in any order (e.g.,
4. **Track data quality**: Add a `DataQuality` enum and field to track completeness
5. **Use RaiseSideEffects**: Override `RaiseSideEffects` to publish `PaymentVerificationCompleted` using `slice.AppendEvent()` when a decision is made

Decision logic:
- Reject if merchant failed
- Reject if fraud score > 0.75
- Reject if amount > 10000 AND fraud score > 0.5
- Otherwise approve


## RaiseSideEffects Pattern

```csharp
Expand All @@ -39,3 +46,5 @@ public override ValueTask RaiseSideEffects(

- [Dealing with Race Conditions in Event-Driven Architecture](https://www.architecture-weekly.com/p/dealing-with-race-conditions-in-event)
- [Marten Multi-Stream Projections](https://martendb.io/events/projections/multi-stream-projections.html)
- [Marten Async Daemon - Backround worker processing async projections](https://martendb.io/events/projections/async-daemon.html#async-projections-daemon)

Loading