Skip to content

Commit a0e4bb2

Browse files
committed
Sorting, C+C, D+M, unsorted
1 parent d1fb8e1 commit a0e4bb2

File tree

2 files changed

+101
-55
lines changed

2 files changed

+101
-55
lines changed

src/Equinox.CosmosStore/CosmosStoreSerialization.fs

+9-4
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,13 @@ module private Deflate =
1212
compressor.Flush() // Could `Close`, but not required
1313
output.ToArray()
1414

15-
let inflate (compressedBytes: byte[]) =
15+
let inflateTo output (compressedBytes: byte[]) =
1616
let input = new MemoryStream(compressedBytes)
1717
let decompressor = new System.IO.Compression.DeflateStream(input, System.IO.Compression.CompressionMode.Decompress, leaveOpen = true)
18-
let output = new MemoryStream()
1918
decompressor.CopyTo(output)
19+
let inflate compressedBytes =
20+
let output = new MemoryStream()
21+
compressedBytes |> inflateTo output
2022
output.ToArray()
2123

2224
module JsonElement =
@@ -26,10 +28,13 @@ module JsonElement =
2628

2729
// Avoid introduction of HTML escaping for things like quotes etc (Options.Default uses Options.Create(), which defaults to unsafeRelaxedJsonEscaping=true)
2830
let private optionsNoEscaping = JsonSerializerOptions(Encoder = System.Text.Encodings.Web.JavaScriptEncoder.UnsafeRelaxedJsonEscaping)
29-
let private toUtf8Bytes (value : JsonElement) = JsonSerializer.SerializeToUtf8Bytes(value, options = optionsNoEscaping)
30-
let deflate (value : JsonElement) : JsonElement =
31+
let private toUtf8Bytes (value: JsonElement) = JsonSerializer.SerializeToUtf8Bytes(value, options = optionsNoEscaping)
32+
let deflate (value: JsonElement): JsonElement =
3133
if value.ValueKind = JsonValueKind.Null then value
3234
else value |> toUtf8Bytes |> Deflate.compress |> JsonSerializer.SerializeToElement
35+
let tryInflateTo ms (x: JsonElement) =
36+
if x.ValueKind <> JsonValueKind.String then false
37+
else x.GetBytesFromBase64() |> Deflate.inflateTo ms; true
3338

3439
type CosmosJsonSerializer(options : JsonSerializerOptions) =
3540
inherit Microsoft.Azure.Cosmos.CosmosSerializer()

tools/Equinox.Tool/Program.fs

