Skip to content

Commit 8f77780

Browse files
Andreas Gehrkechristianhelle
Andreas Gehrke
authored andcommitted
Bring back ProjectionFactory
Also use IServiceScopeFactory instead of IServiceProvider in ProjectionProcessor to make it more clear that we are only creating scopes
1 parent b9646af commit 8f77780

File tree

3 files changed

+50
-8
lines changed

3 files changed

+50
-8
lines changed

src/Atc.Cosmos.EventStore.Cqrs/DependencyInjection/EventStoreOptionsBuilderExtensions.cs

+1
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ public static EventStoreOptionsBuilder UseCQRS(
2828
builder.Services.AddSingleton<IProjectionOptionsFactory, ProjectionOptionsFactory>();
2929

3030
builder.Services.AddSingleton(typeof(ProjectionMetadata<>), typeof(ProjectionMetadata<>));
31+
builder.Services.AddTransient<IProjectionFactory, DefaultProjectionFactory>();
3132

3233
builder.Services.TryAddSingleton<IProjectionTelemetry, ProjectionTelemetry>();
3334

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
using Microsoft.Extensions.DependencyInjection;
2+
3+
namespace Atc.Cosmos.EventStore.Cqrs;
4+
5+
/// <summary>
6+
/// Responsible for creating <see cref="IProjection"/> instances.
7+
/// </summary>
8+
public interface IProjectionFactory
9+
{
10+
/// <summary>
11+
/// Creates a projection of type <typeparamref name="TProjection"/> for the event stream
12+
/// identified by <paramref name="streamId"/>.
13+
/// </summary>
14+
/// <param name="streamId">ID of the stream being projected.</param>
15+
/// <param name="cancellationToken">Cancellation.</param>
16+
/// <typeparam name="TProjection">Type of projection to create.</typeparam>
17+
/// <returns>The created projection.</returns>
18+
public Task<IProjection> CreateAsync<TProjection>(EventStreamId streamId, CancellationToken cancellationToken)
19+
where TProjection : IProjection;
20+
}
21+
22+
/// <summary>
23+
/// The default projection factory which just creates projections by
24+
/// getting them from the DI-container.
25+
/// </summary>
26+
internal sealed class DefaultProjectionFactory : IProjectionFactory
27+
{
28+
private readonly IServiceProvider serviceProvider;
29+
30+
public DefaultProjectionFactory(IServiceProvider serviceProvider)
31+
{
32+
this.serviceProvider = serviceProvider;
33+
}
34+
35+
public Task<IProjection> CreateAsync<TProjection>(EventStreamId streamId, CancellationToken cancellationToken)
36+
where TProjection : IProjection
37+
{
38+
return Task.FromResult<IProjection>(serviceProvider.GetRequiredService<TProjection>());
39+
}
40+
}

src/Atc.Cosmos.EventStore.Cqrs/Projections/ProjectionProcessor.cs

+9-8
Original file line numberDiff line numberDiff line change
@@ -10,18 +10,18 @@ internal class ProjectionProcessor<TProjection> : IProjectionProcessor<TProjecti
1010
private readonly IReadOnlyCollection<ProjectionFilter> filters;
1111
private readonly IProjectionTelemetry telemetry;
1212
private readonly ProjectionMetadata<TProjection> projectionMetadata;
13-
private readonly IServiceProvider serviceProvider;
13+
private readonly IServiceScopeFactory serviceScopeFactory;
1414
private readonly string projectionName;
1515

1616
public ProjectionProcessor(
1717
IProjectionOptionsFactory optionsFactory,
1818
IProjectionTelemetry telemetry,
1919
ProjectionMetadata<TProjection> projectionMetadata,
20-
IServiceProvider serviceProvider)
20+
IServiceScopeFactory serviceScopeFactory)
2121
{
2222
this.telemetry = telemetry;
2323
this.projectionMetadata = projectionMetadata;
24-
this.serviceProvider = serviceProvider;
24+
this.serviceScopeFactory = serviceScopeFactory;
2525
filters = optionsFactory
2626
.GetOptions<TProjection>()
2727
.Filters;
@@ -49,10 +49,6 @@ public async Task<ProjectionAction> ProcessBatchAsync(
4949

5050
foreach (var events in groupedEvents)
5151
{
52-
await using var scope = serviceProvider.CreateAsyncScope();
53-
54-
var projection = scope.ServiceProvider.GetRequiredService<TProjection>();
55-
5652
using var operation = batchTelemetry.StartProjection(events.Key);
5753

5854
if (!projectionMetadata.CanConsumeOneOrMoreEvents(events))
@@ -62,11 +58,16 @@ public async Task<ProjectionAction> ProcessBatchAsync(
6258
continue;
6359
}
6460

61+
var eventStreamId = EventStreamId.FromStreamId(events.Key);
62+
await using var scope = serviceScopeFactory.CreateAsyncScope();
63+
var projectionFactory = scope.ServiceProvider.GetRequiredService<IProjectionFactory>();
64+
var projection = await projectionFactory.CreateAsync<TProjection>(eventStreamId, cancellationToken);
65+
6566
try
6667
{
6768
await projection
6869
.InitializeAsync(
69-
events.Key,
70+
eventStreamId,
7071
cancellationToken)
7172
.ConfigureAwait(false);
7273

0 commit comments

Comments
 (0)