Skip to content

Commit 7225a68

Browse files
committed
- observer interfaces
1 parent 2a86f8a commit 7225a68

12 files changed

Lines changed: 190 additions & 74 deletions

Shuttle.Recall.sln

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,17 @@
11

22
Microsoft Visual Studio Solution File, Format Version 12.00
33
# Visual Studio 15
4-
VisualStudioVersion = 15.0.27130.2020
4+
VisualStudioVersion = 15.0.27130.2024
55
MinimumVisualStudioVersion = 10.0.40219.1
6-
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Shuttle.Recall", "Shuttle.Recall\Shuttle.Recall.csproj", "{2FFBDEC5-EC6E-4EE3-9046-F757C8044BBA}"
6+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Shuttle.Recall", "Shuttle.Recall\Shuttle.Recall.csproj", "{2FFBDEC5-EC6E-4EE3-9046-F757C8044BBA}"
77
EndProject
8-
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Shuttle.Recall.Tests", "Shuttle.Recall.Tests\Shuttle.Recall.Tests.csproj", "{4C228AFF-FF77-4AED-8E78-7A3ACEC06317}"
8+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Shuttle.Recall.Tests", "Shuttle.Recall.Tests\Shuttle.Recall.Tests.csproj", "{4C228AFF-FF77-4AED-8E78-7A3ACEC06317}"
9+
EndProject
10+
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{75DCAA58-73EE-4108-98FD-490E7980DF8D}"
11+
ProjectSection(SolutionItems) = preProject
12+
LICENSE = LICENSE
13+
README.md = README.md
14+
EndProjectSection
915
EndProject
1016
Global
1117
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
using System;
2+
3+
namespace Shuttle.Recall
4+
{
5+
public class DuplicateKeyException : Exception
6+
{
7+
public DuplicateKeyException(Guid id, string key)
8+
:base(string.Format(Resources.DuplicateKeyException, id, key))
9+
{
10+
}
11+
12+
public DuplicateKeyException(string message) : base(message)
13+
{
14+
}
15+
16+
public DuplicateKeyException(string message, Exception innerException) : base(message, innerException)
17+
{
18+
}
19+
}
20+
}

Shuttle.Recall/EventProcessing/ProjectionProcessor.cs

Lines changed: 86 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,13 @@
1-
using Shuttle.Core.Contract;
1+
using System;
2+
using System.Collections.Generic;
3+
using Shuttle.Core.Container;
4+
using Shuttle.Core.Contract;
25
using Shuttle.Core.Pipelines;
6+
using Shuttle.Core.PipelineTransaction;
7+
using Shuttle.Core.Reflection;
8+
using Shuttle.Core.Serialization;
39
using Shuttle.Core.Threading;
10+
using Shuttle.Core.Transactions;
411

512
namespace Shuttle.Recall
613
{
@@ -46,5 +53,83 @@ public void Execute(IThreadState state)
4653

4754
_pipelineFactory.ReleasePipeline(pipeline);
4855
}
56+
57+
public static void Register(IComponentRegistry registry, IEventStoreConfiguration configuration)
58+
{
59+
Guard.AgainstNull(registry, nameof(registry));
60+
Guard.AgainstNull(configuration, nameof(configuration));
61+
62+
registry.AttemptRegisterInstance(configuration);
63+
64+
registry.RegistryBoostrap();
65+
66+
registry.AttemptRegister<IEventMethodInvokerConfiguration, EventMethodInvokerConfiguration>();
67+
registry.AttemptRegister<IEventMethodInvoker, DefaultEventMethodInvoker>();
68+
registry.AttemptRegister<ISerializer, DefaultSerializer>();
69+
registry.AttemptRegister<IProjectionSequenceNumberTracker, ProjectionSequenceNumberTracker>();
70+
registry.AttemptRegister<IPrimitiveEventQueue, PrimitiveEventQueue>();
71+
registry.AttemptRegister<IConcurrenyExceptionSpecification, DefaultConcurrenyExceptionSpecification>();
72+
73+
registry.AttemptRegister<TransactionScopeObserver, TransactionScopeObserver>();
74+
75+
if (!registry.IsRegistered<ITransactionScopeFactory>())
76+
{
77+
var transactionScopeConfiguration =
78+
configuration.TransactionScope ?? new TransactionScopeConfiguration();
79+
80+
registry.AttemptRegisterInstance<ITransactionScopeFactory>(
81+
new DefaultTransactionScopeFactory(transactionScopeConfiguration.Enabled,
82+
transactionScopeConfiguration.IsolationLevel,
83+
TimeSpan.FromSeconds(transactionScopeConfiguration.TimeoutSeconds)));
84+
}
85+
86+
registry.AttemptRegister<IPipelineFactory, DefaultPipelineFactory>();
87+
88+
var reflectionService = new ReflectionService();
89+
90+
foreach (var type in reflectionService.GetTypesAssignableTo<IPipeline>(typeof(EventProcessor).Assembly))
91+
{
92+
if (type.IsInterface || type.IsAbstract || registry.IsRegistered(type))
93+
{
94+
continue;
95+
}
96+
97+
registry.Register(type, type, Lifestyle.Transient);
98+
}
99+
100+
var observers = new List<Type>();
101+
102+
foreach (var type in reflectionService.GetTypesAssignableTo<IPipelineObserver>(typeof(EventProcessor).Assembly))
103+
{
104+
if (type.IsInterface || type.IsAbstract)
105+
{
106+
continue;
107+
}
108+
109+
var interfaceType = type.InterfaceMatching($"I{type.Name}");
110+
111+
if (interfaceType != null)
112+
{
113+
if (registry.IsRegistered(type))
114+
{
115+
continue;
116+
}
117+
118+
registry.Register(interfaceType, type, Lifestyle.Singleton);
119+
}
120+
else
121+
{
122+
throw new ApplicationException(string.Format(Resources.ObserverInterfaceMissingException, type.Name));
123+
}
124+
125+
observers.Add(type);
126+
}
127+
128+
registry.RegisterCollection(typeof(IPipelineObserver), observers, Lifestyle.Singleton);
129+
130+
registry.AttemptRegister<IEventStore, EventStore>();
131+
registry.AttemptRegister<IEventProcessor, EventProcessor>();
132+
}
133+
49134
}
50135
}

