@@ -141,7 +141,13 @@ and [<NoComparison; NoEquality; RequireSubcommand>] QueryParameters =
141
141
| Console -> " Also emit the JSON to the console. Default: Gather statistics (but only write to a File if specified)"
142
142
| Cosmos _ -> " Parameters for CosmosDB."
143
143
and [<RequireQualifiedAccess>] Mode = Default | SnapOnly | SnapWithStream | ReadOnly | ReadWithStream | Raw
144
- and [<RequireQualifiedAccess>] Criteria = SingleStream of string | CatName of string | CatLike of string | Unfiltered
144
+ and [<RequireQualifiedAccess>] Criteria =
145
+ | SingleStream of string | CatName of string | CatLike of string | Unfiltered
146
+ member x.Sql = x |> function
147
+ | Criteria.SingleStream sn -> $" c.p = \" {sn}\" "
148
+ | Criteria.CatName n -> $" c.p LIKE \" {n}-%%\" "
149
+ | Criteria.CatLike pat -> $" c.p LIKE \" {pat}\" "
150
+ | Criteria.Unfiltered -> " 1=1"
145
151
and QueryArguments ( p : ParseResults < QueryParameters >) =
146
152
member val Mode = p.GetResult( QueryParameters.Mode, if p.Contains QueryParameters.File then Mode.Raw else Mode.Default)
147
153
member val Pretty = p.Contains QueryParameters.Pretty
@@ -380,25 +386,23 @@ module CosmosStats =
380
386
381
387
let prettySerdes = lazy FsCodec.SystemTextJson.Serdes( FsCodec.SystemTextJson.Options.Create( indent = true ))
382
388
389
+ type System.Text.Json.JsonElement with
390
+ member x.Timestamp = x.GetProperty( " _ts" ) .GetDouble() |> DateTime.UnixEpoch.AddSeconds
391
+ member x.TryProp ( name : string ) = let mutable p = Unchecked.defaultof<_> in if x.TryGetProperty( name, & p) then ValueSome p else ValueNone
392
+ module StreamName =
393
+ let categoryName = FsCodec.StreamName.parse >> FsCodec.StreamName.split >> fun struct ( cn , _sid ) -> cn
394
+
383
395
module CosmosQuery =
384
396
385
397
open Equinox.CosmosStore .Linq .Internal
386
398
open FSharp.Control
387
- type System.Text.Json.JsonDocument with
388
- member x.Cast < 'T >() = System.Text.Json.JsonSerializer.Deserialize< 'T>( x.RootElement)
389
- member x.Timestamp =
390
- let ok , p = x.RootElement.TryGetProperty( " _ts" )
391
- if ok then p.GetDouble() |> DateTime.UnixEpoch.AddSeconds |> Some else None
399
+
392
400
let private composeSql ( a : QueryArguments ) =
393
- let inline warnOnUnfiltered () =
401
+ match a.Criteria with
402
+ | Criteria.Unfiltered ->
394
403
let lel = if a.Mode = Mode.Raw then LogEventLevel.Debug elif a.Filepath = None then LogEventLevel.Warning else LogEventLevel.Information
395
404
Log.Write( lel, " No StreamName or CategoryName/CategoryLike specified - Unfold Criteria better be unambiguous" )
396
- let partitionKeyCriteria =
397
- match a.Criteria with
398
- | Criteria.SingleStream sn -> $" c.p = \" {sn}\" "
399
- | Criteria.CatName n -> $" c.p LIKE \" {n}-%%\" "
400
- | Criteria.CatLike pat -> $" c.p LIKE \" {pat}\" "
401
- | Criteria.Unfiltered -> warnOnUnfiltered (); " 1=1"
405
+ | _ -> ()
402
406
let selectedFields =
403
407
match a.Mode with
404
408
| Mode.Default -> " c._etag, c.p, c.u[0].d"
@@ -414,7 +418,7 @@ module CosmosQuery =
414
418
| [||] -> " 1=1"
415
419
| [| x |] -> x |> exists
416
420
| xs -> String.Join( " AND " , xs) |> exists
417
- $" SELECT {selectedFields} FROM c WHERE {partitionKeyCriteria } AND {unfoldFilter}"
421
+ $" SELECT {selectedFields} FROM c WHERE {a.Criteria.Sql } AND {unfoldFilter}"
418
422
let private queryDef ( a : QueryArguments ) =
419
423
let sql = composeSql a
420
424
Log.Information( " Querying {mode}: {q}" , a.Mode, sql)
@@ -431,14 +435,14 @@ module CosmosQuery =
431
435
let mutable accI , accE , accU , accRus , accBytesRead = 0 L, 0 L, 0 L, 0. , 0 L
432
436
let it = container.GetItemQueryIterator< System.Text.Json.JsonDocument>( queryDef a, requestOptions = qo)
433
437
try for rtt, rc, items, rdc, rds, ods in it |> Query.enum__ do
434
- let mutable newestTs = None
435
- let items = [| for x in items -> newestTs <- max newestTs x.Timestamp
436
- x.Cast < Equinox.CosmosStore.Core.Tip>() |]
438
+ let mutable newestTs = DateTime.MinValue
439
+ let items = [| for x in items -> newestTs <- max newestTs x.RootElement. Timestamp
440
+ System.Text.Json.JsonSerializer.Deserialize < Equinox.CosmosStore.Core.Tip>( x.RootElement ) |]
437
441
let inline arrayLen x = if isNull x then 0 else Array.length x
438
442
pageStreams.Clear(); for x in items do if x.p <> null && pageStreams.Add x.p then accStreams.Add x.p |> ignore
439
443
let pageI , pageE , pageU = items.Length, items |> Seq.sumBy (_. e >> arrayLen), items |> Seq.sumBy (_. u >> arrayLen)
440
444
Log.Information( " Page{rdc,5}>{count,4}i{streams,5}s{es,5}e{us,5}u{rds,5:f2}>{ods,4:f2}MiB{rc,7:f2}RU{s,5:N1}s age {age:dddd\. hh\: mm\: ss}" ,
441
- rdc, pageI, pageStreams.Count, pageE, pageU, miB rds, miB ods, rc, rtt.TotalSeconds, newestTs.Value - DateTime.UtcNow )
445
+ rdc, pageI, pageStreams.Count, pageE, pageU, miB rds, miB ods, rc, rtt.TotalSeconds, DateTime.UtcNow - newestTs )
442
446
maybeFileStream |> Option.iter ( fun stream ->
443
447
for x in items do
444
448
serdes.SerializeToStream( x, stream)
@@ -450,8 +454,7 @@ module CosmosQuery =
450
454
finally
451
455
let fileSize = maybeFileStream |> Option.map _. Position |> Option.defaultValue 0
452
456
maybeFileStream |> Option.iter _. Close() // Before we log so time includes flush time and no confusion
453
- let categoryName = FsCodec.StreamName.parse >> FsCodec.StreamName.split >> fun struct ( cn , _sid ) -> cn
454
- let accCategories = System.Collections.Generic.HashSet( accStreams |> Seq.map categoryName). Count
457
+ let accCategories = System.Collections.Generic.HashSet( accStreams |> Seq.map StreamName.categoryName). Count
455
458
Log.Information( " TOTALS {count:N0}i {cats}c {streams:N0}s {es:N0}e {us:N0}u R/W {rmib:N1}/{wmib:N1}MiB {ru:N2}RU {s:N1}s" ,
456
459
accI, accCategories, accStreams.Count, accE, accU, miB accBytesRead, miB fileSize, accRus, sw.Elapsed.TotalSeconds) }
457
460
@@ -460,6 +463,7 @@ module CosmosTop =
460
463
open Equinox.CosmosStore .Linq .Internal
461
464
open FSharp.Control
462
465
open System.Text .Json
466
+
463
467
module private Parser =
464
468
let scratch = new System.IO.MemoryStream()
465
469
let inline utf8Size ( x : JsonElement ) =
@@ -470,21 +474,18 @@ module CosmosTop =
470
474
scratch.Position <- 0 L
471
475
if Equinox.CosmosStore.Core.JsonElement.tryInflateTo scratch x then scratch.Position
472
476
else utf8Size x
473
- let inline tryProp ( x : JsonElement ) ( id : string ): ValueOption < JsonElement > =
474
- let mutable p = Unchecked.defaultof<_>
475
- if x.TryGetProperty( id, & p) then ValueSome p else ValueNone
476
477
// using the length as a decent proxy for UTF-8 length of corr/causation; if you have messy data in there, you'll have bigger problems to worry about
477
478
let inline stringLen x = match x with ValueSome ( x: JsonElement) when x.ValueKind <> JsonValueKind.Null -> x.GetString() .Length | _ -> 0
478
479
let _e = Unchecked.defaultof< Equinox.CosmosStore.Core.Event> // Or Unfold - both share field names
479
480
let inline ciSize ( x : JsonElement ) =
480
481
( struct ( 0 , 0 L), x.EnumerateArray())
481
482
||> Seq.fold ( fun struct ( c , i ) x ->
482
483
let inline infSize x = match x with ValueSome x -> inflatedUtf8Size x | ValueNone -> 0
483
- struct ( c + ( tryProp x ( nameof _ e.correlationId) |> stringLen) + ( tryProp x ( nameof _ e.causationId) |> stringLen),
484
- i + ( tryProp x ( nameof _ e.d) |> infSize) + ( tryProp x ( nameof _ e.m) |> infSize)))
484
+ struct ( c + ( x.TryProp ( nameof _ e.correlationId) |> stringLen) + ( x.TryProp ( nameof _ e.causationId) |> stringLen),
485
+ i + ( x.TryProp ( nameof _ e.d) |> infSize) + ( x.TryProp ( nameof _ e.m) |> infSize)))
485
486
let _t = Unchecked.defaultof< Equinox.CosmosStore.Core.Tip>
486
- let inline tryEquinoxStreamName x =
487
- match tryProp x ( nameof _ t.p) with
487
+ let inline tryEquinoxStreamName ( x : JsonElement ) =
488
+ match x.TryProp ( nameof _ t.p) with
488
489
| ValueSome ( je: JsonElement) when je.ValueKind = JsonValueKind.String ->
489
490
je.GetString() |> FsCodec.StreamName.parse |> FsCodec.StreamName.toString |> ValueSome
490
491
| _ -> ValueNone
@@ -500,42 +501,33 @@ module CosmosTop =
500
501
override x.GetHashCode () = StringComparer.Ordinal.GetHashCode x.key
501
502
override x.Equals y = match y with :? Stat as y -> StringComparer.Ordinal.Equals( x.key, y.key) | _ -> false
502
503
static Create( key, x: JsonElement) =
503
- let struct ( e , eb , struct ( ec , ei )) = tryProp x ( nameof _ t.e) |> tryParseEventOrUnfold
504
- let struct ( u , ub , struct ( uc , ui )) = tryProp x ( nameof _ t.u) |> tryParseEventOrUnfold
504
+ let struct ( e , eb , struct ( ec , ei )) = x.TryProp ( nameof _ t.e) |> tryParseEventOrUnfold
505
+ let struct ( u , ub , struct ( uc , ui )) = x.TryProp ( nameof _ t.u) |> tryParseEventOrUnfold
505
506
{ key = key; count = 1 ; events = e; unfolds = u
506
507
bytes = utf8Size x; eBytes = eb; uBytes = ub; cBytes = int64 ( ec + uc); iBytes = ei + ui }
507
508
let [<Literal>] OrderByTs = " ORDER BY c._ts"
508
- let private composeSql ( a : TopArguments ) =
509
- let partitionKeyCriteria =
510
- match a.Criteria with
511
- | Criteria.SingleStream sn -> $" c.p = \" {sn}\" "
512
- | Criteria.CatName n -> $" c.p LIKE \" {n}-%%\" "
513
- | Criteria.CatLike pat -> $" c.p LIKE \" {pat}\" "
514
- | Criteria.Unfiltered -> " 1=1"
515
- $" SELECT * FROM c WHERE {partitionKeyCriteria}{if a.TsOrder then OrderByTs else null}"
516
- let inline cosmosTimeStamp ( x : JsonElement ) = x.GetProperty( " _ts" ) .GetDouble() |> DateTime.UnixEpoch.AddSeconds
509
+ let private sql ( a : TopArguments ) = $" SELECT * FROM c WHERE {a.Criteria.Sql}{if a.TsOrder then OrderByTs else null}"
517
510
let run ( a : TopArguments ) = task {
518
511
let sw = System.Diagnostics.Stopwatch.StartNew()
519
512
let pageStreams , accStreams = System.Collections.Generic.HashSet(), System.Collections.Generic.HashSet()
520
513
let mutable accI , accE , accU , accRus , accRds , accOds , accBytes = 0 L, 0 L, 0 L, 0. , 0 L, 0 L, 0 L
521
514
let s = System.Collections.Generic.HashSet()
522
- let categoryName = FsCodec.StreamName.Internal.trust >> FsCodec.StreamName.split >> fun struct ( cn , _sid ) -> cn
523
- let group = if a.StreamLevel then id else categoryName
524
- try for rtt, rc, items, rdc, rds, ods in a.Execute( composeSql a) |> Query.enum__ do
515
+ let group = if a.StreamLevel then id else StreamName.categoryName
516
+ try for rtt, rc, items, rdc, rds, ods in a.Execute( sql a) |> Query.enum__ do
525
517
let mutable pageI , pageE , pageU , pageB , pageCc , pageDm , newestTs , sw = 0 , 0 , 0 , 0 L, 0 L, 0 L, DateTime.MinValue, System.Diagnostics.Stopwatch.StartNew()
526
518
for x in items do
527
- newestTs <- max newestTs ( cosmosTimeStamp x )
519
+ newestTs <- max newestTs x.Timestamp
528
520
match Parser.tryEquinoxStreamName x with
529
521
| ValueNone -> failwith $" Could not parse document:\n {prettySerdes.Value.Serialize x}"
530
522
| ValueSome sn ->
531
523
if pageStreams.Add sn && not a.StreamLevel then accStreams.Add sn |> ignore
532
524
let x = Parser.Stat.Create( group sn, x)
533
525
let mutable v = Unchecked.defaultof<_>
534
- if s.TryGetValue( x, & v) then s.Remove x |> ignore; s.Add ( v.Merge x) |> ignore
535
- else s.Add x |> ignore
536
- pageI <- pageI + 1 ; pageE <- pageE + x.events ; pageU <- pageU + x.unfolds ; pageB <- pageB + x.bytes; pageCc <- pageCc + x.cBytes; pageDm <- pageDm + x.iBytes
537
- Log.Information( " Page{rdc,5}>{count,4}i{streams,5}s{es,5}e{us,5}u{rds,5:f2}>{ods,4:f2}> {jds,4:f2}MiB{rc,7:f2}RU{s,5:N1}s D+M{im,4:f1} C+C{cm,5:f2} {ms,3}ms age {age:dddd\. hh\: mm\: ss}" ,
538
- rdc, pageI, pageStreams.Count, pageE, pageU, miB rds, miB ods, miB pageB, rc, rtt.TotalSeconds, miB pageDm, miB pageCc, sw.ElapsedMilliseconds, newestTs - DateTime.UtcNow)
526
+ s.Add ( if s.TryGetValue( x, & v) then s.Remove x |> ignore; v.Merge x else x) |> ignore
527
+ pageI <- pageI + 1 ; pageE <- pageE + x.events ; pageU <- pageU + x.unfolds
528
+ pageB <- pageB + x.bytes; pageCc <- pageCc + x.cBytes; pageDm <- pageDm + x.iBytes
529
+ Log.Information( " Page{rdc,5}>{count,4}i{streams,5}s{es,5}e{us,5}u{rds,5:f2}>{ods,4:f2}< {jds,4:f2}MiB{rc,7:f2}RU{s,5:N1}s D+M{im,4:f1} C+C{cm,5:f2} {ms,3}ms age {age:dddd\. hh\: mm\: ss}" ,
530
+ rdc, pageI, pageStreams.Count, pageE, pageU, miB rds, miB ods, miB pageB, rc, rtt.TotalSeconds, miB pageDm, miB pageCc, sw.ElapsedMilliseconds, DateTime.UtcNow - newestTs )
539
531
pageStreams.Clear()
540
532
accI <- accI + int64 pageI; accE <- accE + int64 pageE; accU <- accU + int64 pageU
541
533
accRus <- accRus + rc; accRds <- accRds + int64 rds; accOds <- accOds + int64 ods; accBytes <- accBytes + pageB
@@ -544,9 +536,10 @@ module CosmosTop =
544
536
let accCats = ( if a.StreamLevel then s |> Seq.map _. key else accStreams) |> Seq.map group |> System.Collections.Generic.HashSet |> _. Count
545
537
let accStreams = if a.StreamLevel then s.Count else accStreams.Count
546
538
let iBytes , cBytes = s |> Seq.sumBy _. iBytes, s |> Seq.sumBy _. cBytes
547
- let giB x = miB x / 1024.
539
+ let inline giB x = miB x / 1024.
548
540
Log.Information( " TOTALS {count:N0}i {cats}c {streams:N0}s {es:N0}e {us:N0}u read {rg:f1}GiB output {og:f1}GiB JSON {tg:f1}GiB D+M(inflated) {ig:f1}GiB C+C {cm:f2}MiB {ru:N2}RU {s:N1}s" ,
549
541
accI, accCats, accStreams, accE, accU, giB accRds, giB accOds, giB accBytes, giB iBytes, miB cBytes, accRus, sw.Elapsed.TotalSeconds)
542
+
550
543
let sort : Parser.Stat seq -> Parser.Stat seq = a.Order |> function
551
544
| Order.Name -> Seq.sortBy _. key
552
545
| Order.Size -> Seq.sortByDescending _. bytes
@@ -561,7 +554,7 @@ module CosmosTop =
561
554
Log.Information( " {count,7}i {tm,6:N2}MiB E{events,7} {em,7:N1} U{unfolds,7} {um,6:N1} D+M{dm,6:N1} C+C{cm,5:N1} {key}" ,
562
555
x.count, miB x.bytes, x.events, miB x.eBytes, x.unfolds, miB x.uBytes, miB x.iBytes, miB x.cBytes, x.key)
563
556
if a.StreamLevel then
564
- let collapsed = s |> Seq.groupBy (_. key >> categoryName) |> Seq.map ( fun ( cat , xs ) -> { ( xs |> Seq.reduce _. Merge) with key = cat })
557
+ let collapsed = s |> Seq.groupBy (_. key >> StreamName. categoryName) |> Seq.map ( fun ( cat , xs ) -> { ( xs |> Seq.reduce _. Merge) with key = cat })
565
558
sort collapsed |> Seq.truncate a.Count |> Seq.iter render
566
559
sort s |> Seq.truncate ( if a.StreamLevel then a.StreamCount else a.Count) |> Seq.iter render }
567
560
@@ -675,7 +668,7 @@ type Arguments(p: ParseResults<Parameters>) =
675
668
member _.CreateDomainLog () = createDomainLog verbose verboseConsole maybeSeq
676
669
member _.ExecuteSubCommand () = async {
677
670
match p.GetSubCommand() with
678
- | Init a -> ( CosmosInit.containerAndOrDb Log.Logger a CancellationToken.None). Wait ()
671
+ | Init a -> do ! CosmosInit.containerAndOrDb Log.Logger a CancellationToken.None |> Async.AwaitTaskCorrect
679
672
| InitAws a -> do ! DynamoInit.table Log.Logger a
680
673
| InitSql a -> do ! SqlInit.databaseOrSchema Log.Logger a
681
674
| Dump a -> do ! Dump.run ( Log.Logger, verboseConsole, maybeSeq) a
0 commit comments