Skip to content

Commit 320fa34

Browse files
authored
S3 backend: make actual use of (Options).ClientTimeout (#133)
* S3 backend: make actual use of (Options).ClientTimeout * flip bad isList boolean when we're actually listing * add test * avoid deferring in doStore * update ClientTimeout option doc * add tests and update S3 stream implementation * make sure saving the marker is included in the timing out operation * restore returning ErrClosed * share backend for tests * write marker in separate timeout context
1 parent 70bbf17 commit 320fa34

3 files changed

Lines changed: 169 additions & 56 deletions

File tree

backends/s3/s3.go

Lines changed: 64 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,14 @@ const (
4343
DefaultSecretsRefreshInterval = 15 * time.Second
4444
// DefaultDisableContentMd5 : disable sending the Content-MD5 header
4545
DefaultDisableContentMd5 = false
46+
// DefaultClientTimeout is the default value for [(Options).ClientTimeout].
47+
DefaultClientTimeout = 15 * time.Minute
4648
)
4749

50+
// ErrClientTimeout is returned when a Store, Read, Delete or List operation
51+
// reaches the amount set in the [(Options).ClientTimeout] option (default [DefaultClientTimeout]).
52+
var ErrClientTimeout = errors.New("S3 client timed out")
53+
4854
// Options describes the storage options for the S3 backend
4955
type Options struct {
5056
// AccessKey and SecretKey are statically defined here.
@@ -140,11 +146,8 @@ type Options struct {
140146
// wait for a TLS handshake. Default if unset: 10s
141147
TLSHandshakeTimeout time.Duration `yaml:"tls_handshake_timeout"`
142148

143-
// ClientTimeout specifies a time limit for requests made by this
144-
// HTTP Client. The timeout includes connection time, any
145-
// redirects, and reading the response body. The timer remains
146-
// running after Get, Head, Post, or Do return and will
147-
// interrupt reading of the Response.Body.
149+
// ClientTimeout specifies a time limit for operations on the S3 Backend.
150+
// If [UseUpdateMarker] is set, saving the marker is considered part of the operation.
148151
// Default if unset: 15m
149152
ClientTimeout time.Duration `yaml:"client_timeout"`
150153

@@ -243,23 +246,21 @@ func recordMinioDurationMetric(method string, start time.Time) {
243246
metricCallHistogram.WithLabelValues(method).Observe(elapsed.Seconds())
244247
}
245248

246-
func (b *Backend) doList(ctx context.Context, prefix string) (simpleblob.BlobList, error) {
247-
var blobs simpleblob.BlobList
248-
249+
func (b *Backend) doList(ctx context.Context, prefix string) (blobs simpleblob.BlobList, err error) {
250+
ctx, cancel := b.clientTimeoutContext(ctx)
251+
defer cancel()
249252
defer recordMinioDurationMetric("list", time.Now())
250253

251254
// Runes to strip from blob names for GlobalPrefix
252255
// This is fine, because we can trust the API to only return with the prefix.
253-
// TODO: trust but verify
254256
gpEndIndex := len(b.opt.GlobalPrefix)
255257

256-
objCh := b.client.ListObjects(ctx, b.opt.Bucket, minio.ListObjectsOptions{
258+
objIter := b.client.ListObjectsIter(ctx, b.opt.Bucket, minio.ListObjectsOptions{
257259
Prefix: prefix,
258260
Recursive: !b.opt.PrefixFolders && !b.opt.HideFolders,
259261
})
260-
for obj := range objCh {
261-
// Handle error returned by MinIO client
262-
if err := convertMinioError(obj.Err, true); err != nil {
262+
for obj := range objIter {
263+
if err = convertError(ctx, obj.Err, true); err != nil {
263264
metricCallErrors.WithLabelValues("list").Inc()
264265
metricCallErrorsType.WithLabelValues("list", errorToMetricsLabel(err)).Inc()
265266
return nil, err
@@ -295,6 +296,8 @@ func (b *Backend) doList(ctx context.Context, prefix string) (simpleblob.BlobLis
295296
// configured in b.
296297
func (b *Backend) Load(ctx context.Context, name string) ([]byte, error) {
297298
name = b.prependGlobalPrefix(name)
299+
ctx, cancel := b.clientTimeoutContext(ctx)
300+
defer cancel()
298301

299302
r, err := b.doLoadReader(ctx, name)
300303
if err != nil {
@@ -303,20 +306,20 @@ func (b *Backend) Load(ctx context.Context, name string) ([]byte, error) {
303306
defer r.Close()
304307

305308
p, err := io.ReadAll(r)
306-
if err = convertMinioError(err, false); err != nil {
309+
if err = convertError(ctx, err, false); err != nil {
307310
return nil, err
308311
}
309312
return p, nil
310313
}
311314

312-
func (b *Backend) doLoadReader(ctx context.Context, name string) (io.ReadCloser, error) {
315+
func (b *Backend) doLoadReader(ctx context.Context, name string) (*minio.Object, error) {
313316
metricCalls.WithLabelValues("load").Inc()
314317
metricLastCallTimestamp.WithLabelValues("load").SetToCurrentTime()
315318

316319
defer recordMinioDurationMetric("load", time.Now())
317320

318321
obj, err := b.client.GetObject(ctx, b.opt.Bucket, name, minio.GetObjectOptions{})
319-
if err = convertMinioError(err, false); err != nil {
322+
if err = convertError(ctx, err, false); err != nil {
320323
if !errors.Is(err, os.ErrNotExist) {
321324
metricCallErrors.WithLabelValues("load").Inc()
322325
metricCallErrorsType.WithLabelValues("load", errorToMetricsLabel(err)).Inc()
@@ -327,7 +330,7 @@ func (b *Backend) doLoadReader(ctx context.Context, name string) (io.ReadCloser,
327330
return nil, os.ErrNotExist
328331
}
329332
info, err := obj.Stat()
330-
if err = convertMinioError(err, false); err != nil {
333+
if err = convertError(ctx, err, false); err != nil {
331334
if !errors.Is(err, os.ErrNotExist) {
332335
metricCallErrors.WithLabelValues("load").Inc()
333336
metricCallErrorsType.WithLabelValues("load", errorToMetricsLabel(err)).Inc()
@@ -345,9 +348,9 @@ func (b *Backend) doLoadReader(ctx context.Context, name string) (io.ReadCloser,
345348
// Store sets the content of the object identified by name to the content
346349
// of data, in the S3 Bucket configured in b.
347350
func (b *Backend) Store(ctx context.Context, name string, data []byte) error {
348-
// Prepend global prefix
349351
name = b.prependGlobalPrefix(name)
350-
352+
ctx, cancel := b.clientTimeoutContext(ctx)
353+
defer cancel()
351354
info, err := b.doStore(ctx, name, data)
352355
if err != nil {
353356
return err
@@ -375,20 +378,20 @@ func (b *Backend) doStoreReader(ctx context.Context, name string, r io.Reader, s
375378

376379
// minio accepts size == -1, meaning the size is unknown.
377380
info, err := b.client.PutObject(ctx, b.opt.Bucket, name, r, size, putObjectOptions)
378-
err = convertMinioError(err, false)
379-
if err != nil {
381+
if err = convertError(ctx, err, false); err != nil {
380382
metricCallErrors.WithLabelValues("store").Inc()
381383
metricCallErrorsType.WithLabelValues("store", errorToMetricsLabel(err)).Inc()
384+
return info, err
382385
}
383-
return info, err
386+
return info, nil
384387
}
385388

386389
// Delete removes the object identified by name from the S3 Bucket
387390
// configured in b.
388391
func (b *Backend) Delete(ctx context.Context, name string) error {
389-
// Prepend global prefix
390392
name = b.prependGlobalPrefix(name)
391-
393+
ctx, cancel := b.clientTimeoutContext(ctx)
394+
defer cancel()
392395
if err := b.doDelete(ctx, name); err != nil {
393396
return err
394397
}
@@ -401,9 +404,42 @@ func (b *Backend) doDelete(ctx context.Context, name string) error {
401404
defer recordMinioDurationMetric("delete", time.Now())
402405

403406
err := b.client.RemoveObject(ctx, b.opt.Bucket, name, minio.RemoveObjectOptions{})
404-
if err = convertMinioError(err, false); err != nil {
407+
if err = convertError(ctx, err, false); err != nil {
405408
metricCallErrors.WithLabelValues("delete").Inc()
406409
metricCallErrorsType.WithLabelValues("delete", errorToMetricsLabel(err)).Inc()
410+
return err
411+
}
412+
return nil
413+
}
414+
415+
// clientTimeoutContext wraps [context.WithTimeoutCause] with the values and options that caracterise a client timeout.
416+
func (b *Backend) clientTimeoutContext(ctx context.Context) (context.Context, context.CancelFunc) {
417+
return context.WithTimeoutCause(ctx, getOpt(b.opt.ClientTimeout, DefaultClientTimeout), ErrClientTimeout)
418+
}
419+
420+
// convertError returns a more informative error from err.
421+
// It may be converted to a [minio.ErrorResponse],
422+
// or an [ErrClientTimeout] when ctx was issued by [(*Backend).contextWithTimeout].
423+
func convertError(ctx context.Context, err error, isList bool) error {
424+
if err == nil {
425+
return nil
426+
}
427+
// Try to get a more specific error.
428+
if ctx.Err() != nil {
429+
err = context.Cause(ctx)
430+
} else {
431+
errRes := minio.ToErrorResponse(err)
432+
switch errRes.Code {
433+
case "BucketAlreadyOwnedByYou":
434+
// This is the desired outcome if we work on already existing bucket.
435+
return nil
436+
case "NoSuchKey":
437+
// NoSuchKey in a list means the marker is missing.
438+
if !isList {
439+
// This error does not reflect an upstream issue, so no metrics.
440+
return fmt.Errorf("%w: %s", os.ErrNotExist, err.Error())
441+
}
442+
}
407443
}
408444
return err
409445
}
@@ -471,11 +507,6 @@ func New(ctx context.Context, opt Options) (*Backend, error) {
471507
TLSClientConfig: tlsConfig,
472508
ForceAttemptHTTP2: true,
473509
}
474-
hc := &http.Client{
475-
Transport: transport,
476-
// includes reading response body!
477-
Timeout: getOpt(opt.ClientTimeout, 15*time.Minute),
478-
}
479510

480511
// Some of the following calls require a short running context
481512
ctx, cancel := context.WithTimeout(ctx, opt.InitTimeout)
@@ -511,7 +542,7 @@ func New(ctx context.Context, opt Options) (*Backend, error) {
511542
cfg := &minio.Options{
512543
Creds: creds,
513544
Secure: useSSL,
514-
Transport: hc.Transport,
545+
Transport: transport,
515546
Region: opt.Region,
516547
}
517548

@@ -536,7 +567,7 @@ func New(ctx context.Context, opt Options) (*Backend, error) {
536567

537568
err := client.MakeBucket(ctx, opt.Bucket, minio.MakeBucketOptions{Region: opt.Region})
538569
if err != nil {
539-
if err := convertMinioError(err, false); err != nil {
570+
if err := convertError(ctx, err, false); err != nil {
540571
return nil, err
541572
}
542573
}
@@ -559,27 +590,6 @@ func (b *Backend) setGlobalPrefix(prefix string) {
559590
b.markerName = b.prependGlobalPrefix(UpdateMarkerFilename)
560591
}
561592

562-
// convertMinioError takes an error, possibly a minio.ErrorResponse
563-
// and turns it into a well known error when possible.
564-
// If error is not well known, it is returned as is.
565-
// If error is considered to be ignorable, nil is returned.
566-
func convertMinioError(err error, isList bool) error {
567-
if err == nil {
568-
return nil
569-
}
570-
errRes := minio.ToErrorResponse(err)
571-
// We need to differentiate between a missing bucket and a missing key,
572-
// because a missing bucket is the result of missing rights or a deletion from the outside.
573-
// Thus, we do not use `errRes.StatusCode` that would be == 404 for either case.
574-
if !isList && errRes.Code == "NoSuchKey" {
575-
return fmt.Errorf("%w: %s", os.ErrNotExist, err.Error())
576-
}
577-
if errRes.Code == "BucketAlreadyOwnedByYou" {
578-
return nil
579-
}
580-
return err
581-
}
582-
583593
// errorToMetricsLabel converts an error into a prometheus label.
584594
// If error is a NotExist error, "NotFound" is returned.
585595
// If error is a timeout, "Timeout" is returned.
@@ -596,6 +606,7 @@ func errorToMetricsLabel(err error) string {
596606
}
597607
var netError *net.OpError
598608
if errors.Is(err, context.DeadlineExceeded) ||
609+
errors.Is(err, ErrClientTimeout) ||
599610
(errors.As(err, &netError) && netError.Timeout()) {
600611
return "Timeout"
601612
}

backends/s3/s3_test.go

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package s3
22

33
import (
44
"context"
5+
"io"
56
"testing"
67
"time"
78

@@ -221,3 +222,75 @@ func TestHideFolders(t *testing.T) {
221222
assert.Equal(t, []string{"baz"}, ls.Names())
222223
})
223224
}
225+
226+
// TestClientTimeout makes sure the ClientTimeout option is taken into consideration.
227+
func TestClientTimeout(t *testing.T) {
228+
b := getBackend(t.Context(), t)
229+
t.Run("basic", func(t *testing.T) {
230+
b.opt.ClientTimeout = time.Microsecond
231+
ctx, _ := b.clientTimeoutContext(t.Context())
232+
time.Sleep(time.Second)
233+
require.ErrorIs(t, context.Cause(ctx), ErrClientTimeout)
234+
})
235+
t.Run("crud", func(t *testing.T) {
236+
b.opt.ClientTimeout = time.Microsecond
237+
ctx := t.Context()
238+
assert.ErrorIs(t, b.Store(ctx, "crudTest", []byte("123")), ErrClientTimeout)
239+
_, err := b.Load(ctx, "crudTest")
240+
assert.ErrorIs(t, err, ErrClientTimeout)
241+
assert.ErrorIs(t, b.Delete(ctx, "delete"), ErrClientTimeout)
242+
_, err = b.List(ctx, "")
243+
assert.ErrorIs(t, err, ErrClientTimeout)
244+
})
245+
t.Run("stream write", func(t *testing.T) {
246+
b.opt.ClientTimeout = time.Millisecond
247+
ctx := t.Context()
248+
w, err := b.NewWriter(ctx, "failOnWriteTest")
249+
require.NoError(t, err)
250+
time.Sleep(5 * time.Millisecond)
251+
_, err = w.Write([]byte("123"))
252+
require.ErrorIs(t, err, ErrClientTimeout)
253+
require.ErrorIs(t, w.Close(), ErrClientTimeout)
254+
})
255+
t.Run("stream read", func(t *testing.T) {
256+
b.opt.ClientTimeout = 0 // Reset
257+
ctx := t.Context()
258+
require.NoError(t, b.Store(ctx, "failOnReadTest", []byte("123"))) // avoid ErrNoExist
259+
260+
b.opt.ClientTimeout = time.Millisecond
261+
r, err := b.NewReader(ctx, "failOnReadTest")
262+
require.NoError(t, err)
263+
time.Sleep(5 * time.Millisecond)
264+
_, err = r.Read(make([]byte, 1))
265+
require.ErrorIs(t, err, ErrClientTimeout)
266+
require.ErrorIs(t, r.Close(), ErrClientTimeout)
267+
})
268+
}
269+
270+
// Ensure that in Minio Client, context cancellation aborts
271+
// an ongoing PutObject or GetObject operation.
272+
func TestMinioKeepsContext(t *testing.T) {
273+
t.Parallel()
274+
t.Run("putObject", func(t *testing.T) {
275+
t.Parallel()
276+
ctx, cancel := context.WithCancel(t.Context())
277+
b := getBackend(ctx, t)
278+
_, err := b.client.PutObject(ctx, b.opt.Bucket, "putObject", readerOnce(cancel), -1, minio.PutObjectOptions{})
279+
require.ErrorIs(t, err, context.Canceled)
280+
})
281+
t.Run("getObject", func(t *testing.T) {
282+
t.Parallel()
283+
ctx, cancel := context.WithCancel(t.Context())
284+
b := getBackend(ctx, t)
285+
require.NoError(t, b.Store(ctx, "getObject", []byte("123"))) // avoid ErrNoExist
286+
obj, err := b.client.GetObject(ctx, b.opt.Bucket, "getObject", minio.GetObjectOptions{})
287+
require.NoError(t, err)
288+
cancel()
289+
_, err = io.Copy(io.Discard, obj)
290+
require.ErrorIs(t, err, context.Canceled)
291+
})
292+
}
293+
294+
type readerOnce context.CancelFunc
295+
296+
func (f readerOnce) Read(p []byte) (int, error) { f(); return len(p), nil }

backends/s3/stream.go

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,44 @@ package s3
33
import (
44
"context"
55
"io"
6+
7+
"github.com/minio/minio-go/v7"
68
)
79

810
// NewReader satisfies StreamReader and provides a read streaming interface to
911
// a blob located on an S3 server.
1012
func (b *Backend) NewReader(ctx context.Context, name string) (io.ReadCloser, error) {
13+
ctx, cancel := b.clientTimeoutContext(ctx)
1114
name = b.prependGlobalPrefix(name)
1215
r, err := b.doLoadReader(ctx, name)
1316
if err != nil {
1417
return nil, err
1518
}
16-
return r, nil
19+
return &readWrapper{r, ctx, cancel}, nil
20+
}
21+
22+
// A readWrapper implements io.ReadCloser and allows keeping the context around.
23+
type readWrapper struct {
24+
obj *minio.Object
25+
ctx context.Context
26+
cancel context.CancelFunc
27+
}
28+
29+
func (r *readWrapper) Read(b []byte) (n int, err error) {
30+
n, err = r.obj.Read(b)
31+
if err == context.DeadlineExceeded {
32+
return n, context.Cause(r.ctx)
33+
}
34+
return n, err
35+
}
36+
37+
func (r *readWrapper) Close() (err error) {
38+
err = r.obj.Close()
39+
if err == nil {
40+
err = context.Cause(r.ctx)
41+
}
42+
r.cancel()
43+
return err
1744
}
1845

1946
// NewWriter satisfies StreamWriter and provides a write streaming interface to
@@ -24,9 +51,11 @@ func (b *Backend) NewWriter(ctx context.Context, name string) (io.WriteCloser, e
2451
pr, pw := io.Pipe()
2552
go func() {
2653
// This call returns when the pipe is closed, or when an error occurs.
27-
info, err := b.doStoreReader(ctx, name, pr, -1)
54+
ctx1, _ := b.clientTimeoutContext(ctx)
55+
info, err := b.doStoreReader(ctx1, name, pr, -1)
2856
if err == nil {
29-
_ = b.setMarker(ctx, name, info.ETag, false)
57+
ctx2, _ := b.clientTimeoutContext(ctx)
58+
_ = b.setMarker(ctx2, name, info.ETag, false)
3059
}
3160
_ = pr.CloseWithError(err)
3261
cancel()

0 commit comments

Comments
 (0)