Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk.Web">

<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<TargetFramework>net8.0</TargetFramework>
</PropertyGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<TargetFramework>net8.0</TargetFramework>
</PropertyGroup>

<ItemGroup>
Expand All @@ -18,14 +18,14 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Equinox.Core" Version="4.0.4" />
<PackageReference Include="Equinox.CosmosStore" Version="4.0.0" />
<PackageReference Include="Equinox.DynamoStore" Version="4.0.0" />
<PackageReference Include="Equinox.EventStoreDb" Version="4.0.0" />
<PackageReference Include="Equinox.SqlStreamStore" Version="4.0.0" />
<PackageReference Include="Equinox.MemoryStore" Version="4.0.0" />
<PackageReference Include="FsCodec.SystemTextJson" Version="3.0.2" />
<PackageReference Include="Propulsion" Version="3.0.0-rc.11" />
<PackageReference Include="Equinox.Core" Version="4.1.1" />
<PackageReference Include="Equinox.CosmosStore" Version="4.1.0" />
<PackageReference Include="Equinox.DynamoStore" Version="4.1.0" />
<PackageReference Include="Equinox.EventStoreDb" Version="4.1.0" />
<PackageReference Include="Equinox.SqlStreamStore" Version="4.1.0" />
<PackageReference Include="Equinox.MemoryStore" Version="4.1.0" />
<PackageReference Include="FsCodec.SystemTextJson" Version="3.1.0" />
<PackageReference Include="Propulsion" Version="3.0.0" />
</ItemGroup>

</Project>
4 changes: 2 additions & 2 deletions Sample/ECommerce.Equinox/ECommerce.Domain/ShoppingCart.fs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ module Events =
module Reactions =

let [<return: Struct>] (|For|_|) = catId.TryDecode
let dec = Streams.Codec.dec<Events.Event>
let dec = Streams.Codec.gen<Events.Event>
let config = catId.StreamName, dec
let [<return: Struct>] (|Decode|_|) = function
| struct (For id, _) & Streams.Decode dec events -> ValueSome struct (id, events)
Expand All @@ -42,7 +42,7 @@ module Fold =
type Status = Pending | Confirmed
type State =
{ clientId : ClientId option
status : Status; items : Item array
status : Status; items : Item[]
confirmedAt : System.DateTimeOffset option
confirmedOriginEpoch : ConfirmedEpochId option }
let initial = { clientId = None; status = Status.Pending; items = Array.empty; confirmedAt = None; confirmedOriginEpoch = None }
Expand Down
8 changes: 4 additions & 4 deletions Sample/ECommerce.Equinox/ECommerce.Domain/Store.fs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ let createDecider category = Equinox.Decider.forStream Metrics.log category

module Memory =

let create name codec initial fold store : Equinox.Category<_, _, _> =
Equinox.MemoryStore.MemoryStoreCategory(store, name, FsCodec.Compression.EncodeUncompressed codec, fold, initial)
let create name codec initial fold store: Equinox.Category<_, _, _> =
Equinox.MemoryStore.MemoryStoreCategory(store, name, FsCodec.Encoder.Uncompressed codec, fold, initial)

module Codec =

Expand All @@ -26,7 +26,7 @@ module Cosmos =

let private createCached name codec initial fold accessStrategy (context, cache) =
let cacheStrategy = Equinox.CachingStrategy.SlidingWindow (cache, defaultCacheDuration)
Equinox.CosmosStore.CosmosStoreCategory(context, name, codec, fold, initial, accessStrategy, cacheStrategy)
Equinox.CosmosStore.CosmosStoreCategory(context, name, FsCodec.SystemTextJson.Encoder.Compressed codec, fold, initial, accessStrategy, cacheStrategy)

let createUnoptimized name codec initial fold (context, cache) =
let accessStrategy = Equinox.CosmosStore.AccessStrategy.Unoptimized
Expand All @@ -44,7 +44,7 @@ module Dynamo =

