Skip to content

Commit d1fb8e1

Browse files
committed
feat: eqx top
1 parent 1b43f54 commit d1fb8e1

File tree

1 file changed

+144
-26
lines changed

1 file changed

+144
-26
lines changed

tools/Equinox.Tool/Program.fs

Lines changed: 144 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ type Parameters =
2525
| [<CliPrefix(CliPrefix.None); Last>] InitSql of ParseResults<InitSqlParameters>
2626
| [<CliPrefix(CliPrefix.None); Last>] Stats of ParseResults<StatsParameters>
2727
| [<CliPrefix(CliPrefix.None); Last>] Query of ParseResults<QueryParameters>
28+
| [<CliPrefix(CliPrefix.None); Last>] Top of ParseResults<TopParameters>
2829
interface IArgParserTemplate with
2930
member a.Usage = a |> function
3031
| Verbose -> "Include low level logging regarding specific test runs."
@@ -37,6 +38,7 @@ type Parameters =
3738
| InitSql _ -> "Initialize Database Schema (supports `mssql`/`mysql`/`postgres` SqlStreamStore stores)."
3839
| Stats _ -> "inspect store to determine numbers of streams/documents/events and/or config (supports `cosmos` and `dynamo` stores)."
3940
| Query _ -> "Load/Summarise streams based on Cosmos SQL Queries (supports `cosmos` only)."
41+
| Top _ -> "Scan to determine top categories and streams (supports `cosmos` only)."
4042
| Dump _ -> "Load and show events in a specified stream (supports all stores)."
4143
and [<NoComparison; NoEquality; RequireSubcommand>] InitParameters =
4244
| [<AltCommandLine "-ru"; Unique>] Rus of int
@@ -141,25 +143,61 @@ and [<NoComparison; NoEquality; RequireSubcommand>] QueryParameters =
141143
and [<RequireQualifiedAccess>] Mode = Default | SnapOnly | SnapWithStream | ReadOnly | ReadWithStream | Raw
142144
and [<RequireQualifiedAccess>] Criteria = SingleStream of string | CatName of string | CatLike of string | Unfiltered
143145
and QueryArguments(p: ParseResults<QueryParameters>) =
144-
member val Mode = p.GetResult(Mode, if p.Contains File then Mode.Raw else Mode.Default)
145-
member val Pretty = p.Contains QueryParameters.Pretty
146-
member val TeeConsole = p.Contains Console
146+
member val Mode = p.GetResult(QueryParameters.Mode, if p.Contains QueryParameters.File then Mode.Raw else Mode.Default)
147+
member val Pretty = p.Contains QueryParameters.Pretty
148+
member val TeeConsole = p.Contains QueryParameters.Console
149+
member val Criteria =
150+
match p.TryGetResult QueryParameters.StreamName, p.TryGetResult QueryParameters.CategoryName, p.TryGetResult QueryParameters.CategoryLike with
151+
| Some sn, None, None -> Criteria.SingleStream sn
152+
| Some _, Some _, _
153+
| Some _, _, Some _ -> p.Raise "StreamName and CategoryLike/CategoryName mutually exclusive"
154+
| None, Some cn, None -> Criteria.CatName cn
155+
| None, None, Some cl -> Criteria.CatLike cl
156+
| None, None, None -> Criteria.Unfiltered
157+
| None, Some _, Some _ -> p.Raise "CategoryLike and CategoryName are mutually exclusive"
158+
member val Filepath = p.TryGetResult QueryParameters.File
159+
member val UnfoldName = p.TryGetResult QueryParameters.UnfoldName
160+
member val UnfoldCriteria = p.TryGetResult QueryParameters.UnfoldCriteria
161+
member val CosmosArgs = p.GetResult QueryParameters.Cosmos |> Store.Cosmos.Arguments
162+
member x.Connect() = match Store.Cosmos.config Log.Logger (None, true) x.CosmosArgs with
163+
| Store.Config.Cosmos (cc, _, _) -> cc.Container
164+
| _ -> failwith "Query requires Cosmos"
165+
and [<NoComparison; NoEquality; RequireSubcommand>] TopParameters =
166+
| [<AltCommandLine "-sn"; Unique>] StreamName of string
167+
| [<AltCommandLine "-cn"; Unique>] CategoryName of string
168+
| [<AltCommandLine "-cl"; Unique>] CategoryLike of string
169+
| [<AltCommandLine "-S"; Unique>] Streams
170+
| [<MainCommand>] Limit of int
171+
| [<CliPrefix(CliPrefix.None)>] Cosmos of ParseResults<Store.Cosmos.Parameters>
172+
interface IArgParserTemplate with
173+
member a.Usage = a |> function
174+
| StreamName _ -> "Specify stream name to match against `p`, e.g. `$UserServices-f7c1ce63389a45bdbea1cccebb1b3c8a`."
175+
| CategoryName _ -> "Specify category name to match against `p`, e.g. `$UserServices`."
176+
| CategoryLike _ -> "Specify category name to match against `p` as a Cosmos LIKE expression (with `%` as wildcard, e.g. `$UserServices-%`."
177+
| Streams -> "Stream level stats"
178+
| Limit _ -> "Number of items to limit output to"
179+
| Cosmos _ -> "Parameters for CosmosDB."
180+
and TopArguments(p: ParseResults<TopParameters>) =
147181
member val Criteria =
148182
match p.TryGetResult StreamName, p.TryGetResult CategoryName, p.TryGetResult CategoryLike with
149-
| Some sn, None, None -> Criteria.SingleStream sn
183+
| Some sn, None, None -> Criteria.SingleStream sn
150184
| Some _, Some _, _
151-
| Some _, _, Some _ -> p.Raise "StreamName and CategoryLike/CategoryName mutually exclusive"
152-
| None, Some cn, None -> Criteria.CatName cn
153-
| None, None, Some cl -> Criteria.CatLike cl
154-
| None, None, None -> Criteria.Unfiltered
155-
| None, Some _, Some _ -> p.Raise "CategoryLike and CategoryName are mutually exclusive"
156-
member val Filepath = p.TryGetResult File
157-
member val UnfoldName = p.TryGetResult UnfoldName
158-
member val UnfoldCriteria = p.TryGetResult UnfoldCriteria
159-
member val CosmosArgs = p.GetResult QueryParameters.Cosmos |> Store.Cosmos.Arguments
160-
member x.ConfigureStore(log: ILogger) =
161-
let storeConfig = None, true
162-
Store.Cosmos.config log storeConfig x.CosmosArgs
185+
| Some _, _, Some _ -> p.Raise "StreamName and CategoryLike/CategoryName mutually exclusive"
186+
| None, Some cn, None -> Criteria.CatName cn
187+
| None, None, Some cl -> Criteria.CatLike cl
188+
| None, None, None -> Criteria.Unfiltered
189+
| None, Some _, Some _ -> p.Raise "CategoryLike and CategoryName are mutually exclusive"
190+
member val CosmosArgs = p.GetResult TopParameters.Cosmos |> Store.Cosmos.Arguments
191+
member val StreamLevel = p.Contains TopParameters.Streams
192+
member val Count = p.GetResult(TopParameters.Limit, 100)
193+
member x.Connect() = match Store.Cosmos.config Log.Logger (None, true) x.CosmosArgs with
194+
| Store.Config.Cosmos (cc, _, _) -> cc.Container
195+
| _ -> failwith "Top requires Cosmos"
196+
member x.Execute(sql) = let container = x.Connect()
197+
let qd = Microsoft.Azure.Cosmos.QueryDefinition sql
198+
let qo = Microsoft.Azure.Cosmos.QueryRequestOptions(MaxItemCount = x.CosmosArgs.QueryMaxItems)
199+
container.GetItemQueryIterator<System.Text.Json.JsonElement>(qd, requestOptions = qo)
200+
163201
and [<NoComparison; NoEquality; RequireSubcommand>] DumpParameters =
164202
| [<AltCommandLine "-s"; MainCommand>] Stream of FsCodec.StreamName
165203
| [<AltCommandLine "-C"; Unique>] Correlation
@@ -379,18 +417,20 @@ module CosmosQuery =
379417
let maybeFileStream = a.Filepath |> Option.map (fun p ->
380418
Log.Information("Dumping {mode} content to {path}", a.Mode, System.IO.FileInfo(p).FullName)
381419
System.IO.File.Create p) // Silently truncate if it exists, makes sense for typical usage
382-
let storeConfig, qo = a.ConfigureStore(Log.Logger), Microsoft.Azure.Cosmos.QueryRequestOptions(MaxItemCount = a.CosmosArgs.QueryMaxItems)
383-
let container = match storeConfig with Store.Config.Cosmos (cc, _, _) -> cc.Container | _ -> failwith "Query requires Cosmos"
420+
let qo = Microsoft.Azure.Cosmos.QueryRequestOptions(MaxItemCount = a.CosmosArgs.QueryMaxItems)
421+
let container = a.Connect()
384422
let pageStreams, accStreams = System.Collections.Generic.HashSet(), System.Collections.Generic.HashSet()
385423
let mutable accI, accE, accU, accRus, accBytesRead = 0L, 0L, 0L, 0., 0L
386-
try for rtt, rc, items, rdc, rds, ods in container.GetItemQueryIterator<System.Text.Json.JsonDocument>(queryDef a, requestOptions = qo) |> Query.enum__ do
387-
let newestAge = items |> Seq.choose _.Timestamp |> Seq.tryLast |> Option.map (fun ts -> ts - DateTime.UtcNow)
388-
let items = [| for x in items -> x.Cast<Equinox.CosmosStore.Core.Tip>() |]
424+
let it = container.GetItemQueryIterator<System.Text.Json.JsonDocument>(queryDef a, requestOptions = qo)
425+
try for rtt, rc, items, rdc, rds, ods in it |> Query.enum__ do
426+
let mutable newestTs = None
427+
let items = [| for x in items -> newestTs <- max newestTs x.Timestamp
428+
x.Cast<Equinox.CosmosStore.Core.Tip>() |]
389429
let inline arrayLen x = if isNull x then 0 else Array.length x
390430
pageStreams.Clear(); for x in items do if x.p <> null && pageStreams.Add x.p then accStreams.Add x.p |> ignore
391431
let pageI, pageE, pageU = items.Length, items |> Seq.sumBy (_.e >> arrayLen), items |> Seq.sumBy (_.u >> arrayLen)
392-
Log.Information("Page{rdc,5}>{count,4}i{streams,5}s{es,5}e{us,5}u{rds,5:f2}>{ods,4:f2} MiB{rc,7:f2} RU{s,5:N1} s age {age:dddd\.hh\:mm\:ss}",
393-
rdc, pageI, pageStreams.Count, pageE, pageU, miB rds, miB ods, rc, rtt.TotalSeconds, Option.toNullable newestAge)
432+
Log.Information("Page{rdc,5}>{count,4}i{streams,5}s{es,5}e{us,5}u{rds,5:f2}>{ods,4:f2}MiB{rc,7:f2}RU{s,5:N1}s age {age:dddd\.hh\:mm\:ss}",
433+
rdc, pageI, pageStreams.Count, pageE, pageU, miB rds, miB ods, rc, rtt.TotalSeconds, newestTs.Value - DateTime.UtcNow)
394434
maybeFileStream |> Option.iter (fun stream ->
395435
for x in items do
396436
serdes.SerializeToStream(x, stream)
@@ -403,9 +443,86 @@ module CosmosQuery =
403443
let fileSize = maybeFileStream |> Option.map _.Position |> Option.defaultValue 0
404444
maybeFileStream |> Option.iter _.Close() // Before we log so time includes flush time and no confusion
405445
let categoryName = FsCodec.StreamName.parse >> FsCodec.StreamName.split >> fun struct (cn, _sid) -> cn
406-
let accCategories = accStreams |> Seq.map categoryName |> Seq.distinct |> Seq.length
407-
Log.Information("TOTALS {cats}c {streams:N0}s {count:N0}i {es:N0}e {us:N0}u R/W {rmib:N1}/{wmib:N1}MiB {ru:N2}RU {s:N1}s",
408-
accCategories, accStreams.Count, accI, accE, accU, miB accBytesRead, miB fileSize, accRus, sw.Elapsed.TotalSeconds) }
446+
let accCategories = System.Collections.Generic.HashSet(accStreams |> Seq.map categoryName).Count
447+
Log.Information("TOTALS {count:N0}i {cats}c {streams:N0}s {es:N0}e {us:N0}u R/W {rmib:N1}/{wmib:N1}MiB {ru:N2}RU {s:N1}s",
448+
accI, accCategories, accStreams.Count, accE, accU, miB accBytesRead, miB fileSize, accRus, sw.Elapsed.TotalSeconds) }
449+
450+
module CosmosTop =
451+
452+
open Equinox.CosmosStore.Linq.Internal
453+
open FSharp.Control
454+
455+
let cosmosTimeStamp (x: System.Text.Json.JsonElement) = x.GetProperty("_ts").GetDouble() |> DateTime.UnixEpoch.AddSeconds
456+
let tryParseEquinoxBatch (x: System.Text.Json.JsonElement) =
457+
let tryProp (id: string): ValueOption<System.Text.Json.JsonElement> =
458+
let mutable p = Unchecked.defaultof<_>
459+
if x.TryGetProperty(id, &p) then ValueSome p else ValueNone
460+
match tryProp "p" with
461+
| ValueSome (je: System.Text.Json.JsonElement) when je.ValueKind = System.Text.Json.JsonValueKind.String ->
462+
ValueSome struct (je.GetString() |> FsCodec.StreamName.parse, tryProp "e", tryProp "u")
463+
| _ -> ValueNone
464+
465+
let private composeSql (a: TopArguments) =
466+
let partitionKeyCriteria =
467+
match a.Criteria with
468+
| Criteria.SingleStream sn -> $"c.p = \"{sn}\""
469+
| Criteria.CatName n -> $"c.p LIKE \"{n}-%%\""
470+
| Criteria.CatLike pat -> $"c.p LIKE \"{pat}\""
471+
| Criteria.Unfiltered -> "1=1"
472+
$"SELECT * FROM c WHERE {partitionKeyCriteria}"
473+
let arrayLen = function ValueNone -> 0 | ValueSome (x: System.Text.Json.JsonElement) -> x.GetArrayLength()
474+
let scratch = new System.IO.MemoryStream()
475+
let utf8Size (x: System.Text.Json.JsonElement) =
476+
scratch.Position <- 0L
477+
System.Text.Json.JsonSerializer.Serialize(scratch, x)
478+
scratch.Position
479+
[<Struct; CustomEquality; NoComparison>]
480+
type Stat =
481+
{ key: string; count: int; events: int; unfolds: int; bytes: int64; eBytes: int64; uBytes: int64 }
482+
static member Create(key, d: System.Text.Json.JsonElement, e: System.Text.Json.JsonElement voption, u: System.Text.Json.JsonElement voption) =
483+
let eb = match e with ValueSome x -> utf8Size x | ValueNone -> 0
484+
let ub = match u with ValueSome x -> utf8Size x | ValueNone -> 0
485+
{ key = key; count = 1; events = arrayLen e; unfolds = arrayLen u
486+
bytes = utf8Size d; eBytes = eb; uBytes = ub }
487+
member x.Merge y =
488+
{ key = x.key; count = x.count + y.count; events = x.events + y.events; unfolds = x.unfolds + y.unfolds
489+
bytes = x.bytes + y.bytes; eBytes = x.eBytes + y.eBytes; uBytes = x.uBytes + y.uBytes }
490+
override x.GetHashCode() = StringComparer.Ordinal.GetHashCode x.key
491+
override x.Equals y = match y with :? Stat as y -> StringComparer.Ordinal.Equals(x.key, y.key) | _ -> false
492+
let run (a: TopArguments) = task {
493+
let sw = System.Diagnostics.Stopwatch.StartNew()
494+
let pageStreams, accStreams = System.Collections.Generic.HashSet(), System.Collections.Generic.HashSet()
495+
let mutable accI, accE, accU, accRus, accRds, accOds, accBytes = 0L, 0L, 0L, 0., 0L, 0L, 0L
496+
let s = System.Collections.Generic.HashSet()
497+
let categoryName = FsCodec.StreamName.split >> fun struct (cn, _sid) -> cn
498+
let g = if a.StreamLevel then FsCodec.StreamName.toString else categoryName
499+
try for rtt, rc, items, rdc, rds, ods in a.Execute(composeSql a) |> Query.enum__ do
500+
let mutable pageI, pageE, pageU, pageB, newestTs = 0, 0, 0, 0L, DateTime.MinValue
501+
for x in items do
502+
newestTs <- max newestTs (cosmosTimeStamp x)
503+
match tryParseEquinoxBatch x with
504+
| ValueNone -> failwith $"Could not parse document:\n{prettySerdes.Value.Serialize x}"
505+
| ValueSome (sn, e, u) ->
506+
if pageStreams.Add sn then accStreams.Add sn |> ignore
507+
let x = Stat.Create(g sn, x, e, u)
508+
let mutable v = Unchecked.defaultof<_>
509+
if s.TryGetValue(x, &v) then s.Remove x |> ignore; s.Add(v.Merge x) |> ignore
510+
else s.Add x |> ignore
511+
pageI <- pageI + 1; pageE <- pageE + x.events; pageU <- pageU + x.unfolds; pageB <- pageB + x.bytes
512+
Log.Information("Page{rdc,5}>{count,4}i{streams,5}s{es,5}e{us,5}u{rds,5:f2}>{ods,4:f2}>{jds,4:f2}MiB{rc,7:f2}RU{s,5:N1}s age {age:dddd\.hh\:mm\:ss}",
513+
rdc, pageI, pageStreams.Count, pageE, pageU, miB rds, miB ods, miB pageB, rc, rtt.TotalSeconds, newestTs - DateTime.UtcNow)
514+
pageStreams.Clear()
515+
accI <- accI + int64 pageI; accE <- accE + int64 pageE; accU <- accU + int64 pageU
516+
accRus <- accRus + rc; accRds <- accRds + int64 rds; accOds <- accOds + int64 ods; accBytes <- accBytes + pageB
517+
finally
518+
519+
let accCats = System.Collections.Generic.HashSet(accStreams |> Seq.map categoryName).Count
520+
Log.Information("TOTALS {count:N0}i {cats}c {streams:N0}s {es:N0}e {us:N0}u {tmib:N1}MiB Read {rmib:N1}>{omib:N1} {ru:N2}RU {s:N1}s",
521+
accI, accCats, accStreams.Count, accE, accU, miB accBytes, miB accRds, miB accOds, accRus, sw.Elapsed.TotalSeconds)
522+
523+
for x in s |> Seq.sortByDescending _.bytes |> Seq.truncate a.Count do
524+
Log.Information("{key,-20}:{count,7}i {mib,6:N1}MiB E{events,7} {emib,7:N1} U{unfolds,7} {umib,6:N1}",
525+
x.key, x.count, miB x.bytes, x.events, miB x.eBytes, x.unfolds, miB x.uBytes) }
409526

410527
module DynamoInit =
411528

@@ -522,6 +639,7 @@ type Arguments(p: ParseResults<Parameters>) =
522639
| InitSql a -> do! SqlInit.databaseOrSchema Log.Logger a
523640
| Dump a -> do! Dump.run (Log.Logger, verboseConsole, maybeSeq) a
524641
| Query a -> do! CosmosQuery.run (QueryArguments a) |> Async.AwaitTaskCorrect
642+
| Top a -> do! CosmosTop.run (TopArguments a) |> Async.AwaitTaskCorrect
525643
| Stats a -> do! CosmosStats.run (Log.Logger, verboseConsole, maybeSeq) a
526644
| LoadTest a -> let n = p.GetResult(LogFile, fun () -> p.ProgramName + ".log")
527645
let reportFilename = System.IO.FileInfo(n).FullName

0 commit comments

Comments
 (0)