Shuttle.Recall/EventStore.cs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -128,8 +128,6 @@ public static void Register(IComponentRegistry registry, IEventStoreConfiguratio
128128
registry.AttemptRegister<IEventMethodInvokerConfiguration, EventMethodInvokerConfiguration>();
129129
registry.AttemptRegister<IEventMethodInvoker, DefaultEventMethodInvoker>();
130130
registry.AttemptRegister<ISerializer, DefaultSerializer>();
131-
registry.AttemptRegister<IProjectionSequenceNumberTracker, ProjectionSequenceNumberTracker>();
132-
registry.AttemptRegister<IPrimitiveEventQueue, PrimitiveEventQueue>();
133131
registry.AttemptRegister<IConcurrenyExceptionSpecification, DefaultConcurrenyExceptionSpecification>();
134132

135133
registry.AttemptRegister<TransactionScopeObserver, TransactionScopeObserver>();
@@ -149,9 +147,9 @@ public static void Register(IComponentRegistry registry, IEventStoreConfiguratio
149147

150148
var reflectionService = new ReflectionService();
151149

152-
foreach (var type in reflectionService.GetTypesAssignableTo(typeof(EventStore).Assembly))
150+
foreach (var type in reflectionService.GetTypesAssignableTo<IPipeline>(typeof(EventStore).Assembly))
153151
{
154-
if (type.IsInterface || registry.IsRegistered(type))
152+
if (type.IsInterface || type.IsAbstract || registry.IsRegistered(type))
155153
{
156154
continue;
157155
}

Shuttle.Recall/Pipeline/Pipelines/AssembleEventEnvelopePipeline.cs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,18 @@
1-
using System.Collections.Generic;
2-
using System.Linq;
3-
using Shuttle.Core.Contract;
1+
using Shuttle.Core.Contract;
42
using Shuttle.Core.Pipelines;
5-
using Shuttle.Core.Reflection;
63

74
namespace Shuttle.Recall
85
{
96
public class AssembleEventEnvelopePipeline : Pipeline
107
{
11-
public AssembleEventEnvelopePipeline(IEnumerable<IPipelineObserver> observers)
8+
public AssembleEventEnvelopePipeline(IAssembleEventEnvelopeObserver assembleEventEnvelopeObserver,
9+
ICompressEventObserver compressEventObserver, IEncryptEventObserver encryptEventObserver,
10+
ISerializeEventObserver serializeEventObserver)
1211
{
13-
Guard.AgainstNull(observers, nameof(observers));
14-
15-
var list = observers.ToList();
12+
Guard.AgainstNull(assembleEventEnvelopeObserver, nameof(assembleEventEnvelopeObserver));
13+
Guard.AgainstNull(compressEventObserver, nameof(compressEventObserver));
14+
Guard.AgainstNull(encryptEventObserver, nameof(encryptEventObserver));
15+
Guard.AgainstNull(serializeEventObserver, nameof(serializeEventObserver));
1616

1717
RegisterStage("Get")
1818
.WithEvent<OnAssembleEventEnvelope>()
@@ -26,10 +26,10 @@ public AssembleEventEnvelopePipeline(IEnumerable<IPipelineObserver> observers)
2626
.WithEvent<OnAssembleEventEnvelope>()
2727
.WithEvent<OnAfterAssembleEventEnvelope>();
2828

29-
RegisterObserver(list.Get<IAssembleEventEnvelopeObserver>());
30-
RegisterObserver(list.Get<ICompressEventObserver>());
31-
RegisterObserver(list.Get<IEncryptEventObserver>());
32-
RegisterObserver(list.Get<ISerializeEventObserver>());
29+
RegisterObserver(assembleEventEnvelopeObserver);
30+
RegisterObserver(compressEventObserver);
31+
RegisterObserver(encryptEventObserver);
32+
RegisterObserver(serializeEventObserver);
3333
}
3434

3535
public EventEnvelope Execute(DomainEvent domainEvent)
Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,23 @@
1-
using System.Collections.Generic;
2-
using System.Linq;
3-
using Shuttle.Core.Contract;
1+
using Shuttle.Core.Contract;
42
using Shuttle.Core.Pipelines;
53
using Shuttle.Core.PipelineTransaction;
6-
using Shuttle.Core.Reflection;
74

85
namespace Shuttle.Recall
96
{
107
public class EventProcessingPipeline : Pipeline
118
{
12-
public EventProcessingPipeline(IEnumerable<IPipelineObserver> observers)
9+
public EventProcessingPipeline(IGetProjectionSequenceNumberObserver getProjectionSequenceNumberObserver,
10+
IProjectionPrimitiveEventObserver projectionPrimitiveEventObserver,
11+
IProjectionEventEnvelopeObserver projectionEventEnvelopeObserver,
12+
IProcessEventObserver processEventObserver, IAcknowledgeEventObserver acknowledgeEventObserver,
13+
ITransactionScopeObserver transactionScopeObserver)
1314
{
14-
Guard.AgainstNull(observers, nameof(observers));
15-
16-
var list = observers.ToList();
15+
Guard.AgainstNull(getProjectionSequenceNumberObserver, nameof(getProjectionSequenceNumberObserver));
16+
Guard.AgainstNull(projectionPrimitiveEventObserver, nameof(projectionPrimitiveEventObserver));
17+
Guard.AgainstNull(projectionEventEnvelopeObserver, nameof(projectionEventEnvelopeObserver));
18+
Guard.AgainstNull(processEventObserver, nameof(processEventObserver));
19+
Guard.AgainstNull(acknowledgeEventObserver, nameof(acknowledgeEventObserver));
20+
Guard.AgainstNull(transactionScopeObserver, nameof(transactionScopeObserver));
1721

1822
RegisterStage("Process")
1923
.WithEvent<OnStartTransactionScope>()
@@ -31,12 +35,12 @@ public EventProcessingPipeline(IEnumerable<IPipelineObserver> observers)
3135
.WithEvent<OnCompleteTransactionScope>()
3236
.WithEvent<OnDisposeTransactionScope>();
3337

34-
RegisterObserver(list.Get<IGetProjectionSequenceNumberObserver>());
35-
RegisterObserver(list.Get<IProjectionPrimitiveEventObserver>());
36-
RegisterObserver(list.Get<IProjectionEventEnvelopeObserver>());
37-
RegisterObserver(list.Get<IProcessEventObserver>());
38-
RegisterObserver(list.Get<IAcknowledgeEventObserver>());
39-
RegisterObserver(list.Get<ITransactionScopeObserver>());
38+
RegisterObserver(getProjectionSequenceNumberObserver);
39+
RegisterObserver(projectionPrimitiveEventObserver);
40+
RegisterObserver(projectionEventEnvelopeObserver);
41+
RegisterObserver(processEventObserver);
42+
RegisterObserver(acknowledgeEventObserver);
43+
RegisterObserver(transactionScopeObserver);
4044
}
4145
}
4246
}

Shuttle.Recall/Pipeline/Pipelines/GetEventEnvelopePipeline.cs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,18 @@
1-
using System.Collections.Generic;
2-
using System.Linq;
3-
using Shuttle.Core.Contract;
1+
using Shuttle.Core.Contract;
42
using Shuttle.Core.Pipelines;
5-
using Shuttle.Core.Reflection;
63

74
namespace Shuttle.Recall
85
{
96
public class GetEventEnvelopePipeline : Pipeline
107
{
11-
public GetEventEnvelopePipeline(IEnumerable<IPipelineObserver> observers)
8+
public GetEventEnvelopePipeline(IDeserializeEventEnvelopeObserver deserializeEventEnvelopeObserver,
9+
IDecompressEventObserver decompressEventObserver, IDecryptEventObserver decryptEventObserver,
10+
IDeserializeEventObserver deserializeEventObserver)
1211
{
13-
Guard.AgainstNull(observers, nameof(observers));
14-
15-
var list = observers.ToList();
12+
Guard.AgainstNull(deserializeEventEnvelopeObserver, nameof(deserializeEventEnvelopeObserver));
13+
Guard.AgainstNull(decompressEventObserver, nameof(decompressEventObserver));
14+
Guard.AgainstNull(decryptEventObserver, nameof(decryptEventObserver));
15+
Guard.AgainstNull(deserializeEventObserver, nameof(deserializeEventObserver));
1616

1717
RegisterStage("Get")
1818
.WithEvent<OnDeserializeEventEnvelope>()
@@ -24,10 +24,10 @@ public GetEventEnvelopePipeline(IEnumerable<IPipelineObserver> observers)
2424
.WithEvent<OnDeserializeEvent>()
2525
.WithEvent<OnAfterDeserializeEvent>();
2626

27-
RegisterObserver(list.Get<IDeserializeEventEnvelopeObserver>());
28-
RegisterObserver(list.Get<IDecompressEventObserver>());
29-
RegisterObserver(list.Get<IDecryptEventObserver>());
30-
RegisterObserver(list.Get<IDeserializeEventObserver>());
27+
RegisterObserver(deserializeEventEnvelopeObserver);
28+
RegisterObserver(decompressEventObserver);
29+
RegisterObserver(decryptEventObserver);
30+
RegisterObserver(deserializeEventObserver);
3131
}
3232

3333
public void Execute(PrimitiveEvent primitiveEvent)

Shuttle.Recall/Pipeline/Pipelines/GetEventStreamPipeline.cs

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,25 @@
11
using System;
2-
using System.Collections.Generic;
3-
using System.Linq;
42
using Shuttle.Core.Contract;
53
using Shuttle.Core.Pipelines;
6-
using Shuttle.Core.Reflection;
74

85
namespace Shuttle.Recall
96
{
107
public class GetEventStreamPipeline : Pipeline
118
{
12-
public GetEventStreamPipeline(IEnumerable<IPipelineObserver> observers)
9+
public GetEventStreamPipeline(IGetStreamEventsObserver getStreamEventsObserver,
10+
IAssembleEventStreamObserver assembleEventStreamObserver)
1311
{
14-
Guard.AgainstNull(observers, nameof(observers));
15-
16-
var list = observers.ToList();
12+
Guard.AgainstNull(getStreamEventsObserver, nameof(getStreamEventsObserver));
13+
Guard.AgainstNull(assembleEventStreamObserver, nameof(assembleEventStreamObserver));
1714

1815
RegisterStage("Process")
1916
.WithEvent<OnGetStreamEvents>()
2017
.WithEvent<OnAfterGetStreamEvents>()
2118
.WithEvent<OnAssembleEventStream>()
2219
.WithEvent<OnAfterAssembleEventStream>();
2320

24-
RegisterObserver(list.Get<IGetStreamEventsObserver>());
25-
RegisterObserver(list.Get<IAssembleEventStreamObserver>());
21+
RegisterObserver(getStreamEventsObserver);
22+
RegisterObserver(assembleEventStreamObserver);
2623
}
2724

2825
public EventStream Execute(Guid id)

Shuttle.Recall/Pipeline/Pipelines/RemoveEventStreamPipeline.cs

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,20 @@
11
using System;
2-
using System.Collections.Generic;
3-
using System.Linq;
42
using Shuttle.Core.Contract;
53
using Shuttle.Core.Pipelines;
6-
using Shuttle.Core.Reflection;
74

85
namespace Shuttle.Recall
96
{
107
public class RemoveEventStreamPipeline : Pipeline
118
{
12-
public RemoveEventStreamPipeline(IEnumerable<IPipelineObserver> observers)
9+
public RemoveEventStreamPipeline(IRemoveEventStreamObserver removeEventStreamObserver)
1310
{
14-
Guard.AgainstNull(observers, nameof(observers));
15-
16-
var list = observers.ToList();
11+
Guard.AgainstNull(removeEventStreamObserver, nameof(removeEventStreamObserver));
1712

1813
RegisterStage("Process")
1914
.WithEvent<OnRemoveEventStream>()
2015
.WithEvent<OnAfterRemoveEventStream>();
2116

22-
RegisterObserver(list.Get<IRemoveEventStreamObserver>());
17+
RegisterObserver(removeEventStreamObserver);
2318
}
2419

2520
public void Execute(Guid id)

0 commit comments

Comments
 (0)