Skip to content

Commit dc92f61

Browse files
ATCBotNiels LunnchristianhelleagehrkeAndreas Gehrke
authored
Release of new minor version v1.20 (#69)
* Update main with changes in stable after v1.17.0 release (#63) * Release of new minor version v1.16 (#61) * Set version to '1.16-preview' * Make sure that the projections are processed in a scoped application lifecycle * clean up scope * update changelog * The projection is not optional * Set version to '1.16' --------- Co-authored-by: Niels Lunn <[email protected]> * Release of new minor version v1.17 (#62) * Set version to '1.16-preview' * Make sure that the projections are processed in a scoped application lifecycle * clean up scope * update changelog * The projection is not optional * Set version to '1.16' * Set version to '1.17-preview' * ci: fix dotnet pack unable to find build output * Set version to '1.17' --------- Co-authored-by: Niels Lunn <[email protected]> Co-authored-by: Christian Helle <[email protected]> * Updated CHANGELOG.md for 1.17.0 release --------- Co-authored-by: Niels Lunn <[email protected]> Co-authored-by: Christian Helle <[email protected]> * Allow CommandHandlers to have scoped dependencies (#64) * Register ICommandProcessorFactory, ICommandHandlerFactory and CommandProcessor as transient to allow to CommandHandler to use scoped dependencies. * Update changelog --------- Co-authored-by: Andreas Gehrke <[email protected]> * Set version to '1.18' * Set version to '1.19-preview' * Set version to '1.19' * Set version to '1.20-preview' * Bring back ProjectionFactory Also use IServiceScopeFactory instead of IServiceProvider in ProjectionProcessor to make it more clear that we are only creating scopes * Changelog * Functional tests of Command execution and projections * Upgrade SonarAnalyzer.CSharp as the current version reports bogus issues * Fix Warning S6612 : Use the lambda parameter instead of capturing the argument 'name' * Fix Warning S6608 : Indexing at Count-1 should be used instead of the "Enumerable" extension method "Last" * Set version to '1.20' --------- Co-authored-by: Niels Lunn <[email protected]> Co-authored-by: Christian Helle <[email protected]> Co-authored-by: Andreas Gehrke <[email protected]> Co-authored-by: Andreas Gehrke <[email protected]>
1 parent 55223dc commit dc92f61

16 files changed

+626
-18
lines changed

CHANGELOG.md

+10
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
2323

2424
## [1.17.0] - 2025-03-21
2525

26+
### Changed
27+
28+
- Changed `ICommandProcessorFactory`, `ICommandHandlerFactory` and `CommandProcessor<>` to be
29+
registered as transient rather than singleton. This allows `CommandHandler` implementations to use
30+
dependencies registered as scoped.
31+
32+
- Reintroduce `IProjectionFactory` in a slightly modified version to allow consumers to make additional "initialization" of projections.
33+
34+
## [1.17.0] - 2025-03-21
35+
2636
### Fixed
2737

2838
- Ensure that a projection can run in its own scope. This is important as we do not want state to leak from one projection to the other.

Directory.Build.props

+1-1
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
<ItemGroup Label="Code Analyzers">
4343
<PackageReference Include="SecurityCodeScan.VS2019" Version="5.6.7" PrivateAssets="all" />
4444
<PackageReference Include="StyleCop.Analyzers" Version="1.2.0-beta.321" PrivateAssets="All" />
45-
<PackageReference Include="SonarAnalyzer.CSharp" Version="9.0.0.68202" PrivateAssets="all" />
45+
<PackageReference Include="SonarAnalyzer.CSharp" Version="10.7.0.110445" PrivateAssets="all" />
4646
</ItemGroup>
4747

4848
</Project>

src/Atc.Cosmos.EventStore.Cqrs/DependencyInjection/EventStoreOptionsBuilderExtensions.cs

+1
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ public static EventStoreOptionsBuilder UseCQRS(
2828
builder.Services.AddSingleton<IProjectionOptionsFactory, ProjectionOptionsFactory>();
2929

3030
builder.Services.AddSingleton(typeof(ProjectionMetadata<>), typeof(ProjectionMetadata<>));
31+
builder.Services.AddTransient<IProjectionFactory, DefaultProjectionFactory>();
3132

3233
builder.Services.TryAddSingleton<IProjectionTelemetry, ProjectionTelemetry>();
3334

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
using Microsoft.Extensions.DependencyInjection;
2+
3+
namespace Atc.Cosmos.EventStore.Cqrs;
4+
5+
/// <summary>
6+
/// Responsible for creating <see cref="IProjection"/> instances.
7+
/// </summary>
8+
public interface IProjectionFactory
9+
{
10+
/// <summary>
11+
/// Creates a projection of type <typeparamref name="TProjection"/> for the event stream
12+
/// identified by <paramref name="streamId"/>.
13+
/// </summary>
14+
/// <param name="streamId">ID of the stream being projected.</param>
15+
/// <param name="cancellationToken">Cancellation.</param>
16+
/// <typeparam name="TProjection">Type of projection to create.</typeparam>
17+
/// <returns>The created projection.</returns>
18+
public Task<IProjection> CreateAsync<TProjection>(EventStreamId streamId, CancellationToken cancellationToken)
19+
where TProjection : IProjection;
20+
}
21+
22+
/// <summary>
23+
/// The default projection factory which just creates projections by
24+
/// getting them from the DI-container.
25+
/// </summary>
26+
internal sealed class DefaultProjectionFactory : IProjectionFactory
27+
{
28+
private readonly IServiceProvider serviceProvider;
29+
30+
public DefaultProjectionFactory(IServiceProvider serviceProvider)
31+
{
32+
this.serviceProvider = serviceProvider;
33+
}
34+
35+
public Task<IProjection> CreateAsync<TProjection>(EventStreamId streamId, CancellationToken cancellationToken)
36+
where TProjection : IProjection
37+
{
38+
return Task.FromResult<IProjection>(serviceProvider.GetRequiredService<TProjection>());
39+
}
40+
}

src/Atc.Cosmos.EventStore.Cqrs/Projections/ProjectionFilter.cs

+1-5
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,7 @@ public ProjectionFilter(string filter)
1515
? CreateEvaluateAll()
1616
: CreateEvaluation(p))
1717
.ToArray();
18-
endsOnAcceptAll = filter
19-
.Split(
20-
new[] { EventStreamId.PartSeperator },
21-
StringSplitOptions.RemoveEmptyEntries)
22-
.Last() == "**";
18+
endsOnAcceptAll = filter.EndsWith("**");
2319
}
2420

2521
public bool Evaluate(StreamId streamId)

src/Atc.Cosmos.EventStore.Cqrs/Projections/ProjectionProcessor.cs

+9-8
Original file line numberDiff line numberDiff line change
@@ -10,18 +10,18 @@ internal class ProjectionProcessor<TProjection> : IProjectionProcessor<TProjecti
1010
private readonly IReadOnlyCollection<ProjectionFilter> filters;
1111
private readonly IProjectionTelemetry telemetry;
1212
private readonly ProjectionMetadata<TProjection> projectionMetadata;
13-
private readonly IServiceProvider serviceProvider;
13+
private readonly IServiceScopeFactory serviceScopeFactory;
1414
private readonly string projectionName;
1515

1616
public ProjectionProcessor(
1717
IProjectionOptionsFactory optionsFactory,
1818
IProjectionTelemetry telemetry,
1919
ProjectionMetadata<TProjection> projectionMetadata,
20-
IServiceProvider serviceProvider)
20+
IServiceScopeFactory serviceScopeFactory)
2121
{
2222
this.telemetry = telemetry;
2323
this.projectionMetadata = projectionMetadata;
24-
this.serviceProvider = serviceProvider;
24+
this.serviceScopeFactory = serviceScopeFactory;
2525
filters = optionsFactory
2626
.GetOptions<TProjection>()
2727
.Filters;
@@ -49,10 +49,6 @@ public async Task<ProjectionAction> ProcessBatchAsync(
4949

5050
foreach (var events in groupedEvents)
5151
{
52-
await using var scope = serviceProvider.CreateAsyncScope();
53-
54-
var projection = scope.ServiceProvider.GetRequiredService<TProjection>();
55-
5652
using var operation = batchTelemetry.StartProjection(events.Key);
5753

5854
if (!projectionMetadata.CanConsumeOneOrMoreEvents(events))
@@ -62,11 +58,16 @@ public async Task<ProjectionAction> ProcessBatchAsync(
6258
continue;
6359
}
6460

61+
var eventStreamId = EventStreamId.FromStreamId(events.Key);
62+
await using var scope = serviceScopeFactory.CreateAsyncScope();
63+
var projectionFactory = scope.ServiceProvider.GetRequiredService<IProjectionFactory>();
64+
var projection = await projectionFactory.CreateAsync<TProjection>(eventStreamId, cancellationToken);
65+
6566
try
6667
{
6768
await projection
6869
.InitializeAsync(
69-
events.Key,
70+
eventStreamId,
7071
cancellationToken)
7172
.ConfigureAwait(false);
7273

src/Atc.Cosmos.EventStore/InMemory/InMemoryStore.cs

+3-2
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,9 @@ public Task WriteAsync(
8484
.GetOrAdd(streamId, new ConcurrentDictionary<string, CheckpointDocument>())
8585
.AddOrUpdate(
8686
name,
87-
key => new CheckpointDocument(name, streamId, streamVersion, dateTimeProvider.GetDateTime(), state),
88-
(key, doc) => new CheckpointDocument(name, streamId, streamVersion, dateTimeProvider.GetDateTime(), state));
87+
static (key, arg) => new CheckpointDocument(key, arg.streamId, arg.streamVersion, arg.currentTime, arg.state),
88+
static (key, doc, arg) => new CheckpointDocument(key, arg.streamId, arg.streamVersion, arg.currentTime, arg),
89+
(streamId, streamVersion, state, currentTime: dateTimeProvider.GetDateTime()));
8990

9091
return Task.CompletedTask;
9192
}

test/Atc.Cosmos.EventStore.Cqrs.Tests/Atc.Cosmos.EventStore.Cqrs.Tests.csproj

+5
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,13 @@
33
<PropertyGroup>
44
<TargetFramework>net9.0</TargetFramework>
55
<IsPackable>false</IsPackable>
6+
67
</PropertyGroup>
78

89
<ItemGroup>
910
<PackageReference Include="Atc.Test" Version="1.1.4" />
1011
<PackageReference Include="FluentAssertions" Version="6.12.1" />
12+
<PackageReference Include="Microsoft.AspNetCore.TestHost" Version="9.0.3" />
1113
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.11.1" />
1214
<PackageReference Include="xunit" Version="2.9.2" />
1315
<PackageReference Include="xunit.runner.visualstudio" Version="2.8.2">
@@ -18,6 +20,9 @@
1820
<PrivateAssets>all</PrivateAssets>
1921
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
2022
</PackageReference>
23+
24+
<!-- Get the shared Microsoft.AspNetCore.App framework instead of referencing a bunch of MS.Ext.Hosting etc packages -->
25+
<FrameworkReference Include="Microsoft.AspNetCore.App"/>
2126
</ItemGroup>
2227

2328
<ItemGroup>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
using Microsoft.Extensions.DependencyInjection;
2+
using Xunit;
3+
4+
namespace Atc.Cosmos.EventStore.Cqrs.Tests.Functional;
5+
6+
#nullable enable
7+
8+
[Trait("Category", "Functional")]
9+
public class CommandHandlerTests : IAsyncLifetime
10+
{
11+
private CqrsTestHost host = null!;
12+
13+
[Fact]
14+
public async Task Result_from_CommandHandler_must_be_returned()
15+
{
16+
var tick = DateTime.UtcNow.Ticks;
17+
var commandResult = await host.Services.GetRequiredService<ICommandProcessorFactory>()
18+
.Create<MakeTimeTickCommand>()
19+
.ExecuteAsync(new MakeTimeTickCommand(tick), CancellationToken.None);
20+
21+
Assert.NotNull(commandResult);
22+
Assert.NotNull(commandResult.Response);
23+
Assert.Equal(tick, commandResult.Response);
24+
}
25+
26+
[Fact]
27+
public async Task CommandHandler_can_consume_existing_events()
28+
{
29+
// Produce some events by making time tick
30+
await MakeTimeTick(host);
31+
await MakeTimeTick(host);
32+
await MakeTimeTick(host);
33+
34+
// Query events
35+
var result = await host.Services.GetRequiredService<ICommandProcessorFactory>()
36+
.Create<QueryTimeTickCommand>()
37+
.ExecuteAsync(new QueryTimeTickCommand(), CancellationToken.None);
38+
39+
var events = Assert.IsType<List<(TimeTickedEvent Evt, EventMetadata Metadata)>>(result.Response);
40+
Assert.Equal(3, events.Count);
41+
42+
static async Task MakeTimeTick(CqrsTestHost host)
43+
{
44+
var tick = DateTime.UtcNow.Ticks;
45+
var commandProcessorFactory = host.Services.GetRequiredService<ICommandProcessorFactory>();
46+
_ = await commandProcessorFactory
47+
.Create<MakeTimeTickCommand>()
48+
.ExecuteAsync(new MakeTimeTickCommand(tick), CancellationToken.None);
49+
}
50+
}
51+
52+
[Fact]
53+
public async Task CommandHandler_that_consumes_events_works_when_no_existing_events_are_present()
54+
{
55+
// Query events - none exists
56+
var result = await host.Services.GetRequiredService<ICommandProcessorFactory>()
57+
.Create<QueryTimeTickCommand>()
58+
.ExecuteAsync(new QueryTimeTickCommand(), CancellationToken.None);
59+
60+
var events = Assert.IsType<List<(TimeTickedEvent Evt, EventMetadata Metadata)>>(result.Response);
61+
Assert.Empty(events);
62+
}
63+
64+
[Fact]
65+
public async Task Command_that_uses_RequiredVersion_Exists_must_result_in_NotFound_when_no_existing_events_are_present()
66+
{
67+
// Query events - must fail as non exists
68+
var result = await host.Services.GetRequiredService<ICommandProcessorFactory>()
69+
.Create<QueryExistingTimeTickCommand>()
70+
.ExecuteAsync(new QueryExistingTimeTickCommand(), CancellationToken.None);
71+
72+
Assert.Equal(ResultType.NotFound, result.Result);
73+
}
74+
75+
public async Task InitializeAsync()
76+
{
77+
host = new CqrsTestHost();
78+
await host.StartAsync();
79+
}
80+
81+
public async Task DisposeAsync()
82+
{
83+
await host.DisposeAsync();
84+
}
85+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
using Atc.Cosmos.EventStore.Streams;
2+
using Microsoft.AspNetCore.Builder;
3+
using Microsoft.AspNetCore.TestHost;
4+
using Microsoft.Extensions.DependencyInjection;
5+
using Microsoft.Extensions.DependencyInjection.Extensions;
6+
7+
namespace Atc.Cosmos.EventStore.Cqrs.Tests.Functional;
8+
9+
#nullable enable
10+
11+
public class CqrsTestHost : IAsyncDisposable
12+
{
13+
private readonly WebApplication host;
14+
15+
public CqrsTestHost()
16+
{
17+
host = CreateHostBuilder().Build();
18+
}
19+
20+
public IServiceProvider Services => host.Services;
21+
22+
public async Task StartAsync()
23+
{
24+
await host.StartAsync();
25+
}
26+
27+
public async Task StopAsync()
28+
{
29+
await host.StopAsync();
30+
}
31+
32+
public async ValueTask DisposeAsync()
33+
{
34+
await host.StopAsync();
35+
await host.DisposeAsync();
36+
}
37+
38+
private static WebApplicationBuilder CreateHostBuilder()
39+
{
40+
// Build a host with Atc EventStore
41+
var webApplicationBuilder = WebApplication.CreateEmptyBuilder(new WebApplicationOptions()
42+
{
43+
ApplicationName = "Atc.Cosmos.EventStore.Cqrs.Tests",
44+
});
45+
46+
webApplicationBuilder.WebHost.UseTestServer();
47+
48+
// Configure EventStore
49+
webApplicationBuilder.Services.AddEventStore(eventStoreBuilder =>
50+
{
51+
eventStoreBuilder.UseEvents(c => c.FromAssembly<CqrsTestHost>());
52+
eventStoreBuilder.UseCQRS(c =>
53+
{
54+
c.AddCommandsFromAssembly<CqrsTestHost>();
55+
c.AddProjectionJob<TimeProjection>("TimeProjection");
56+
});
57+
});
58+
59+
// Use InMemoryEventStoreClient which actually works
60+
webApplicationBuilder.Services.Replace(
61+
ServiceDescriptor.Singleton<IEventStoreClient, InMemoryEventStoreClient>());
62+
63+
// Remove unused registrations
64+
webApplicationBuilder.Services.RemoveAll<IStreamInfoReader>();
65+
webApplicationBuilder.Services.RemoveAll<IStreamMetadataReader>();
66+
webApplicationBuilder.Services.RemoveAll<IStreamReader>();
67+
webApplicationBuilder.Services.RemoveAll<IStreamWriter>();
68+
69+
webApplicationBuilder.Services.AddSingleton<FakeDatabase>();
70+
71+
return webApplicationBuilder;
72+
}
73+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
#nullable enable
2+
namespace Atc.Cosmos.EventStore.Cqrs.Tests.Functional;
3+
4+
internal class FakeDatabase
5+
{
6+
private readonly Dictionary<string, object> storage = new();
7+
8+
public void Save(string key, object value)
9+
{
10+
storage[key] = value;
11+
}
12+
13+
public object? Load(string key)
14+
{
15+
storage.TryGetValue(key, out var value);
16+
return value;
17+
}
18+
}

0 commit comments

Comments
 (0)