Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/FSharp.AWS.DynamoDB/Picklers/CollectionPicklers.fs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ type MapPickler<'Value>(vp: Pickler<'Value>) =
override _.PicklerType = PicklerType.Value
override _.DefaultValue = Map.empty
override _.Pickle map =
if isNull map then
if obj.ReferenceEquals(map, null) then
AttributeValue(NULL = true) |> Some
elif map.Count = 0 then
None
Expand Down
36 changes: 20 additions & 16 deletions src/FSharp.AWS.DynamoDB/Picklers/PrimitivePicklers.fs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ type BoolPickler() =

override _.DefaultValue = false
override _.Pickle b = AttributeValue(BOOL = b) |> Some
override _.UnPickle a = if a.IsBOOLSet then a.BOOL.GetValueOrDefault false else invalidCast a
override _.UnPickle a =
if a.IsBOOLSet then
a.BOOL.GetValueOrDefault false
else
invalidCast a

override _.Parse s = Boolean.Parse s
override _.UnParse s = string s
Expand All @@ -34,15 +38,15 @@ type StringPickler() =

override _.DefaultValue = null
override _.Pickle s =
if isNull s then
if s = null then
AttributeValue(NULL = true)
else
AttributeValue(s)
AttributeValue s
|> Some

override _.UnPickle a =
if a.IsNULL then null
elif not <| isNull a.S then a.S
elif a.IsSSet then a.S
else invalidCast a

override _.Parse s = s
Expand All @@ -56,7 +60,7 @@ type CharPickler() =

override _.DefaultValue = char 0
override _.Pickle c = AttributeValue(string c) |> Some
override _.UnPickle a = if not <| isNull a.S then Char.Parse(a.S) else invalidCast a
override _.UnPickle a = if a.IsSSet then Char.Parse(a.S) else invalidCast a

override _.Parse s = Char.Parse s
override _.UnParse c = string c
Expand All @@ -79,7 +83,7 @@ let inline mkNumericalPickler< ^N

member _.DefaultValue = Unchecked.defaultof< ^N>
member _.Pickle num = AttributeValue(N = toString num) |> Some
member _.UnPickle a = if not <| isNull a.N then parseNum a.N else invalidCast a
member _.UnPickle a = if a.IsNSet then parseNum a.N else invalidCast a

member x.PickleCoerced o =
let n =
Expand All @@ -102,7 +106,7 @@ type DoublePickler() =

override _.DefaultValue = Unchecked.defaultof<double>
override _.Pickle num = AttributeValue(N = unparse num) |> Some
override _.UnPickle a = if not <| isNull a.N then parse a.N else invalidCast a
override _.UnPickle a = if a.IsNSet then parse a.N else invalidCast a

override x.PickleCoerced o =
let n =
Expand All @@ -122,13 +126,13 @@ type ByteArrayPickler() =

override _.DefaultValue = [||]
override _.Pickle bs =
if isNull bs then Some <| AttributeValue(NULL = true)
if bs = null then Some <| AttributeValue(NULL = true)
elif bs.Length = 0 then None
else Some <| AttributeValue(B = new MemoryStream(bs))

override _.UnPickle a =
if a.IsNULL then null
elif not <| isNull a.B then a.B.ToArray()
elif a.IsBSet then a.B.ToArray()
else invalidCast a


Expand All @@ -139,13 +143,13 @@ type MemoryStreamPickler() =

override _.DefaultValue = null
override _.Pickle m =
if isNull m then Some <| AttributeValue(NULL = true)
if m = null then Some <| AttributeValue(NULL = true)
elif m.Length = 0L then None
else Some <| AttributeValue(B = m)

override _.UnPickle a =
if a.IsNULL then null
elif notNull a.B then a.B
elif a.IsBSet then a.B
else invalidCast a

type GuidPickler() =
Expand All @@ -156,7 +160,7 @@ type GuidPickler() =

override _.DefaultValue = Guid.Empty
override _.Pickle g = AttributeValue(string g) |> Some
override _.UnPickle a = if not <| isNull a.S then Guid.Parse a.S else invalidCast a
override _.UnPickle a = if a.IsSSet then Guid.Parse a.S else invalidCast a

