Skip to content

Commit 9c7f881

Browse files
authored
[Fix] GRPC rate limit and add exponential backoff for CD (#2125)
* update rate limit to use new method Signed-off-by: pxp928 <[email protected]> * udpate grpc rate limit package Signed-off-by: pxp928 <[email protected]> * udpate grpc rate limit unit tests Signed-off-by: pxp928 <[email protected]> * update deps.dev unit test Signed-off-by: pxp928 <[email protected]> * update deps.dev to check already queried map Signed-off-by: pxp928 <[email protected]> * update grpc rate limit unit test Signed-off-by: pxp928 <[email protected]> * add expoential backoff for CD Signed-off-by: pxp928 <[email protected]> * update deps.dev unit test Signed-off-by: pxp928 <[email protected]> --------- Signed-off-by: pxp928 <[email protected]>
1 parent 2b018e2 commit 9c7f881

File tree

13 files changed

+249
-405
lines changed

13 files changed

+249
-405
lines changed

cmd/guaccollect/cmd/deps_dev.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ you have access to read and write to the respective blob store.`,
9898
os.Exit(1)
9999
}
100100
// Register collector
101-
depsDevCollector, err := deps_dev.NewDepsCollector(ctx, opts.dataSource, opts.poll, opts.retrieveDependencies, 30*time.Second, opts.addedLatency, nil)
101+
depsDevCollector, err := deps_dev.NewDepsCollector(ctx, opts.dataSource, opts.poll, opts.retrieveDependencies, 30*time.Second, opts.addedLatency)
102102
if err != nil {
103103
logger.Fatalf("unable to register oci collector: %v", err)
104104
}

cmd/guaccollect/cmd/osv.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,6 @@ func initializeNATsandCertifier(ctx context.Context, blobAddr, pubsubAddr string
220220
// Collect
221221
errHandler := func(err error) bool {
222222
if err == nil {
223-
logger.Info("certifier ended gracefully")
224223
return true
225224
}
226225
logger.Errorf("certifier ended with error: %v", err)

cmd/guacone/cmd/deps_dev.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ var depsDevCmd = &cobra.Command{
7272
transport := cli.HTTPHeaderTransport(ctx, opts.headerFile, http.DefaultTransport)
7373

7474
// Register collector
75-
depsDevCollector, err := deps_dev.NewDepsCollector(ctx, opts.dataSource, opts.poll, opts.retrieveDependencies, 30*time.Second, opts.addedLatency, nil)
75+
depsDevCollector, err := deps_dev.NewDepsCollector(ctx, opts.dataSource, opts.poll, opts.retrieveDependencies, 30*time.Second, opts.addedLatency)
7676
if err != nil {
7777
logger.Fatalf("unable to register depsdev collector: %v", err)
7878
}

cmd/guacone/cmd/osv.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,6 @@ var osvCmd = &cobra.Command{
181181
// Collect
182182
errHandler := func(err error) bool {
183183
if err == nil {
184-
logger.Info("certifier ended gracefully")
185184
return true
186185
}
187186
logger.Errorf("certifier ended with error: %v", err)

cmd/guacone/cmd/scorecard.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,6 @@ var scorecardCmd = &cobra.Command{
143143
// Collect
144144
errHandler := func(err error) bool {
145145
if err == nil {
146-
logger.Info("certifier ended gracefully")
147146
return true
148147
}
149148
logger.Errorf("certifier ended with error: %v", err)

go.mod

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ require (
9696
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.26.5 // indirect
9797
github.com/aws/aws-sdk-go-v2/service/sts v1.30.5 // indirect
9898
github.com/aws/smithy-go v1.20.4 // indirect
99+
github.com/benbjohnson/clock v1.3.0 // indirect
99100
github.com/beorn7/perks v1.0.1 // indirect
100101
github.com/bombsimon/logrusr/v2 v2.0.1 // indirect
101102
github.com/bradleyfalzon/ghinstallation/v2 v2.8.0 // indirect
@@ -332,6 +333,7 @@ require (
332333
github.com/tikv/client-go/v2 v2.0.8-0.20231115083414-7c96dfd783fb
333334
github.com/vektah/gqlparser/v2 v2.5.16
334335
go.uber.org/mock v0.4.0
336+
go.uber.org/ratelimit v0.3.1
335337
gocloud.dev v0.39.0
336338
gocloud.dev/pubsub/kafkapubsub v0.37.0
337339
gocloud.dev/pubsub/rabbitpubsub v0.39.0

go.sum

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,8 @@ github.com/aws/aws-sdk-go-v2/service/sts v1.30.5/go.mod h1:vmSqFK+BVIwVpDAGZB3Co
165165
github.com/aws/smithy-go v1.20.4 h1:2HK1zBdPgRbjFOHlfeQZfpC4r72MOb9bZkiFwggKO+4=
166166
github.com/aws/smithy-go v1.20.4/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg=
167167
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
168+
github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A=
169+
github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
168170
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
169171
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
170172
github.com/bmatcuk/doublestar v1.1.1/go.mod h1:UD6OnuiIn0yFxxA2le/rnRU1G4RaI4UvFv1sNto9p6w=
@@ -872,6 +874,8 @@ go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9i
872874
go.uber.org/multierr v1.7.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak=
873875
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
874876
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
877+
go.uber.org/ratelimit v0.3.1 h1:K4qVE+byfv/B3tC+4nYWP7v/6SimcO7HzHekoMNBma0=
878+
go.uber.org/ratelimit v0.3.1/go.mod h1:6euWsTB6U/Nb3X++xEUXA8ciPJvr19Q/0h1+oDcJhRk=
875879
go.uber.org/zap v1.18.1/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI=
876880
go.uber.org/zap v1.19.0/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI=
877881
go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=

pkg/certifier/certify/certify.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ func Certify(ctx context.Context, query certifier.QueryComponents, emitter certi
7878
}
7979

8080
go func() {
81-
wrappedOperation := retryWithBackoff(backoffOperation)
81+
wrappedOperation := retryWithBackoff(ctx, backoffOperation)
8282
errChan <- wrappedOperation()
8383
}()
8484

@@ -191,7 +191,8 @@ func generateDocuments(ctx context.Context, collectedComponent interface{}, emit
191191
type retryFunc func() error
192192

193193
// retryWithBackoff retries the given operation with exponential backoff
194-
func retryWithBackoff(operation retryFunc) retryFunc {
194+
func retryWithBackoff(ctx context.Context, operation retryFunc) retryFunc {
195+
logger := logging.FromContext(ctx)
195196
return func() error {
196197
var lastError error
197198
var urlErr *url.Error
@@ -203,7 +204,7 @@ func retryWithBackoff(operation retryFunc) retryFunc {
203204
}
204205
if errors.As(err, &urlErr) {
205206
secRetry := math.Pow(2, float64(i))
206-
fmt.Printf("Retrying operation in %f seconds\n", secRetry)
207+
logger.Infof("Retrying operation in %f seconds\n", secRetry)
207208
delay := time.Duration(secRetry) * baseDelay
208209
time.Sleep(delay)
209210
lastError = err

pkg/certifier/clearlydefined/clearlydefined.go

Lines changed: 45 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"errors"
2222
"fmt"
2323
"io"
24+
"math"
2425
"net/http"
2526
"strings"
2627
"time"
@@ -49,6 +50,8 @@ var rateLimitInterval = 30 * time.Second
4950
const (
5051
PRODUCER_ID string = "guacsec/guac"
5152
CDCollector string = "clearlydefined"
53+
maxRetries = 10
54+
baseDelay = 1 * time.Second
5255
)
5356

5457
var ErrComponentTypeMismatch error = errors.New("rootComponent type is not []*root_package.PackageNode")
@@ -72,7 +75,7 @@ func NewClearlyDefinedHTTPClient(limiter *rate.Limiter) *http.Client {
7275
}
7376

7477
// getDefinitions uses the coordinates to query clearly defined for license definition
75-
func getDefinitions(_ context.Context, client *http.Client, purls []string, coordinates []string) (map[string]*attestation.Definition, error) {
78+
func getDefinitions(ctx context.Context, client *http.Client, purls []string, coordinates []string) (map[string]*attestation.Definition, error) {
7679

7780
coordinateToPurl := make(map[string]string)
7881
for i, purl := range purls {
@@ -87,30 +90,17 @@ func getDefinitions(_ context.Context, client *http.Client, purls []string, coor
8790
return nil, fmt.Errorf("error marshalling coordinates: %w", err)
8891
}
8992

90-
// retries if a 429 is encountered. This could occur even with the rate limiting
91-
// as multiple services may be hitting it.
92-
var resp *http.Response
93-
maxRetries := 5
94-
for retries := 0; retries < maxRetries; retries++ {
95-
// Make the POST request
96-
resp, err = client.Post("https://api.clearlydefined.io/definitions", "application/json", bytes.NewBuffer(jsonData))
97-
if err != nil {
98-
return nil, fmt.Errorf("error making POST request: %w", err)
99-
}
100-
defer resp.Body.Close()
101-
102-
if resp.StatusCode != http.StatusOK {
103-
if resp.StatusCode != http.StatusTooManyRequests {
104-
// otherwise return an error
105-
return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
106-
}
107-
} else {
108-
break
109-
}
93+
// retries if a 429 is encountered
94+
backoffOperation := func() (*http.Response, error) {
95+
return client.Post("https://api.clearlydefined.io/definitions", "application/json", bytes.NewBuffer(jsonData))
96+
}
11097

111-
// Retry after a delay if status code is 429
112-
time.Sleep(5 * time.Second)
98+
wrappedOperation := retryWithBackoff(ctx, backoffOperation)
99+
resp, err := wrappedOperation()
100+
if err != nil {
101+
return nil, fmt.Errorf("clearly defined POST request failed with error: %w", err)
113102
}
103+
defer resp.Body.Close()
114104

115105
body, err := io.ReadAll(resp.Body)
116106
if err != nil {
@@ -300,3 +290,35 @@ func createAttestation(purl string, definition *attestation.Definition, currentT
300290

301291
return attestation
302292
}
293+
294+
// retryFunc is a function that can be retried
295+
type retryFunc func() (*http.Response, error)
296+
297+
// retryWithBackoff retries the given operation with exponential backoff
298+
func retryWithBackoff(ctx context.Context, operation retryFunc) retryFunc {
299+
logger := logging.FromContext(ctx)
300+
return func() (*http.Response, error) {
301+
var collectedResp *http.Response
302+
for i := 0; i < maxRetries; i++ {
303+
resp, err := operation()
304+
if err != nil {
305+
return nil, fmt.Errorf("error making POST request: %w", err)
306+
}
307+
if resp.StatusCode != http.StatusOK {
308+
if resp.StatusCode != http.StatusTooManyRequests {
309+
// otherwise return an error
310+
return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
311+
} else {
312+
secRetry := math.Pow(2, float64(i))
313+
logger.Infof("Retrying operation in %f seconds\n", secRetry)
314+
delay := time.Duration(secRetry) * baseDelay
315+
time.Sleep(delay)
316+
}
317+
} else {
318+
collectedResp = resp
319+
break
320+
}
321+
}
322+
return collectedResp, nil
323+
}
324+
}

pkg/clients/grpcRateLimiter.go

Lines changed: 23 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -18,61 +18,39 @@ package clients
1818
import (
1919
"context"
2020

21+
grpc_ratelimit "github.com/grpc-ecosystem/go-grpc-middleware/ratelimit"
2122
"github.com/guacsec/guac/pkg/logging"
22-
23-
"golang.org/x/time/rate"
23+
"go.uber.org/ratelimit"
2424
"google.golang.org/grpc"
25+
"google.golang.org/grpc/codes"
26+
"google.golang.org/grpc/status"
2527
)
2628

27-
// RateLimitedClient is a wrapper around grpc.ClientConn that adds rate limiting
28-
// functionality to gRPC calls. It uses a rate.Limiter to control the rate of
29-
// outgoing requests.
30-
type RateLimitedClient struct {
31-
ClientConn *grpc.ClientConn
32-
Limiter *rate.Limiter
29+
type limiter struct {
30+
ratelimit.Limiter
3331
}
3432

35-
// Invoke performs a gRPC call on the wrapped grpc.ClientConn, applying
36-
// rate limiting before making the call. If the rate limit is exceeded, it waits
37-
// until the limiter allows the request.
38-
func (c *RateLimitedClient) Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...grpc.CallOption) error {
39-
logger := logging.FromContext(ctx)
40-
if !c.Limiter.Allow() {
41-
logger.Debugf("Rate limit exceeded for method: %s", method)
42-
if err := c.Limiter.Wait(ctx); err != nil {
43-
return err
44-
}
45-
}
46-
return c.ClientConn.Invoke(ctx, method, args, reply, opts...)
33+
// Limit blocks to ensure that RPS is met
34+
func (l *limiter) Limit() bool {
35+
l.Take()
36+
return false
4737
}
4838

49-
// NewStream creates a new stream on the wrapped grpc.ClientConn, applying rate
50-
// limiting before creating the stream. If the rate limit is exceeded, it waits
51-
// until the limiter allows the request.
52-
func (c *RateLimitedClient) NewStream(ctx context.Context, desc *grpc.StreamDesc, method string, opts ...grpc.CallOption) (grpc.ClientStream, error) {
53-
logger := logging.FromContext(ctx)
54-
if !c.Limiter.Allow() {
55-
logger.Debugf("Rate limit exceeded for method: %s", method)
56-
if err := c.Limiter.Wait(ctx); err != nil {
57-
return nil, err
58-
}
39+
// NewLimiter return new go-grpc Limiter, specified the number of requests you want to limit as a counts per second.
40+
func NewLimiter(count int) grpc_ratelimit.Limiter {
41+
return &limiter{
42+
Limiter: ratelimit.New(count),
5943
}
60-
return c.ClientConn.NewStream(ctx, desc, method, opts...)
6144
}
6245

63-
// NewRateLimitedClient creates a new RateLimitedClient that wraps the provided
64-
// grpc.ClientConn and uses the provided rate.Limiter to control the rate of
65-
// outgoing requests. It returns a grpc.ClientConnInterface that can be used
66-
// wherever a grpc.ClientConn is expected.
67-
//
68-
// Parameters:
69-
// - conn: The underlying grpc.ClientConn to wrap. This is typically an instance
70-
// of grpc.ClientConn created using grpc.NewClient or any custom implementation of
71-
// grpc.ClientConnInterface.
72-
// - limiter: The rate.Limiter to use for controlling the rate of outgoing requests.
73-
func NewRateLimitedClient(conn *grpc.ClientConn, limiter *rate.Limiter) grpc.ClientConnInterface {
74-
return &RateLimitedClient{
75-
ClientConn: conn,
76-
Limiter: limiter,
46+
// UnaryClientInterceptor return server unary interceptor that limit requests.
47+
func UnaryClientInterceptor(limiter grpc_ratelimit.Limiter) grpc.UnaryClientInterceptor {
48+
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
49+
logger := logging.FromContext(ctx)
50+
if limiter.Limit() {
51+
logger.Infof("Rate limit exceeded for method: %s", method)
52+
return status.Errorf(codes.ResourceExhausted, "%s have been rejected by rate limiting.", method)
53+
}
54+
return invoker(ctx, method, req, reply, cc, opts...)
7755
}
7856
}

0 commit comments

Comments
 (0)