Skip to content

Commit fe1886a

Browse files
committed
Add metrics wiring
1 parent d776499 commit fe1886a

File tree

2 files changed

+40
-29
lines changed

2 files changed

+40
-29
lines changed

src/Equinox.CosmosStore/CosmosStore.fs

+1-1
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ module Log =
210210
| Delete of Measurement
211211
/// Trimmed the Tip
212212
| Trim of Measurement
213-
/// Queried via the Index
213+
/// Queried via the Index; count=-1 -> aggregate operation
214214
| Index of Measurement
215215
let [<return: Struct>] (|MetricEvent|_|) (logEvent: Serilog.Events.LogEvent): Metric voption =
216216
let mutable p = Unchecked.defaultof<_>

src/Equinox.CosmosStore/CosmosStoreLinq.fs

+39-28
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
namespace Equinox.CosmosStore.Linq
22

3-
open Equinox.Core.Infrastructure
3+
open Equinox.Core
4+
open Equinox.CosmosStore.Core // Log, JsonCompressedBase64Converter
45
open FSharp.Control // taskSeq
56
open Serilog
67
open System
@@ -76,8 +77,8 @@ module Internal =
7677
let m = response.Diagnostics.GetQueryMetrics().CumulativeMetrics
7778
yield struct (response.Diagnostics.GetClientElapsedTime(), response.RequestCharge, response.Resource,
7879
int m.RetrievedDocumentCount, int m.RetrievedDocumentSize, int m.OutputDocumentSize) }
79-
let toAsyncEnum<'T> (desc: string) (iterator: FeedIterator<'T>) = taskSeq {
80-
let sw = System.Diagnostics.Stopwatch.StartNew()
80+
let [<EditorBrowsable(EditorBrowsableState.Never)>] toAsyncEnum<'T> log (container: Container) cat (iterator: FeedIterator<'T>) = taskSeq {
81+
let startTicks = System.Diagnostics.Stopwatch.GetTimestamp()
8182
use _ = iterator
8283
let mutable responses, items, totalRtt, totalRu, totalRdc, totalRds, totalOds = 0, 0, TimeSpan.Zero, 0., 0, 0, 0
8384
try for rtt, rc, response, rdc, rds, ods in enum_ iterator do
@@ -90,44 +91,53 @@ module Internal =
9091
for item in response do
9192
items <- items + 1
9293
yield item
93-
finally Log.Information("CosmosStoreQuery.enum {desc} {count} ({trips}r {totalRtt:f0}ms; {rdc}i {rds:f2}>{ods:f2} MiB) {rc:f2} RU {latency} ms",
94-
desc, items, responses, totalRtt.TotalMilliseconds, totalRdc, miB totalRds, miB totalOds, totalRu, sw.ElapsedMilliseconds) }
94+
finally
95+
let interval = StopwatchInterval(startTicks, System.Diagnostics.Stopwatch.GetTimestamp())
96+
let log = let evt = Log.Metric.Index { database = container.Database.Id; container = container.Id; stream = cat + FsCodec.StreamName.Category.SeparatorStr
97+
interval = interval; bytes = totalOds; count = items; ru = totalRu } in log |> Log.event evt
98+
log.Information("EqxCosmos {action:l} {count} ({trips}r {totalRtt:f0}ms; {rdc}i {rds:f2}>{ods:f2} MiB) {rc:f2} RU {latency} ms",
99+
"Index", items, responses, totalRtt.TotalMilliseconds, totalRdc, miB totalRds, miB totalOds, totalRu, interval.ElapsedMilliseconds) }
95100
/// Runs a query that renders 'T, Hydrating the results as 'P (can be the same types but e.g. you might want to map an object to a JsonElement etc)
96-
let enum<'T, 'P> desc (container: Container) (query: IQueryable<'T>): TaskSeq<'P> =
101+
let enum<'T, 'P> (log: ILogger) (container: Container) cat (query: IQueryable<'T>): TaskSeq<'P> =
97102
let queryDefinition = query.ToQueryDefinition()
98-
if Log.IsEnabled Serilog.Events.LogEventLevel.Debug then Log.Debug("CosmosStoreQuery.query {desc} {query}", desc, queryDefinition.QueryText)
99-
container.GetItemQueryIterator<'P>(queryDefinition) |> toAsyncEnum<'P> desc
103+
if log.IsEnabled Serilog.Events.LogEventLevel.Debug then log.Debug("CosmosStoreQuery.query {cat} {query}", cat, queryDefinition.QueryText)
104+
container.GetItemQueryIterator<'P>(queryDefinition) |> toAsyncEnum<'P> log container cat
100105
module AggregateOp =
101106
/// Runs one of the typical Cosmos SDK extensions, e.g. CountAsync, logging the costs
102-
let exec (desc: string) (query: IQueryable<'T>) run render: System.Threading.Tasks.Task<'R> = task {
103-
if Log.IsEnabled Serilog.Events.LogEventLevel.Debug then Log.Debug("CosmosStoreQuery.count {desc} {query}", desc, query.ToQueryDefinition().QueryText)
104-
let sw = System.Diagnostics.Stopwatch.StartNew()
107+
let [<EditorBrowsable(EditorBrowsableState.Never)>] exec (log: ILogger) (container: Container) (op: string) (cat: string) (query: IQueryable<'T>) run render: System.Threading.Tasks.Task<'R> = task {
108+
let startTicks = System.Diagnostics.Stopwatch.GetTimestamp()
105109
let! (rsp: Response<'R>) = run query
106110
let res = rsp.Resource
107111
let summary = render res
108112
let m = rsp.Diagnostics.GetQueryMetrics().CumulativeMetrics
109-
Log.Information("CosmosStoreQuery.count {desc} {count} ({rdc}i {rds:f2}>{ods:f2} MiB) {rc} RU {latency} ms",
110-
desc, summary, m.RetrievedDocumentCount, miB m.RetrievedDocumentSize, miB m.OutputDocumentSize, rsp.RequestCharge, sw.ElapsedMilliseconds)
113+
let interval = StopwatchInterval(startTicks, System.Diagnostics.Stopwatch.GetTimestamp())
114+
let totalOds, totalRu = m.OutputDocumentSize, rsp.RequestCharge
115+
let log = let evt = Log.Metric.Index { database = container.Database.Id; container = container.Id; stream = cat + FsCodec.StreamName.Category.SeparatorStr
116+
interval = interval; bytes = int totalOds; count = -1; ru = totalRu } in log |> Log.event evt
117+
log.Information("EqxCosmos {action:l} {cat} {count} ({rdc}i {rds:f2}>{ods:f2} MiB) {rc} RU {latency} ms",
118+
op, cat, summary, m.RetrievedDocumentCount, miB m.RetrievedDocumentSize, miB totalOds, totalRu, interval.ElapsedMilliseconds)
111119
return res }
112120
/// Runs query.CountAsync, with instrumentation equivalent to what query provides
113-
let countAsync desc (query: IQueryable<'T>) ct =
114-
exec desc query (_.CountAsync(ct)) id<int>
121+
let countAsync (log: ILogger) container cat (query: IQueryable<'T>) ct =
122+
if log.IsEnabled Serilog.Events.LogEventLevel.Debug then log.Debug("CosmosStoreQuery.count {cat} {query}", cat, query.ToQueryDefinition().QueryText)
123+
exec log container "count" cat query (_.CountAsync(ct)) id<int>
115124
module Scalar =
116125
/// Generates a TOP 1 SQL query
117126
let top1 (query: IQueryable<'T>) =
118127
query.Take(1)
119128
/// Handles a query that's expected to yield 0 or 1 result item
120-
let tryHeadAsync<'T, 'R> desc (container: Container) (query: IQueryable<'T>) (_ct: CancellationToken): Task<'R option> =
129+
let tryHeadAsync<'T, 'R> (log: ILogger) (container: Container) cat (query: IQueryable<'T>) (_ct: CancellationToken): Task<'R option> =
121130
let queryDefinition = (top1 query).ToQueryDefinition()
122-
if Log.IsEnabled Serilog.Events.LogEventLevel.Debug then Log.Debug("CosmosStoreQuery.tryScalar {desc} {query}", desc, queryDefinition.QueryText)
123-
container.GetItemQueryIterator<'R>(queryDefinition) |> Query.toAsyncEnum desc |> TaskSeq.tryHead
124-
type Projection<'T, 'M>(query, description, container, enum: IQueryable<'T> -> TaskSeq<'M>) =
125-
static member Create<'P>(q, d, c, hydrate: 'P -> 'M) = Projection<'T, 'M>(q, d, c, Query.enum<'T, 'P> d c >> TaskSeq.map hydrate)
131+
if log.IsEnabled Serilog.Events.LogEventLevel.Debug then log.Debug("CosmosStoreQuery.tryScalar {cat} {query}", queryDefinition.QueryText)
132+
container.GetItemQueryIterator<'R>(queryDefinition) |> Query.toAsyncEnum log container cat |> TaskSeq.tryHead
133+
type Projection<'T, 'M>(query, category, container, enum: IQueryable<'T> -> TaskSeq<'M>, count: IQueryable<'T> -> CancellationToken -> Task<int>) =
134+
static member Create<'P>(q, cat, c, log, hydrate: 'P -> 'M) =
135+
Projection<'T, 'M>(q, cat, c, Query.enum<'T, 'P> log c cat >> TaskSeq.map hydrate, AggregateOp.countAsync log c cat)
126136
member _.Enum: TaskSeq<'M> = query |> enum
127137
member x.EnumPage(skip, take): TaskSeq<'M> = query |> Query.offsetLimit (skip, take) |> enum
128-
member _.CountAsync: CancellationToken -> Task<int> = query |> AggregateOp.countAsync description
138+
member _.CountAsync: CancellationToken -> Task<int> = query |> count
129139
[<EditorBrowsable(EditorBrowsableState.Never)>] member val Query: IQueryable<'T> = query
130-
[<EditorBrowsable(EditorBrowsableState.Never)>] member val Description: string = description
140+
[<EditorBrowsable(EditorBrowsableState.Never)>] member val Category: string = category
131141
[<EditorBrowsable(EditorBrowsableState.Never)>] member val Container: Container = container
132142

133143
// We want to generate a projection statement of the shape: VALUE {"sn": root["p"], "snap": root["u"][0].["d"]}
@@ -138,7 +148,7 @@ module Internal =
138148
// This hack is based on https://stackoverflow.com/a/73506241/11635
139149
type SnAndSnap<'I>() =
140150
member val sn: FsCodec.StreamName = Unchecked.defaultof<_> with get, set
141-
[<System.Text.Json.Serialization.JsonConverter(typeof<Equinox.CosmosStore.Core.JsonCompressedBase64Converter>)>]
151+
[<System.Text.Json.Serialization.JsonConverter(typeof<JsonCompressedBase64Converter>)>]
142152
member val snap: 'I = Unchecked.defaultof<_> with get, set
143153
static member CreateItemQueryLambda<'T>(snExpression: Expression -> MemberExpression, snapExpression: Expression<Func<'T, 'I>>) =
144154
let param = Expression.Parameter(typeof<'T>, "x")
@@ -178,8 +188,8 @@ module Index =
178188
container.GetItemLinqQueryable<Item<'I>>().Where(fun d -> d.p.StartsWith(prefix) && d.u[0].c = caseName)
179189

180190
/// Returns the StreamName (from the `p` field) for a 0/1 item query; only the TOP 1 item is returned
181-
let tryGetStreamNameAsync description container (query: IQueryable<Item<'I>>) ct =
182-
Internal.Scalar.tryHeadAsync<string, FsCodec.StreamName> description container (query.Select(fun x -> x.p)) ct
191+
let tryGetStreamNameAsync log cat container (query: IQueryable<Item<'I>>) ct =
192+
Internal.Scalar.tryHeadAsync<string, FsCodec.StreamName> log cat container (query.Select(fun x -> x.p)) ct
183193

184194
/// Query the items, returning the Stream name and the Snapshot as a JsonElement (Decompressed if applicable)
185195
let projectStreamNameAndSnapshot<'I> snapExpression: Expression<Func<Item<'I>, SnAndSnap<'I>>> =
@@ -197,8 +207,9 @@ type Query<'T, 'M>(inner: Internal.Projection<'T, 'M>) =
197207

198208
/// Enables querying based on uncompressed Indexed values stored as secondary unfolds alongside the snapshot
199209
[<NoComparison; NoEquality>]
200-
type IndexContext<'I>(container, categoryName, caseName) =
210+
type IndexContext<'I>(container, categoryName, caseName, log) =
201211

212+
member val Log = defaultArg log Log.Logger
202213
member val Description = $"{categoryName}/{caseName}" with get, set
203214
member val Container = container
204215

@@ -218,7 +229,7 @@ type IndexContext<'I>(container, categoryName, caseName) =
218229

219230
/// Runs the query; yields the StreamName from the TOP 1 Item matching the criteria
220231
member x.TryGetStreamNameWhereAsync(criteria: Expressions.Expression<Func<Index.Item<'I>, bool>>, ct) =
221-
Index.tryGetStreamNameAsync x.Description container (x.ByCategory().Where criteria) ct
232+
Index.tryGetStreamNameAsync x.Log container categoryName (x.ByCategory().Where criteria) ct
222233

223234
/// Runs the query; yields the StreamName from the TOP 1 Item matching the criteria
224235
member x.TryGetStreamNameWhere(criteria: Expressions.Expression<Func<Index.Item<'I>, bool>>): Async<FsCodec.StreamName option> =
@@ -227,4 +238,4 @@ type IndexContext<'I>(container, categoryName, caseName) =
227238
/// Query the items, grabbing the Stream name and the Snapshot; The StreamName and the (Decompressed if applicable) Snapshot are passed to `hydrate`
228239
member x.QueryStreamNameAndSnapshot(query: IQueryable<Index.Item<'I>>, selectBody: Expression<Func<Index.Item<'I>, 'I>>,
229240
hydrate: SnAndSnap<System.Text.Json.JsonElement> -> 'M) =
230-
Internal.Projection.Create(query.Select(Index.projectStreamNameAndSnapshot<'I> selectBody), x.Description, container, hydrate) |> Query
241+
Internal.Projection.Create(query.Select(Index.projectStreamNameAndSnapshot<'I> selectBody), categoryName, container, x.Log, hydrate) |> Query

0 commit comments

Comments
 (0)