diff --git a/.config/dotnet-tools.json b/.config/dotnet-tools.json index 7d8f1f4..0b9d7a0 100644 --- a/.config/dotnet-tools.json +++ b/.config/dotnet-tools.json @@ -6,13 +6,22 @@ "version": "9.0.2", "commands": [ "paket" - ] + ], + "rollForward": false }, "fantomas": { "version": "6.2.3", "commands": [ "fantomas" - ] + ], + "rollForward": false + }, + "sleet": { + "version": "6.4.0", + "commands": [ + "sleet" + ], + "rollForward": false } } } \ No newline at end of file diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 5f7f224..c1950fb 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -1,5 +1,9 @@ +### 0.12.4-beta +* Added OpenTelemetry instrumentation + ### 0.12.3-beta * Removed erroneous `Dotnet.Reproduciblebuilds` dependency [#75](https://github.com/fsprojects/FSharp.AWS.DynamoDB/pull/75) + ### 0.12.2-beta * (breaking) Revised multi-table transaction API (thanks @bartelink) diff --git a/build.fsx b/build.fsx index 04c36b7..1bf4db9 100755 --- a/build.fsx +++ b/build.fsx @@ -46,6 +46,7 @@ let summary = "An F# wrapper over the standard Amazon.DynamoDB library" let gitOwner = "fsprojects" let gitName = "FSharp.AWS.DynamoDB" let gitHome = "https://github.com/" + gitOwner +let sleetSource = "codesplice" // -------------------------------------------------------------------------------------- // Build variables @@ -196,16 +197,25 @@ Target.create "Push" (fun _ -> Source = Some "https://api.nuget.org/v3/index.json" } DotNet.nugetPush (fun o -> o.WithPushParams pushParams) (sprintf "%s**.*.nupkg" nugetDir)) +Target.create "PushInternal" (fun _ -> + CreateProcess.fromRawCommand "dotnet" [ "sleet"; "push"; nugetDir; "--source"; sleetSource; "--force" ] + |> CreateProcess.ensureExitCodeWithMessage "Error while running 'sleet push'" + |> Proc.run + |> ignore) + // -------------------------------------------------------------------------------------- // Build order // -------------------------------------------------------------------------------------- Target.create "Default" DoNothing +Target.create "ReleaseInternal" DoNothing Target.create "Release" DoNothing "Clean" ==> "AssemblyInfo" ==> "Restore" ==> "Build" ==> "Test" ==> "Default" "Clean" ==> "AssemblyInfo" ==> "Restore" ==> "BuildRelease" ==> "Docs" +"Default" ==> "Pack" ==> "PushInternal" ==> "ReleaseInternal" + "Default" ==> "Pack" ==> "ReleaseGitHub" ==> "Push" ==> "Release" Target.runOrDefaultWithArguments "Default" diff --git a/paket.dependencies b/paket.dependencies index e67a48e..699bf9d 100644 --- a/paket.dependencies +++ b/paket.dependencies @@ -1,7 +1,7 @@ source https://api.nuget.org/v3/index.json storage: none -framework: net80, netstandard20, netstandard21 +framework: net90, netstandard20, netstandard21 # copy_local: true because this is a build-time dependency only nuget Dotnet.ReproducibleBuilds copy_local: true @@ -15,14 +15,14 @@ nuget Unquote ~> 6.1.0 nuget FSharp.Core >= 4.7.2 content: false, lowest_matching: true nuget AWSSDK.DynamoDBv2 ~> 3.7.5 - +nuget System.Diagnostics.DiagnosticSource >= 6.0.0 github eiriktsarpalis/TypeShape:10.0.0 src/TypeShape/TypeShape.fs group Test source https://api.nuget.org/v3/index.json - framework: net80 + framework: net90 - nuget FsCheck + nuget FsCheck ~> 2.16.6 nuget Microsoft.NET.Test.Sdk nuget xunit nuget xunit.runner.visualstudio \ No newline at end of file diff --git a/paket.lock b/paket.lock index 30e98b1..4f3a987 100644 --- a/paket.lock +++ b/paket.lock @@ -1,80 +1,62 @@ STORAGE: NONE -RESTRICTION: || (== net8.0) (== netstandard2.0) (== netstandard2.1) +RESTRICTION: || (== net9.0) (== netstandard2.0) (== netstandard2.1) NUGET remote: https://api.nuget.org/v3/index.json - AWSSDK.Core (3.7.300.11) - Microsoft.Bcl.AsyncInterfaces (>= 1.1) - restriction: || (&& (== net8.0) (< netcoreapp3.1)) (== netstandard2.0) (== netstandard2.1) - AWSSDK.DynamoDBv2 (3.7.300.11) - AWSSDK.Core (>= 3.7.300.11 < 4.0) - DotNet.ReproducibleBuilds (1.1.1) - copy_local: true - Microsoft.SourceLink.AzureRepos.Git (>= 1.1.1) - Microsoft.SourceLink.Bitbucket.Git (>= 1.1.1) - Microsoft.SourceLink.GitHub (>= 1.1.1) - Microsoft.SourceLink.GitLab (>= 1.1.1) + AWSSDK.Core (3.7.402.43) + Microsoft.Bcl.AsyncInterfaces (>= 1.1) - restriction: || (&& (== net9.0) (< netcoreapp3.1)) (== netstandard2.0) (== netstandard2.1) + AWSSDK.DynamoDBv2 (3.7.406.25) + AWSSDK.Core (>= 3.7.402.43 < 4.0) + DotNet.ReproducibleBuilds (1.2.25) - copy_local: true FSharp.Core (4.7.2) - Microsoft.Bcl.AsyncInterfaces (8.0) - restriction: || (&& (== net8.0) (< netcoreapp3.1)) (== netstandard2.0) (== netstandard2.1) - System.Threading.Tasks.Extensions (>= 4.5.4) - restriction: || (&& (== net8.0) (>= net462)) (&& (== net8.0) (< netstandard2.1)) (== netstandard2.0) (&& (== netstandard2.1) (>= net462)) - Microsoft.Build.Tasks.Git (8.0) - copy_local: true - Microsoft.SourceLink.AzureRepos.Git (8.0) - copy_local: true - Microsoft.Build.Tasks.Git (>= 8.0) - Microsoft.SourceLink.Common (>= 8.0) - Microsoft.SourceLink.Bitbucket.Git (8.0) - copy_local: true - Microsoft.Build.Tasks.Git (>= 8.0) - Microsoft.SourceLink.Common (>= 8.0) - Microsoft.SourceLink.Common (8.0) - copy_local: true - Microsoft.SourceLink.GitHub (8.0) - copy_local: true - Microsoft.Build.Tasks.Git (>= 8.0) - Microsoft.SourceLink.Common (>= 8.0) - Microsoft.SourceLink.GitLab (8.0) - copy_local: true - Microsoft.Build.Tasks.Git (>= 8.0) - Microsoft.SourceLink.Common (>= 8.0) - System.Runtime.CompilerServices.Unsafe (6.0) - restriction: || (&& (== net8.0) (>= net461)) (&& (== net8.0) (>= net462)) (&& (== net8.0) (< netcoreapp2.1) (< netstandard2.1)) (&& (== net8.0) (< netstandard1.0)) (&& (== net8.0) (< netstandard2.0)) (&& (== net8.0) (>= wp8)) (== netstandard2.0) (&& (== netstandard2.1) (>= net462)) - System.Threading.Tasks.Extensions (4.5.4) - restriction: || (&& (== net8.0) (>= net462)) (&& (== net8.0) (< netstandard2.1)) (== netstandard2.0) (&& (== netstandard2.1) (>= net462)) - System.Runtime.CompilerServices.Unsafe (>= 4.5.3) - restriction: || (&& (== net8.0) (>= net461)) (&& (== net8.0) (< netcoreapp2.1)) (&& (== net8.0) (< netstandard1.0)) (&& (== net8.0) (< netstandard2.0)) (&& (== net8.0) (>= wp8)) (== netstandard2.0) (== netstandard2.1) + Microsoft.Bcl.AsyncInterfaces (9.0.4) - restriction: || (&& (== net9.0) (< netcoreapp3.1)) (== netstandard2.0) (== netstandard2.1) + System.Threading.Tasks.Extensions (>= 4.5.4) - restriction: || (&& (== net9.0) (>= net462)) (&& (== net9.0) (< netstandard2.1)) (== netstandard2.0) (&& (== netstandard2.1) (>= net462)) + System.Buffers (4.6.1) - restriction: || (&& (== net9.0) (>= net462)) (&& (== net9.0) (< netcoreapp2.1) (< netstandard2.1)) (== netstandard2.0) (&& (== netstandard2.1) (>= net462)) + System.Diagnostics.DiagnosticSource (9.0.4) + System.Memory (>= 4.5.5) - restriction: || (&& (== net9.0) (>= net462)) (&& (== net9.0) (< net8.0)) (== netstandard2.0) (== netstandard2.1) + System.Runtime.CompilerServices.Unsafe (>= 6.0) - restriction: || (&& (== net9.0) (>= net462)) (&& (== net9.0) (< net8.0)) (== netstandard2.0) (== netstandard2.1) + System.Memory (4.6.3) - restriction: || (&& (== net9.0) (>= net462)) (&& (== net9.0) (< net8.0)) (== netstandard2.0) (== netstandard2.1) + System.Buffers (>= 4.6.1) - restriction: || (&& (== net9.0) (>= net462)) (&& (== net9.0) (< netcoreapp2.1) (< netstandard2.1)) (== netstandard2.0) (&& (== netstandard2.1) (>= net462)) + System.Numerics.Vectors (>= 4.6.1) - restriction: || (&& (== net9.0) (>= net462)) (&& (== net9.0) (< netcoreapp2.1) (< netstandard2.1)) (== netstandard2.0) (&& (== netstandard2.1) (>= net462)) + System.Runtime.CompilerServices.Unsafe (>= 6.1.2) - restriction: || (&& (== net9.0) (>= net462)) (&& (== net9.0) (< netcoreapp2.1) (< netstandard2.1)) (== netstandard2.0) (&& (== netstandard2.1) (>= net462)) + System.Numerics.Vectors (4.6.1) - restriction: || (&& (== net9.0) (>= net462)) (&& (== net9.0) (< netcoreapp2.1) (< netstandard2.1)) (== netstandard2.0) (&& (== netstandard2.1) (>= net462)) + System.Runtime.CompilerServices.Unsafe (6.1.2) - restriction: || (&& (== net9.0) (>= net462)) (&& (== net9.0) (< net8.0)) (&& (== net9.0) (< netcoreapp2.1) (< netstandard2.1)) (== netstandard2.0) (== netstandard2.1) + System.Threading.Tasks.Extensions (4.6.3) - restriction: || (&& (== net9.0) (>= net462)) (&& (== net9.0) (< netstandard2.1)) (== netstandard2.0) (&& (== netstandard2.1) (>= net462)) + System.Runtime.CompilerServices.Unsafe (>= 6.1.2) - restriction: || (&& (== net9.0) (>= net462)) (&& (== net9.0) (< netcoreapp2.1) (< netstandard2.1)) (== netstandard2.0) (&& (== netstandard2.1) (>= net462)) Unquote (6.1) FSharp.Core (>= 4.7.2) GITHUB remote: eiriktsarpalis/TypeShape src/TypeShape/TypeShape.fs (6e7fe07c799de723de7e4b32d64a4fd6c1697c7f) GROUP Test -RESTRICTION: == net8.0 +RESTRICTION: == net9.0 NUGET remote: https://api.nuget.org/v3/index.json FsCheck (2.16.6) FSharp.Core (>= 4.2.3) - FSharp.Core (8.0.100) - Microsoft.CodeCoverage (17.8) - Microsoft.NET.Test.Sdk (17.8) - Microsoft.CodeCoverage (>= 17.8) - Microsoft.TestPlatform.TestHost (>= 17.8) - Microsoft.NETCore.Platforms (7.0.4) - Microsoft.TestPlatform.ObjectModel (17.8) - NuGet.Frameworks (>= 6.5) + FSharp.Core (9.0.202) + Microsoft.CodeCoverage (17.13) + Microsoft.NET.Test.Sdk (17.13) + Microsoft.CodeCoverage (>= 17.13) + Microsoft.TestPlatform.TestHost (>= 17.13) + Microsoft.TestPlatform.ObjectModel (17.13) System.Reflection.Metadata (>= 1.6) - Microsoft.TestPlatform.TestHost (17.8) - Microsoft.TestPlatform.ObjectModel (>= 17.8) + Microsoft.TestPlatform.TestHost (17.13) + Microsoft.TestPlatform.ObjectModel (>= 17.13) Newtonsoft.Json (>= 13.0.1) - NETStandard.Library (2.0.3) - Microsoft.NETCore.Platforms (>= 1.1) Newtonsoft.Json (13.0.3) - NuGet.Frameworks (6.8) - System.Collections.Immutable (8.0) - System.Reflection.Metadata (8.0) - System.Collections.Immutable (>= 8.0) - xunit (2.6.2) - xunit.analyzers (>= 1.6) - xunit.assert (>= 2.6.2) - xunit.core (2.6.2) + System.Reflection.Metadata (9.0.4) + xunit (2.9.3) + xunit.analyzers (>= 1.18) + xunit.assert (>= 2.9.3) + xunit.core (2.9.3) xunit.abstractions (2.0.3) - xunit.analyzers (1.6) - xunit.assert (2.6.2) - xunit.core (2.6.2) - xunit.extensibility.core (2.6.2) - xunit.extensibility.execution (2.6.2) - xunit.extensibility.core (2.6.2) - NETStandard.Library (>= 1.6.1) + xunit.analyzers (1.21) + xunit.assert (2.9.3) + xunit.core (2.9.3) + xunit.extensibility.core (2.9.3) + xunit.extensibility.execution (2.9.3) + xunit.extensibility.core (2.9.3) xunit.abstractions (>= 2.0.3) - xunit.extensibility.execution (2.6.2) - NETStandard.Library (>= 1.6.1) - xunit.extensibility.core (2.6.2) - xunit.runner.visualstudio (2.5.4) + xunit.extensibility.execution (2.9.3) + xunit.extensibility.core (2.9.3) + xunit.runner.visualstudio (3.0.2) diff --git a/src/FSharp.AWS.DynamoDB/Diagnostics.fs b/src/FSharp.AWS.DynamoDB/Diagnostics.fs new file mode 100644 index 0000000..a61e3f6 --- /dev/null +++ b/src/FSharp.AWS.DynamoDB/Diagnostics.fs @@ -0,0 +1,132 @@ +namespace FSharp.AWS.DynamoDB + +module internal Activity = + open System.Diagnostics + + let private activitySource = new ActivitySource "FSharp.AWS.DynamoDB" + + let hasListeners () : bool = activitySource.HasListeners() + + let private addTag (tag: string) (value: obj) (activity: Activity) = + if activity <> null then + activity.AddTag(tag, value) + else + activity + + /// Adds the hard-coded `db.system`, `rpc.system`, and `rpc.service` tags to the `Activity`. + let addStandardTags (activity: Activity) = + activity + |> addTag "db.system" "dynamodb" + |> addTag "rpc.system" "aws-api" + |> addTag "rpc.service" "DynamoDB" + + let addTableName (tableName: string) = addTag "aws.dynamodb.table_names" [| tableName |] + + let addTableNames (tableNames: string seq) = addTag "aws.dynamodb.table_names" (Array.ofSeq tableNames) + + let addOperation (operation: string) = addTag "rpc.method" operation + + let addConsistentRead (consistentRead: bool) = addTag "aws.dynamodb.consistent_read" consistentRead + + let addScanIndexForward (scanIndexForward: bool) = addTag "aws.dynamodb.scan_forward" scanIndexForward + + let addLimit (limit: int option) (activity: Activity) = + match limit with + | Some l -> addTag "aws.dynamodb.limit" l activity + | None -> activity + + let addIndexName (indexName: string option) (activity: Activity) = + match indexName with + | Some i -> addTag "aws.dynamodb.index_name" i activity + | None -> activity + + let addProjection (projection: string) = addTag "aws.dynamodb.projection" projection + + let addCount (count: int) = addTag "aws.dynamodb.count" count + + let addScannedCount (scannedCount: int) = addTag "aws.dynamodb.scanned_count" scannedCount + + /// Starts a new activity for a DynamoDB operation named "{operation} {tableName}" (eg "GetItem MyTable"). + /// Sets the standard tags for all table operations and returns the `Activity` for further customization. + let startTableActivity (tableName: string) (operation: string) : Activity = + activitySource.StartActivity(sprintf "%s %s" operation tableName, ActivityKind.Client) + |> addStandardTags + |> addTableName tableName + |> addOperation operation + + /// Starts a new activity for a multi-table DynamoDB operation (eg "BatchGetItem"). + /// Sets the standard tags for all table operations and returns the `Activity` for further customization. + let startMultipleTableActivity (tableNames: string seq) (operation: string) : Activity = + activitySource.StartActivity(operation, ActivityKind.Client) + |> addStandardTags + |> addTableNames tableNames + |> addOperation operation + + let addException (ex: exn) (activity: Activity) = + if activity <> null then + activity.AddException ex + else + activity + + let stop (activity: Activity) = + if activity <> null then + activity.Stop() + +module internal Meter = + open System.Diagnostics.Metrics + open System.Collections.Generic + + let private meter = new Meter "FSharp.AWS.DynamoDB" + + let private consumedReadCapacity = + meter.CreateHistogram( + "db.client.operation.consumed_read_capacity", + unit = "RCU", + description = "Consumed read capacity units (RCU) for the DynamoDB operation", + tags = [ KeyValuePair("db.system.name", "aws.dynamodb" :> obj) ] + ) + + let private consumedWriteCapacity = + meter.CreateHistogram( + "db.client.operation.consumed_write_capacity", + unit = "WCU", + description = "Consumed write capacity units (WCU) for the DynamoDB operation", + tags = [ KeyValuePair("db.system.name", "aws.dynamodb" :> obj) ] + ) + + let recordConsumedReadCapacity (tableName: string) (operation: string) (rcu: float) = + if rcu > 0.0 then + consumedReadCapacity.Record( + rcu, + KeyValuePair("db.collection.name", tableName :> obj), + KeyValuePair("db.operation.name", operation :> obj) + ) + + let recordConsumedWriteCapacity (tableName: string) (operation: string) (wcu: float) = + if wcu > 0.0 then + consumedWriteCapacity.Record( + wcu, + KeyValuePair("db.collection.name", tableName :> obj), + KeyValuePair("db.operation.name", operation :> obj) + ) + + let recordConsumedCapacity (tableName: string) (operation: string) (capacity : Amazon.DynamoDBv2.Model.ConsumedCapacity) = + if capacity <> null then + recordConsumedReadCapacity tableName operation capacity.ReadCapacityUnits + recordConsumedWriteCapacity tableName operation capacity.WriteCapacityUnits + +module internal Task = + open System.Threading.Tasks + open System.Diagnostics + + /// Adds an exception to the activity if the task is faulted + /// Handles unwrapping the inner exception if present + let addActivityException (activity: Activity) (task: Task<'a>) = + if task.IsFaulted then + let exn = + if task.Exception.InnerExceptions.Count = 1 then + task.Exception.InnerExceptions[0] + else + task.Exception + Activity.addException exn activity |> ignore + task diff --git a/src/FSharp.AWS.DynamoDB/FSharp.AWS.DynamoDB.fsproj b/src/FSharp.AWS.DynamoDB/FSharp.AWS.DynamoDB.fsproj index daebc2a..99f18c3 100644 --- a/src/FSharp.AWS.DynamoDB/FSharp.AWS.DynamoDB.fsproj +++ b/src/FSharp.AWS.DynamoDB/FSharp.AWS.DynamoDB.fsproj @@ -1,4 +1,3 @@ - netstandard2.0 @@ -35,10 +34,11 @@ + - + \ No newline at end of file diff --git a/src/FSharp.AWS.DynamoDB/Script.fsx b/src/FSharp.AWS.DynamoDB/Script.fsx index 83b9152..13aaf9d 100644 --- a/src/FSharp.AWS.DynamoDB/Script.fsx +++ b/src/FSharp.AWS.DynamoDB/Script.fsx @@ -1,7 +1,7 @@ #if USE_PUBLISHED_NUGET // If you don't want to do a local build first #r "nuget: FSharp.AWS.DynamoDB, *-*" // *-* to white-list the fact that all releases to date have been `-beta` sufficed #else -#I "../../tests/FSharp.AWS.DynamoDB.Tests/bin/Debug/net8.0/" +#I "../../tests/FSharp.AWS.DynamoDB.Tests/bin/Debug/net9.0/" #r "AWSSDK.Core.dll" #r "AWSSDK.DynamoDBv2.dll" #r "FSharp.AWS.DynamoDB.dll" diff --git a/src/FSharp.AWS.DynamoDB/TableContext.fs b/src/FSharp.AWS.DynamoDB/TableContext.fs index c442607..b7f88b7 100644 --- a/src/FSharp.AWS.DynamoDB/TableContext.fs +++ b/src/FSharp.AWS.DynamoDB/TableContext.fs @@ -86,9 +86,10 @@ module internal CreateTableRequest = customize |> Option.iter (fun c -> c req) req - let execute (client: IAmazonDynamoDB) request : Async = async { + let execute (client: IAmazonDynamoDB) (request: CreateTableRequest) : Async = async { let! ct = Async.CancellationToken - return! client.CreateTableAsync(request, ct) |> Async.AwaitTaskCorrect + use activity = Activity.startTableActivity request.TableName "CreateTable" + return! client.CreateTableAsync(request, ct) |> Task.addActivityException activity |> Async.AwaitTaskCorrect } module internal UpdateTableRequest = @@ -114,9 +115,10 @@ module internal UpdateTableRequest = apply tc sc request if customize request then Some request else None - let execute (client: IAmazonDynamoDB) request : Async = async { + let execute (client: IAmazonDynamoDB) (request: UpdateTableRequest) : Async = async { let! ct = Async.CancellationToken - let! response = client.UpdateTableAsync(request, ct) |> Async.AwaitTaskCorrect + use activity = Activity.startTableActivity request.TableName "UpdateTable" + let! response = client.UpdateTableAsync(request, ct) |> Task.addActivityException activity |> Async.AwaitTaskCorrect return response.TableDescription } @@ -124,7 +126,11 @@ module internal Provisioning = let tryDescribe (client: IAmazonDynamoDB, tableName: string) : Async = async { let! ct = Async.CancellationToken - let! td = client.DescribeTableAsync(tableName, ct) |> Async.AwaitTaskCorrect + use activity = Activity.startTableActivity tableName "DescribeTable" + let! td = + client.DescribeTableAsync(tableName, ct) + |> Task.addActivityException activity + |> Async.AwaitTaskCorrect return match td.Table with | t when t.TableStatus = TableStatus.ACTIVE -> Some t @@ -251,30 +257,32 @@ type TableContext<'TRecord> Operation = operation ConsumedCapacity = consumedCapacity ItemCount = itemCount } + let returnConsumedCapacity, maybeReport = match metricsCollector with | Some sink -> ReturnConsumedCapacity.INDEXES, Some(reportMetrics sink) + | None when Activity.hasListeners () -> ReturnConsumedCapacity.TOTAL, None | None -> ReturnConsumedCapacity.NONE, None let tryGetItemAsync (key: TableKey) (consistentRead: bool option) (proj: ProjectionExpr.ProjectionExpr option) = async { + let consistentRead = defaultArg consistentRead false + use activity = Activity.startTableActivity tableName "GetItem" |> Activity.addConsistentRead consistentRead let kav = template.ToAttributeValues(key) - let request = GetItemRequest(tableName, kav, ReturnConsumedCapacity = returnConsumedCapacity) + let request = GetItemRequest(tableName, kav, ReturnConsumedCapacity = returnConsumedCapacity, ConsistentRead = consistentRead) match proj with | None -> () | Some proj -> let aw = AttributeWriter(request.ExpressionAttributeNames, null) request.ProjectionExpression <- proj.Write aw - - match consistentRead with - | None -> () - | Some c -> request.ConsistentRead <- c + activity |> Activity.addProjection request.ProjectionExpression |> ignore let! ct = Async.CancellationToken - let! response = client.GetItemAsync(request, ct) |> Async.AwaitTaskCorrect + let! response = client.GetItemAsync(request, ct) |> Task.addActivityException activity |> Async.AwaitTaskCorrect + maybeReport |> Option.iter (fun r -> r GetItem [ response.ConsumedCapacity ] (if response.IsItemSet then 1 else 0)) - if response.HttpStatusCode <> HttpStatusCode.OK then - failwithf "GetItem request returned error %O" response.HttpStatusCode + + Meter.recordConsumedCapacity tableName "GetItem" response.ConsumedCapacity if response.IsItemSet then return Some response.Item @@ -289,8 +297,8 @@ type TableContext<'TRecord> } let batchGetItemsAsync (keys: seq) (consistentRead: bool option) (projExpr: ProjectionExpr.ProjectionExpr option) = async { - let consistentRead = defaultArg consistentRead false + use activity = Activity.startTableActivity tableName "BatchGetItem" |> Activity.addConsistentRead consistentRead let kna = KeysAndAttributes() kna.AttributesToGet.AddRange(template.Info.Properties |> Seq.map (fun p -> p.Name)) kna.Keys.AddRange(keys |> Seq.map template.ToAttributeValues) @@ -300,16 +308,21 @@ type TableContext<'TRecord> | Some projExpr -> let aw = AttributeWriter(kna.ExpressionAttributeNames, null) kna.ProjectionExpression <- projExpr.Write aw + activity |> Activity.addProjection kna.ProjectionExpression |> ignore let request = BatchGetItemRequest(ReturnConsumedCapacity = returnConsumedCapacity) request.RequestItems[tableName] <- kna let! ct = Async.CancellationToken - let! response = client.BatchGetItemAsync(request, ct) |> Async.AwaitTaskCorrect + let! response = + client.BatchGetItemAsync(request, ct) + |> Task.addActivityException activity + |> Async.AwaitTaskCorrect + maybeReport |> Option.iter (fun r -> r BatchGetItems (List.ofSeq response.ConsumedCapacity) response.Responses[tableName].Count) - if response.HttpStatusCode <> HttpStatusCode.OK then - failwithf "BatchGetItem request returned error %O" response.HttpStatusCode + + response.ConsumedCapacity |> Seq.iter (Meter.recordConsumedCapacity tableName "BatchGetItem") return response.Responses[tableName] } @@ -343,6 +356,12 @@ type TableContext<'TRecord> let mutable lastEvaluatedKey: Dictionary option = None let rec aux last = async { + use activity = + Activity.startTableActivity tableName "Query" + |> Activity.addConsistentRead (defaultArg consistentRead false) + |> Activity.addScanIndexForward (defaultArg scanIndexForward true) + |> Activity.addIndexName keyCondition.IndexName + |> Activity.addLimit (limit.GetCount()) let request = QueryRequest(tableName, ReturnConsumedCapacity = returnConsumedCapacity) keyCondition.IndexName |> Option.iter (fun name -> request.IndexName <- name) let writer = AttributeWriter(request.ExpressionAttributeNames, request.ExpressionAttributeValues) @@ -354,7 +373,9 @@ type TableContext<'TRecord> match projectionExpr with | None -> () - | Some pe -> request.ProjectionExpression <- pe.Write writer + | Some pe -> + request.ProjectionExpression <- pe.Write writer + Activity.addProjection request.ProjectionExpression activity |> ignore limit.GetCount() |> Option.iter (fun l -> request.Limit <- l - downloaded.Count) consistentRead |> Option.iter (fun cr -> request.ConsistentRead <- cr) @@ -362,13 +383,13 @@ type TableContext<'TRecord> last |> Option.iter (fun l -> request.ExclusiveStartKey <- l) let! ct = Async.CancellationToken - let! response = client.QueryAsync(request, ct) |> Async.AwaitTaskCorrect + let! response = client.QueryAsync(request, ct) |> Task.addActivityException activity |> Async.AwaitTaskCorrect consumedCapacity.Add response.ConsumedCapacity - if response.HttpStatusCode <> HttpStatusCode.OK then - emitMetrics () - failwithf "Query request returned error %O" response.HttpStatusCode + Meter.recordConsumedCapacity tableName "Query" response.ConsumedCapacity + Activity.stop activity // Explicitly stop activity as it is still in scope downloaded.AddRange response.Items + if response.LastEvaluatedKey.Count > 0 then lastEvaluatedKey <- Some response.LastEvaluatedKey if limit.IsDownloadIncomplete downloaded.Count then @@ -428,6 +449,11 @@ type TableContext<'TRecord> let emitMetrics () = maybeReport |> Option.iter (fun r -> r Scan (Seq.toList consumedCapacity) downloaded.Count) let mutable lastEvaluatedKey: Dictionary option = None let rec aux last = async { + use activity = + Activity.startTableActivity tableName "Scan" + |> Activity.addConsistentRead (defaultArg consistentRead false) + |> Activity.addLimit (limit.GetCount()) + let request = ScanRequest(tableName, ReturnConsumedCapacity = returnConsumedCapacity) let writer = AttributeWriter(request.ExpressionAttributeNames, request.ExpressionAttributeValues) match filterCondition with @@ -436,20 +462,25 @@ type TableContext<'TRecord> match projectionExpr with | None -> () - | Some pe -> request.ProjectionExpression <- pe.Write writer + | Some pe -> + request.ProjectionExpression <- pe.Write writer + Activity.addProjection request.ProjectionExpression activity |> ignore limit.GetCount() |> Option.iter (fun l -> request.Limit <- l - downloaded.Count) consistentRead |> Option.iter (fun cr -> request.ConsistentRead <- cr) last |> Option.iter (fun l -> request.ExclusiveStartKey <- l) let! ct = Async.CancellationToken - let! response = client.ScanAsync(request, ct) |> Async.AwaitTaskCorrect - if response.HttpStatusCode <> HttpStatusCode.OK then - emitMetrics () - failwithf "Scan request returned error %O" response.HttpStatusCode + let! response = client.ScanAsync(request, ct) |> Task.addActivityException activity |> Async.AwaitTaskCorrect downloaded.AddRange response.Items consumedCapacity.Add response.ConsumedCapacity + Meter.recordConsumedCapacity tableName "Scan" response.ConsumedCapacity + activity + |> Activity.addCount response.Count + |> Activity.addScannedCount response.ScannedCount + |> Activity.stop // Explicitly stop activity as it is still in scope + if response.LastEvaluatedKey.Count > 0 then lastEvaluatedKey <- Some response.LastEvaluatedKey if limit.IsDownloadIncomplete downloaded.Count then @@ -523,6 +554,7 @@ type TableContext<'TRecord> /// Precondition to satisfy where item already exists. Use Precondition.CheckFailed to identify Precondition Check failures. member _.PutItemAsync(item: 'TRecord, ?precondition: ConditionExpression<'TRecord>) : Async = async { let attrValues = template.ToAttributeValues(item) + use activity = Activity.startTableActivity tableName "PutItem" let request = PutItemRequest(tableName, attrValues, ReturnValues = ReturnValue.NONE, ReturnConsumedCapacity = returnConsumedCapacity) match precondition with @@ -533,10 +565,9 @@ type TableContext<'TRecord> | _ -> () let! ct = Async.CancellationToken - let! response = client.PutItemAsync(request, ct) |> Async.AwaitTaskCorrect + let! response = client.PutItemAsync(request, ct) |> Task.addActivityException activity |> Async.AwaitTaskCorrect maybeReport |> Option.iter (fun r -> r PutItem [ response.ConsumedCapacity ] 1) - if response.HttpStatusCode <> HttpStatusCode.OK then - failwithf "PutItem request returned error %O" response.HttpStatusCode + Meter.recordConsumedCapacity tableName "PutItem" response.ConsumedCapacity return template.ExtractKey item } @@ -554,6 +585,7 @@ type TableContext<'TRecord> /// Any unprocessed items due to throttling. /// Items to be written. member _.BatchPutItemsAsync(items: seq<'TRecord>) : Async<'TRecord[]> = async { + use activity = Activity.startTableActivity tableName "BatchWriteItem" let mkWriteRequest (item: 'TRecord) = let attrValues = template.ToAttributeValues(item) let pr = PutRequest(attrValues) @@ -566,7 +598,7 @@ type TableContext<'TRecord> let pbr = BatchWriteItemRequest(ReturnConsumedCapacity = returnConsumedCapacity) pbr.RequestItems[tableName] <- writeRequests let! ct = Async.CancellationToken - let! response = client.BatchWriteItemAsync(pbr, ct) |> Async.AwaitTaskCorrect + let! response = client.BatchWriteItemAsync(pbr, ct) |> Task.addActivityException activity |> Async.AwaitTaskCorrect let unprocessed = match response.UnprocessedItems.TryGetValue tableName with | true, reqs -> @@ -577,8 +609,7 @@ type TableContext<'TRecord> | false, _ -> [||] maybeReport |> Option.iter (fun r -> r BatchWriteItems (Seq.toList response.ConsumedCapacity) (items.Length - unprocessed.Length)) - if response.HttpStatusCode <> HttpStatusCode.OK then - failwithf "BatchWriteItem put request returned error %O" response.HttpStatusCode + response.ConsumedCapacity |> Seq.iter (Meter.recordConsumedCapacity tableName "BatchWriteItem") return unprocessed |> Array.map template.OfAttributeValues } @@ -597,7 +628,7 @@ type TableContext<'TRecord> ?returnLatest: bool ) : Async<'TRecord> = async { - + use activity = Activity.startTableActivity tableName "UpdateItem" let kav = template.ToAttributeValues(key) let request = UpdateItemRequest(Key = kav, TableName = tableName, ReturnConsumedCapacity = returnConsumedCapacity) request.ReturnValues <- @@ -616,10 +647,10 @@ type TableContext<'TRecord> | _ -> () let! ct = Async.CancellationToken - let! response = client.UpdateItemAsync(request, ct) |> Async.AwaitTaskCorrect + let! response = client.UpdateItemAsync(request, ct) |> Task.addActivityException activity |> Async.AwaitTaskCorrect + maybeReport |> Option.iter (fun r -> r UpdateItem [ response.ConsumedCapacity ] 1) - if response.HttpStatusCode <> HttpStatusCode.OK then - failwithf "UpdateItem request returned error %O" response.HttpStatusCode + Meter.recordConsumedCapacity tableName "UpdateItem" response.ConsumedCapacity return template.OfAttributeValues response.Attributes } @@ -673,13 +704,19 @@ type TableContext<'TRecord> /// /// Key to be checked. member _.ContainsKeyAsync(key: TableKey) : Async = async { + use activity = + Activity.startTableActivity tableName "GetItem" + |> Activity.addConsistentRead false + |> Activity.addProjection "#HKEY" + let kav = template.ToAttributeValues(key) let request = GetItemRequest(tableName, kav, ReturnConsumedCapacity = returnConsumedCapacity) request.ExpressionAttributeNames.Add("#HKEY", template.PrimaryKey.HashKey.AttributeName) request.ProjectionExpression <- "#HKEY" let! ct = Async.CancellationToken - let! response = client.GetItemAsync(request, ct) |> Async.AwaitTaskCorrect + let! response = client.GetItemAsync(request, ct) |> Task.addActivityException activity |> Async.AwaitTaskCorrect maybeReport |> Option.iter (fun r -> r GetItem [ response.ConsumedCapacity ] 1) + Meter.recordConsumedCapacity tableName "GetItem" response.ConsumedCapacity return response.IsItemSet } @@ -776,6 +813,7 @@ type TableContext<'TRecord> /// Key of item to be deleted. /// Specifies a precondition expression that existing item should satisfy. Use Precondition.CheckFailed to identify Precondition Check failures. member _.DeleteItemAsync(key: TableKey, ?precondition: ConditionExpression<'TRecord>) : Async<'TRecord option> = async { + use activity = Activity.startTableActivity tableName "DeleteItem" let kav = template.ToAttributeValues key let request = DeleteItemRequest(tableName, kav, ReturnValues = ReturnValue.ALL_OLD, ReturnConsumedCapacity = returnConsumedCapacity) match precondition with @@ -786,10 +824,10 @@ type TableContext<'TRecord> | None -> () let! ct = Async.CancellationToken - let! response = client.DeleteItemAsync(request, ct) |> Async.AwaitTaskCorrect + let! response = client.DeleteItemAsync(request, ct) |> Task.addActivityException activity |> Async.AwaitTaskCorrect + maybeReport |> Option.iter (fun r -> r DeleteItem [ response.ConsumedCapacity ] 1) - if response.HttpStatusCode <> HttpStatusCode.OK then - failwithf "DeleteItem request returned error %O" response.HttpStatusCode + Meter.recordConsumedCapacity tableName "DeleteItem" response.ConsumedCapacity if response.Attributes.Count = 0 then return None @@ -811,6 +849,7 @@ type TableContext<'TRecord> /// Any unprocessed keys due to throttling. /// Keys of items to be deleted. member _.BatchDeleteItemsAsync(keys: seq) = async { + use activity = Activity.startTableActivity tableName "BatchWriteItem" let mkDeleteRequest (key: TableKey) = let kav = template.ToAttributeValues(key) let pr = DeleteRequest(kav) @@ -824,7 +863,11 @@ type TableContext<'TRecord> request.RequestItems[tableName] <- deleteRequests let! ct = Async.CancellationToken - let! response = client.BatchWriteItemAsync(request, ct) |> Async.AwaitTaskCorrect + let! response = + client.BatchWriteItemAsync(request, ct) + |> Task.addActivityException activity + |> Async.AwaitTaskCorrect + let unprocessed = match response.UnprocessedItems.TryGetValue tableName with | true, reqs -> @@ -835,8 +878,7 @@ type TableContext<'TRecord> | false, _ -> [||] maybeReport |> Option.iter (fun r -> r BatchWriteItems (Seq.toList response.ConsumedCapacity) (keys.Length - unprocessed.Length)) - if response.HttpStatusCode <> HttpStatusCode.OK then - failwithf "BatchWriteItem deletion request returned error %O" response.HttpStatusCode + response.ConsumedCapacity |> Seq.iter (Meter.recordConsumedCapacity tableName "BatchWriteItem") return unprocessed |> Array.map template.ExtractKey } @@ -1319,8 +1361,7 @@ type TableContext<'TRecord> t.VerifyTableAsync() /// Creates a new `Transaction`, using the DynamoDB client and metricsCollector configured for this `TableContext` - member _.CreateTransaction() = - Transaction(client, ?metricsCollector = metricsCollector) + member _.CreateTransaction() = Transaction(client, ?metricsCollector = metricsCollector) /// /// Represents a transactional set of operations to be applied atomically to a arbitrary number of DynamoDB tables. @@ -1339,21 +1380,25 @@ and Transaction(client: IAmazonDynamoDB, ?metricsCollector: (RequestMetrics -> u let returnConsumedCapacity, maybeReport = match metricsCollector with - | Some sink -> ReturnConsumedCapacity.INDEXES, Some(reportMetrics sink) + | Some sink -> ReturnConsumedCapacity.TOTAL, Some(reportMetrics sink) + | None when Activity.hasListeners () -> ReturnConsumedCapacity.TOTAL, None | None -> ReturnConsumedCapacity.NONE, None + let tableNameForItem (item: TransactWriteItem) = + match (item.Put, item.ConditionCheck, item.Delete, item.Update) with + | (null, null, null, null) -> invalidArg "item" "must contain a Put, ConditionCheck, Delete, or Update operation." + | (p, null, null, null) -> p.TableName + | (_, c, null, null) -> c.TableName + | (_, _, d, null) -> d.TableName + | (_, _, _, u) -> u.TableName + /// /// Adds a Put operation to the transaction. /// /// Table context to operate on. /// Item to be put. /// Optional precondition expression. - member _.Put<'TRecord> - ( - tableContext: TableContext<'TRecord>, - item: 'TRecord, - ?precondition: ConditionExpression<'TRecord> - ) = + member _.Put<'TRecord>(tableContext: TableContext<'TRecord>, item: 'TRecord, ?precondition: ConditionExpression<'TRecord>) = let req = Put(TableName = tableContext.TableName, Item = tableContext.Template.ToAttributeValues item) precondition |> Option.iter (fun cond -> @@ -1400,12 +1445,7 @@ and Transaction(client: IAmazonDynamoDB, ?metricsCollector: (RequestMetrics -> u /// Table context to operate on. /// Key of item to delete. /// Optional precondition expression. - member _.Delete - ( - tableContext: TableContext<'TRecord>, - key: TableKey, - ?precondition: ConditionExpression<'TRecord> - ) = + member _.Delete(tableContext: TableContext<'TRecord>, key: TableKey, ?precondition: ConditionExpression<'TRecord>) = let req = Delete(TableName = tableContext.TableName, Key = tableContext.Template.ToAttributeValues key) precondition |> Option.iter (fun cond -> @@ -1423,18 +1463,23 @@ and Transaction(client: IAmazonDynamoDB, ?metricsCollector: (RequestMetrics -> u if transactionItems.Count = 0 || transactionItems.Count > 100 then raise <| System.ArgumentOutOfRangeException(nameof transactionItems, "must be between 1 and 100 items.") + use activity = Activity.startMultipleTableActivity (transactionItems |> Seq.map tableNameForItem) "TransactWriteItems" let req = TransactWriteItemsRequest(ReturnConsumedCapacity = returnConsumedCapacity, TransactItems = transactionItems) clientRequestToken |> Option.iter (fun x -> req.ClientRequestToken <- x) let! ct = Async.CancellationToken - let! response = client.TransactWriteItemsAsync(req, ct) |> Async.AwaitTaskCorrect + let! response = + client.TransactWriteItemsAsync(req, ct) + |> Task.addActivityException activity + |> Async.AwaitTaskCorrect + maybeReport |> Option.iter (fun r -> response.ConsumedCapacity |> Seq.groupBy (fun x -> x.TableName) |> Seq.iter (fun (tableName, consumedCapacity) -> r tableName Operation.TransactWriteItems (Seq.toList consumedCapacity) (Seq.length transactionItems))) - if response.HttpStatusCode <> HttpStatusCode.OK then - failwithf "TransactWriteItems request returned error %O" response.HttpStatusCode + response.ConsumedCapacity + |> Seq.iter (fun c -> Meter.recordConsumedCapacity c.TableName "TransactWriteItems" c) } // Deprecated factory method, to be removed. Replaced with diff --git a/src/FSharp.AWS.DynamoDB/paket.references b/src/FSharp.AWS.DynamoDB/paket.references index 9b12cc9..af9c6cc 100644 --- a/src/FSharp.AWS.DynamoDB/paket.references +++ b/src/FSharp.AWS.DynamoDB/paket.references @@ -2,4 +2,5 @@ AWSSDK.DynamoDBv2 DotNet.ReproducibleBuilds FSharp.Core Unquote +System.Diagnostics.DiagnosticSource File: TypeShape.fs TypeShape diff --git a/tests/FSharp.AWS.DynamoDB.Tests/FSharp.AWS.DynamoDB.Tests.fsproj b/tests/FSharp.AWS.DynamoDB.Tests/FSharp.AWS.DynamoDB.Tests.fsproj index 3c973f3..35fca2b 100644 --- a/tests/FSharp.AWS.DynamoDB.Tests/FSharp.AWS.DynamoDB.Tests.fsproj +++ b/tests/FSharp.AWS.DynamoDB.Tests/FSharp.AWS.DynamoDB.Tests.fsproj @@ -1,6 +1,6 @@  - net8.0 + net9.0 diff --git a/tests/FSharp.AWS.DynamoDB.Tests/Utils.fs b/tests/FSharp.AWS.DynamoDB.Tests/Utils.fs index 30a9af1..31c19d7 100644 --- a/tests/FSharp.AWS.DynamoDB.Tests/Utils.fs +++ b/tests/FSharp.AWS.DynamoDB.Tests/Utils.fs @@ -16,6 +16,7 @@ open Amazon.Runtime [] module Utils = + open System.Diagnostics let guid () = Guid.NewGuid().ToString("N") @@ -56,12 +57,18 @@ module Utils = let client = getDynamoDBAccount () let tableName = getRandomTableName () + let activities = new Collections.Concurrent.ConcurrentBag() member _.Client = client member _.TableName = tableName member _.CreateEmpty<'TRecord>() = let throughput = ProvisionedThroughput(readCapacityUnits = 10L, writeCapacityUnits = 10L) + let listener = new ActivityListener() + listener.ShouldListenTo <- fun s -> s.Name = "FSharp.AWS.DynamoDB" + listener.Sample <- fun _ -> ActivitySamplingResult.AllData + listener.ActivityStopped <- fun a -> activities.Add(a) + ActivitySource.AddActivityListener(listener) Scripting.TableContext.Initialize<'TRecord>(client, tableName, Throughput.Provisioned throughput) interface IAsyncLifetime with