Description
Describe the bug
I'm trying to debug a problem where my application is crashing on leaked GoRoutines. While trying to root cause the problem I've found some issues that seem to be related with the SDK:
I'm launching 500 GoRoutines to do a PutItem (different PK on each). Only between 200-350 succeed (depending on the run), with the rest returning:
operation error DynamoDB: PutItem, failed to get rate limit token, retry quota exceeded, 0 available, 5 requested
Interestingly, the DDB table is configured as On Demand, and CloudWatch doesn't show any Throttling errors
Also, when running goleak.VerifyNone(t)
, the library detects lots of leaked GoRoutine in different states in some part of http libraries. I've looked into #1434 but the PutItem doesn't seem to have any response with io.ReadCloser
so I'm assuming these aren't related.
Expected Behavior
- I'm assuming rate limit errors should be caused by service throttle errors (DynamoDB) but I can see no evidence in DynamoDB that this is the cause of these errors, so more clarity would be useful (what rate limits am I hitting?)
- I would expect that there should be no Goroutine leaks at the end of the operations. This doesn't seem to be the case.
Current Behavior
- Receiving error
operation error DynamoDB: PutItem, failed to get rate limit token, retry quota exceeded, 0 available, 5 requested
without any throttling evidenced in DynamoDB CloudWatch metrics - Goroutine leaks
Reproduction Steps
Sample below includes commented code to customise the http setting which I used to see how that could change behaviour but couldn't find any settings that would solve the perceived problem.
package database
import (
"context"
"crypto/md5"
"crypto/rand"
"encoding/hex"
"fmt"
"log"
// "net"
"runtime"
"sync"
"testing"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
// "github.com/aws/aws-sdk-go-v2/aws/transport/http"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/stretchr/testify/assert"
"go.uber.org/goleak"
)
// func TestMain(m *testing.M) {
// goleak.VerifyTestMain(m)
// }
func TestConnectionReuseForPutContentHash(t *testing.T) {
defer goleak.VerifyNone(t)
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
svc := databaseConnection(ctx)
table := "testtable"
n := runtime.NumGoroutine()
var wg sync.WaitGroup
limit := 500
success := make(chan int, limit)
s := 0
go func() {
for i := range success {
s = s + i
}
}()
for i := 0; i < limit; i++ {
wg.Add(1)
go func(c chan int) {
defer wg.Done()
entry := NewHashEntry(
fakeHash(),
"175767cb005f8809beb731fd853d0343",
)
err := putItem(ctx, svc, table, entry)
if err == nil {
c <- 1
}
}(success)
}
wg.Wait()
cancel()
close(success)
assert.Equal(t, n, runtime.NumGoroutine(), "Number of GoRoutines is different to expected")
assert.Equal(t, limit, s, "Number of succesful items processed is different to requested")
// assert.Equal(t, true, false, "forced assert to be sure the test is performing")
}
func fakeHash() string {
hash := md5.New()
// used to create a hard enough to repeat hash and avoid PK problems
s := fmt.Sprintf("%d%s", time.Now().UnixNano(), pseudo_uuid())
hash.Write([]byte(s))
hashInBytes := hash.Sum(nil)[:16]
returnHashString := hex.EncodeToString(hashInBytes)
return returnHashString
}
// Note - NOT RFC4122 compliant
func pseudo_uuid() (uuid string) {
b := make([]byte, 16)
_, err := rand.Read(b)
if err != nil {
fmt.Println("Error: ", err)
return
}
uuid = fmt.Sprintf("%X-%X-%X-%X-%X", b[0:4], b[4:6], b[6:8], b[8:10], b[10:])
return
}
func databaseConnection(ctx context.Context) *dynamodb.Client {
// Initialize a session that the SDK will use to load
// credentials from the shared credentials file ~/.aws/credentials
// and region from the shared configuration file ~/.aws/config.
// httpCustomClient := http.
// NewBuildableClient().
// WithTimeout(time.Second * 5).
// WithDialerOptions(func(d *net.Dialer) {
// d.KeepAlive = -1
// d.Timeout = time.Millisecond * 1000
// })
// cfg, err := config.LoadDefaultConfig(ctx, config.WithHTTPClient(httpCustomClient))
cfg, err := config.LoadDefaultConfig(ctx)
if err != nil {
log.Printf("unable to load SDK config, %v", err)
}
return dynamodb.NewFromConfig(cfg)
}
// Generic function to record a generic object into DDB
func putItem(ctx context.Context, svc *dynamodb.Client, table string, item interface{}) error {
av, err := attributevalue.MarshalMap(item)
if err != nil {
log.Printf("Got error marshalling new item: %s", err)
return err
}
tableName := table
input := &dynamodb.PutItemInput{
Item: av,
TableName: aws.String(tableName),
}
_, err = svc.PutItem(ctx, input)
if err != nil {
log.Printf("Got error calling PutItem: %s", err)
}
return err
}
func NewHashEntry(hashvalue string, filenamehash string) *HashEntry {
return &HashEntry{
Pk: hashvalue,
Sk: filenamehash,
}
}
type HashEntry struct {
Pk string `dynamodbav:"pk"`
Sk string `dynamodbav:"sk"`
}
Possible Solution
No response
Additional Information/Context
Error is reproducible in both Linux and MacOS
AWS Go SDK version used
Go 1.18 (and Go 1.16)
Compiler and Version used
go version go1.18 darwin/amd64
Operating System and version
MacOs 12.2.1 and Amazon Linux 2
Activity