override _.Parse s = Guid.Parse s
override _.UnParse g = string g
Expand All @@ -176,7 +180,7 @@ type DateTimeOffsetPickler() =
override _.UnParse d = unparse d

override _.Pickle d = AttributeValue(unparse d) |> Some
override _.UnPickle a = if not <| isNull a.S then parse a.S else invalidCast a
override _.UnPickle a = if a.IsSSet then parse a.S else invalidCast a


type TimeSpanPickler() =
Expand All @@ -190,7 +194,7 @@ type TimeSpanPickler() =
override _.DefaultValue = TimeSpan.Zero
override _.Pickle t = AttributeValue(N = string t.Ticks) |> Some
override _.UnPickle a =
if not <| isNull a.N then
if a.IsNSet then
TimeSpan.FromTicks(int64 a.N)
else
invalidCast a
Expand All @@ -204,7 +208,7 @@ type EnumerationPickler<'E, 'U when 'E: enum<'U> and 'E: struct and 'E :> ValueT
override _.DefaultValue = Unchecked.defaultof<'E>
override _.Pickle e = AttributeValue(S = e.ToString()) |> Some
override _.UnPickle a =
if notNull a.S then
if a.IsSSet then
Enum.Parse(typeof<'E>, a.S) :?> 'E
else
invalidCast a
Expand Down Expand Up @@ -252,7 +256,7 @@ type StringRepresentationPickler<'T>(ep: StringRepresentablePickler<'T>) =
override _.PicklerType = ep.PicklerType
override _.DefaultValue = ep.DefaultValue
override _.Pickle t = AttributeValue(S = ep.UnParse t) |> Some
override _.UnPickle a = if notNull a.S then ep.Parse a.S else invalidCast a
override _.UnPickle a = if a.IsSSet then ep.Parse a.S else invalidCast a

let mkStringRepresentationPickler (resolver: IPicklerResolver) (prop: PropertyInfo) =
TypeShape.Create(prop.PropertyType).Accept
Expand Down
4 changes: 2 additions & 2 deletions src/FSharp.AWS.DynamoDB/RecordKeySchema.fs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ type PrimaryKeyStructure with
static member ToAttributeValues(keyStructure: PrimaryKeyStructure, key: TableKey) =
let dict = Dictionary<string, AttributeValue>()
let extractKey name (pickler: Pickler) (value: obj) =
if isNull value then
if obj.ReferenceEquals(value, null) then
invalidArg name "Key value was not specified."
let av = pickler.PickleUntyped value |> Option.get
dict.Add(name, av)
Expand Down Expand Up @@ -323,7 +323,7 @@ type RecordTableInfo with
static member IndexKeyToAttributeValues(indexKeySchema: TableKeySchema, recordInfo: RecordTableInfo, key: IndexKey) =
let dict = PrimaryKeyStructure.ToAttributeValues(recordInfo.PrimaryKeyStructure, key.PrimaryKey)
let extractKey (ks: KeyAttributeSchema) (value: obj) =
if isNull value then
if obj.ReferenceEquals(value, null) then
invalidArg ks.AttributeName "Key value was not specified."
let meta = recordInfo.Properties |> Array.find (fun p -> p.Name = ks.AttributeName)
let av = meta.Pickler.PickleUntyped value |> Option.get
Expand Down
106 changes: 66 additions & 40 deletions src/FSharp.AWS.DynamoDB/TableContext.fs
Original file line number Diff line number Diff line change
Expand Up @@ -311,11 +311,27 @@ type TableContext<'TRecord>
let! ct = Async.CancellationToken
let! response = client.BatchGetItemAsync(request, ct) |> Async.AwaitTaskCorrect
maybeReport
|> Option.iter (fun r -> r BatchGetItems (List.ofSeq response.ConsumedCapacity) response.Responses[tableName].Count)
|> Option.iter (fun r ->
let cc =
if response.ConsumedCapacity = null then
[]
else
List.ofSeq response.ConsumedCapacity
r
BatchGetItems
cc
(if response.Responses = null then
0
else
response.Responses[tableName].Count))
if response.HttpStatusCode <> HttpStatusCode.OK then
failwithf "BatchGetItem request returned error %O" response.HttpStatusCode

