Skip to content

Commit 014da18

Browse files
authored
Merge pull request #11 from Shuttle/async
Async
2 parents e524d96 + 81e4dbb commit 014da18

67 files changed

Lines changed: 1758 additions & 509 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

Shuttle.Recall.Tests/EventHandler.cs

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
1-
namespace Shuttle.Recall.Tests
1+
using System.Threading.Tasks;
2+
3+
namespace Shuttle.Recall.Tests
24
{
35
public class EventHandler :
46
IEventHandler<EventA>,
5-
IEventHandler<EventB>
7+
IEventHandler<EventB>,
8+
IAsyncEventHandler<EventA>,
9+
IAsyncEventHandler<EventB>
610
{
711
private static readonly object Lock = new();
812
public int Entry { get; private set; }
@@ -29,6 +33,20 @@ private void Apply(int entry)
2933
Entry = entry;
3034
}
3135
}
36+
37+
public async Task ProcessEventAsync(IEventHandlerContext<EventA> context)
38+
{
39+
Apply(context.Event.Entry);
40+
41+
await Task.CompletedTask;
42+
}
43+
44+
public async Task ProcessEventAsync(IEventHandlerContext<EventB> context)
45+
{
46+
Apply(context.Event.Entry);
47+
48+
await Task.CompletedTask;
49+
}
3250
}
3351

3452
public class EventA

Shuttle.Recall.Tests/EventProcessorFixture.cs

