From 0497fe180b43e9894f3e456ce2fd7aec1289875b Mon Sep 17 00:00:00 2001 From: Andreas Gehrke Date: Mon, 7 Apr 2025 09:39:22 +0200 Subject: [PATCH 1/2] Bring back ProjectionFactory Also use IServiceScopeFactory instead of IServiceProvider in ProjectionProcessor to make it more clear that we are only creating scopes --- .../EventStoreOptionsBuilderExtensions.cs | 1 + .../ProjectionFactory.cs | 40 +++++++++++++++++++ .../Projections/ProjectionProcessor.cs | 17 ++++---- 3 files changed, 50 insertions(+), 8 deletions(-) create mode 100644 src/Atc.Cosmos.EventStore.Cqrs/ProjectionFactory.cs diff --git a/src/Atc.Cosmos.EventStore.Cqrs/DependencyInjection/EventStoreOptionsBuilderExtensions.cs b/src/Atc.Cosmos.EventStore.Cqrs/DependencyInjection/EventStoreOptionsBuilderExtensions.cs index ede23cd..a55fc06 100644 --- a/src/Atc.Cosmos.EventStore.Cqrs/DependencyInjection/EventStoreOptionsBuilderExtensions.cs +++ b/src/Atc.Cosmos.EventStore.Cqrs/DependencyInjection/EventStoreOptionsBuilderExtensions.cs @@ -28,6 +28,7 @@ public static EventStoreOptionsBuilder UseCQRS( builder.Services.AddSingleton(); builder.Services.AddSingleton(typeof(ProjectionMetadata<>), typeof(ProjectionMetadata<>)); + builder.Services.AddTransient(); builder.Services.TryAddSingleton(); diff --git a/src/Atc.Cosmos.EventStore.Cqrs/ProjectionFactory.cs b/src/Atc.Cosmos.EventStore.Cqrs/ProjectionFactory.cs new file mode 100644 index 0000000..6ddb384 --- /dev/null +++ b/src/Atc.Cosmos.EventStore.Cqrs/ProjectionFactory.cs @@ -0,0 +1,40 @@ +using Microsoft.Extensions.DependencyInjection; + +namespace Atc.Cosmos.EventStore.Cqrs; + +/// +/// Responsible for creating instances. +/// +public interface IProjectionFactory +{ + /// + /// Creates a projection of type for the event stream + /// identified by . + /// + /// ID of the stream being projected. + /// Cancellation. + /// Type of projection to create. + /// The created projection. + public Task CreateAsync(EventStreamId streamId, CancellationToken cancellationToken) + where TProjection : IProjection; +} + +/// +/// The default projection factory which just creates projections by +/// getting them from the DI-container. +/// +internal sealed class DefaultProjectionFactory : IProjectionFactory +{ + private readonly IServiceProvider serviceProvider; + + public DefaultProjectionFactory(IServiceProvider serviceProvider) + { + this.serviceProvider = serviceProvider; + } + + public Task CreateAsync(EventStreamId streamId, CancellationToken cancellationToken) + where TProjection : IProjection + { + return Task.FromResult(serviceProvider.GetRequiredService()); + } +} \ No newline at end of file diff --git a/src/Atc.Cosmos.EventStore.Cqrs/Projections/ProjectionProcessor.cs b/src/Atc.Cosmos.EventStore.Cqrs/Projections/ProjectionProcessor.cs index f36e500..cd47aea 100644 --- a/src/Atc.Cosmos.EventStore.Cqrs/Projections/ProjectionProcessor.cs +++ b/src/Atc.Cosmos.EventStore.Cqrs/Projections/ProjectionProcessor.cs @@ -10,18 +10,18 @@ internal class ProjectionProcessor : IProjectionProcessor filters; private readonly IProjectionTelemetry telemetry; private readonly ProjectionMetadata projectionMetadata; - private readonly IServiceProvider serviceProvider; + private readonly IServiceScopeFactory serviceScopeFactory; private readonly string projectionName; public ProjectionProcessor( IProjectionOptionsFactory optionsFactory, IProjectionTelemetry telemetry, ProjectionMetadata projectionMetadata, - IServiceProvider serviceProvider) + IServiceScopeFactory serviceScopeFactory) { this.telemetry = telemetry; this.projectionMetadata = projectionMetadata; - this.serviceProvider = serviceProvider; + this.serviceScopeFactory = serviceScopeFactory; filters = optionsFactory .GetOptions() .Filters; @@ -49,10 +49,6 @@ public async Task ProcessBatchAsync( foreach (var events in groupedEvents) { - await using var scope = serviceProvider.CreateAsyncScope(); - - var projection = scope.ServiceProvider.GetRequiredService(); - using var operation = batchTelemetry.StartProjection(events.Key); if (!projectionMetadata.CanConsumeOneOrMoreEvents(events)) @@ -62,11 +58,16 @@ public async Task ProcessBatchAsync( continue; } + var eventStreamId = EventStreamId.FromStreamId(events.Key); + await using var scope = serviceScopeFactory.CreateAsyncScope(); + var projectionFactory = scope.ServiceProvider.GetRequiredService(); + var projection = await projectionFactory.CreateAsync(eventStreamId, cancellationToken); + try { await projection .InitializeAsync( - events.Key, + eventStreamId, cancellationToken) .ConfigureAwait(false); From 43b3bd1a3df9ed320f728b7a8555046a2d14a0a2 Mon Sep 17 00:00:00 2001 From: Andreas Gehrke Date: Tue, 8 Apr 2025 10:15:41 +0200 Subject: [PATCH 2/2] Changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index fa5832a..91fbccf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 registered as transient rather than singleton. This allows `CommandHandler` implementations to use dependencies registered as scoped. +- Reintroduce `IProjectionFactory` in a slightly modified version to allow consumers to make additional "initialization" of projections. + ## [1.17.0] - 2025-03-21 ### Fixed