let private createCached name codec initial fold accessStrategy (context, cache) =
let cacheStrategy = Equinox.CachingStrategy.SlidingWindow (cache, defaultCacheDuration)
Equinox.DynamoStore.DynamoStoreCategory(context, name, FsCodec.Compression.EncodeTryCompress codec, fold, initial, accessStrategy, cacheStrategy)
Equinox.DynamoStore.DynamoStoreCategory(context, name, FsCodec.Encoder.Compressed codec, fold, initial, accessStrategy, cacheStrategy)

let createUnoptimized name codec initial fold (context, cache) =
let accessStrategy = Equinox.DynamoStore.AccessStrategy.Unoptimized
Expand Down
12 changes: 7 additions & 5 deletions Sample/ECommerce.Equinox/ECommerce.Domain/Streams.fs
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,21 @@ open Serilog

module Codec =

let dec<'E when 'E :> TypeShape.UnionContract.IUnionContract> : Propulsion.Sinks.Codec<'E> =
FsCodec.SystemTextJson.Codec.Create<'E>() // options = Options.Default

let private renderBody (x: Propulsion.Sinks.EventBody) = System.Text.Encoding.UTF8.GetString(x.Span)
let gen<'E when 'E :> TypeShape.UnionContract.IUnionContract> : Propulsion.Sinks.Codec<'E> =
FsCodec.SystemTextJson.Codec.Create<'E>() |> FsCodec.Encoder.Uncompressed // options = Options.Default

// Uses the supplied codec to decode the supplied event record (iff at LogEventLevel.Debug, failures are logged, citing `stream` and `.Data`)
let internal tryDecode<'E> (codec: Propulsion.Sinks.Codec<'E>) (streamName: FsCodec.StreamName) event =
match codec.Decode event with
| ValueNone when Log.IsEnabled Serilog.Events.LogEventLevel.Debug ->
Log.ForContext("eventData", renderBody event.Data)
Log.ForContext("eventData", FsCodec.Encoding.GetStringUtf8 event.Data)
.Debug("Codec {type} Could not decode {eventType} in {stream}", codec.GetType().FullName, event.EventType, streamName)
ValueNone
| x -> x

let next span = Propulsion.Sinks.Events.next span
let truncate max (span: Propulsion.Sinks.Event[]) =
let span = Array.truncate max span
span, next span
let (|Decode|) codec struct (stream, events: Propulsion.Sinks.Event[]): 'E[] =
events |> Propulsion.Internal.Array.chooseV (Codec.tryDecode codec stream)
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net6.0</TargetFramework>
<TargetFramework>net8.0</TargetFramework>
<WarningLevel>5</WarningLevel>
</PropertyGroup>

Expand Down
17 changes: 0 additions & 17 deletions Sample/ECommerce.Equinox/ECommerce.FeedConsumer/Infrastructure.fs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ type StringBuilder with
open System
open System.Net
open System.Net.Http
open System.Runtime.Serialization

/// Operations on System.Net.HttpRequestMessage
module HttpReq =
Expand Down Expand Up @@ -113,21 +112,6 @@ type InvalidHttpResponseException =
sb.Appendfn "RequestBody=%s" (getBodyString e.RequestBody)
sb.Appendfn "ResponseBody=%s" (getBodyString e.ResponseBody))

interface ISerializable with
member e.GetObjectData(si : SerializationInfo, sc : StreamingContext) =
let add name (value:obj) = si.AddValue(name, value)
base.GetObjectData(si, sc) ; add "userMessage" e.userMessage ;
add "requestUri" e.RequestUri ; add "requestMethod" e.requestMethod ; add "requestBody" e.RequestBody
add "statusCode" e.StatusCode ; add "reasonPhrase" e.ReasonPhrase ; add "responseBody" e.ResponseBody

