diff --git a/CHANGELOG.md b/CHANGELOG.md
index 491f80b..7020818 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -23,6 +23,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [1.17.0] - 2025-03-21
+### Changed
+
+- Changed `ICommandProcessorFactory`, `ICommandHandlerFactory` and `CommandProcessor<>` to be
+ registered as transient rather than singleton. This allows `CommandHandler` implementations to use
+ dependencies registered as scoped.
+
+- Reintroduce `IProjectionFactory` in a slightly modified version to allow consumers to make additional "initialization" of projections.
+
+## [1.17.0] - 2025-03-21
+
### Fixed
- Ensure that a projection can run in its own scope. This is important as we do not want state to leak from one projection to the other.
diff --git a/Directory.Build.props b/Directory.Build.props
index 15612a6..3989b38 100644
--- a/Directory.Build.props
+++ b/Directory.Build.props
@@ -42,7 +42,7 @@
-
+
\ No newline at end of file
diff --git a/src/Atc.Cosmos.EventStore.Cqrs/DependencyInjection/EventStoreOptionsBuilderExtensions.cs b/src/Atc.Cosmos.EventStore.Cqrs/DependencyInjection/EventStoreOptionsBuilderExtensions.cs
index ede23cd..a55fc06 100644
--- a/src/Atc.Cosmos.EventStore.Cqrs/DependencyInjection/EventStoreOptionsBuilderExtensions.cs
+++ b/src/Atc.Cosmos.EventStore.Cqrs/DependencyInjection/EventStoreOptionsBuilderExtensions.cs
@@ -28,6 +28,7 @@ public static EventStoreOptionsBuilder UseCQRS(
builder.Services.AddSingleton();
builder.Services.AddSingleton(typeof(ProjectionMetadata<>), typeof(ProjectionMetadata<>));
+ builder.Services.AddTransient();
builder.Services.TryAddSingleton();
diff --git a/src/Atc.Cosmos.EventStore.Cqrs/ProjectionFactory.cs b/src/Atc.Cosmos.EventStore.Cqrs/ProjectionFactory.cs
new file mode 100644
index 0000000..6ddb384
--- /dev/null
+++ b/src/Atc.Cosmos.EventStore.Cqrs/ProjectionFactory.cs
@@ -0,0 +1,40 @@
+using Microsoft.Extensions.DependencyInjection;
+
+namespace Atc.Cosmos.EventStore.Cqrs;
+
+///
+/// Responsible for creating instances.
+///
+public interface IProjectionFactory
+{
+ ///
+ /// Creates a projection of type for the event stream
+ /// identified by .
+ ///
+ /// ID of the stream being projected.
+ /// Cancellation.
+ /// Type of projection to create.
+ /// The created projection.
+ public Task CreateAsync(EventStreamId streamId, CancellationToken cancellationToken)
+ where TProjection : IProjection;
+}
+
+///
+/// The default projection factory which just creates projections by
+/// getting them from the DI-container.
+///
+internal sealed class DefaultProjectionFactory : IProjectionFactory
+{
+ private readonly IServiceProvider serviceProvider;
+
+ public DefaultProjectionFactory(IServiceProvider serviceProvider)
+ {
+ this.serviceProvider = serviceProvider;
+ }
+
+ public Task CreateAsync(EventStreamId streamId, CancellationToken cancellationToken)
+ where TProjection : IProjection
+ {
+ return Task.FromResult(serviceProvider.GetRequiredService());
+ }
+}
\ No newline at end of file
diff --git a/src/Atc.Cosmos.EventStore.Cqrs/Projections/ProjectionFilter.cs b/src/Atc.Cosmos.EventStore.Cqrs/Projections/ProjectionFilter.cs
index fd1e907..236db7c 100644
--- a/src/Atc.Cosmos.EventStore.Cqrs/Projections/ProjectionFilter.cs
+++ b/src/Atc.Cosmos.EventStore.Cqrs/Projections/ProjectionFilter.cs
@@ -15,11 +15,7 @@ public ProjectionFilter(string filter)
? CreateEvaluateAll()
: CreateEvaluation(p))
.ToArray();
- endsOnAcceptAll = filter
- .Split(
- new[] { EventStreamId.PartSeperator },
- StringSplitOptions.RemoveEmptyEntries)
- .Last() == "**";
+ endsOnAcceptAll = filter.EndsWith("**");
}
public bool Evaluate(StreamId streamId)
diff --git a/src/Atc.Cosmos.EventStore.Cqrs/Projections/ProjectionProcessor.cs b/src/Atc.Cosmos.EventStore.Cqrs/Projections/ProjectionProcessor.cs
index f36e500..cd47aea 100644
--- a/src/Atc.Cosmos.EventStore.Cqrs/Projections/ProjectionProcessor.cs
+++ b/src/Atc.Cosmos.EventStore.Cqrs/Projections/ProjectionProcessor.cs
@@ -10,18 +10,18 @@ internal class ProjectionProcessor : IProjectionProcessor filters;
private readonly IProjectionTelemetry telemetry;
private readonly ProjectionMetadata projectionMetadata;
- private readonly IServiceProvider serviceProvider;
+ private readonly IServiceScopeFactory serviceScopeFactory;
private readonly string projectionName;
public ProjectionProcessor(
IProjectionOptionsFactory optionsFactory,
IProjectionTelemetry telemetry,
ProjectionMetadata projectionMetadata,
- IServiceProvider serviceProvider)
+ IServiceScopeFactory serviceScopeFactory)
{
this.telemetry = telemetry;
this.projectionMetadata = projectionMetadata;
- this.serviceProvider = serviceProvider;
+ this.serviceScopeFactory = serviceScopeFactory;
filters = optionsFactory
.GetOptions()
.Filters;
@@ -49,10 +49,6 @@ public async Task ProcessBatchAsync(
foreach (var events in groupedEvents)
{
- await using var scope = serviceProvider.CreateAsyncScope();
-
- var projection = scope.ServiceProvider.GetRequiredService();
-
using var operation = batchTelemetry.StartProjection(events.Key);
if (!projectionMetadata.CanConsumeOneOrMoreEvents(events))
@@ -62,11 +58,16 @@ public async Task ProcessBatchAsync(
continue;
}
+ var eventStreamId = EventStreamId.FromStreamId(events.Key);
+ await using var scope = serviceScopeFactory.CreateAsyncScope();
+ var projectionFactory = scope.ServiceProvider.GetRequiredService();
+ var projection = await projectionFactory.CreateAsync(eventStreamId, cancellationToken);
+
try
{
await projection
.InitializeAsync(
- events.Key,
+ eventStreamId,
cancellationToken)
.ConfigureAwait(false);
diff --git a/src/Atc.Cosmos.EventStore/InMemory/InMemoryStore.cs b/src/Atc.Cosmos.EventStore/InMemory/InMemoryStore.cs
index 4f6376a..d32414a 100644
--- a/src/Atc.Cosmos.EventStore/InMemory/InMemoryStore.cs
+++ b/src/Atc.Cosmos.EventStore/InMemory/InMemoryStore.cs
@@ -84,8 +84,9 @@ public Task WriteAsync(
.GetOrAdd(streamId, new ConcurrentDictionary())
.AddOrUpdate(
name,
- key => new CheckpointDocument(name, streamId, streamVersion, dateTimeProvider.GetDateTime(), state),
- (key, doc) => new CheckpointDocument(name, streamId, streamVersion, dateTimeProvider.GetDateTime(), state));
+ static (key, arg) => new CheckpointDocument(key, arg.streamId, arg.streamVersion, arg.currentTime, arg.state),
+ static (key, doc, arg) => new CheckpointDocument(key, arg.streamId, arg.streamVersion, arg.currentTime, arg),
+ (streamId, streamVersion, state, currentTime: dateTimeProvider.GetDateTime()));
return Task.CompletedTask;
}
diff --git a/test/Atc.Cosmos.EventStore.Cqrs.Tests/Atc.Cosmos.EventStore.Cqrs.Tests.csproj b/test/Atc.Cosmos.EventStore.Cqrs.Tests/Atc.Cosmos.EventStore.Cqrs.Tests.csproj
index e77b84c..2dfa7c6 100644
--- a/test/Atc.Cosmos.EventStore.Cqrs.Tests/Atc.Cosmos.EventStore.Cqrs.Tests.csproj
+++ b/test/Atc.Cosmos.EventStore.Cqrs.Tests/Atc.Cosmos.EventStore.Cqrs.Tests.csproj
@@ -3,11 +3,13 @@
net9.0
false
+
+
@@ -18,6 +20,9 @@
all
runtime; build; native; contentfiles; analyzers; buildtransitive
+
+
+
diff --git a/test/Atc.Cosmos.EventStore.Cqrs.Tests/Functional/CommandHandlerTests.cs b/test/Atc.Cosmos.EventStore.Cqrs.Tests/Functional/CommandHandlerTests.cs
new file mode 100644
index 0000000..358ddbb
--- /dev/null
+++ b/test/Atc.Cosmos.EventStore.Cqrs.Tests/Functional/CommandHandlerTests.cs
@@ -0,0 +1,85 @@
+using Microsoft.Extensions.DependencyInjection;
+using Xunit;
+
+namespace Atc.Cosmos.EventStore.Cqrs.Tests.Functional;
+
+#nullable enable
+
+[Trait("Category", "Functional")]
+public class CommandHandlerTests : IAsyncLifetime
+{
+ private CqrsTestHost host = null!;
+
+ [Fact]
+ public async Task Result_from_CommandHandler_must_be_returned()
+ {
+ var tick = DateTime.UtcNow.Ticks;
+ var commandResult = await host.Services.GetRequiredService()
+ .Create()
+ .ExecuteAsync(new MakeTimeTickCommand(tick), CancellationToken.None);
+
+ Assert.NotNull(commandResult);
+ Assert.NotNull(commandResult.Response);
+ Assert.Equal(tick, commandResult.Response);
+ }
+
+ [Fact]
+ public async Task CommandHandler_can_consume_existing_events()
+ {
+ // Produce some events by making time tick
+ await MakeTimeTick(host);
+ await MakeTimeTick(host);
+ await MakeTimeTick(host);
+
+ // Query events
+ var result = await host.Services.GetRequiredService()
+ .Create()
+ .ExecuteAsync(new QueryTimeTickCommand(), CancellationToken.None);
+
+ var events = Assert.IsType>(result.Response);
+ Assert.Equal(3, events.Count);
+
+ static async Task MakeTimeTick(CqrsTestHost host)
+ {
+ var tick = DateTime.UtcNow.Ticks;
+ var commandProcessorFactory = host.Services.GetRequiredService();
+ _ = await commandProcessorFactory
+ .Create()
+ .ExecuteAsync(new MakeTimeTickCommand(tick), CancellationToken.None);
+ }
+ }
+
+ [Fact]
+ public async Task CommandHandler_that_consumes_events_works_when_no_existing_events_are_present()
+ {
+ // Query events - none exists
+ var result = await host.Services.GetRequiredService()
+ .Create()
+ .ExecuteAsync(new QueryTimeTickCommand(), CancellationToken.None);
+
+ var events = Assert.IsType>(result.Response);
+ Assert.Empty(events);
+ }
+
+ [Fact]
+ public async Task Command_that_uses_RequiredVersion_Exists_must_result_in_NotFound_when_no_existing_events_are_present()
+ {
+ // Query events - must fail as non exists
+ var result = await host.Services.GetRequiredService()
+ .Create()
+ .ExecuteAsync(new QueryExistingTimeTickCommand(), CancellationToken.None);
+
+ Assert.Equal(ResultType.NotFound, result.Result);
+ }
+
+ public async Task InitializeAsync()
+ {
+ host = new CqrsTestHost();
+ await host.StartAsync();
+ }
+
+ public async Task DisposeAsync()
+ {
+ await host.DisposeAsync();
+ }
+}
\ No newline at end of file
diff --git a/test/Atc.Cosmos.EventStore.Cqrs.Tests/Functional/CqrsTestHost.cs b/test/Atc.Cosmos.EventStore.Cqrs.Tests/Functional/CqrsTestHost.cs
new file mode 100644
index 0000000..c3b9131
--- /dev/null
+++ b/test/Atc.Cosmos.EventStore.Cqrs.Tests/Functional/CqrsTestHost.cs
@@ -0,0 +1,73 @@
+using Atc.Cosmos.EventStore.Streams;
+using Microsoft.AspNetCore.Builder;
+using Microsoft.AspNetCore.TestHost;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.DependencyInjection.Extensions;
+
+namespace Atc.Cosmos.EventStore.Cqrs.Tests.Functional;
+
+#nullable enable
+
+public class CqrsTestHost : IAsyncDisposable
+{
+ private readonly WebApplication host;
+
+ public CqrsTestHost()
+ {
+ host = CreateHostBuilder().Build();
+ }
+
+ public IServiceProvider Services => host.Services;
+
+ public async Task StartAsync()
+ {
+ await host.StartAsync();
+ }
+
+ public async Task StopAsync()
+ {
+ await host.StopAsync();
+ }
+
+ public async ValueTask DisposeAsync()
+ {
+ await host.StopAsync();
+ await host.DisposeAsync();
+ }
+
+ private static WebApplicationBuilder CreateHostBuilder()
+ {
+ // Build a host with Atc EventStore
+ var webApplicationBuilder = WebApplication.CreateEmptyBuilder(new WebApplicationOptions()
+ {
+ ApplicationName = "Atc.Cosmos.EventStore.Cqrs.Tests",
+ });
+
+ webApplicationBuilder.WebHost.UseTestServer();
+
+ // Configure EventStore
+ webApplicationBuilder.Services.AddEventStore(eventStoreBuilder =>
+ {
+ eventStoreBuilder.UseEvents(c => c.FromAssembly());
+ eventStoreBuilder.UseCQRS(c =>
+ {
+ c.AddCommandsFromAssembly();
+ c.AddProjectionJob("TimeProjection");
+ });
+ });
+
+ // Use InMemoryEventStoreClient which actually works
+ webApplicationBuilder.Services.Replace(
+ ServiceDescriptor.Singleton());
+
+ // Remove unused registrations
+ webApplicationBuilder.Services.RemoveAll();
+ webApplicationBuilder.Services.RemoveAll();
+ webApplicationBuilder.Services.RemoveAll();
+ webApplicationBuilder.Services.RemoveAll();
+
+ webApplicationBuilder.Services.AddSingleton();
+
+ return webApplicationBuilder;
+ }
+}
\ No newline at end of file
diff --git a/test/Atc.Cosmos.EventStore.Cqrs.Tests/Functional/FakeDatabase.cs b/test/Atc.Cosmos.EventStore.Cqrs.Tests/Functional/FakeDatabase.cs
new file mode 100644
index 0000000..c4c5de3
--- /dev/null
+++ b/test/Atc.Cosmos.EventStore.Cqrs.Tests/Functional/FakeDatabase.cs
@@ -0,0 +1,18 @@
+#nullable enable
+namespace Atc.Cosmos.EventStore.Cqrs.Tests.Functional;
+
+internal class FakeDatabase
+{
+ private readonly Dictionary storage = new();
+
+ public void Save(string key, object value)
+ {
+ storage[key] = value;
+ }
+
+ public object? Load(string key)
+ {
+ storage.TryGetValue(key, out var value);
+ return value;
+ }
+}
\ No newline at end of file
diff --git a/test/Atc.Cosmos.EventStore.Cqrs.Tests/Functional/InMemoryEventStore.cs b/test/Atc.Cosmos.EventStore.Cqrs.Tests/Functional/InMemoryEventStore.cs
new file mode 100644
index 0000000..010b3ef
--- /dev/null
+++ b/test/Atc.Cosmos.EventStore.Cqrs.Tests/Functional/InMemoryEventStore.cs
@@ -0,0 +1,251 @@
+using System.Runtime.CompilerServices;
+using Atc.Cosmos.EventStore.Cosmos;
+using Atc.Cosmos.EventStore.Streams;
+using Microsoft.Azure.Cosmos;
+using StreamState = Atc.Cosmos.EventStore.StreamState;
+
+namespace Atc.Cosmos.EventStore.Cqrs.Tests;
+
+#nullable enable
+
+///
+/// A working in-memory implementation of Atc.Cosmos.EventStore.IEventStoreClient.
+///
+public sealed class InMemoryEventStoreClient(IStreamReadValidator readValidator) : IEventStoreClient
+{
+ private readonly Dictionary streams = new();
+ private readonly List subscriptions = new();
+
+ ///
+ public async Task WriteToStreamAsync(
+ StreamId streamId,
+ IReadOnlyCollection