Skip to content

Commit f4fb7e4

Browse files
committed
source-kinesis: don't try to make more than 5 requests per second
Kinesis has a 5 transactions per second limit for a single shard, and a GetRecords call counts against that. Previously the backoff & retry logic of the SDK was used exclusively for managing this, but that's not ideal since the retry behavior is rather aggressive, and any time you exceed the provisioned throughput the server will close the connection. This was actually causing a lot of connection churn and occasional DNS resolution failures. The fix here is to use a client-side rate limiter that respects the 5 TPS limit. Experimentally I have observed that this nearly eliminates the connection churn, and should improve the stability of the connector.
1 parent 02a20fe commit f4fb7e4

File tree

2 files changed

+8
-10
lines changed

2 files changed

+8
-10
lines changed

source-kinesis/capture.go

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
log "github.com/sirupsen/logrus"
1515
"golang.org/x/exp/maps"
1616
"golang.org/x/sync/errgroup"
17+
"golang.org/x/time/rate"
1718
)
1819

1920
type capture struct {
@@ -345,8 +346,13 @@ func (c *capture) readShard(
345346
break
346347
}
347348

349+
// Respect the kinesis 5 TPS rate limit.
350+
limiter := rate.NewLimiter(rate.Every(time.Second), 5)
348351
var didLogNoData bool
349352
for {
353+
if err := limiter.Wait(ctx); err != nil {
354+
return fmt.Errorf("waiting for rate limiter: %w", err)
355+
}
350356
res, err := c.client.GetRecords(ctx, &kinesis.GetRecordsInput{
351357
ShardIterator: iterator,
352358
StreamARN: &stream.arn,
@@ -375,19 +381,11 @@ func (c *capture) readShard(
375381
if *res.MillisBehindLatest != 0 && len(res.Records) == 0 {
376382
ll.WithField("MillisBehindLatest", *res.MillisBehindLatest).Info("shard is not caught up but returned no new data")
377383
didLogNoData = true
378-
}
379-
380-
if *res.MillisBehindLatest == 0 && len(res.Records) == 0 {
384+
} else if *res.MillisBehindLatest == 0 && len(res.Records) == 0 {
381385
if didLogNoData {
382386
ll.Info("shard is caught up")
383387
didLogNoData = false
384388
}
385-
select {
386-
case <-ctx.Done():
387-
return ctx.Err()
388-
case <-time.After(1 * time.Second):
389-
// Small delay to avoid hot-looping on a shard with no new data.
390-
}
391389
}
392390
}
393391
}

source-kinesis/connection.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,12 @@ import (
77
"slices"
88
"sync"
99

10+
"github.com/aws/aws-sdk-go-v2/aws"
1011
"github.com/aws/aws-sdk-go-v2/aws/ratelimit"
1112
"github.com/aws/aws-sdk-go-v2/aws/retry"
1213
"github.com/aws/aws-sdk-go-v2/service/kinesis"
1314
"golang.org/x/sync/errgroup"
1415

15-
"github.com/aws/aws-sdk-go-v2/aws"
1616
awsConfig "github.com/aws/aws-sdk-go-v2/config"
1717
"github.com/aws/aws-sdk-go-v2/credentials"
1818
)

0 commit comments

Comments
 (0)