new (si : SerializationInfo, sc : StreamingContext) =
let get name = si.GetValue(name, typeof<'a>) :?> 'a
{
inherit Exception(si, sc) ; userMessage = get "userMessage" ;
RequestUri = get "requestUri" ; requestMethod = get "requestMethod" ; RequestBody = get "requestBody" ;
StatusCode = get "statusCode" ; ReasonPhrase = get "reasonPhrase" ; ResponseBody = get "responseBody"
}

static member Create(userMessage : string, response : HttpResponseMessage, ?innerException : exn) = async {
let request = response.RequestMessage
let! responseBodyC = response.Content.ReadAsStringDiapered() |> Async.StartChild
Expand Down Expand Up @@ -171,7 +155,6 @@ type HttpResponseMessage with

module HttpRes =

// let codec = ECommerce.Domain.Config.EventCodec.forUnion<Event>
let serdes = FsCodec.SystemTextJson.Serdes(FsCodec.SystemTextJson.Options.Create())

/// Deserialize body using default Json.Net profile - throw with content details if StatusCode is unexpected or decoding fails
Expand Down
11 changes: 6 additions & 5 deletions Sample/ECommerce.Equinox/ECommerce.FeedConsumer/Ingester.fs
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,12 @@ module PipelineEvent =
Unchecked.defaultof<_>,
context = item)
let (|ItemsForFc|_|) = function
| FsCodec.StreamName.Split (_, FsCodec.StreamId.Parse 2 [|_ ; FcId.Parse fc|]), (s : Propulsion.Sinks.Event[]) ->
| FsCodec.StreamName.Split (_, FsCodec.StreamId.Parse 2 [| _ ; FcId.Parse fc |]), (s: Propulsion.Sinks.Event[]) ->
Some (fc, s |> Seq.map (fun e -> Unchecked.unbox<Item> e.Context))
| _ -> None

let handle maxDop stream span: Async<Propulsion.Sinks.StreamResult * Outcome> = async {
let handle maxDop stream span: Async<Outcome * int64> = async {
let span, nextIndex = Streams.truncate 1000 span
match stream, span with
| PipelineEvent.ItemsForFc (_fc, items) ->
// Take chunks of max 1000 in order to make handler latency be less 'lumpy'
Expand All @@ -50,14 +51,14 @@ let handle maxDop stream span: Async<Propulsion.Sinks.StreamResult * Outcome> =
do! Async.Sleep(TimeSpan.FromSeconds 1.)
return if i % 3 = 1 then Some 42 else None
})
let! results = Async.Parallel(maybeAccept, maxDegreeOfParallelism=maxDop)
let! results = Async.Parallel(maybeAccept, maxDegreeOfParallelism = maxDop)
let ready = results |> Array.choose id
let maybeAdd = ready |> Seq.mapi (fun i _x -> async {
do! Async.Sleep(TimeSpan.FromSeconds 1.)
return if i % 2 = 1 then Some 42 else None
})
let! added = Async.Parallel(maybeAdd, maxDegreeOfParallelism=maxDop)
let outcome = { added = Seq.length added; notReady = results.Length - ready.Length; dups = results.Length - ticketIds.Length }
return Propulsion.Sinks.StreamResult.PartiallyProcessed ticketIds.Length, outcome
| x -> return failwithf "Unexpected stream %O" x
return outcome, nextIndex
| x -> return failwithf $"Unexpected stream {x}"
}
2 changes: 1 addition & 1 deletion Sample/ECommerce.Equinox/ECommerce.FeedConsumer/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ let run args = async {
return! [| Async.AwaitKeyboardInterruptAsTaskCanceledException()
source.AwaitWithStopOnCancellation()
sink.AwaitWithStopOnCancellation()
|] |> Async.Parallel |> Async.Ignore<unit array> }
|] |> Async.Parallel |> Async.Ignore<unit[]> }