return response.Responses[tableName]
return
if response.Responses = null then
ResizeArray()
else
response.Responses[tableName]
}

let queryPaginatedAsync
Expand Down Expand Up @@ -374,7 +390,9 @@ type TableContext<'TRecord>
emitMetrics ()
failwithf "Query request returned error %O" response.HttpStatusCode

downloaded.AddRange response.Items
if response.Items <> null then
downloaded.AddRange response.Items

if response.LastEvaluatedKey <> null && response.LastEvaluatedKey.Count > 0 then
lastEvaluatedKey <- Some response.LastEvaluatedKey
if limit.IsDownloadIncomplete downloaded.Count then
Expand Down Expand Up @@ -456,7 +474,9 @@ type TableContext<'TRecord>
emitMetrics ()
failwithf "Scan request returned error %O" response.HttpStatusCode

downloaded.AddRange response.Items
if response.Items <> null then
downloaded.AddRange response.Items

consumedCapacity.Add response.ConsumedCapacity
if response.LastEvaluatedKey <> null && response.LastEvaluatedKey.Count > 0 then
lastEvaluatedKey <- Some response.LastEvaluatedKey
Expand Down Expand Up @@ -578,15 +598,22 @@ type TableContext<'TRecord>
let! ct = Async.CancellationToken
let! response = client.BatchWriteItemAsync(pbr, ct) |> Async.AwaitTaskCorrect
let unprocessed =
match response.UnprocessedItems.TryGetValue tableName with
| true, reqs ->
reqs
|> Seq.choose (fun r -> r.PutRequest |> Option.ofObj)
|> Seq.map _.Item
|> Seq.toArray
| false, _ -> [||]
[| if response.UnprocessedItems <> null then
match response.UnprocessedItems.TryGetValue tableName with
| true, reqs ->
for r in reqs do
if r.PutRequest <> null then
yield r.PutRequest.Item
| false, _ -> () |]
maybeReport
|> Option.iter (fun r -> r BatchWriteItems (Seq.toList response.ConsumedCapacity) (items.Length - unprocessed.Length))
|> Option.iter (fun r ->
let cc =
if response.ConsumedCapacity = null then
[]
else
Seq.toList response.ConsumedCapacity
r BatchWriteItems cc (items.Length - unprocessed.Length))

if response.HttpStatusCode <> HttpStatusCode.OK then
failwithf "BatchWriteItem put request returned error %O" response.HttpStatusCode

