Skip to content

Commit 2b018e2

Browse files
authored
retry on network error for certifiers (#2122)
* retry on network error for certifiers Signed-off-by: pxp928 <[email protected]> * exit after the retry fails Signed-off-by: pxp928 <[email protected]> * return err if not a url error Signed-off-by: pxp928 <[email protected]> --------- Signed-off-by: pxp928 <[email protected]>
1 parent 0c72777 commit 2b018e2

File tree

2 files changed

+61
-9
lines changed

2 files changed

+61
-9
lines changed

cmd/guaccollect/cmd/osv.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -224,8 +224,8 @@ func initializeNATsandCertifier(ctx context.Context, blobAddr, pubsubAddr string
224224
return true
225225
}
226226
logger.Errorf("certifier ended with error: %v", err)
227-
// Continue to emit any documents still in the docChan
228-
return true
227+
// exit the loop but drain the channel first
228+
return false
229229
}
230230

231231
ctx, cf := context.WithCancel(ctx)

pkg/certifier/certify/certify.go

Lines changed: 59 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,10 @@ package certify
1717

1818
import (
1919
"context"
20+
"errors"
2021
"fmt"
22+
"math"
23+
"net/url"
2124
"time"
2225

2326
"github.com/guacsec/guac/pkg/certifier"
@@ -28,6 +31,8 @@ import (
2831

2932
const (
3033
BufferChannelSize int = 1000
34+
maxRetries = 10
35+
baseDelay = 1 * time.Second
3136
)
3237

3338
var (
@@ -62,8 +67,19 @@ func Certify(ctx context.Context, query certifier.QueryComponents, emitter certi
6267
// logger
6368
logger := logging.FromContext(ctx)
6469

70+
// define the GetComponents operation to be retried on failure (if gql server is not up)
71+
backoffOperation := func() error {
72+
err := query.GetComponents(ctx, compChan)
73+
if err != nil {
74+
logger.Errorf("GetComponents failed with error: %v", err)
75+
return fmt.Errorf("GetComponents failed with error: %w", err)
76+
}
77+
return nil
78+
}
79+
6580
go func() {
66-
errChan <- query.GetComponents(ctx, compChan)
81+
wrappedOperation := retryWithBackoff(backoffOperation)
82+
errChan <- wrappedOperation()
6783
}()
6884

6985
componentsCaptured := false
@@ -75,19 +91,17 @@ func Certify(ctx context.Context, query certifier.QueryComponents, emitter certi
7591
}
7692
case err := <-errChan:
7793
if !handleErr(err) {
94+
// drain channel before exiting
95+
drainComponentChannel(compChan, ctx, emitter, handleErr)
7896
return err
7997
}
8098
componentsCaptured = true
8199
case <-ctx.Done():
82100
componentsCaptured = true
83101
}
84102
}
85-
for len(compChan) > 0 {
86-
d := <-compChan
87-
if err := generateDocuments(ctx, d, emitter, handleErr); err != nil {
88-
logger.Errorf("generate certifier documents error: %v", err)
89-
}
90-
}
103+
// drain channel before exiting
104+
drainComponentChannel(compChan, ctx, emitter, handleErr)
91105
return nil
92106
}
93107

@@ -116,6 +130,16 @@ func Certify(ctx context.Context, query certifier.QueryComponents, emitter certi
116130
return nil
117131
}
118132

133+
func drainComponentChannel(compChan chan interface{}, ctx context.Context, emitter certifier.Emitter, handleErr certifier.ErrHandler) {
134+
logger := logging.FromContext(ctx)
135+
for len(compChan) > 0 {
136+
d := <-compChan
137+
if err := generateDocuments(ctx, d, emitter, handleErr); err != nil {
138+
logger.Errorf("generate certifier documents error: %v", err)
139+
}
140+
}
141+
}
142+
119143
// generateDocuments runs CertifyVulns as a goroutine to scan and generates attestations that
120144
// are emitted as processor documents to be ingested
121145
func generateDocuments(ctx context.Context, collectedComponent interface{}, emitter certifier.Emitter, handleErr certifier.ErrHandler) error {
@@ -162,3 +186,31 @@ func generateDocuments(ctx context.Context, collectedComponent interface{}, emit
162186
}
163187
return nil
164188
}
189+
190+
// retryFunc is a function that can be retried
191+
type retryFunc func() error
192+
193+
// retryWithBackoff retries the given operation with exponential backoff
194+
func retryWithBackoff(operation retryFunc) retryFunc {
195+
return func() error {
196+
var lastError error
197+
var urlErr *url.Error
198+
199+
for i := 0; i < maxRetries; i++ {
200+
err := operation()
201+
if err == nil {
202+
return nil
203+
}
204+
if errors.As(err, &urlErr) {
205+
secRetry := math.Pow(2, float64(i))
206+
fmt.Printf("Retrying operation in %f seconds\n", secRetry)
207+
delay := time.Duration(secRetry) * baseDelay
208+
time.Sleep(delay)
209+
lastError = err
210+
} else {
211+
return err
212+
}
213+
}
214+
return lastError
215+
}
216+
}

0 commit comments

Comments
 (0)