Skip to content

Commit 2bf6847

Browse files
committed
Change to using retry for mirror requests
Signed-off-by: Philip Laine <philip.laine@gmail.com>
1 parent 28a9316 commit 2bf6847

File tree

2 files changed

+85
-80
lines changed

2 files changed

+85
-80
lines changed

pkg/registry/registry.go

Lines changed: 81 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"sync/atomic"
1414
"time"
1515

16+
"github.com/avast/retry-go/v4"
1617
"github.com/go-logr/logr"
1718

1819
"github.com/spegel-org/spegel/internal/option"
@@ -257,9 +258,6 @@ func (r *Registry) mirrorHandler(rw httpx.ResponseWriter, req *http.Request, dis
257258
oci.DistributionKindManifest: oci.ErrCodeManifestUnknown,
258259
}[dist.Kind]
259260

260-
// Resume range for when blobs fail midway through copying.
261-
var resumeRng *httpx.Range
262-
263261
lookupCtx, lookupCancel := context.WithTimeout(req.Context(), r.resolveTimeout)
264262
defer lookupCancel()
265263
balancer, err := r.router.Lookup(lookupCtx, dist.Identifier(), r.resolveRetries)
@@ -268,12 +266,23 @@ func (r *Registry) mirrorHandler(rw httpx.ResponseWriter, req *http.Request, dis
268266
rw.WriteError(http.StatusNotFound, errors.Join(respErr, err))
269267
return
270268
}
271-
for range r.resolveRetries {
269+
270+
// Resume range for when blobs fail midway through copying.
271+
var resumeRng *httpx.Range
272+
273+
retryOpts := []retry.Option{
274+
retry.Context(req.Context()),
275+
retry.Attempts(uint(r.resolveRetries)),
276+
retry.DelayType(retry.FixedDelay),
277+
retry.Delay(0),
278+
retry.OnRetry(func(attempt uint, err error) {
279+
log.Error(err, "retrying mirror request", "attempt", attempt)
280+
}),
281+
}
282+
err = retry.Do(func() error {
272283
peer, err := balancer.Next()
273284
if err != nil {
274-
respErr := oci.NewDistributionError(errCode, fmt.Sprintf("could not find peer for %s", dist.Identifier()), mirrorDetails)
275-
rw.WriteError(http.StatusNotFound, errors.Join(respErr, lookupCtx.Err()))
276-
return
285+
return retry.Unrecoverable(err)
277286
}
278287

279288
mirrorDetails.Attempts += 1
@@ -297,86 +306,82 @@ func (r *Registry) mirrorHandler(rw httpx.ResponseWriter, req *http.Request, dis
297306
fetchOpts = append(fetchOpts, oci.WithFetchHeader(httpx.HeaderRange, h))
298307
}
299308

300-
done := func() bool {
301-
log := log.WithValues("attempt", mirrorDetails.Attempts, "path", req.URL.Path, "mirror", peer)
302-
303-
fetchCtx := req.Context()
304-
if req.Method == http.MethodHead {
305-
var reqCancel context.CancelFunc
306-
fetchCtx, reqCancel = context.WithTimeout(req.Context(), 1*time.Second)
307-
defer reqCancel()
308-
} else if req.Method == http.MethodGet && dist.Kind == oci.DistributionKindManifest {
309-
var reqCancel context.CancelFunc
310-
fetchCtx, reqCancel = context.WithTimeout(req.Context(), 2*time.Second)
311-
defer reqCancel()
312-
}
313-
314-
rc, desc, err := r.ociClient.Fetch(fetchCtx, req.Method, dist, fetchOpts...)
315-
if err != nil {
316-
log.Error(err, "request to mirror failed, retrying with next")
317-
balancer.Remove(peer)
318-
return false
319-
}
320-
defer httpx.DrainAndClose(rc)
309+
fetchCtx := req.Context()
310+
if req.Method == http.MethodHead {
311+
var reqCancel context.CancelFunc
312+
fetchCtx, reqCancel = context.WithTimeout(req.Context(), 1*time.Second)
313+
defer reqCancel()
314+
} else if req.Method == http.MethodGet && dist.Kind == oci.DistributionKindManifest {
315+
var reqCancel context.CancelFunc
316+
fetchCtx, reqCancel = context.WithTimeout(req.Context(), 2*time.Second)
317+
defer reqCancel()
318+
}
321319

322-
if !rw.HeadersWritten() {
323-
oci.WriteDescriptorToHeader(desc, rw.Header())
320+
rc, desc, err := r.ociClient.Fetch(fetchCtx, req.Method, dist, fetchOpts...)
321+
if err != nil {
322+
balancer.Remove(peer)
323+
return fmt.Errorf("request to mirror failed: %w", err)
324+
}
325+
defer httpx.DrainAndClose(rc)
326+
327+
if !rw.HeadersWritten() {
328+
oci.WriteDescriptorToHeader(desc, rw.Header())
329+
330+
switch dist.Kind {
331+
case oci.DistributionKindManifest:
332+
rw.WriteHeader(http.StatusOK)
333+
case oci.DistributionKindBlob:
334+
rng, err := httpx.ParseRangeHeader(req.Header, desc.Size)
335+
if err != nil {
336+
return retry.Unrecoverable(err)
337+
}
338+
resumeRng = rng
324339

325-
switch dist.Kind {
326-
case oci.DistributionKindManifest:
340+
rw.Header().Set(httpx.HeaderAcceptRanges, httpx.RangeUnit)
341+
if rng == nil {
327342
rw.WriteHeader(http.StatusOK)
328-
case oci.DistributionKindBlob:
329-
rng, err := httpx.ParseRangeHeader(req.Header, desc.Size)
330-
if err != nil {
331-
rw.WriteError(http.StatusBadRequest, err)
332-
return true
333-
}
334-
resumeRng = rng
335-
336-
rw.Header().Set(httpx.HeaderAcceptRanges, httpx.RangeUnit)
337-
if rng == nil {
338-
rw.WriteHeader(http.StatusOK)
339-
} else {
340-
rw.Header().Set(httpx.HeaderContentType, httpx.ContentTypeBinary)
341-
rw.Header().Set(httpx.HeaderContentRange, httpx.ContentRangeFromRange(*rng, desc.Size).String())
342-
rw.Header().Set(httpx.HeaderContentLength, strconv.FormatInt(rng.Size(), 10))
343-
rw.WriteHeader(http.StatusPartialContent)
344-
}
343+
} else {
344+
rw.Header().Set(httpx.HeaderContentType, httpx.ContentTypeBinary)
345+
rw.Header().Set(httpx.HeaderContentRange, httpx.ContentRangeFromRange(*rng, desc.Size).String())
346+
rw.Header().Set(httpx.HeaderContentLength, strconv.FormatInt(rng.Size(), 10))
347+
rw.WriteHeader(http.StatusPartialContent)
345348
}
346349
}
347-
if req.Method == http.MethodHead {
348-
return true
349-
}
350+
}
351+
if req.Method == http.MethodHead {
352+
return nil
353+
}
350354

351-
//nolint: errcheck // Ignore
352-
buf := r.bufferPool.Get().(*[]byte)
353-
defer r.bufferPool.Put(buf)
354-
n, err := io.CopyBuffer(rw, rc, *buf)
355-
if err != nil {
356-
switch dist.Kind {
357-
case oci.DistributionKindManifest:
358-
log.Error(err, "copying of manifest data failed")
359-
return true
360-
case oci.DistributionKindBlob:
361-
if resumeRng == nil {
362-
resumeRng = &httpx.Range{
363-
End: desc.Size - 1,
364-
}
355+
//nolint: errcheck // Ignore
356+
buf := r.bufferPool.Get().(*[]byte)
357+
defer r.bufferPool.Put(buf)
358+
n, err := io.CopyBuffer(rw, rc, *buf)
359+
if err != nil {
360+
switch dist.Kind {
361+
case oci.DistributionKindManifest:
362+
return retry.Unrecoverable(fmt.Errorf("copying of manifest data failed: %w", err))
363+
case oci.DistributionKindBlob:
364+
if resumeRng == nil {
365+
resumeRng = &httpx.Range{
366+
End: desc.Size - 1,
365367
}
366-
resumeRng.Start += n
367-
log.Error(err, "copying of blob data failed, retrying with offset")
368-
return false
369368
}
369+
resumeRng.Start += n
370+
return fmt.Errorf("copying of blob data failed: %w", err)
370371
}
371-
return true
372-
}()
373-
if done {
374-
return
375372
}
373+
return nil
374+
}, retryOpts...)
375+
if err != nil {
376+
if !rw.HeadersWritten() {
377+
respErr := oci.NewDistributionError(errCode, fmt.Sprintf("all request retries exhausted for %s", dist.Identifier()), mirrorDetails)
378+
if mirrorDetails.Attempts == 0 {
379+
respErr = oci.NewDistributionError(errCode, fmt.Sprintf("could not find peer for %s", dist.Identifier()), mirrorDetails)
380+
}
381+
rw.WriteError(http.StatusNotFound, errors.Join(respErr, err))
382+
}
383+
return
376384
}
377-
378-
respErr := oci.NewDistributionError(errCode, fmt.Sprintf("all request retries exhausted for %s", dist.Identifier()), mirrorDetails)
379-
rw.WriteError(http.StatusNotFound, errors.Join(respErr, err))
380385
}
381386

382387
func (r *Registry) manifestHandler(rw httpx.ResponseWriter, req *http.Request, dist oci.DistributionPath) {

pkg/registry/registry_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -304,10 +304,10 @@ func TestRegistryHandler(t *testing.T) {
304304
key: "sha256:ef3a5e9aba91d942f5f888b4e855e785395387aab0f122a6e49d0eaea215e98d",
305305
distributionKind: oci.DistributionKindBlob,
306306
expectedStatus: http.StatusNotFound,
307-
expectedBody: []byte(`{"errors":[{"code":"BLOB_UNKNOWN","detail":{"attempts":1},"message":"could not find peer for sha256:ef3a5e9aba91d942f5f888b4e855e785395387aab0f122a6e49d0eaea215e98d"}]}`),
307+
expectedBody: []byte(`{"errors":[{"code":"BLOB_UNKNOWN","detail":{"attempts":1},"message":"all request retries exhausted for sha256:ef3a5e9aba91d942f5f888b4e855e785395387aab0f122a6e49d0eaea215e98d"}]}`),
308308
expectedHeaders: http.Header{
309309
httpx.HeaderContentType: {httpx.ContentTypeJSON},
310-
httpx.HeaderContentLength: {"168"},
310+
httpx.HeaderContentLength: {"178"},
311311
},
312312
},
313313
{
@@ -327,10 +327,10 @@ func TestRegistryHandler(t *testing.T) {
327327
key: "sha256:ac73670af3abed54ac6fb4695131f4099be9fbe39d6076c5d0264a6bbdae9d83",
328328
distributionKind: oci.DistributionKindManifest,
329329
expectedStatus: http.StatusNotFound,
330-
expectedBody: []byte(`{"errors":[{"code":"MANIFEST_UNKNOWN","detail":{"attempts":1},"message":"could not find peer for sha256:ac73670af3abed54ac6fb4695131f4099be9fbe39d6076c5d0264a6bbdae9d83"}]}`),
330+
expectedBody: []byte(`{"errors":[{"code":"MANIFEST_UNKNOWN","detail":{"attempts":1},"message":"all request retries exhausted for sha256:ac73670af3abed54ac6fb4695131f4099be9fbe39d6076c5d0264a6bbdae9d83"}]}`),
331331
expectedHeaders: http.Header{
332332
httpx.HeaderContentType: {httpx.ContentTypeJSON},
333-
httpx.HeaderContentLength: {"172"},
333+
httpx.HeaderContentLength: {"182"},
334334
},
335335
},
336336
{

0 commit comments

Comments
 (0)