Expand Down Expand Up @@ -686,7 +713,8 @@ type TableContext<'TRecord>
/// <param name="key">Key to be checked.</param>
member _.ContainsKeyAsync(key: TableKey) : Async<bool> = async {
let kav = template.ToAttributeValues(key)
let request = GetItemRequest(tableName, kav, ReturnConsumedCapacity = returnConsumedCapacity, ExpressionAttributeNames = Dictionary())
let request =
GetItemRequest(tableName, kav, ReturnConsumedCapacity = returnConsumedCapacity, ExpressionAttributeNames = Dictionary())
request.ExpressionAttributeNames.Add("#HKEY", template.PrimaryKey.HashKey.AttributeName)
request.ProjectionExpression <- "#HKEY"
let! ct = Async.CancellationToken
Expand Down Expand Up @@ -841,15 +869,21 @@ type TableContext<'TRecord>
let! ct = Async.CancellationToken
let! response = client.BatchWriteItemAsync(request, ct) |> Async.AwaitTaskCorrect
let unprocessed =
match response.UnprocessedItems.TryGetValue tableName with
| true, reqs ->
reqs
|> Seq.choose (fun r -> r.DeleteRequest |> Option.ofObj)
|> Seq.map _.Key
|> Seq.toArray
| false, _ -> [||]
[| if response.UnprocessedItems <> null then
match response.UnprocessedItems.TryGetValue tableName with
| true, reqs ->
for req in reqs do
if req.DeleteRequest <> null then
yield req.DeleteRequest.Key
| false, _ -> () |]
maybeReport
|> Option.iter (fun r -> r BatchWriteItems (Seq.toList response.ConsumedCapacity) (keys.Length - unprocessed.Length))
|> Option.iter (fun r ->
let cc =
if response.ConsumedCapacity = null then
[]
else
Seq.toList response.ConsumedCapacity
r BatchWriteItems cc (keys.Length - unprocessed.Length))
if response.HttpStatusCode <> HttpStatusCode.OK then
failwithf "BatchWriteItem deletion request returned error %O" response.HttpStatusCode

Expand Down Expand Up @@ -1334,8 +1368,7 @@ type TableContext<'TRecord>
t.VerifyTableAsync()

/// <summary>Creates a new `Transaction`, using the DynamoDB client and metricsCollector configured for this `TableContext`</summary>
member _.CreateTransaction() =
Transaction(client, ?metricsCollector = metricsCollector)
member _.CreateTransaction() = Transaction(client, ?metricsCollector = metricsCollector)

/// <summary>
/// Represents a transactional set of operations to be applied atomically to an arbitrary number of DynamoDB tables.
Expand Down Expand Up @@ -1363,12 +1396,7 @@ and Transaction(client: IAmazonDynamoDB, ?metricsCollector: RequestMetrics -> un
/// <param name="tableContext">Table context to operate on.</param>
/// <param name="item">Item to be put.</param>
/// <param name="precondition">Optional precondition expression.</param>
member _.Put<'TRecord>
(
tableContext: TableContext<'TRecord>,
item: 'TRecord,
?precondition: ConditionExpression<'TRecord>
) =
member _.Put<'TRecord>(tableContext: TableContext<'TRecord>, item: 'TRecord, ?precondition: ConditionExpression<'TRecord>) =
let req = Put(TableName = tableContext.TableName, Item = tableContext.Template.ToAttributeValues item)
match precondition with
| None -> ()
Expand Down Expand Up @@ -1412,7 +1440,7 @@ and Transaction(client: IAmazonDynamoDB, ?metricsCollector: RequestMetrics -> un
let writer = AttributeWriter()
req.UpdateExpression <- updater.UpdateOps.Write writer
precondition |> Option.iter (fun cond -> req.ConditionExpression <- cond.Conditional.Write writer)

req.ExpressionAttributeNames <- writer.Names
req.ExpressionAttributeValues <- writer.Values
transactionItems.Add(TransactWriteItem(Update = req))
Expand All @@ -1423,12 +1451,7 @@ and Transaction(client: IAmazonDynamoDB, ?metricsCollector: RequestMetrics -> un
/// <param name="tableContext">Table context to operate on.</param>
/// <param name="key">Key of item to delete.</param>
/// <param name="precondition">Optional precondition expression.</param>
member _.Delete
(
tableContext: TableContext<'TRecord>,
key: TableKey,
?precondition: ConditionExpression<'TRecord>
) =
member _.Delete(tableContext: TableContext<'TRecord>, key: TableKey, ?precondition: ConditionExpression<'TRecord>) =
let req = Delete(TableName = tableContext.TableName, Key = tableContext.Template.ToAttributeValues key)
match precondition with
| None -> ()
Expand All @@ -1455,10 +1478,13 @@ and Transaction(client: IAmazonDynamoDB, ?metricsCollector: RequestMetrics -> un
let! response = client.TransactWriteItemsAsync(req, ct) |> Async.AwaitTaskCorrect
maybeReport
|> Option.iter (fun r ->
response.ConsumedCapacity
|> Seq.groupBy _.TableName
|> Seq.iter (fun (tableName, consumedCapacity) ->
r tableName Operation.TransactWriteItems (Seq.toList consumedCapacity) (Seq.length transactionItems)))
let cc =
if response.ConsumedCapacity = null then
[]
else
Seq.toList response.ConsumedCapacity
for tableName, consumedCapacity in cc |> Seq.groupBy _.TableName do
r tableName Operation.TransactWriteItems (Seq.toList consumedCapacity) (Seq.length transactionItems))
if response.HttpStatusCode <> HttpStatusCode.OK then
failwithf "TransactWriteItems request returned error %O" response.HttpStatusCode
}
Expand Down
Loading
Loading