-
-
Notifications
You must be signed in to change notification settings - Fork 541
Expand file tree
/
Copy pathProjectionsTests.cs
More file actions
246 lines (209 loc) · 10 KB
/
ProjectionsTests.cs
File metadata and controls
246 lines (209 loc) · 10 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
using FluentAssertions;
using JasperFx;
using JasperFx.Events;
using JasperFx.Events.Projections;
using Marten;
using Marten.Events.Projections;
using Xunit;
namespace IntroductionToEventSourcing.Projections.MultiStream.OutOfOrder.Marten;
// EVENTS
public record PaymentRecorded(
string PaymentId,
string OrderId,
decimal Amount
);
public record MerchantLimitsChecked(
string PaymentId,
string MerchantId,
bool IsWithinLimits
);
public record FraudScoreCalculated(
string PaymentId,
decimal Score,
bool IsAcceptable
);
public record PaymentVerificationCompleted(
string PaymentId,
bool IsApproved
);
// ENUMS
public enum VerificationStatus
{
Pending,
Passed,
Failed
}
public enum PaymentStatus
{
Pending,
Approved,
Rejected
}
// READ MODEL
public class PaymentVerification
{
public required string Id { get; set; }
public required string OrderId { get; set; }
public decimal Amount { get; set; }
public VerificationStatus MerchantLimitStatus { get; set; }
public VerificationStatus FraudStatus { get; set; }
public decimal FraudScore { get; set; }
public PaymentStatus Status { get; set; }
}
// TODO: This projection was built assuming ordered events. Run the test — it fails.
// Events can arrive out of order (e.g. from different RabbitMQ queues or Kafka topics).
// Fix it to handle out-of-order events and derive the verification decision.
public class PaymentVerificationProjection: MultiStreamProjection<PaymentVerification, string>
{
public PaymentVerificationProjection()
{
Identity<PaymentRecorded>(e => e.PaymentId);
Identity<MerchantLimitsChecked>(e => e.PaymentId);
Identity<FraudScoreCalculated>(e => e.PaymentId);
}
public void Apply(PaymentVerification item, PaymentRecorded @event)
{
item.Id = @event.PaymentId;
item.OrderId = @event.OrderId;
item.Amount = @event.Amount;
}
public void Apply(PaymentVerification item, MerchantLimitsChecked @event)
{
item.MerchantLimitStatus = @event.IsWithinLimits
? VerificationStatus.Passed
: VerificationStatus.Failed;
}
public void Apply(PaymentVerification item, FraudScoreCalculated @event)
{
item.FraudScore = @event.Score;
item.FraudStatus = @event.IsAcceptable
? VerificationStatus.Passed
: VerificationStatus.Failed;
if (item.Status != PaymentStatus.Pending)
return;
if (item.MerchantLimitStatus == VerificationStatus.Failed || item.FraudScore > 0.75m ||
item is { Amount: > 10000m, FraudScore: > 0.5m })
item.Status = PaymentStatus.Rejected;
else
item.Status = PaymentStatus.Approved;
}
}
public class ProjectionsTests
{
private const string ConnectionString =
"PORT = 5432; HOST = localhost; TIMEOUT = 15; POOLING = True; DATABASE = 'postgres'; PASSWORD = 'Password12!'; USER ID = 'postgres'";
[Fact]
[Trait("Category", "SkipCI")]
public async Task MultiStreamProjection_WithOutOfOrderEventsAndMarten_ShouldSucceed()
{
var payment1Id = $"payment:{Guid.CreateVersion7()}";
var payment2Id = $"payment:{Guid.CreateVersion7()}";
var payment3Id = $"payment:{Guid.CreateVersion7()}";
var payment4Id = $"payment:{Guid.CreateVersion7()}";
var payment5Id = $"payment:{Guid.CreateVersion7()}";
var order1Id = $"order:{Guid.CreateVersion7()}";
var order2Id = $"order:{Guid.CreateVersion7()}";
var order3Id = $"order:{Guid.CreateVersion7()}";
var order4Id = $"order:{Guid.CreateVersion7()}";
var order5Id = $"order:{Guid.CreateVersion7()}";
var merchant1Id = $"merchant:{Guid.CreateVersion7()}";
var merchant2Id = $"merchant:{Guid.CreateVersion7()}";
var fraudCheck1Id = $"fraudcheck:{Guid.CreateVersion7()}";
var fraudCheck2Id = $"fraudcheck:{Guid.CreateVersion7()}";
var fraudCheck3Id = $"fraudcheck:{Guid.CreateVersion7()}";
var fraudCheck4Id = $"fraudcheck:{Guid.CreateVersion7()}";
await using var documentStore = DocumentStore.For(options =>
{
options.Connection(ConnectionString);
options.DatabaseSchemaName = options.Events.DatabaseSchemaName = "Exercise18MultiStreamOutOfOrderMarten";
options.AutoCreateSchemaObjects = AutoCreate.All;
// TODO: This projection was built assuming ordered events. Run the test — it fails.
// Events can arrive out of order (e.g. from different RabbitMQ queues or Kafka topics).
// Fix it to handle out-of-order events and derive the verification decision.
options.Projections.Add<PaymentVerificationProjection>(ProjectionLifecycle.Async);
options.Events.StreamIdentity = StreamIdentity.AsString;
});
// Let's start Async Daemon to process async projections in the background
// Read more: https://martendb.io/events/projections/async-daemon.html#async-projections-daemon
using var daemon = await documentStore.BuildProjectionDaemonAsync();
await daemon.StartAllAsync();
await using var session = documentStore.LightweightSession();
// Payment 1: Approved — FraudScore arrives first
session.Events.Append(fraudCheck1Id, new FraudScoreCalculated(payment1Id, 0.1m, true));
session.Events.Append(merchant1Id, new MerchantLimitsChecked(payment1Id, merchant1Id, true));
session.Events.Append(payment1Id, new PaymentRecorded(payment1Id, order1Id, 100m));
// Payment 2: Rejected — Merchant fails, arrives first
session.Events.Append(merchant2Id, new MerchantLimitsChecked(payment2Id, merchant2Id, false));
session.Events.Append(fraudCheck2Id, new FraudScoreCalculated(payment2Id, 0.2m, true));
session.Events.Append(payment2Id, new PaymentRecorded(payment2Id, order2Id, 5000m));
// Payment 3: Rejected — high fraud score arrives first
session.Events.Append(fraudCheck3Id, new FraudScoreCalculated(payment3Id, 0.95m, false));
session.Events.Append(merchant1Id, new MerchantLimitsChecked(payment3Id, merchant1Id, true));
session.Events.Append(payment3Id, new PaymentRecorded(payment3Id, order3Id, 200m));
// Payment 4: Rejected — fraud 0.6 looks OK until 15000 amount arrives last
session.Events.Append(fraudCheck4Id, new FraudScoreCalculated(payment4Id, 0.6m, true));
session.Events.Append(merchant1Id, new MerchantLimitsChecked(payment4Id, merchant1Id, true));
session.Events.Append(payment4Id, new PaymentRecorded(payment4Id, order4Id, 15000m));
// Payment 5: Pending — no fraud check
session.Events.Append(merchant1Id, new MerchantLimitsChecked(payment5Id, merchant1Id, true));
session.Events.Append(payment5Id, new PaymentRecorded(payment5Id, order5Id, 50m));
await session.SaveChangesAsync();
await daemon.WaitForNonStaleData(TimeSpan.FromSeconds(5));
// Assert Payment 1: Approved
var payment1 = await session.LoadAsync<PaymentVerification>(payment1Id);
payment1.Should().NotBeNull();
payment1.Id.Should().Be(payment1Id);
payment1.OrderId.Should().Be(order1Id);
payment1.Amount.Should().Be(100m);
payment1.MerchantLimitStatus.Should().Be(VerificationStatus.Passed);
payment1.FraudStatus.Should().Be(VerificationStatus.Passed);
payment1.FraudScore.Should().Be(0.1m);
payment1.Status.Should().Be(PaymentStatus.Approved);
// Assert Payment 2: Rejected
var payment2 = await session.LoadAsync<PaymentVerification>(payment2Id);
payment2.Should().NotBeNull();
payment2.Id.Should().Be(payment2Id);
payment2.OrderId.Should().Be(order2Id);
payment2.Amount.Should().Be(5000m);
payment2.MerchantLimitStatus.Should().Be(VerificationStatus.Failed);
payment2.FraudStatus.Should().Be(VerificationStatus.Passed);
payment2.FraudScore.Should().Be(0.2m);
payment2.Status.Should().Be(PaymentStatus.Rejected);
// Assert Payment 3: Rejected
var payment3 = await session.LoadAsync<PaymentVerification>(payment3Id);
payment3.Should().NotBeNull();
payment3.Id.Should().Be(payment3Id);
payment3.OrderId.Should().Be(order3Id);
payment3.Amount.Should().Be(200m);
payment3.MerchantLimitStatus.Should().Be(VerificationStatus.Passed);
payment3.FraudStatus.Should().Be(VerificationStatus.Failed);
payment3.FraudScore.Should().Be(0.95m);
payment3.Status.Should().Be(PaymentStatus.Rejected);
// Assert Payment 4: Rejected
var payment4 = await session.LoadAsync<PaymentVerification>(payment4Id);
payment4.Should().NotBeNull();
payment4.Id.Should().Be(payment4Id);
payment4.OrderId.Should().Be(order4Id);
payment4.Amount.Should().Be(15000m);
payment4.MerchantLimitStatus.Should().Be(VerificationStatus.Passed);
payment4.FraudStatus.Should().Be(VerificationStatus.Passed);
payment4.FraudScore.Should().Be(0.6m);
payment4.Status.Should().Be(PaymentStatus.Rejected);
// Assert Payment 5: Pending
var payment5 = await session.LoadAsync<PaymentVerification>(payment5Id);
payment5.Should().NotBeNull();
payment5.Id.Should().Be(payment5Id);
payment5.OrderId.Should().Be(order5Id);
payment5.Amount.Should().Be(50m);
payment5.MerchantLimitStatus.Should().Be(VerificationStatus.Passed);
payment5.FraudStatus.Should().Be(VerificationStatus.Pending);
payment5.FraudScore.Should().Be(0m);
payment5.Status.Should().Be(PaymentStatus.Pending);
// Assert Payment 1: Verification is emitted
var paymentVerification = await session.Events.QueryRawEventDataOnly<PaymentVerificationCompleted>()
.SingleOrDefaultAsync(e => e.PaymentId == payment1Id);
paymentVerification.Should().NotBeNull();
paymentVerification.PaymentId.Should().Be(payment1Id);
paymentVerification.IsApproved.Should().BeTrue();
}
}