+92-51
Original file line numberDiff line numberDiff line change
@@ -167,16 +167,21 @@ and [<NoComparison; NoEquality; RequireSubcommand>] TopParameters =
167167
| [<AltCommandLine "-cn"; Unique>] CategoryName of string
168168
| [<AltCommandLine "-cl"; Unique>] CategoryLike of string
169169
| [<AltCommandLine "-S"; Unique>] Streams
170-
| [<MainCommand>] Limit of int
170+
| [<AltCommandLine "-T"; Unique>] TsOrder
171+
| [<AltCommandLine "-c">] Limit of int
172+
| [<MainCommand; AltCommandLine "-s"; Unique>] Sort of Order
171173
| [<CliPrefix(CliPrefix.None)>] Cosmos of ParseResults<Store.Cosmos.Parameters>
172174
interface IArgParserTemplate with
173175
member a.Usage = a |> function
174176
| StreamName _ -> "Specify stream name to match against `p`, e.g. `$UserServices-f7c1ce63389a45bdbea1cccebb1b3c8a`."
175177
| CategoryName _ -> "Specify category name to match against `p`, e.g. `$UserServices`."
176178
| CategoryLike _ -> "Specify category name to match against `p` as a Cosmos LIKE expression (with `%` as wildcard, e.g. `$UserServices-%`."
177179
| Streams -> "Stream level stats"
178-
| Limit _ -> "Number of items to limit output to"
180+
| TsOrder -> "Retrieve data in `_ts` ORDER (generally has significant RU impact). Default: Use continuation tokens"
181+
| Sort _ -> "Sort order for results"
182+
| Limit _ -> "Number of categories to limit output to (Streams limit is 10x the category limit). Default: 100"
179183
| Cosmos _ -> "Parameters for CosmosDB."
184+
and Order = Name | Items | Events | Unfolds | Size | EventSize | UnfoldSize | InflateSize | CorrCauseSize
180185
and TopArguments(p: ParseResults<TopParameters>) =
181186
member val Criteria =
182187
match p.TryGetResult StreamName, p.TryGetResult CategoryName, p.TryGetResult CategoryLike with
@@ -188,8 +193,11 @@ and TopArguments(p: ParseResults<TopParameters>) =
188193
| None, None, None -> Criteria.Unfiltered
189194
| None, Some _, Some _ -> p.Raise "CategoryLike and CategoryName are mutually exclusive"
190195
member val CosmosArgs = p.GetResult TopParameters.Cosmos |> Store.Cosmos.Arguments
191-
member val StreamLevel = p.Contains TopParameters.Streams
192-
member val Count = p.GetResult(TopParameters.Limit, 100)
196+
member val StreamLevel = p.Contains Streams
197+
member val Count = p.GetResult(Limit, 100)
198+
member val TsOrder = p.Contains TsOrder
199+
member val Order = p.GetResult(Sort, Order.Size)
200+
member x.StreamCount = p.GetResult(Limit, x.Count * 10)
193201
member x.Connect() = match Store.Cosmos.config Log.Logger (None, true) x.CosmosArgs with
194202
| Store.Config.Cosmos (cc, _, _) -> cc.Container
195203
| _ -> failwith "Top requires Cosmos"
@@ -406,7 +414,7 @@ module CosmosQuery =
406414
| [||] -> "1=1"
407415
| [| x |] -> x |> exists
408416
| xs -> String.Join(" AND ", xs) |> exists
409-
$"SELECT {selectedFields} FROM c WHERE {partitionKeyCriteria} AND {unfoldFilter} ORDER BY c.i"
417+
$"SELECT {selectedFields} FROM c WHERE {partitionKeyCriteria} AND {unfoldFilter}"
410418
let private queryDef (a: QueryArguments) =
411419
let sql = composeSql a
412420
Log.Information("Querying {mode}: {q}", a.Mode, sql)
@@ -451,78 +459,111 @@ module CosmosTop =
451459

