Skip to content

Commit 0993e11

Browse files
authored
MemoryStore: Serialize Committed events as v2 #268 (#269)
1 parent 729146e commit 0993e11

File tree

2 files changed

+31
-16
lines changed

2 files changed

+31
-16
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ The `Unreleased` section name is replaced by the expected version of next releas
4545
### Removed
4646
### Fixed
4747

48+
- `MemoryStore`: Serialize `Committed` events to guarantee consumption in event `Index` order re [#265](https://github.com/jet/equinox/issues/265) [#269](https://github.com/jet/equinox/pull/269)
4849
- `Cosmos`: Fix defaulting for `compressUnfolds` in C# [#261](https://github.com/jet/equinox/pull/261)
4950

5051
<a name="2.3.0"></a>

src/Equinox.MemoryStore/MemoryStore.fs

+30-16
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ open Equinox
77
open Equinox.Core
88
open System.Runtime.InteropServices
99

10-
/// Equivalent to GetEventStore's in purpose; signals a conflict has been detected and reprocessing of the decision will be necessary
10+
/// Equivalent to EventStoreDB's in purpose; signals a conflict has been detected and reprocessing of the decision will be necessary
1111
exception private WrongVersionException of streamName: string * expected: int * value: obj
1212

1313
/// Internal result used to reflect the outcome of syncing with the entry in the inner ConcurrentDictionary
@@ -20,10 +20,21 @@ type ConcurrentArraySyncResult<'t> = Written of 't | Conflict of 't
2020

2121
/// Maintains a dictionary of ITimelineEvent<'Format>[] per stream-name, allowing one to vary the encoding used to match that of a given concrete store, or optimize test run performance
2222
type VolatileStore<'Format>() =
23-
let streams = System.Collections.Concurrent.ConcurrentDictionary<string,FsCodec.ITimelineEvent<'Format>[]>()
23+
24+
let streams = System.Collections.Concurrent.ConcurrentDictionary<string, FsCodec.ITimelineEvent<'Format>[]>()
25+
26+
// Where TrySync attempts overlap on the same stream, there's a race to raise the Committed event for each 'commit' resulting from a successful Sync
27+
// If we don't serialize the publishing of the events, its possible for handlers to observe the Events out of order
2428
let committed = Event<_>()
29+
// Here we neuter that effect - the BatchingGate can end up with commits submitted out of order, but we serialize the raising of the events per stream
30+
let publishBatches (commits : (FsCodec.StreamName * FsCodec.ITimelineEvent<'Format>[])[]) = async {
31+
for streamName, events in commits |> Seq.groupBy fst do
32+
committed.Trigger(streamName, events |> Seq.collect snd |> Seq.sortBy (fun x -> x.Index) |> Seq.toArray) }
33+
let publishCommit = AsyncBatchingGate(publishBatches, System.TimeSpan.FromMilliseconds 2.)
2534

2635
[<CLIEvent>]
36+
/// Notifies of a batch of events being committed to a given Stream. Guarantees no out of order and/or overlapping raising of the event<br/>
37+
/// NOTE in some cases, two or more overlapping commits can be coalesced into a single <c>Committed</c> event
2738
member __.Committed : IEvent<FsCodec.StreamName * FsCodec.ITimelineEvent<'Format>[]> = committed.Publish
2839

2940
/// Loads state from a given stream
@@ -33,20 +44,22 @@ type VolatileStore<'Format>() =
3344
member __.TrySync
3445
( streamName, trySyncValue : FsCodec.ITimelineEvent<'Format>[] -> ConcurrentDictionarySyncResult<FsCodec.ITimelineEvent<'Format>[]>,
3546
events: FsCodec.ITimelineEvent<'Format>[])
36-
: ConcurrentArraySyncResult<FsCodec.ITimelineEvent<'Format>[]> =
47+
: Async<ConcurrentArraySyncResult<FsCodec.ITimelineEvent<'Format>[]>> = async {
3748
let seedStream _streamName = events
3849
let updateValue streamName (currentValue : FsCodec.ITimelineEvent<'Format>[]) =
3950
match trySyncValue currentValue with
4051
| ConcurrentDictionarySyncResult.Conflict expectedVersion -> raise <| WrongVersionException (streamName, expectedVersion, box currentValue)
4152
| ConcurrentDictionarySyncResult.Written value -> value
42-
try let res = streams.AddOrUpdate(streamName, seedStream, updateValue) |> Written
43-
committed.Trigger((FsCodec.StreamName.parse streamName, events)) // raise here, once, as updateValue can conceptually be invoked multiple times
44-
res
45-
with WrongVersionException(_, _, conflictingValue) -> unbox conflictingValue |> Conflict
53+
try let res = streams.AddOrUpdate(streamName, seedStream, updateValue)
54+
// we publish the event here, once, as `updateValue` can be invoked multiple times
55+
do! publishCommit.Execute((FsCodec.StreamName.parse streamName, events))
56+
return Written res
57+
with WrongVersionException(_, _, conflictingValue) ->
58+
return Conflict (unbox conflictingValue) }
4659

4760
type Token = { streamVersion: int; streamName: string }
4861

49-
/// Internal implementation detail of MemoryStreamStore
62+
/// Internal implementation detail of MemoryStore
5063
module private Token =
5164

5265
let private streamTokenOfIndex streamName (streamVersion : int) : StreamToken =
@@ -64,36 +77,37 @@ module private Token =
6477

6578
/// Represents the state of a set of streams in a style consistent withe the concrete Store types - no constraints on memory consumption (but also no persistence!).
6679
type Category<'event, 'state, 'context, 'Format>(store : VolatileStore<'Format>, codec : FsCodec.IEventCodec<'event,'Format,'context>, fold, initial) =
67-
let (|Decode|) = Array.choose codec.TryDecode
6880
interface ICategory<'event, 'state, string, 'context> with
6981
member __.Load(_log, streamName, _opt) = async {
7082
match store.TryLoad streamName with
7183
| None -> return Token.ofEmpty streamName initial
72-
| Some (Decode events) -> return Token.ofEventArray streamName fold initial events }
84+
| Some events -> return Token.ofEventArray streamName fold initial (events |> Array.choose codec.TryDecode) }
7385
member __.TrySync(_log, Token.Unpack token, state, events : 'event list, context : 'context option) = async {
7486
let inline map i (e : FsCodec.IEventData<'Format>) =
7587
FsCodec.Core.TimelineEvent.Create(int64 i, e.EventType, e.Data, e.Meta, e.EventId, e.CorrelationId, e.CausationId, e.Timestamp)
76-
let encoded : FsCodec.ITimelineEvent<_>[] = events |> Seq.mapi (fun i e -> map (token.streamVersion+i+1) (codec.Encode(context,e))) |> Array.ofSeq
88+
let encoded = events |> Seq.mapi (fun i e -> map (token.streamVersion + i + 1) (codec.Encode(context, e))) |> Array.ofSeq
7789
let trySyncValue currentValue =
7890
if Array.length currentValue <> token.streamVersion + 1 then ConcurrentDictionarySyncResult.Conflict (token.streamVersion)
7991
else ConcurrentDictionarySyncResult.Written (Seq.append currentValue encoded |> Array.ofSeq)
80-
match store.TrySync(token.streamName, trySyncValue, encoded) with
92+
match! store.TrySync(token.streamName, trySyncValue, encoded) with
93+
| ConcurrentArraySyncResult.Written _ ->
94+
return SyncResult.Written <| Token.ofEventArrayAndKnownState token.streamName fold state events
8195
| ConcurrentArraySyncResult.Conflict conflictingEvents ->
8296
let resync = async {
8397
let version = Token.tokenOfArray token.streamName conflictingEvents
8498
let successorEvents = conflictingEvents |> Seq.skip (token.streamVersion + 1) |> List.ofSeq
8599
return version, fold state (successorEvents |> Seq.choose codec.TryDecode) }
86-
return SyncResult.Conflict resync
87-
| ConcurrentArraySyncResult.Written _ -> return SyncResult.Written <| Token.ofEventArrayAndKnownState token.streamName fold state events }
100+
return SyncResult.Conflict resync }
88101

89102
type Resolver<'event, 'state, 'Format, 'context>(store : VolatileStore<'Format>, codec : FsCodec.IEventCodec<'event,'Format,'context>, fold, initial) =
90103
let category = Category<'event, 'state, 'context, 'Format>(store, codec, fold, initial)
91104
let resolveStream streamName context = Stream.create category streamName None context
105+
92106
member __.Resolve(streamName : FsCodec.StreamName, [<Optional; DefaultParameterValue null>] ?option, [<Optional; DefaultParameterValue null>] ?context : 'context) =
93107
match FsCodec.StreamName.toString streamName, option with
94-
| sn, (None|Some AllowStale) -> resolveStream sn context
108+
| sn, (None | Some AllowStale) -> resolveStream sn context
95109
| sn, Some AssumeEmpty -> Stream.ofMemento (Token.ofEmpty sn initial) (resolveStream sn context)
96110

97111
/// Resolve from a Memento being used in a Continuation [based on position and state typically from Stream.CreateMemento]
98112
member __.FromMemento(Token.Unpack stream as streamToken, state, ?context) =
99-
Stream.ofMemento (streamToken,state) (resolveStream stream.streamName context)
113+
Stream.ofMemento (streamToken, state) (resolveStream stream.streamName context)

0 commit comments

Comments
 (0)