[<EntryPoint>]
let main argv =
Expand Down
5 changes: 1 addition & 4 deletions Sample/ECommerce.Equinox/ECommerce.Infrastructure/Args.fs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ module Cosmos =
| [<AltCommandLine "-s">] Connection of string
| [<AltCommandLine "-d">] Database of string
| [<AltCommandLine "-c">] Container of string
| [<AltCommandLine "-o">] Timeout of float
| [<AltCommandLine "-r">] Retries of int
| [<AltCommandLine "-rt">] RetriesWaitTime of float
interface IArgParserTemplate with
Expand All @@ -88,17 +87,15 @@ module Cosmos =
| Connection _ -> "specify a connection string for a Cosmos account. (optional if environment variable EQUINOX_COSMOS_CONNECTION specified)"
| Database _ -> "specify a database name for Cosmos store. (optional if environment variable EQUINOX_COSMOS_DATABASE specified)"
| Container _ -> "specify a container name for Cosmos store. (optional if environment variable EQUINOX_COSMOS_CONTAINER specified)"
| Timeout _ -> "specify operation timeout in seconds (default: 5)."
| Retries _ -> "specify operation retries (default: 1)."
| RetriesWaitTime _ -> "specify max wait-time for retry when being throttled by Cosmos in seconds (default: 5)"
type Arguments(c : Configuration, p : ParseResults<Parameters>) =
let connection = p.GetResult(Connection, fun () -> c.CosmosConnection)
let discovery = Equinox.CosmosStore.Discovery.ConnectionString connection
let mode = p.TryGetResult ConnectionMode
let timeout = p.GetResult(Timeout, 5.) |> TimeSpan.FromSeconds
let retries = p.GetResult(Retries, 1)
let maxRetryWaitTime = p.GetResult(RetriesWaitTime, 5.) |> TimeSpan.FromSeconds
let connector = Equinox.CosmosStore.CosmosStoreConnector(discovery, timeout, retries, maxRetryWaitTime, ?mode = mode)
let connector = Equinox.CosmosStore.CosmosStoreConnector(discovery, retries, maxRetryWaitTime, ?mode = mode)
let database = p.GetResult(Database, fun () -> c.CosmosDatabase)
let container = p.GetResult(Container, fun () -> c.CosmosContainer)
member val Verbose = p.Contains Verbose
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<TargetFramework>net8.0</TargetFramework>
</PropertyGroup>

<ItemGroup>
Expand All @@ -11,19 +11,18 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Argu" Version="6.1.4" />
<PackageReference Include="Destructurama.FSharp" Version="1.2.0" />
<PackageReference Include="Equinox.CosmosStore.Prometheus" Version="4.0.0" />
<PackageReference Include="Equinox.DynamoStore.Prometheus" Version="4.0.0" />
<PackageReference Include="Equinox.SqlStreamStore.MsSql" Version="4.0.0" />
<PackageReference Include="Argu" Version="6.2.5" />
<PackageReference Include="Equinox.CosmosStore.Prometheus" Version="4.1.0" />
<PackageReference Include="Equinox.DynamoStore.Prometheus" Version="4.1.0" />
<PackageReference Include="Equinox.SqlStreamStore.MsSql" Version="4.1.0" />
<PackageReference Include="prometheus-net.AspNetCore" Version="3.6.0" />
<PackageReference Include="Propulsion.CosmosStore" Version="3.0.0-rc.11" />
<PackageReference Include="Propulsion.DynamoStore" Version="3.0.0-rc.11" />
<PackageReference Include="Propulsion.EventStoreDb" Version="3.0.0-rc.11" />
<PackageReference Include="Propulsion.SqlStreamStore" Version="3.0.0-rc.11" />
<PackageReference Include="Propulsion.MemoryStore" Version="3.0.0-rc.11" />
<PackageReference Include="Propulsion.CosmosStore" Version="3.0.0" />
<PackageReference Include="Propulsion.DynamoStore" Version="3.0.0" />
<PackageReference Include="Propulsion.EventStoreDb" Version="3.0.0" />
<PackageReference Include="Propulsion.SqlStreamStore" Version="3.0.0" />
<PackageReference Include="Propulsion.MemoryStore" Version="3.0.0" />
<PackageReference Include="Serilog.Sinks.Async" Version="1.5.0" />
<PackageReference Include="Serilog.Sinks.Console" Version="4.1.0" />
<PackageReference Include="Serilog.Sinks.Console" Version="6.1.1" />
<ProjectReference Include="../ECommerce.Domain/ECommerce.Domain.fsproj" />
<!-- <ProjectReference Include="../../../../propulsion/src/Propulsion.EventStoreDb/Propulsion.EventStoreDb.fsproj" />-->
<!-- <ProjectReference Include="../../../../propulsion/src/Propulsion.CosmosStore/Propulsion.CosmosStore.fsproj" />-->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,13 +128,11 @@ module Sinks =
let t = "[{Timestamp:HH:mm:ss} {Level:u3}] {Message:lj} {NewLine}{Exception}"
configuration.WriteTo.Console(theme=Sinks.SystemConsole.Themes.AnsiConsoleTheme.Code, outputTemplate=t)