452460
open Equinox.CosmosStore.Linq.Internal
453461
open FSharp.Control
454-
455-
let cosmosTimeStamp (x: System.Text.Json.JsonElement) = x.GetProperty("_ts").GetDouble() |> DateTime.UnixEpoch.AddSeconds
456-
let tryParseEquinoxBatch (x: System.Text.Json.JsonElement) =
457-
let tryProp (id: string): ValueOption<System.Text.Json.JsonElement> =
462+
open System.Text.Json
463+
module private Parser =
464+
let scratch = new System.IO.MemoryStream()
465+
let inline utf8Size (x: JsonElement) =
466+
scratch.Position <- 0L
467+
JsonSerializer.Serialize(scratch, x)
468+
scratch.Position
469+
let inline inflatedUtf8Size x =
470+
scratch.Position <- 0L
471+
if Equinox.CosmosStore.Core.JsonElement.tryInflateTo scratch x then scratch.Position
472+
else utf8Size x
473+
let inline tryProp (x: JsonElement) (id: string): ValueOption<JsonElement> =
458474
let mutable p = Unchecked.defaultof<_>
459475
if x.TryGetProperty(id, &p) then ValueSome p else ValueNone
460-
match tryProp "p" with
461-
| ValueSome (je: System.Text.Json.JsonElement) when je.ValueKind = System.Text.Json.JsonValueKind.String ->
462-
ValueSome struct (je.GetString() |> FsCodec.StreamName.parse, tryProp "e", tryProp "u")
463-
| _ -> ValueNone
464-
476+
// using the length as a decent proxy for UTF-8 length of corr/causation; if you have messy data in there, you'll have bigger problems to worry about
477+
let inline stringLen x = match x with ValueSome (x: JsonElement) when x.ValueKind <> JsonValueKind.Null -> x.GetString().Length | _ -> 0
478+
let _e = Unchecked.defaultof<Equinox.CosmosStore.Core.Event> // Or Unfold - both share field names
479+
let inline ciSize (x: JsonElement) =
480+
(struct (0, 0L), x.EnumerateArray())
481+
||> Seq.fold (fun struct (c, i) x ->
482+
let inline infSize x = match x with ValueSome x -> inflatedUtf8Size x | ValueNone -> 0
483+
struct (c + (tryProp x (nameof _e.correlationId) |> stringLen) + (tryProp x (nameof _e.causationId) |> stringLen),
484+
i + (tryProp x (nameof _e.d) |> infSize) + (tryProp x (nameof _e.m) |> infSize)))
485+
let _t = Unchecked.defaultof<Equinox.CosmosStore.Core.Tip>
486+
let inline tryEquinoxStreamName x =
487+
match tryProp x (nameof _t.p) with
488+
| ValueSome (je: JsonElement) when je.ValueKind = JsonValueKind.String ->
489+
je.GetString() |> FsCodec.StreamName.parse |> FsCodec.StreamName.toString |> ValueSome
490+
| _ -> ValueNone
491+
let private tryParseEventOrUnfold = function
492+
| ValueNone -> struct (0, 0L, struct (0, 0L))
493+
| ValueSome (x: JsonElement) -> x.GetArrayLength(), utf8Size x, ciSize x
494+
[<Struct; CustomEquality; NoComparison>]
495+
type Stat =
496+
{ key: string; count: int; events: int; unfolds: int; bytes: int64; eBytes: int64; uBytes: int64; cBytes: int64; iBytes: int64 }
497+
member x.Merge y =
498+
{ key = x.key; count = x.count + y.count; events = x.events + y.events; unfolds = x.unfolds + y.unfolds; bytes = x.bytes + y.bytes
499+
eBytes = x.eBytes + y.eBytes; uBytes = x.uBytes + y.uBytes; cBytes = x.cBytes + y.cBytes; iBytes = x.iBytes + y.iBytes }
500+
override x.GetHashCode() = StringComparer.Ordinal.GetHashCode x.key
501+
override x.Equals y = match y with :? Stat as y -> StringComparer.Ordinal.Equals(x.key, y.key) | _ -> false
502+
static Create(key, x: JsonElement) =
503+
let struct (e, eb, struct (ec, ei)) = tryProp x (nameof _t.e) |> tryParseEventOrUnfold
504+
let struct (u, ub, struct (uc, ui)) = tryProp x (nameof _t.u) |> tryParseEventOrUnfold
505+
{ key = key; count = 1; events = e; unfolds = u
506+
bytes = utf8Size x; eBytes = eb; uBytes = ub; cBytes = int64 (ec + uc); iBytes = ei + ui }
507+
let [<Literal>] OrderByTs = " ORDER BY c._ts"
465508
let private composeSql (a: TopArguments) =
466509
let partitionKeyCriteria =
467510
match a.Criteria with
468511
| Criteria.SingleStream sn -> $"c.p = \"{sn}\""
469512
| Criteria.CatName n -> $"c.p LIKE \"{n}-%%\""
470513
| Criteria.CatLike pat -> $"c.p LIKE \"{pat}\""
471514
| Criteria.Unfiltered -> "1=1"
472-
$"SELECT * FROM c WHERE {partitionKeyCriteria}"
473-
let arrayLen = function ValueNone -> 0 | ValueSome (x: System.Text.Json.JsonElement) -> x.GetArrayLength()
474-
let scratch = new System.IO.MemoryStream()
475-
let utf8Size (x: System.Text.Json.JsonElement) =
476-
scratch.Position <- 0L
477-
System.Text.Json.JsonSerializer.Serialize(scratch, x)
478-
scratch.Position
479-
[<Struct; CustomEquality; NoComparison>]
480-
type Stat =
481-
{ key: string; count: int; events: int; unfolds: int; bytes: int64; eBytes: int64; uBytes: int64 }
482-
static member Create(key, d: System.Text.Json.JsonElement, e: System.Text.Json.JsonElement voption, u: System.Text.Json.JsonElement voption) =
483-
let eb = match e with ValueSome x -> utf8Size x | ValueNone -> 0
484-
let ub = match u with ValueSome x -> utf8Size x | ValueNone -> 0
485-
{ key = key; count = 1; events = arrayLen e; unfolds = arrayLen u
486-
bytes = utf8Size d; eBytes = eb; uBytes = ub }
487-
member x.Merge y =
488-
{ key = x.key; count = x.count + y.count; events = x.events + y.events; unfolds = x.unfolds + y.unfolds
489-
bytes = x.bytes + y.bytes; eBytes = x.eBytes + y.eBytes; uBytes = x.uBytes + y.uBytes }
490-
override x.GetHashCode() = StringComparer.Ordinal.GetHashCode x.key
491-
override x.Equals y = match y with :? Stat as y -> StringComparer.Ordinal.Equals(x.key, y.key) | _ -> false
515+
$"SELECT * FROM c WHERE {partitionKeyCriteria}{if a.TsOrder then OrderByTs else null}"
516+
let inline cosmosTimeStamp (x: JsonElement) = x.GetProperty("_ts").GetDouble() |> DateTime.UnixEpoch.AddSeconds
492517
let run (a: TopArguments) = task {
493518
let sw = System.Diagnostics.Stopwatch.StartNew()
494519
let pageStreams, accStreams = System.Collections.Generic.HashSet(), System.Collections.Generic.HashSet()
495520
let mutable accI, accE, accU, accRus, accRds, accOds, accBytes = 0L, 0L, 0L, 0., 0L, 0L, 0L
496521
let s = System.Collections.Generic.HashSet()
497-
let categoryName = FsCodec.StreamName.split >> fun struct (cn, _sid) -> cn
498-
let g = if a.StreamLevel then FsCodec.StreamName.toString else categoryName
522+
let categoryName = FsCodec.StreamName.Internal.trust >> FsCodec.StreamName.split >> fun struct (cn, _sid) -> cn
523+
let group = if a.StreamLevel then id else categoryName
499524
try for rtt, rc, items, rdc, rds, ods in a.Execute(composeSql a) |> Query.enum__ do
500-
let mutable pageI, pageE, pageU, pageB, newestTs = 0, 0, 0, 0L, DateTime.MinValue
525+
let mutable pageI, pageE, pageU, pageB, pageCc, pageDm, newestTs, sw = 0, 0, 0, 0L, 0L, 0L, DateTime.MinValue, System.Diagnostics.Stopwatch.StartNew()
501526
for x in items do
502527
newestTs <- max newestTs (cosmosTimeStamp x)
503-
match tryParseEquinoxBatch x with
528+
match Parser.tryEquinoxStreamName x with
504529
| ValueNone -> failwith $"Could not parse document:\n{prettySerdes.Value.Serialize x}"
505-
| ValueSome (sn, e, u) ->
506-
if pageStreams.Add sn then accStreams.Add sn |> ignore
507-
let x = Stat.Create(g sn, x, e, u)
530+
| ValueSome sn ->
531+
if pageStreams.Add sn && not a.StreamLevel then accStreams.Add sn |> ignore
532+
let x = Parser.Stat.Create(group sn, x)
508533
let mutable v = Unchecked.defaultof<_>
509534
if s.TryGetValue(x, &v) then s.Remove x |> ignore; s.Add(v.Merge x) |> ignore
510535
else s.Add x |> ignore
511-
pageI <- pageI + 1; pageE <- pageE + x.events; pageU <- pageU + x.unfolds; pageB <- pageB + x.bytes
512-
Log.Information("Page{rdc,5}>{count,4}i{streams,5}s{es,5}e{us,5}u{rds,5:f2}>{ods,4:f2}>{jds,4:f2}MiB{rc,7:f2}RU{s,5:N1}s age {age:dddd\.hh\:mm\:ss}",
513-
rdc, pageI, pageStreams.Count, pageE, pageU, miB rds, miB ods, miB pageB, rc, rtt.TotalSeconds, newestTs - DateTime.UtcNow)
536+
pageI <- pageI + 1; pageE <- pageE + x.events; pageU <- pageU + x.unfolds; pageB <- pageB + x.bytes; pageCc <- pageCc + x.cBytes; pageDm <- pageDm + x.iBytes
537+
Log.Information("Page{rdc,5}>{count,4}i{streams,5}s{es,5}e{us,5}u{rds,5:f2}>{ods,4:f2}>{jds,4:f2}MiB{rc,7:f2}RU{s,5:N1}s D+M{im,4:f1} C+C{cm,5:f2} {ms,3}ms age {age:dddd\.hh\:mm\:ss}",
538+
rdc, pageI, pageStreams.Count, pageE, pageU, miB rds, miB ods, miB pageB, rc, rtt.TotalSeconds, miB pageDm, miB pageCc, sw.ElapsedMilliseconds, newestTs - DateTime.UtcNow)
514539
pageStreams.Clear()
515540
accI <- accI + int64 pageI; accE <- accE + int64 pageE; accU <- accU + int64 pageU
516541
accRus <- accRus + rc; accRds <- accRds + int64 rds; accOds <- accOds + int64 ods; accBytes <- accBytes + pageB
517542
finally
518543

519-
let accCats = System.Collections.Generic.HashSet(accStreams |> Seq.map categoryName).Count
520-
Log.Information("TOTALS {count:N0}i {cats}c {streams:N0}s {es:N0}e {us:N0}u {tmib:N1}MiB Read {rmib:N1}>{omib:N1} {ru:N2}RU {s:N1}s",
521-
accI, accCats, accStreams.Count, accE, accU, miB accBytes, miB accRds, miB accOds, accRus, sw.Elapsed.TotalSeconds)
522-
523-
for x in s |> Seq.sortByDescending _.bytes |> Seq.truncate a.Count do
524-
Log.Information("{key,-20}:{count,7}i {mib,6:N1}MiB E{events,7} {emib,7:N1} U{unfolds,7} {umib,6:N1}",
525-
x.key, x.count, miB x.bytes, x.events, miB x.eBytes, x.unfolds, miB x.uBytes) }
544+
let accCats = (if a.StreamLevel then s |> Seq.map _.key else accStreams) |> Seq.map group |> System.Collections.Generic.HashSet |> _.Count
545+
let accStreams = if a.StreamLevel then s.Count else accStreams.Count
546+
let iBytes, cBytes = s |> Seq.sumBy _.iBytes, s |> Seq.sumBy _.cBytes
547+
let giB x = miB x / 1024.
548+
Log.Information("TOTALS {count:N0}i {cats}c {streams:N0}s {es:N0}e {us:N0}u read {rg:f1}GiB output {og:f1}GiB JSON {tg:f1}GiB D+M(inflated) {ig:f1}GiB C+C {cm:f2}MiB {ru:N2}RU {s:N1}s",
549+
accI, accCats, accStreams, accE, accU, giB accRds, giB accOds, giB accBytes, giB iBytes, miB cBytes, accRus, sw.Elapsed.TotalSeconds)
550+
let sort: Parser.Stat seq -> Parser.Stat seq = a.Order |> function
551+
| Order.Name -> Seq.sortBy _.key
552+
| Order.Size -> Seq.sortByDescending _.bytes
553+
| Order.Items -> Seq.sortByDescending _.count
554+
| Order.Events -> Seq.sortByDescending _.events
555+
| Order.Unfolds -> Seq.sortByDescending _.unfolds
556+
| Order.EventSize -> Seq.sortByDescending _.eBytes
557+
| Order.UnfoldSize -> Seq.sortByDescending _.uBytes
558+
| Order.InflateSize -> Seq.sortByDescending _.iBytes
559+
| Order.CorrCauseSize -> Seq.sortByDescending _.cBytes
560+
let render (x: Parser.Stat) =
561+
Log.Information("{count,7}i {tm,6:N2}MiB E{events,7} {em,7:N1} U{unfolds,7} {um,6:N1} D+M{dm,6:N1} C+C{cm,5:N1} {key}",
562+
x.count, miB x.bytes, x.events, miB x.eBytes, x.unfolds, miB x.uBytes, miB x.iBytes, miB x.cBytes, x.key)
563+
if a.StreamLevel then
564+
let collapsed = s |> Seq.groupBy (_.key >> categoryName) |> Seq.map (fun (cat, xs) -> { (xs |> Seq.reduce _.Merge) with key = cat })
565+
sort collapsed |> Seq.truncate a.Count |> Seq.iter render
566+
sort s |> Seq.truncate (if a.StreamLevel then a.StreamCount else a.Count) |> Seq.iter render }
526567

527568
module DynamoInit =
528569

0 commit comments

Comments
 (0)