|
| 1 | +package lambda |
| 2 | + |
| 3 | +import ( |
| 4 | + "context" |
| 5 | + |
| 6 | + "github.com/aws/aws-lambda-go/events" |
| 7 | + "github.com/aws/aws-lambda-go/lambda" |
| 8 | + dynamodbtypes "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" |
| 9 | + "github.com/nguyengg/go-aws-commons/metrics" |
| 10 | +) |
| 11 | + |
| 12 | +// StartDynamodbEventHandler starts the Lambda loop for handling DynamoDB stream events in a batch. |
| 13 | +// |
| 14 | +// See https://docs.aws.amazon.com/lambda/latest/dg/services-ddb-batchfailurereporting.html. |
| 15 | +// |
| 16 | +// The handler works on one record at a time and sequentially. If the handler returns a non-nil error, the wrapper will |
| 17 | +// automatically add an events.DynamoDBBatchItemFailure so that only failed records get retried later. |
| 18 | +func StartDynamodbEventHandler(handler func(context.Context, events.DynamoDBEventRecord) error, options ...lambda.Option) { |
| 19 | + StartHandlerFunc(func(ctx context.Context, req events.DynamoDBEvent) (events.DynamoDBEventResponse, error) { |
| 20 | + m := metrics.Get(ctx) |
| 21 | + |
| 22 | + res := events.DynamoDBEventResponse{ |
| 23 | + BatchItemFailures: make([]events.DynamoDBBatchItemFailure, 0), |
| 24 | + } |
| 25 | + |
| 26 | + m.AddCounter("recordCount", int64(len(req.Records)), "failureCount") |
| 27 | + |
| 28 | + for _, record := range req.Records { |
| 29 | + if err := handler(ctx, record); err != nil { |
| 30 | + m.AddCounter("failureCount", 1) |
| 31 | + res.BatchItemFailures = append(res.BatchItemFailures, events.DynamoDBBatchItemFailure{ItemIdentifier: record.Change.SequenceNumber}) |
| 32 | + } |
| 33 | + } |
| 34 | + |
| 35 | + // very important that nil error is returned here. |
| 36 | + return res, nil |
| 37 | + }, options...) |
| 38 | +} |
| 39 | + |
| 40 | +// StreamToDynamoDBAttributeValue converts a DynamoDB Stream event attribute value (from |
| 41 | +// https://pkg.go.dev/github.com/aws/aws-lambda-go/events) to an equivalent DynamoDB attribute value (from |
| 42 | +// https://pkg.go.dev/github.com/aws/aws-sdk-go-v2/service/dynamodb/types). |
| 43 | +// |
| 44 | +// See StreamToDynamoDBItem for usage. |
| 45 | +func StreamToDynamoDBAttributeValue(av events.DynamoDBAttributeValue) dynamodbtypes.AttributeValue { |
| 46 | + // TODO as an exercise, remove recursion. |
| 47 | + |
| 48 | + switch av.DataType() { |
| 49 | + case events.DataTypeBinary: |
| 50 | + return &dynamodbtypes.AttributeValueMemberB{Value: av.Binary()} |
| 51 | + case events.DataTypeBoolean: |
| 52 | + return &dynamodbtypes.AttributeValueMemberBOOL{Value: av.Boolean()} |
| 53 | + case events.DataTypeBinarySet: |
| 54 | + return &dynamodbtypes.AttributeValueMemberBS{Value: av.BinarySet()} |
| 55 | + case events.DataTypeList: |
| 56 | + l := av.List() |
| 57 | + value := make([]dynamodbtypes.AttributeValue, len(l)) |
| 58 | + for i, v := range l { |
| 59 | + value[i] = StreamToDynamoDBAttributeValue(v) |
| 60 | + } |
| 61 | + return &dynamodbtypes.AttributeValueMemberL{Value: value} |
| 62 | + case events.DataTypeMap: |
| 63 | + value := make(map[string]dynamodbtypes.AttributeValue) |
| 64 | + for k, v := range av.Map() { |
| 65 | + value[k] = StreamToDynamoDBAttributeValue(v) |
| 66 | + } |
| 67 | + return &dynamodbtypes.AttributeValueMemberM{Value: value} |
| 68 | + case events.DataTypeNumber: |
| 69 | + return &dynamodbtypes.AttributeValueMemberN{Value: av.Number()} |
| 70 | + case events.DataTypeNumberSet: |
| 71 | + return &dynamodbtypes.AttributeValueMemberNS{Value: av.NumberSet()} |
| 72 | + case events.DataTypeNull: |
| 73 | + return &dynamodbtypes.AttributeValueMemberNULL{Value: av.IsNull()} |
| 74 | + case events.DataTypeString: |
| 75 | + return &dynamodbtypes.AttributeValueMemberS{Value: av.String()} |
| 76 | + case events.DataTypeStringSet: |
| 77 | + return &dynamodbtypes.AttributeValueMemberSS{Value: av.StringSet()} |
| 78 | + default: |
| 79 | + // should panic? |
| 80 | + return nil |
| 81 | + } |
| 82 | +} |
| 83 | + |
| 84 | +// StreamToDynamoDBItem uses StreamToDynamoDBAttributeValue to convert an item from a DynamoDB Stream event to an item |
| 85 | +// in DynamoDB. |
| 86 | +// |
| 87 | +// Useful if you're implementing a DynamoDB Stream event handler, and you need to convert the old and/or new image to |
| 88 | +// the tagged struct by way of https://pkg.go.dev/github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue: |
| 89 | +// |
| 90 | +// item := &MyStruct{} |
| 91 | +// err := attributevalue.UnmarshalMap(StreamToDynamoDBItem(record.Change.NewImage), item) |
| 92 | +func StreamToDynamoDBItem(in map[string]events.DynamoDBAttributeValue) map[string]dynamodbtypes.AttributeValue { |
| 93 | + out := make(map[string]dynamodbtypes.AttributeValue) |
| 94 | + for k, v := range in { |
| 95 | + out[k] = StreamToDynamoDBAttributeValue(v) |
| 96 | + } |
| 97 | + return out |
| 98 | +} |
0 commit comments