[<System.Runtime.CompilerServices.Extension>]
type Logging() =

[<System.Runtime.CompilerServices.Extension>]
static member Configure(configuration : LoggerConfiguration, ?verbose) =
configuration
.Destructure.FSharpTypes()
.Enrich.FromLogContext()
|> fun c -> if verbose = Some true then c.MinimumLevel.Debug() else c

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net6.0</TargetFramework>
<TargetFramework>net8.0</TargetFramework>
<WarningLevel>5</WarningLevel>
</PropertyGroup>

Expand Down
2 changes: 1 addition & 1 deletion Sample/ECommerce.Equinox/ECommerce.Reactor/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ let run (args : Args.Arguments) = async {
return! [| Async.AwaitKeyboardInterruptAsTaskCanceledException()
source.AwaitWithStopOnCancellation()
sink.AwaitWithStopOnCancellation()
|] |> Async.Parallel |> Async.Ignore<unit array> }
|] |> Async.Parallel |> Async.Ignore<unit[]> }

[<EntryPoint>]
let main argv =
Expand Down
14 changes: 7 additions & 7 deletions Sample/ECommerce.Equinox/ECommerce.Reactor/Reactor.fs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ type Stats(log, statsInterval, stateInterval, verboseStore, ?logExternalStats) =
let reactionCategories = [| ShoppingCart.CategoryName |]

let handle
(cartSummary : ShoppingCartSummaryHandler.Service)
(confirmedCarts : ConfirmedHandler.Service)
stream span : Async<Propulsion.Sinks.StreamResult * Outcome> = async {
(cartSummary: ShoppingCartSummaryHandler.Service)
(confirmedCarts: ConfirmedHandler.Service)
stream span: Async<Outcome * int64> = async {
match struct (stream, span) with
| ShoppingCart.Reactions.Decode (cartId, events) ->
match events with
Expand All @@ -39,10 +39,10 @@ let handle
| _ -> ()
match events with
| ShoppingCart.Reactions.StateChanged ->
let! worked, version' = cartSummary.TryIngestSummary(cartId)
let outcome = if worked then Outcome.Ok (1, Array.length span - 1) else Outcome.Skipped span.Length
return Propulsion.Sinks.StreamResult.OverrideNextIndex version', outcome
| _ -> return Propulsion.Sinks.StreamResult.AllProcessed, Outcome.NotApplicable span.Length
let! dirty, version' = cartSummary.TryIngestSummary(cartId)
let outcome = if dirty then Outcome.Ok (1, Array.length span - 1) else Outcome.Skipped span.Length
return outcome, version'
| _ -> return Outcome.NotApplicable span.Length, Streams.next span
| x -> return failwith $"Invalid event %A{x}" } // should be filtered by isReactionStream

module Config =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ type Service internal (source : ShoppingCart.Service, destination : ShoppingCart
member _.TryIngestSummary(cartId) : Async<bool * int64> = async {
match! source.SummarizeWithVersion(cartId) with
| Some view, version' ->
let! worked = destination.TryIngest(cartId, version', view)
return worked, version'
let! dirty = destination.TryIngest(cartId, version', view)
return dirty, version'
| None, version' -> return false, version' }

module Config =
Expand Down
Loading
Loading