From 3985b369eea176562aff82420b0ce5ce2a457ebc Mon Sep 17 00:00:00 2001 From: ATCBot <83698916+ATCBot@users.noreply.github.com> Date: Fri, 21 Mar 2025 23:38:41 +0100 Subject: [PATCH 01/13] Update main with changes in stable after v1.17.0 release (#63) * Release of new minor version v1.16 (#61) * Set version to '1.16-preview' * Make sure that the projections are processed in a scoped application lifecycle * clean up scope * update changelog * The projection is not optional * Set version to '1.16' --------- Co-authored-by: Niels Lunn * Release of new minor version v1.17 (#62) * Set version to '1.16-preview' * Make sure that the projections are processed in a scoped application lifecycle * clean up scope * update changelog * The projection is not optional * Set version to '1.16' * Set version to '1.17-preview' * ci: fix dotnet pack unable to find build output * Set version to '1.17' --------- Co-authored-by: Niels Lunn Co-authored-by: Christian Helle * Updated CHANGELOG.md for 1.17.0 release --------- Co-authored-by: Niels Lunn Co-authored-by: Christian Helle --- CHANGELOG.md | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2375561..e23177b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,7 +5,7 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## [Unreleased] +## [1.17.0] - 2025-03-21 ### Fixed @@ -180,7 +180,9 @@ services.AddEventStore(builder => - EventStore `ConnectionString` option has been made obsolete, please use `UseCredentials` or `UseCosmosEmulator` instead. -[Unreleased]: https://github.com/atc-net/atc-cosmos-eventstore/compare/v1.15.4...HEAD +[Unreleased]: https://github.com/atc-net/atc-cosmos-eventstore/compare/v1.17.0...HEAD + +[1.17.0]: https://github.com/atc-net/atc-cosmos-eventstore/compare/v1.15.4...v1.17.0 [1.15.4]: https://github.com/atc-net/atc-cosmos-eventstore/compare/v1.14.11...v1.15.4 From 6b91434ba981f6dbe818c9cff59439d13b191960 Mon Sep 17 00:00:00 2001 From: Andreas Gehrke Date: Fri, 4 Apr 2025 11:04:20 +0200 Subject: [PATCH 02/13] Allow CommandHandlers to have scoped dependencies (#64) * Register ICommandProcessorFactory, ICommandHandlerFactory and CommandProcessor as transient to allow to CommandHandler to use scoped dependencies. * Update changelog --------- Co-authored-by: Andreas Gehrke --- CHANGELOG.md | 8 ++++++++ .../EventStoreOptionsBuilderExtensions.cs | 6 +++--- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e23177b..fa5832a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,14 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [Unreleased] + +### Changed + +- Changed `ICommandProcessorFactory`, `ICommandHandlerFactory` and `CommandProcessor<>` to be + registered as transient rather than singleton. This allows `CommandHandler` implementations to use + dependencies registered as scoped. + ## [1.17.0] - 2025-03-21 ### Fixed diff --git a/src/Atc.Cosmos.EventStore.Cqrs/DependencyInjection/EventStoreOptionsBuilderExtensions.cs b/src/Atc.Cosmos.EventStore.Cqrs/DependencyInjection/EventStoreOptionsBuilderExtensions.cs index 2e8490f..ede23cd 100644 --- a/src/Atc.Cosmos.EventStore.Cqrs/DependencyInjection/EventStoreOptionsBuilderExtensions.cs +++ b/src/Atc.Cosmos.EventStore.Cqrs/DependencyInjection/EventStoreOptionsBuilderExtensions.cs @@ -20,9 +20,9 @@ public static EventStoreOptionsBuilder UseCQRS( builder.Services.AddSingleton(typeof(IStateProjector<>), typeof(StateProjector<>)); builder.Services.AddSingleton(typeof(IStateWriter<>), typeof(StateWriter<>)); - builder.Services.AddSingleton(typeof(ICommandProcessor<>), typeof(CommandProcessor<>)); - builder.Services.AddSingleton(); - builder.Services.AddSingleton(); + builder.Services.AddTransient(typeof(ICommandProcessor<>), typeof(CommandProcessor<>)); + builder.Services.AddTransient(); + builder.Services.AddTransient(); builder.Services.AddSingleton(); builder.Services.AddSingleton(); From 8b7d651bddb02a9a7379b7266bff0b9e2274e2e3 Mon Sep 17 00:00:00 2001 From: ATCBot Date: Fri, 4 Apr 2025 09:08:51 +0000 Subject: [PATCH 03/13] Set version to '1.18' --- version.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/version.json b/version.json index c15a008..78d3670 100644 --- a/version.json +++ b/version.json @@ -1,6 +1,6 @@ { "$schema": "https://raw.githubusercontent.com/dotnet/Nerdbank.GitVersioning/main/src/NerdBank.GitVersioning/version.schema.json", - "version": "1.18-preview", + "version": "1.18", "assemblyVersion": { "precision": "revision" }, From 29e26610ee23012cfa4fea3ed366bf7e1a2cc362 Mon Sep 17 00:00:00 2001 From: ATCBot Date: Fri, 4 Apr 2025 09:08:51 +0000 Subject: [PATCH 04/13] Set version to '1.19-preview' --- version.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/version.json b/version.json index c15a008..b811a0a 100644 --- a/version.json +++ b/version.json @@ -1,6 +1,6 @@ { "$schema": "https://raw.githubusercontent.com/dotnet/Nerdbank.GitVersioning/main/src/NerdBank.GitVersioning/version.schema.json", - "version": "1.18-preview", + "version": "1.19-preview", "assemblyVersion": { "precision": "revision" }, From a6ae2ac6ba80c38af945b4888ce27236df269657 Mon Sep 17 00:00:00 2001 From: ATCBot Date: Fri, 4 Apr 2025 10:06:37 +0000 Subject: [PATCH 05/13] Set version to '1.19' --- version.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/version.json b/version.json index b811a0a..7be4704 100644 --- a/version.json +++ b/version.json @@ -1,6 +1,6 @@ { "$schema": "https://raw.githubusercontent.com/dotnet/Nerdbank.GitVersioning/main/src/NerdBank.GitVersioning/version.schema.json", - "version": "1.19-preview", + "version": "1.19", "assemblyVersion": { "precision": "revision" }, From ea48e56be4c3a7c8c3f68abadce88dbeabab5a43 Mon Sep 17 00:00:00 2001 From: ATCBot Date: Fri, 4 Apr 2025 10:06:37 +0000 Subject: [PATCH 06/13] Set version to '1.20-preview' --- version.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/version.json b/version.json index b811a0a..dbc49f9 100644 --- a/version.json +++ b/version.json @@ -1,6 +1,6 @@ { "$schema": "https://raw.githubusercontent.com/dotnet/Nerdbank.GitVersioning/main/src/NerdBank.GitVersioning/version.schema.json", - "version": "1.19-preview", + "version": "1.20-preview", "assemblyVersion": { "precision": "revision" }, From 8f77780076893fe8896604ebfa486d47e10a4841 Mon Sep 17 00:00:00 2001 From: Andreas Gehrke Date: Mon, 7 Apr 2025 09:39:22 +0200 Subject: [PATCH 07/13] Bring back ProjectionFactory Also use IServiceScopeFactory instead of IServiceProvider in ProjectionProcessor to make it more clear that we are only creating scopes --- .../EventStoreOptionsBuilderExtensions.cs | 1 + .../ProjectionFactory.cs | 40 +++++++++++++++++++ .../Projections/ProjectionProcessor.cs | 17 ++++---- 3 files changed, 50 insertions(+), 8 deletions(-) create mode 100644 src/Atc.Cosmos.EventStore.Cqrs/ProjectionFactory.cs diff --git a/src/Atc.Cosmos.EventStore.Cqrs/DependencyInjection/EventStoreOptionsBuilderExtensions.cs b/src/Atc.Cosmos.EventStore.Cqrs/DependencyInjection/EventStoreOptionsBuilderExtensions.cs index ede23cd..a55fc06 100644 --- a/src/Atc.Cosmos.EventStore.Cqrs/DependencyInjection/EventStoreOptionsBuilderExtensions.cs +++ b/src/Atc.Cosmos.EventStore.Cqrs/DependencyInjection/EventStoreOptionsBuilderExtensions.cs @@ -28,6 +28,7 @@ public static EventStoreOptionsBuilder UseCQRS( builder.Services.AddSingleton(); builder.Services.AddSingleton(typeof(ProjectionMetadata<>), typeof(ProjectionMetadata<>)); + builder.Services.AddTransient(); builder.Services.TryAddSingleton(); diff --git a/src/Atc.Cosmos.EventStore.Cqrs/ProjectionFactory.cs b/src/Atc.Cosmos.EventStore.Cqrs/ProjectionFactory.cs new file mode 100644 index 0000000..6ddb384 --- /dev/null +++ b/src/Atc.Cosmos.EventStore.Cqrs/ProjectionFactory.cs @@ -0,0 +1,40 @@ +using Microsoft.Extensions.DependencyInjection; + +namespace Atc.Cosmos.EventStore.Cqrs; + +/// +/// Responsible for creating instances. +/// +public interface IProjectionFactory +{ + /// + /// Creates a projection of type for the event stream + /// identified by . + /// + /// ID of the stream being projected. + /// Cancellation. + /// Type of projection to create. + /// The created projection. + public Task CreateAsync(EventStreamId streamId, CancellationToken cancellationToken) + where TProjection : IProjection; +} + +/// +/// The default projection factory which just creates projections by +/// getting them from the DI-container. +/// +internal sealed class DefaultProjectionFactory : IProjectionFactory +{ + private readonly IServiceProvider serviceProvider; + + public DefaultProjectionFactory(IServiceProvider serviceProvider) + { + this.serviceProvider = serviceProvider; + } + + public Task CreateAsync(EventStreamId streamId, CancellationToken cancellationToken) + where TProjection : IProjection + { + return Task.FromResult(serviceProvider.GetRequiredService()); + } +} \ No newline at end of file diff --git a/src/Atc.Cosmos.EventStore.Cqrs/Projections/ProjectionProcessor.cs b/src/Atc.Cosmos.EventStore.Cqrs/Projections/ProjectionProcessor.cs index f36e500..cd47aea 100644 --- a/src/Atc.Cosmos.EventStore.Cqrs/Projections/ProjectionProcessor.cs +++ b/src/Atc.Cosmos.EventStore.Cqrs/Projections/ProjectionProcessor.cs @@ -10,18 +10,18 @@ internal class ProjectionProcessor : IProjectionProcessor filters; private readonly IProjectionTelemetry telemetry; private readonly ProjectionMetadata projectionMetadata; - private readonly IServiceProvider serviceProvider; + private readonly IServiceScopeFactory serviceScopeFactory; private readonly string projectionName; public ProjectionProcessor( IProjectionOptionsFactory optionsFactory, IProjectionTelemetry telemetry, ProjectionMetadata projectionMetadata, - IServiceProvider serviceProvider) + IServiceScopeFactory serviceScopeFactory) { this.telemetry = telemetry; this.projectionMetadata = projectionMetadata; - this.serviceProvider = serviceProvider; + this.serviceScopeFactory = serviceScopeFactory; filters = optionsFactory .GetOptions() .Filters; @@ -49,10 +49,6 @@ public async Task ProcessBatchAsync( foreach (var events in groupedEvents) { - await using var scope = serviceProvider.CreateAsyncScope(); - - var projection = scope.ServiceProvider.GetRequiredService(); - using var operation = batchTelemetry.StartProjection(events.Key); if (!projectionMetadata.CanConsumeOneOrMoreEvents(events)) @@ -62,11 +58,16 @@ public async Task ProcessBatchAsync( continue; } + var eventStreamId = EventStreamId.FromStreamId(events.Key); + await using var scope = serviceScopeFactory.CreateAsyncScope(); + var projectionFactory = scope.ServiceProvider.GetRequiredService(); + var projection = await projectionFactory.CreateAsync(eventStreamId, cancellationToken); + try { await projection .InitializeAsync( - events.Key, + eventStreamId, cancellationToken) .ConfigureAwait(false); From d6d4def78e828e558a6c090cbdb1696c83736dd3 Mon Sep 17 00:00:00 2001 From: Andreas Gehrke Date: Tue, 8 Apr 2025 10:15:41 +0200 Subject: [PATCH 08/13] Changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index fa5832a..91fbccf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 registered as transient rather than singleton. This allows `CommandHandler` implementations to use dependencies registered as scoped. +- Reintroduce `IProjectionFactory` in a slightly modified version to allow consumers to make additional "initialization" of projections. + ## [1.17.0] - 2025-03-21 ### Fixed From 18678f773b866f5b827c05b4b6352f3fb16a6bbf Mon Sep 17 00:00:00 2001 From: Andreas Gehrke Date: Thu, 3 Apr 2025 10:11:35 +0200 Subject: [PATCH 09/13] Functional tests of Command execution and projections --- .../Atc.Cosmos.EventStore.Cqrs.Tests.csproj | 5 + .../Functional/CommandHandlerTests.cs | 85 ++++++ .../Functional/CqrsTestHost.cs | 73 +++++ .../Functional/FakeDatabase.cs | 18 ++ .../Functional/InMemoryEventStore.cs | 251 ++++++++++++++++++ .../Functional/ProjectionTests.cs | 35 +++ .../Functional/Time.cs | 91 +++++++ test/Directory.Build.props | 3 +- 8 files changed, 560 insertions(+), 1 deletion(-) create mode 100644 test/Atc.Cosmos.EventStore.Cqrs.Tests/Functional/CommandHandlerTests.cs create mode 100644 test/Atc.Cosmos.EventStore.Cqrs.Tests/Functional/CqrsTestHost.cs create mode 100644 test/Atc.Cosmos.EventStore.Cqrs.Tests/Functional/FakeDatabase.cs create mode 100644 test/Atc.Cosmos.EventStore.Cqrs.Tests/Functional/InMemoryEventStore.cs create mode 100644 test/Atc.Cosmos.EventStore.Cqrs.Tests/Functional/ProjectionTests.cs create mode 100644 test/Atc.Cosmos.EventStore.Cqrs.Tests/Functional/Time.cs diff --git a/test/Atc.Cosmos.EventStore.Cqrs.Tests/Atc.Cosmos.EventStore.Cqrs.Tests.csproj b/test/Atc.Cosmos.EventStore.Cqrs.Tests/Atc.Cosmos.EventStore.Cqrs.Tests.csproj index e77b84c..2dfa7c6 100644 --- a/test/Atc.Cosmos.EventStore.Cqrs.Tests/Atc.Cosmos.EventStore.Cqrs.Tests.csproj +++ b/test/Atc.Cosmos.EventStore.Cqrs.Tests/Atc.Cosmos.EventStore.Cqrs.Tests.csproj @@ -3,11 +3,13 @@ net9.0 false + + @@ -18,6 +20,9 @@ all runtime; build; native; contentfiles; analyzers; buildtransitive + + + diff --git a/test/Atc.Cosmos.EventStore.Cqrs.Tests/Functional/CommandHandlerTests.cs b/test/Atc.Cosmos.EventStore.Cqrs.Tests/Functional/CommandHandlerTests.cs new file mode 100644 index 0000000..358ddbb --- /dev/null +++ b/test/Atc.Cosmos.EventStore.Cqrs.Tests/Functional/CommandHandlerTests.cs @@ -0,0 +1,85 @@ +using Microsoft.Extensions.DependencyInjection; +using Xunit; + +namespace Atc.Cosmos.EventStore.Cqrs.Tests.Functional; + +#nullable enable + +[Trait("Category", "Functional")] +public class CommandHandlerTests : IAsyncLifetime +{ + private CqrsTestHost host = null!; + + [Fact] + public async Task Result_from_CommandHandler_must_be_returned() + { + var tick = DateTime.UtcNow.Ticks; + var commandResult = await host.Services.GetRequiredService() + .Create() + .ExecuteAsync(new MakeTimeTickCommand(tick), CancellationToken.None); + + Assert.NotNull(commandResult); + Assert.NotNull(commandResult.Response); + Assert.Equal(tick, commandResult.Response); + } + + [Fact] + public async Task CommandHandler_can_consume_existing_events() + { + // Produce some events by making time tick + await MakeTimeTick(host); + await MakeTimeTick(host); + await MakeTimeTick(host); + + // Query events + var result = await host.Services.GetRequiredService() + .Create() + .ExecuteAsync(new QueryTimeTickCommand(), CancellationToken.None); + + var events = Assert.IsType>(result.Response); + Assert.Equal(3, events.Count); + + static async Task MakeTimeTick(CqrsTestHost host) + { + var tick = DateTime.UtcNow.Ticks; + var commandProcessorFactory = host.Services.GetRequiredService(); + _ = await commandProcessorFactory + .Create() + .ExecuteAsync(new MakeTimeTickCommand(tick), CancellationToken.None); + } + } + + [Fact] + public async Task CommandHandler_that_consumes_events_works_when_no_existing_events_are_present() + { + // Query events - none exists + var result = await host.Services.GetRequiredService() + .Create() + .ExecuteAsync(new QueryTimeTickCommand(), CancellationToken.None); + + var events = Assert.IsType>(result.Response); + Assert.Empty(events); + } + + [Fact] + public async Task Command_that_uses_RequiredVersion_Exists_must_result_in_NotFound_when_no_existing_events_are_present() + { + // Query events - must fail as non exists + var result = await host.Services.GetRequiredService() + .Create() + .ExecuteAsync(new QueryExistingTimeTickCommand(), CancellationToken.None); + + Assert.Equal(ResultType.NotFound, result.Result); + } + + public async Task InitializeAsync() + { + host = new CqrsTestHost(); + await host.StartAsync(); + } + + public async Task DisposeAsync() + { + await host.DisposeAsync(); + } +} \ No newline at end of file diff --git a/test/Atc.Cosmos.EventStore.Cqrs.Tests/Functional/CqrsTestHost.cs b/test/Atc.Cosmos.EventStore.Cqrs.Tests/Functional/CqrsTestHost.cs new file mode 100644 index 0000000..c3b9131 --- /dev/null +++ b/test/Atc.Cosmos.EventStore.Cqrs.Tests/Functional/CqrsTestHost.cs @@ -0,0 +1,73 @@ +using Atc.Cosmos.EventStore.Streams; +using Microsoft.AspNetCore.Builder; +using Microsoft.AspNetCore.TestHost; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; + +namespace Atc.Cosmos.EventStore.Cqrs.Tests.Functional; + +#nullable enable + +public class CqrsTestHost : IAsyncDisposable +{ + private readonly WebApplication host; + + public CqrsTestHost() + { + host = CreateHostBuilder().Build(); + } + + public IServiceProvider Services => host.Services; + + public async Task StartAsync() + { + await host.StartAsync(); + } + + public async Task StopAsync() + { + await host.StopAsync(); + } + + public async ValueTask DisposeAsync() + { + await host.StopAsync(); + await host.DisposeAsync(); + } + + private static WebApplicationBuilder CreateHostBuilder() + { + // Build a host with Atc EventStore + var webApplicationBuilder = WebApplication.CreateEmptyBuilder(new WebApplicationOptions() + { + ApplicationName = "Atc.Cosmos.EventStore.Cqrs.Tests", + }); + + webApplicationBuilder.WebHost.UseTestServer(); + + // Configure EventStore + webApplicationBuilder.Services.AddEventStore(eventStoreBuilder => + { + eventStoreBuilder.UseEvents(c => c.FromAssembly()); + eventStoreBuilder.UseCQRS(c => + { + c.AddCommandsFromAssembly(); + c.AddProjectionJob("TimeProjection"); + }); + }); + + // Use InMemoryEventStoreClient which actually works + webApplicationBuilder.Services.Replace( + ServiceDescriptor.Singleton()); + + // Remove unused registrations + webApplicationBuilder.Services.RemoveAll(); + webApplicationBuilder.Services.RemoveAll(); + webApplicationBuilder.Services.RemoveAll(); + webApplicationBuilder.Services.RemoveAll(); + + webApplicationBuilder.Services.AddSingleton(); + + return webApplicationBuilder; + } +} \ No newline at end of file diff --git a/test/Atc.Cosmos.EventStore.Cqrs.Tests/Functional/FakeDatabase.cs b/test/Atc.Cosmos.EventStore.Cqrs.Tests/Functional/FakeDatabase.cs new file mode 100644 index 0000000..c4c5de3 --- /dev/null +++ b/test/Atc.Cosmos.EventStore.Cqrs.Tests/Functional/FakeDatabase.cs @@ -0,0 +1,18 @@ +#nullable enable +namespace Atc.Cosmos.EventStore.Cqrs.Tests.Functional; + +internal class FakeDatabase +{ + private readonly Dictionary storage = new(); + + public void Save(string key, object value) + { + storage[key] = value; + } + + public object? Load(string key) + { + storage.TryGetValue(key, out var value); + return value; + } +} \ No newline at end of file diff --git a/test/Atc.Cosmos.EventStore.Cqrs.Tests/Functional/InMemoryEventStore.cs b/test/Atc.Cosmos.EventStore.Cqrs.Tests/Functional/InMemoryEventStore.cs new file mode 100644 index 0000000..010b3ef --- /dev/null +++ b/test/Atc.Cosmos.EventStore.Cqrs.Tests/Functional/InMemoryEventStore.cs @@ -0,0 +1,251 @@ +using System.Runtime.CompilerServices; +using Atc.Cosmos.EventStore.Cosmos; +using Atc.Cosmos.EventStore.Streams; +using Microsoft.Azure.Cosmos; +using StreamState = Atc.Cosmos.EventStore.StreamState; + +namespace Atc.Cosmos.EventStore.Cqrs.Tests; + +#nullable enable + +/// +/// A working in-memory implementation of Atc.Cosmos.EventStore.IEventStoreClient. +/// +public sealed class InMemoryEventStoreClient(IStreamReadValidator readValidator) : IEventStoreClient +{ + private readonly Dictionary streams = new(); + private readonly List subscriptions = new(); + + /// + public async Task WriteToStreamAsync( + StreamId streamId, + IReadOnlyCollection events, + StreamVersion? version = null, + StreamWriteOptions? options = null, + CancellationToken cancellationToken = default) + { + // Get stream + if (!streams.TryGetValue(streamId, out var stream)) + { + stream = new InMemoryStream(streamId); + streams.Add(streamId, stream); + } + + // Add events to stream + var writtenEvents = stream.AddEvents(events); + + // Invoke subscriptions + // Note that this way of invoking the subscriptions (used by projections jobs) means that projections runs as part of + // this method (WriteToStreamAsync). A nice benefit from this implementation is that we can assert most side-effects from a command + // execution, eg projections being written, directly after command has been executed. + // When using a real Cosmos DB projections runs some time *after* writing of events to Cosmos. + foreach (var sub in subscriptions) + { + try + { + await sub.EventsHandler(writtenEvents, CancellationToken.None); + } + catch (Exception ex) + { + await sub.ExceptionHandler("leasetoken", ex); + } + } + + var metadata = stream.Metadata; + return new StreamResponse(streamId, metadata.Version, metadata.Timestamp, metadata.State); + } + + /// + public async IAsyncEnumerable ReadFromStreamAsync( + StreamId streamId, + StreamVersion? fromVersion = null, + StreamReadFilter? filter = null, + [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + if (filter is not null) + { + throw new NotImplementedException("Use of StreamReadFilter is not implemented"); + } + + if (!streams.TryGetValue(streamId, out var stream)) + { + stream = new InMemoryStream(streamId); + } + + // Validate like the real implementation + readValidator.Validate(stream.Metadata, fromVersion ?? StreamVersion.Any); + + // StreamVersion is modelled as EventStreamVersion in CQS package + // StreamEmpty = 0 + // Exists = -1 + // Any = 9223372036854775807L + if (fromVersion.HasValue && !fromVersion.Value.Equals(StreamVersion.Any) && + fromVersion.Value > StreamVersion.ToStreamVersion(0)) + { + throw new NotImplementedException( + $"Use of StreamVersion other than EventStreamVersion.Any, EventStreamVersion.Exists, EventStreamVersion.StreamEmpty is not implemented. Version {fromVersion.Value.Value} was provided"); + } + + await Task.Yield(); // Force async + foreach (var @event in stream.Events) + { + yield return @event; + } + } + + /// + public Task GetStreamInfoAsync( + StreamId streamId, + CancellationToken cancellationToken = default) + { + if (streams.TryGetValue(streamId, out var stream)) + { + return Task.FromResult(stream.Metadata); + } + + return Task.FromResult(new InMemoryStreamMetadata( + "ETag", + StreamState.New, + streamId, + DateTimeOffset.UtcNow, + StreamVersion.ToStreamVersion(0))); + } + + /// + public IStreamSubscription SubscribeToStreams( + ConsumerGroup consumerGroup, + ProcessEventsHandler eventsHandler, + ProcessExceptionHandler exceptionHandler) + { + var subscription = new InMemoryStreamSubscription(eventsHandler, exceptionHandler); + this.subscriptions.Add(subscription); + return subscription; + } + + /// + public Task DeleteSubscriptionAsync(ConsumerGroup consumerGroup, CancellationToken cancellationToken = default) => + Task.CompletedTask; + + /// + public IAsyncEnumerable QueryStreamsAsync( + string? filter = null, + DateTimeOffset? createdAfter = null, + CancellationToken cancellationToken = default) => + throw new NotImplementedException(); + + /// + public Task SetStreamCheckpointAsync( + string name, + StreamId streamId, + StreamVersion version, + object? state = null, + CancellationToken cancellationToken = default) => + throw new NotImplementedException(); + + /// + public Task?> GetStreamCheckpointAsync( + string name, + StreamId streamId, + CancellationToken cancellationToken = default) => + throw new NotImplementedException(); + + /// + public Task GetStreamCheckpointAsync( + string name, + StreamId streamId, + CancellationToken cancellationToken = default) => + throw new NotImplementedException(); + + /// + public Task DeleteStreamAsync(StreamId streamId, CancellationToken cancellationToken = default) => + throw new NotImplementedException(); + + private sealed record InMemoryStreamSubscription( + ProcessEventsHandler EventsHandler, + ProcessExceptionHandler ExceptionHandler) + : IStreamSubscription + { + public Task StartAsync() => Task.CompletedTask; + + public Task StopAsync() => Task.CompletedTask; + } + + private sealed class InMemoryStream(StreamId streamId) + { + private int currentVersion; + + public List Events { get; } = new(); + + public InMemoryStreamMetadata Metadata => new InMemoryStreamMetadata( + "ETag", + Events.Count == 0 ? StreamState.New : StreamState.Active, + streamId, + DateTimeOffset.UtcNow, + StreamVersion.ToStreamVersion(currentVersion)); + + public IReadOnlyList AddEvents(IEnumerable events) + { + var wrappedEvents = events.Select(e => new InMemoryEvent + { + Data = e, + Metadata = new InMemoryEventMetadata + { + Name = e.GetType().Name, + StreamId = streamId, + Version = ++currentVersion, + Timestamp = DateTimeOffset.UtcNow, + }, + }).ToList(); + + Events.AddRange(wrappedEvents); + return wrappedEvents; + } + } + + private sealed class InMemoryEvent : IEvent + { + required public object Data { get; init; } + + required public IEventMetadata Metadata { get; init; } + } + + private sealed class InMemoryEventMetadata : IEventMetadata + { + required public string Name { get; init; } + + public string? CorrelationId { get; init; } + + public string? CausationId { get; init; } + + required public StreamId StreamId { get; init; } + + public DateTimeOffset Timestamp { get; init; } + + required public StreamVersion Version { get; init; } + } + + private sealed record InMemoryStreamMetadata( + string ETag, + StreamState State, + StreamId StreamId, + DateTimeOffset Timestamp, + StreamVersion Version) : IStreamMetadata; +} + +/// +/// No op implementation of . +/// +public sealed class NoOpEventStoreInitializer : IEventStoreInitializer +{ + /// + public Task CreateEventStoreAsync(ThroughputProperties throughputProperties, CancellationToken cancellationToken) + { + return Task.CompletedTask; + } + + /// + public void CreateEventStore(ThroughputProperties throughputProperties) + { + // No op + } +} \ No newline at end of file diff --git a/test/Atc.Cosmos.EventStore.Cqrs.Tests/Functional/ProjectionTests.cs b/test/Atc.Cosmos.EventStore.Cqrs.Tests/Functional/ProjectionTests.cs new file mode 100644 index 0000000..32a4dc3 --- /dev/null +++ b/test/Atc.Cosmos.EventStore.Cqrs.Tests/Functional/ProjectionTests.cs @@ -0,0 +1,35 @@ +using Microsoft.Extensions.DependencyInjection; +using Xunit; + +namespace Atc.Cosmos.EventStore.Cqrs.Tests.Functional; + +[Trait("Category", "Functional")] +public class ProjectionTests : IAsyncLifetime +{ + private CqrsTestHost host = null!; + + [Fact] + public async Task Projection_is_triggered_when_command_is_executed() + { + var tick = DateTime.UtcNow.Ticks; + _ = await host.Services.GetRequiredService() + .Create() + .ExecuteAsync(new MakeTimeTickCommand(tick), CancellationToken.None); + + // Assert that projection was triggered and saved our tick to database + var database = host.Services.GetRequiredService(); + var storedTick = database.Load("TimeProjection"); + Assert.Equal(tick, storedTick); + } + + public async Task InitializeAsync() + { + host = new CqrsTestHost(); + await host.StartAsync(); + } + + public async Task DisposeAsync() + { + await host.DisposeAsync(); + } +} \ No newline at end of file diff --git a/test/Atc.Cosmos.EventStore.Cqrs.Tests/Functional/Time.cs b/test/Atc.Cosmos.EventStore.Cqrs.Tests/Functional/Time.cs new file mode 100644 index 0000000..c920807 --- /dev/null +++ b/test/Atc.Cosmos.EventStore.Cqrs.Tests/Functional/Time.cs @@ -0,0 +1,91 @@ +namespace Atc.Cosmos.EventStore.Cqrs.Tests.Functional; + +[ProjectionFilter("*")] +internal class TimeProjection(FakeDatabase fakeDatabase) : IConsumeEvent, IProjection +{ + private long lastTick = 0; + + public void Consume(TimeTickedEvent evt, EventMetadata metadata) + { + lastTick = evt.TickValue; + } + + public Task InitializeAsync(EventStreamId id, CancellationToken cancellationToken) + { + // Here we could read latest "snapshot/view" value from DB + return Task.CompletedTask; + } + + public Task CompleteAsync(CancellationToken cancellationToken) + { + // Persist "snapshot/view" value to DB + fakeDatabase.Save("TimeProjection", lastTick); + + return Task.CompletedTask; + } + + public Task FailedAsync(Exception exception, CancellationToken cancellationToken) + { + return Task.FromResult(ProjectionAction.Continue); + } +} + +/// +/// Queries all stored TimeTickedEvent. +/// +internal record QueryTimeTickCommand() : CommandBase(new EventStreamId("time")); + +/// +/// Queries all stored TimeTickedEvent - at least one event must exist. +/// +internal record QueryExistingTimeTickCommand() + : CommandBase(new EventStreamId("time"), RequiredVersion: EventStreamVersion.Exists); + +internal class QueryTimeTickHandler : + ICommandHandler, + ICommandHandler, + IConsumeEvent +{ + private readonly List<(TimeTickedEvent Evt, EventMetadata Metadata)> events = new(); + + public void Consume(TimeTickedEvent evt, EventMetadata metadata) + { + events.Add((evt, metadata)); + } + + public ValueTask ExecuteAsync( + QueryTimeTickCommand command, + ICommandContext context, + CancellationToken cancellationToken) + { + context.ResponseObject = events; + return default; + } + + public ValueTask ExecuteAsync( + QueryExistingTimeTickCommand command, + ICommandContext context, + CancellationToken cancellationToken) + { + context.ResponseObject = events; + return default; + } +} + +internal record MakeTimeTickCommand(long Tick) : CommandBase(new EventStreamId("time")); + +internal class MakeTimeTickCommandHandler : ICommandHandler +{ + public ValueTask ExecuteAsync( + MakeTimeTickCommand command, + ICommandContext context, + CancellationToken cancellationToken) + { + // Add event with Tick value from command + context.AddEvent(new TimeTickedEvent(command.Tick)); + context.ResponseObject = command.Tick; + return default; + } +} + +internal sealed record TimeTickedEvent(long TickValue); \ No newline at end of file diff --git a/test/Directory.Build.props b/test/Directory.Build.props index 7355aa1..7166da7 100644 --- a/test/Directory.Build.props +++ b/test/Directory.Build.props @@ -8,6 +8,7 @@ annotations + latest @@ -18,5 +19,5 @@ - + \ No newline at end of file From a177714f82288ac8fbc2b25bdbafa6a4a2ecdd84 Mon Sep 17 00:00:00 2001 From: Andreas Gehrke Date: Fri, 4 Apr 2025 12:39:55 +0200 Subject: [PATCH 10/13] Upgrade SonarAnalyzer.CSharp as the current version reports bogus issues --- Directory.Build.props | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Directory.Build.props b/Directory.Build.props index 15612a6..3989b38 100644 --- a/Directory.Build.props +++ b/Directory.Build.props @@ -42,7 +42,7 @@ - + \ No newline at end of file From 40c3692c5db4cf07b8cdc40934115f9ca4401244 Mon Sep 17 00:00:00 2001 From: Andreas Gehrke Date: Fri, 4 Apr 2025 12:47:47 +0200 Subject: [PATCH 11/13] Fix Warning S6612 : Use the lambda parameter instead of capturing the argument 'name' --- src/Atc.Cosmos.EventStore/InMemory/InMemoryStore.cs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/Atc.Cosmos.EventStore/InMemory/InMemoryStore.cs b/src/Atc.Cosmos.EventStore/InMemory/InMemoryStore.cs index 4f6376a..d32414a 100644 --- a/src/Atc.Cosmos.EventStore/InMemory/InMemoryStore.cs +++ b/src/Atc.Cosmos.EventStore/InMemory/InMemoryStore.cs @@ -84,8 +84,9 @@ public Task WriteAsync( .GetOrAdd(streamId, new ConcurrentDictionary()) .AddOrUpdate( name, - key => new CheckpointDocument(name, streamId, streamVersion, dateTimeProvider.GetDateTime(), state), - (key, doc) => new CheckpointDocument(name, streamId, streamVersion, dateTimeProvider.GetDateTime(), state)); + static (key, arg) => new CheckpointDocument(key, arg.streamId, arg.streamVersion, arg.currentTime, arg.state), + static (key, doc, arg) => new CheckpointDocument(key, arg.streamId, arg.streamVersion, arg.currentTime, arg), + (streamId, streamVersion, state, currentTime: dateTimeProvider.GetDateTime())); return Task.CompletedTask; } From 3d0d7ea7e6c19210020a753771e0089303966829 Mon Sep 17 00:00:00 2001 From: Andreas Gehrke Date: Fri, 4 Apr 2025 12:49:21 +0200 Subject: [PATCH 12/13] Fix Warning S6608 : Indexing at Count-1 should be used instead of the "Enumerable" extension method "Last" --- .../Projections/ProjectionFilter.cs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/Atc.Cosmos.EventStore.Cqrs/Projections/ProjectionFilter.cs b/src/Atc.Cosmos.EventStore.Cqrs/Projections/ProjectionFilter.cs index fd1e907..236db7c 100644 --- a/src/Atc.Cosmos.EventStore.Cqrs/Projections/ProjectionFilter.cs +++ b/src/Atc.Cosmos.EventStore.Cqrs/Projections/ProjectionFilter.cs @@ -15,11 +15,7 @@ public ProjectionFilter(string filter) ? CreateEvaluateAll() : CreateEvaluation(p)) .ToArray(); - endsOnAcceptAll = filter - .Split( - new[] { EventStreamId.PartSeperator }, - StringSplitOptions.RemoveEmptyEntries) - .Last() == "**"; + endsOnAcceptAll = filter.EndsWith("**"); } public bool Evaluate(StreamId streamId) From 5bd63c1859e46926bf86b15af69ea7c518ca57e1 Mon Sep 17 00:00:00 2001 From: ATCBot Date: Tue, 8 Apr 2025 10:32:10 +0000 Subject: [PATCH 13/13] Set version to '1.20' --- version.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/version.json b/version.json index dbc49f9..addd78a 100644 --- a/version.json +++ b/version.json @@ -1,6 +1,6 @@ { "$schema": "https://raw.githubusercontent.com/dotnet/Nerdbank.GitVersioning/main/src/NerdBank.GitVersioning/version.schema.json", - "version": "1.20-preview", + "version": "1.20", "assemblyVersion": { "precision": "revision" },