Lines changed: 124 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
using System;
2+
using System.Diagnostics;
23
using System.Threading;
4+
using System.Threading.Tasks;
35
using Microsoft.Extensions.DependencyInjection;
46
using Microsoft.Extensions.Options;
57
using Moq;
@@ -17,6 +19,30 @@ public class EventProcessorFixture
1719
[Test]
1820
public void Should_be_able_process_sequence_number_tail()
1921
{
22+
Should_be_able_process_sequence_number_tail_async(true).GetAwaiter().GetResult();
23+
}
24+
25+
[Test]
26+
public async Task Should_be_able_process_sequence_number_tail_async()
27+
{
28+
await Should_be_able_process_sequence_number_tail_async(false);
29+
}
30+
31+
private async Task Should_be_able_process_sequence_number_tail_async(bool sync)
32+
{
33+
var eventStoreOptions = Options.Create(new EventStoreOptions
34+
{
35+
Asynchronous = !sync,
36+
ProjectionEventFetchCount = 100,
37+
SequenceNumberTailThreadWorkerInterval = TimeSpan.FromMilliseconds(100),
38+
ProjectionThreadCount = 1
39+
});
40+
41+
var projectionRepository = new Mock<IProjectionRepository>();
42+
43+
projectionRepository.Setup(m => m.Find("projection-1")).Returns(new Projection(eventStoreOptions.Value, "projection-1", 200));
44+
projectionRepository.Setup(m => m.FindAsync("projection-1")).Returns(Task.FromResult(new Projection(eventStoreOptions.Value, "projection-1", 200)));
45+
2046
var pipelineFactory = new Mock<IPipelineFactory>();
2147

2248
pipelineFactory.Setup(m => m.GetPipeline<EventProcessingPipeline>()).Returns(
@@ -32,19 +58,12 @@ public void Should_be_able_process_sequence_number_tail()
3258
new EventProcessorStartupPipeline(new Mock<IStartupEventProcessingObserver>().Object)
3359
);
3460

35-
var projectionRepository = new Mock<IProjectionRepository>();
36-
37-
projectionRepository.Setup(m => m.Find("projection-1")).Returns(new Projection("projection-1", 200));
38-
39-
var eventStoreOptions = Options.Create(new EventStoreOptions
40-
{
41-
ProjectionEventFetchCount = 100,
42-
SequenceNumberTailThreadWorkerInterval = TimeSpan.FromMilliseconds(100),
43-
ProjectionThreadCount = 1
44-
});
61+
pipelineFactory.Setup(m => m.GetPipeline<AddProjectionPipeline>()).Returns(
62+
new AddProjectionPipeline(new AddProjectionObserver(eventStoreOptions, projectionRepository.Object))
63+
);
4564

46-
var processor = new EventProcessor(eventStoreOptions, pipelineFactory.Object, projectionRepository.Object);
47-
var projection = processor.AddProjection("projection-1");
65+
var processor = new EventProcessor(eventStoreOptions, pipelineFactory.Object);
66+
var projection = sync ? processor.AddProjection("projection-1") : await processor.AddProjectionAsync("projection-1");
4867
var projectionAggregation = processor.GetProjectionAggregation(projection.AggregationId);
4968

5069
for (var i = 50; i < 101; i++)
@@ -60,7 +79,14 @@ public void Should_be_able_process_sequence_number_tail()
6079

6180
try
6281
{
63-
processor.Start();
82+
if (sync)
83+
{
84+
processor.Start();
85+
}
86+
else
87+
{
88+
await processor.StartAsync();
89+
}
6490

6591
var timeout = DateTime.Now.AddSeconds(60);
6692

@@ -81,8 +107,19 @@ public void Should_be_able_process_sequence_number_tail()
81107
[Test]
82108
public void Should_be_able_to_get_round_robin_projections()
83109
{
84-
var processor = new EventProcessor(Options.Create(new EventStoreOptions()),
85-
new Mock<IPipelineFactory>().Object, new Mock<IProjectionRepository>().Object);
110+
var eventStoreOptions = Options.Create(new EventStoreOptions());
111+
var projectionRepository = new Mock<IProjectionRepository>();
112+
113+
projectionRepository.Setup(m => m.Find("projection-1")).Returns(new Projection(eventStoreOptions.Value, "projection-1", 0));
114+
projectionRepository.Setup(m => m.Find("projection-2")).Returns(new Projection(eventStoreOptions.Value, "projection-2", 0));
115+
116+
var pipelineFactory = new Mock<IPipelineFactory>();
117+
118+
pipelineFactory.Setup(m => m.GetPipeline<AddProjectionPipeline>()).Returns(
119+
new AddProjectionPipeline(new AddProjectionObserver(eventStoreOptions, projectionRepository.Object))
120+
);
121+
122+
var processor = new EventProcessor(Options.Create(new EventStoreOptions()), pipelineFactory.Object);
86123

87124
processor.AddProjection("projection-1");
88125
processor.AddProjection("projection-2");
@@ -107,8 +144,7 @@ public void Should_be_able_to_get_round_robin_projections()
107144
Assert.That(processor.GetProjection(), Is.Not.Null);
108145
Assert.That(processor.GetProjection(), Is.Null);
109146

110-
Assert.That(() => processor.ReleaseProjection(new Projection("fail", 1)),
111-
Throws.TypeOf<InvalidOperationException>());
147+
Assert.That(() => processor.ReleaseProjection(new Projection(new EventStoreOptions(), "fail", 1)), Throws.TypeOf<InvalidOperationException>());
112148
}
113149

114150
[Test]
@@ -121,13 +157,18 @@ public void Should_be_able_to_place_projection_into_clustered_aggregation()
121157

122158
var projectionRepository = new Mock<IProjectionRepository>();
123159

124-
projectionRepository.Setup(m => m.Find("projection-1")).Returns(new Projection("projection-1", 1));
125-
projectionRepository.Setup(m => m.Find("projection-2")).Returns(new Projection("projection-2", 300));
126-
projectionRepository.Setup(m => m.Find("projection-3")).Returns(new Projection("projection-3", 301));
127-
projectionRepository.Setup(m => m.Find("projection-4")).Returns(new Projection("projection-4", 600));
160+
projectionRepository.Setup(m => m.Find("projection-1")).Returns(new Projection(eventStoreOptions.Value, "projection-1", 1));
161+
projectionRepository.Setup(m => m.Find("projection-2")).Returns(new Projection(eventStoreOptions.Value, "projection-2", 300));
162+
projectionRepository.Setup(m => m.Find("projection-3")).Returns(new Projection(eventStoreOptions.Value, "projection-3", 301));
163+
projectionRepository.Setup(m => m.Find("projection-4")).Returns(new Projection(eventStoreOptions.Value, "projection-4", 600));
164+
165+
var pipelineFactory = new Mock<IPipelineFactory>();
166+
167+
pipelineFactory.Setup(m => m.GetPipeline<AddProjectionPipeline>()).Returns(
168+
new AddProjectionPipeline(new AddProjectionObserver(eventStoreOptions, projectionRepository.Object))
169+
);
128170

129-
var processor = new EventProcessor(eventStoreOptions, new Mock<IPipelineFactory>().Object,
130-
projectionRepository.Object);
171+
var processor = new EventProcessor(Options.Create(new EventStoreOptions()), pipelineFactory.Object);
131172

132173
var projection1 = processor.AddProjection("projection-1");
133174
var projection2 = processor.AddProjection("projection-2");
@@ -140,52 +181,53 @@ public void Should_be_able_to_place_projection_into_clustered_aggregation()
140181
}
141182

142183
[Test]
143-
public void Should_be_able_to_process_projections_timeously()
184+
public void Should_be_able_to_process_projections_with_optimal_performance()
185+
{
186+
Should_be_able_to_process_projections_with_optimal_performance_async(true).GetAwaiter().GetResult();
187+
}
188+
189+
[Test]
190+
public async Task Should_be_able_to_process_projections_with_optimal_performance_async()
144191
{
145-
const int projectionEventCount = 10000;
192+
await Should_be_able_to_process_projections_with_optimal_performance_async(false);
193+
}
194+
195+
private async Task Should_be_able_to_process_projections_with_optimal_performance_async(bool sync)
196+
{
197+
const int projectionEventCount = 4000;
146198
const string projectionName = "projection-1";
199+
147200
var id = Guid.NewGuid();
148-
149201
var serializer = new DefaultSerializer();
150-
151-
var eventStoreOptions = Options.Create(new EventStoreOptions
202+
var services = new ServiceCollection();
203+
var projectionEventProvider = new Mock<IProjectionEventProvider>();
204+
var entry = 1;
205+
var eventStoreOptions = new EventStoreOptions()
152206
{
207+
Asynchronous = !sync,
153208
ProjectionEventFetchCount = 100,
154209
SequenceNumberTailThreadWorkerInterval = TimeSpan.FromMilliseconds(100),
155210
ProjectionThreadCount = 1
156-
});
157-
158-
var services = new ServiceCollection();
159-
160-
var projectionEventProvider = new Mock<IProjectionEventProvider>();
211+
};
161212

162-
var entry = 1;
163-
164-
projectionEventProvider.Setup(m => m.Get(It.IsAny<Projection>())).Returns(() =>
213+
ProjectionEvent GetProjectionEvent()
165214
{
166215
if (entry > projectionEventCount)
167216
{
168217
return new ProjectionEvent(entry);
169218
}
170219

171220
var primitiveEvent = entry % 2 == 0
172-
? new PrimitiveEvent
173-
{
174-
EventType = typeof(EventA).FullName
175-
}
176-
: new PrimitiveEvent
177-
{
178-
EventType = typeof(EventB).FullName
179-
};
221+
? new PrimitiveEvent { EventType = typeof(EventA).FullName }
222+
: new PrimitiveEvent { EventType = typeof(EventB).FullName };
180223

181224
var eventEnvelope = new EventEnvelope
182225
{
183226
EventType = primitiveEvent.EventType,
184227
AssemblyQualifiedName = entry % 2 == 0
185228
? typeof(EventA).AssemblyQualifiedName
186229
: typeof(EventB).AssemblyQualifiedName,
187-
Event = serializer
188-
.Serialize(entry % 2 == 0 ? new EventA { Entry = entry } : new EventB { Entry = entry })
230+
Event = serializer.Serialize(entry % 2 == 0 ? new EventA { Entry = entry } : new EventB { Entry = entry })
189231
.ToBytes(),
190232
EventId = Guid.NewGuid(),
191233
Version = entry,
@@ -202,41 +244,65 @@ public void Should_be_able_to_process_projections_timeously()
202244
entry++;
203245

204246
return new ProjectionEvent(primitiveEvent);
205-
});
247+
}
248+
249+
projectionEventProvider.Setup(m => m.Get(It.IsAny<Projection>())).Returns(GetProjectionEvent);
250+
projectionEventProvider.Setup(m => m.GetAsync(It.IsAny<Projection>())).Returns(() => Task.FromResult(GetProjectionEvent()));
206251

207252
var projectionRepository = new Mock<IProjectionRepository>();
208253

209-
projectionRepository.Setup(m => m.Find(projectionName)).Returns(new Projection(projectionName, 0));
254+
projectionRepository.Setup(m => m.Find(projectionName)).Returns(new Projection(eventStoreOptions, projectionName, 0));
255+
projectionRepository.Setup(m => m.FindAsync(projectionName)).Returns(Task.FromResult(new Projection(eventStoreOptions, projectionName, 0)));
210256

211257
services.AddSingleton(projectionRepository.Object);
212258
services.AddSingleton(projectionEventProvider.Object);
213259

214-
services.AddEventStore();
260+
services.AddEventStore(builder =>
261+
{
262+
builder.Options = eventStoreOptions;
263+
});
215264

216265
var serviceProvider = services.BuildServiceProvider();
217266

218267
var processor = serviceProvider.GetRequiredService<IEventProcessor>();
219-
220-
var projection = processor.AddProjection(projectionName);
268+
var projection = sync ? processor.AddProjection(projectionName) : await processor.AddProjectionAsync(projectionName);
221269
var projectionAggregation = processor.GetProjectionAggregation(projection.AggregationId);
222-
223270
var eventHandler = new EventHandler();
224271

225-
projection.AddEventHandler(eventHandler);
272+
if (sync)
273+
{
274+
projection.AddEventHandler(eventHandler);
275+
}
276+
else
277+
{
278+
await projection.AddEventHandlerAsync(eventHandler).ConfigureAwait(false);
279+
}
226280

227281
try
228282
{
229-
processor.Start();
283+
if (sync)
284+
{
285+
processor.Start();
286+
}
287+
else
288+
{
289+
await processor.StartAsync().ConfigureAwait(false);
290+
}
291+
292+
var sw = new Stopwatch();
230293

231-
var now = DateTime.Now;
232-
var timeout = now.AddSeconds(5);
294+
sw.Start();
233295

234-
while (eventHandler.Entry < projectionEventCount && DateTime.Now < timeout)
296+
while (eventHandler.Entry < projectionEventCount && sw.ElapsedMilliseconds < 5000)
235297
{
236298
Thread.Sleep(100);
237299
}
238300

239-
Assert.That((DateTime.Now - now).TotalMilliseconds, Is.LessThan(2000));
301+
sw.Stop();
302+
303+
Console.WriteLine($@"[event-handler] : entry = {eventHandler.Entry} / elapsed ms = {sw.ElapsedMilliseconds}");
304+
305+
Assert.That(sw.ElapsedMilliseconds, Is.LessThan(2000));
240306
Assert.That(projectionAggregation.IsEmpty, Is.True);
241307
Assert.That(eventHandler.Entry, Is.EqualTo(projectionEventCount));
242308
}

Shuttle.Recall.Tests/ProjectionAggregationFixture.cs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,22 +8,20 @@ namespace Shuttle.Recall.Tests
88
[TestFixture]
99
public class ProjectionAggregationFixture : IEventHandler<object>
1010
{
11-
public bool Active => true;
12-
1311
public void ProcessEvent(IEventHandlerContext<object> context)
1412
{
1513
}
1614

1715
[Test]
1816
public void Should_be_able_to_trim_sequence_number_tail()
1917
{
20-
var aggregation = new ProjectionAggregation(100);
18+
var aggregation = new ProjectionAggregation(100, CancellationToken.None);
2119

2220
var projection1 =
23-
new Projection("projection-1", 10)
21+
new Projection(new EventStoreOptions(), "projection-1", 10)
2422
.AddEventHandler(this);
2523
var projection2 =
26-
new Projection("projection-2", 15)
24+
new Projection(new EventStoreOptions(), "projection-2", 15)
2725
.AddEventHandler(this);
2826

2927
aggregation.Add(projection1);

Shuttle.Recall.Tests/ProjectionEventProviderFixture.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,12 @@ public class ProjectionEventProviderFixture : IEventHandler<TestEvent>
1717
[Test]
1818
public void Should_be_able_to_use_provider()
1919
{
20-
var projection = new Projection("projection", 15);
20+
var projection = new Projection(new EventStoreOptions(), "projection", 15);
2121

2222
projection.AddEventHandler(this);
2323

2424
var eventProcessor = new Mock<IEventProcessor>();
25-
var projectionAggregation = new ProjectionAggregation(100);
25+
var projectionAggregation = new ProjectionAggregation(100, CancellationToken.None);
2626

2727
projectionAggregation.Add(projection);
2828

Shuttle.Recall.Tests/Shuttle.Recall.Tests.csproj

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@
66

77
<ItemGroup>
88
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="7.0.0" />
9-
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.4.0" />
10-
<PackageReference Include="Moq" Version="4.18.2" />
11-
<PackageReference Include="NUnit" Version="3.13.3" />
12-
<PackageReference Include="NUnit3TestAdapter" Version="4.3.1" />
9+
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.9.0" />
10+
<PackageReference Include="Moq" Version="4.20.70" />
11+
<PackageReference Include="NUnit" Version="3.14.0" />
12+
<PackageReference Include="NUnit3TestAdapter" Version="4.5.0" />
1313
</ItemGroup>
1414

1515
<ItemGroup>

0 commit comments

Comments
 (0)