Skip to content

Commit cc77373

Browse files
committed
[DEVEX-227] Made Aggregate return messages and separated AggregateStore to a dedicated file
1 parent 081d91f commit cc77373

File tree

3 files changed

+159
-57
lines changed

3 files changed

+159
-57
lines changed
Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,20 @@
1+
using Kurrent.Client.Core.Serialization;
12
using Kurrent.Client.Streams.GettingState;
23

34
namespace Kurrent.Client.Streams.DecisionMaking;
45

56
public interface IAggregate<TEvent>: IState<TEvent>{
6-
TEvent[] DequeueUncommittedEvents();
7+
Message[] DequeueUncommittedMessages();
78
}
89

910
public class Aggregate : Aggregate<object>;
1011

11-
public abstract class Aggregate<TEvent>: IAggregate<TEvent>
12-
{
13-
readonly Queue<TEvent> _uncommittedEvents = new();
12+
public abstract class Aggregate<TEvent>: IAggregate<TEvent> where TEvent : notnull {
13+
readonly Queue<Message> _uncommittedEvents = new();
1414

1515
public virtual void Apply(TEvent @event) { }
1616

17-
TEvent[] IAggregate<TEvent>.DequeueUncommittedEvents()
17+
Message[] IAggregate<TEvent>.DequeueUncommittedMessages()
1818
{
1919
var dequeuedEvents = _uncommittedEvents.ToArray();
2020

@@ -25,6 +25,11 @@ TEvent[] IAggregate<TEvent>.DequeueUncommittedEvents()
2525

2626
protected void Enqueue(TEvent @event) {
2727
Apply(@event);
28-
_uncommittedEvents.Enqueue(@event);
28+
_uncommittedEvents.Enqueue(Message.From(@event));
29+
}
30+
31+
protected void Enqueue(Message message) {
32+
Apply((TEvent)message.Data);
33+
_uncommittedEvents.Enqueue(message);
2934
}
3035
}
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
using EventStore.Client;
2+
using Kurrent.Client.Core.Serialization;
3+
using Kurrent.Client.Streams.GettingState;
4+
5+
namespace Kurrent.Client.Streams.DecisionMaking;
6+
7+
public interface IAggregateStore<TAggregate, TEvent>
8+
where TAggregate : IAggregate<TEvent> {
9+
Task<StateAtPointInTime<TAggregate>> Get(
10+
string streamName,
11+
CancellationToken ct = default
12+
);
13+
14+
Task<IWriteResult> AddAsync(
15+
string streamName,
16+
TAggregate aggregate,
17+
AppendToStreamOptions? appendToStreamOptions,
18+
CancellationToken ct = default
19+
);
20+
21+
Task<IWriteResult> UpdateAsync(
22+
string streamName,
23+
TAggregate aggregate,
24+
AppendToStreamOptions? appendToStreamOptions,
25+
CancellationToken ct = default
26+
);
27+
28+
Task<IWriteResult> HandleAsync(
29+
string streamName,
30+
Func<TAggregate, CancellationToken, ValueTask> handle,
31+
DecideOptions<TAggregate>? decideOption,
32+
CancellationToken ct = default
33+
);
34+
}
35+
36+
public static class AggregateStoreExtensions {
37+
public static Task<IWriteResult> AddAsync<TAggregate, TEvent>(
38+
IAggregateStore<TAggregate, TEvent> aggregateStore,
39+
string streamName,
40+
TAggregate aggregate,
41+
CancellationToken ct = default
42+
) where TAggregate : IAggregate<TEvent> =>
43+
aggregateStore.AddAsync(
44+
streamName,
45+
aggregate,
46+
new AppendToStreamOptions { ExpectedStreamState = StreamState.NoStream },
47+
ct
48+
);
49+
50+
public static Task<IWriteResult> HandleAsync<TAggregate, TEvent>(
51+
IAggregateStore<TAggregate, TEvent> aggregateStore,
52+
string streamName,
53+
Func<TAggregate, CancellationToken, ValueTask> handle,
54+
CancellationToken ct = default
55+
) where TAggregate : IAggregate<TEvent> =>
56+
aggregateStore.HandleAsync(
57+
streamName,
58+
handle,
59+
new DecideOptions<TAggregate>(),
60+
ct
61+
);
62+
63+
public static Task<IWriteResult> HandleAsync<TAggregate, TEvent>(
64+
IAggregateStore<TAggregate, TEvent> aggregateStore,
65+
string streamName,
66+
Action<TAggregate> handle,
67+
CancellationToken ct = default
68+
) where TAggregate : IAggregate<TEvent> =>
69+
aggregateStore.HandleAsync(
70+
streamName,
71+
(state, _) => {
72+
handle(state);
73+
return new ValueTask();
74+
},
75+
new DecideOptions<TAggregate>(),
76+
ct
77+
);
78+
79+
public static Task<IWriteResult> HandleAsync<TAggregate, TEvent>(
80+
IAggregateStore<TAggregate, TEvent> aggregateStore,
81+
string streamName,
82+
Action<TAggregate> handle,
83+
DecideOptions<TAggregate>? decideOption,
84+
CancellationToken ct = default
85+
) where TAggregate : IAggregate<TEvent> =>
86+
aggregateStore.HandleAsync(
87+
streamName,
88+
(state, _) => {
89+
handle(state);
90+
return new ValueTask();
91+
},
92+
decideOption,
93+
ct
94+
);
95+
}
96+
97+
public class AggregateStore<TAggregate, TEvent>(KurrentClient client, AggregateStoreOptions<TAggregate> options)
98+
: IAggregateStore<TAggregate, TEvent>
99+
where TAggregate : IAggregate<TEvent>
100+
where TEvent : notnull {
101+
public Task<StateAtPointInTime<TAggregate>> Get(string streamName, CancellationToken ct = default) =>
102+
client.GetStateAsync(streamName, options.StateBuilder, ct);
103+
104+
public Task<IWriteResult> AddAsync(
105+
string streamName,
106+
TAggregate aggregate,
107+
AppendToStreamOptions? appendToStreamOptions,
108+
CancellationToken ct = default
109+
) {
110+
appendToStreamOptions ??= new AppendToStreamOptions();
111+
112+
if (appendToStreamOptions.ExpectedStreamState == null && appendToStreamOptions.ExpectedStreamRevision == null)
113+
appendToStreamOptions.ExpectedStreamState = StreamState.NoStream;
114+
115+
return client.AppendToStreamAsync(streamName, aggregate.DequeueUncommittedMessages(), appendToStreamOptions, ct);
116+
}
117+
118+
public Task<IWriteResult> UpdateAsync(
119+
string streamName,
120+
TAggregate aggregate,
121+
AppendToStreamOptions? appendToStreamOptions,
122+
CancellationToken ct = default
123+
) {
124+
appendToStreamOptions ??= new AppendToStreamOptions();
125+
126+
if (appendToStreamOptions.ExpectedStreamState == null && appendToStreamOptions.ExpectedStreamRevision == null)
127+
appendToStreamOptions.ExpectedStreamState = StreamState.StreamExists;
128+
129+
return client.AppendToStreamAsync(streamName, aggregate.DequeueUncommittedMessages(), appendToStreamOptions, ct);
130+
}
131+
132+
public Task<IWriteResult> HandleAsync(
133+
string streamName,
134+
Func<TAggregate, CancellationToken, ValueTask> handle,
135+
DecideOptions<TAggregate>? decideOption,
136+
CancellationToken ct = default
137+
) =>
138+
client.DecideAsync(
139+
streamName,
140+
async (state, token) => {
141+
await handle(state, token);
142+
return state.DequeueUncommittedMessages();
143+
},
144+
options.StateBuilder,
145+
decideOption ?? options.DecideOptions,
146+
ct
147+
);
148+
}

src/Kurrent.Client/Streams/DecisionMaking/StateStore.cs

Lines changed: 0 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
using EventStore.Client;
2-
using Kurrent.Client.Core.Serialization;
32
using Kurrent.Client.Streams.GettingState;
43

54
namespace Kurrent.Client.Streams.DecisionMaking;
@@ -105,7 +104,6 @@ public Task<IWriteResult> Handle(
105104
);
106105
}
107106

108-
109107
public class AggregateStoreOptions<TState> where TState : notnull {
110108
#if NET48
111109
public IStateBuilder<TState> StateBuilder { get; set; } = null!;
@@ -115,52 +113,3 @@ public class AggregateStoreOptions<TState> where TState : notnull {
115113

116114
public DecideOptions<TState>? DecideOptions { get; set; }
117115
}
118-
119-
public class AggregateStore<TState, TEvent>(KurrentClient client, AggregateStoreOptions<TState> options)
120-
where TState : IAggregate<TEvent> where TEvent : notnull {
121-
public Task<StateAtPointInTime<TState>> Get(string streamName, CancellationToken ct = default) =>
122-
client.GetStateAsync(streamName, options.StateBuilder, ct);
123-
124-
public Task<IWriteResult> AddAsync(string streamName, TState state, CancellationToken ct = default) =>
125-
AddAsync(streamName, state, new AppendToStreamOptions { ExpectedStreamState = StreamState.NoStream }, ct);
126-
127-
public Task<IWriteResult> AddAsync(
128-
string streamName,
129-
TState state,
130-
AppendToStreamOptions appendToStreamOptions,
131-
CancellationToken ct = default
132-
) {
133-
if (appendToStreamOptions.ExpectedStreamState == null && appendToStreamOptions.ExpectedStreamRevision == null)
134-
appendToStreamOptions.ExpectedStreamState = StreamState.NoStream;
135-
136-
return client.AppendToStreamAsync(streamName, [Message.From(state)], appendToStreamOptions, ct);
137-
}
138-
139-
public Task<IWriteResult> UpdateAsync(
140-
string streamName,
141-
TState state,
142-
AppendToStreamOptions appendToStreamOptions,
143-
CancellationToken ct = default
144-
) {
145-
if (appendToStreamOptions.ExpectedStreamState == null && appendToStreamOptions.ExpectedStreamRevision == null)
146-
appendToStreamOptions.ExpectedStreamState = StreamState.StreamExists;
147-
148-
return client.AppendToStreamAsync(streamName, [Message.From(state)], appendToStreamOptions, ct);
149-
}
150-
151-
public Task<IWriteResult> HandleAsync(
152-
string streamName,
153-
Func<TState, CancellationToken, ValueTask> handle,
154-
CancellationToken ct = default
155-
) =>
156-
client.DecideAsync(
157-
streamName,
158-
async (state, token) => {
159-
await handle(state, token);
160-
return state.DequeueUncommittedEvents().Select(e => Message.From(e)).ToArray();
161-
},
162-
options.StateBuilder,
163-
options.DecideOptions,
164-
ct
165-
);
166-
}

0 commit comments

Comments
 (0)