Skip to content

Commit 5ef35aa

Browse files
authored
fix(v2): segment upload timeout (#4097)
1 parent 3ff7247 commit 5ef35aa

File tree

4 files changed

+36
-27
lines changed

4 files changed

+36
-27
lines changed

Diff for: pkg/experiment/ingester/client/client.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -63,12 +63,11 @@ func isClientError(err error) bool {
6363
switch status.Code(err) {
6464
case codes.InvalidArgument,
6565
codes.Canceled,
66-
codes.DeadlineExceeded,
6766
codes.PermissionDenied,
6867
codes.Unauthenticated:
6968
return true
7069
default:
71-
return errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)
70+
return errors.Is(err, context.Canceled)
7271
}
7372
}
7473

Diff for: pkg/experiment/ingester/client/client_test.go

+17-5
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,9 @@ type segwriterClientSuite struct {
6767
}
6868

6969
func (s *segwriterClientSuite) SetupTest() {
70-
s.listener = bufconn.Listen(256 << 10)
71-
s.dialer = func(context.Context, string) (net.Conn, error) { return s.listener.Dial() }
70+
listener := bufconn.Listen(256 << 10)
71+
s.listener = listener
72+
s.dialer = func(context.Context, string) (net.Conn, error) { return listener.Dial() }
7273
s.server = grpc.NewServer()
7374
s.service = new(segwriterServerMock)
7475
segmentwriterv1.RegisterSegmentWriterServiceServer(s.server, s.service)
@@ -93,7 +94,7 @@ func (s *segwriterClientSuite) SetupTest() {
9394
s.done = make(chan struct{})
9495
go func() {
9596
defer close(s.done)
96-
s.Require().NoError(s.server.Serve(s.listener))
97+
s.Require().NoError(s.server.Serve(listener))
9798
}()
9899

99100
// Wait for the server
@@ -163,13 +164,24 @@ func (s *segwriterClientSuite) Test_Push_ClientError_Cancellation() {
163164
s.Assert().Equal(codes.Canceled.String(), status.Code(err).String())
164165
}
165166

166-
func (s *segwriterClientSuite) Test_Push_ClientError_Deadline() {
167+
func (s *segwriterClientSuite) Test_Push_Client_Deadline() {
168+
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(-time.Second))
169+
defer cancel()
170+
_, err := s.client.Push(ctx, &segmentwriterv1.PushRequest{})
171+
s.Assert().ErrorIs(err, context.DeadlineExceeded)
172+
}
173+
174+
func (s *segwriterClientSuite) Test_Push_NonClient_Deadline() {
167175
s.service.On("Push", mock.Anything, mock.Anything).
168176
Return(new(segmentwriterv1.PushResponse), context.DeadlineExceeded).
169177
Once()
170178

179+
s.service.On("Push", mock.Anything, mock.Anything).
180+
Return(new(segmentwriterv1.PushResponse), nil).
181+
Once()
182+
171183
_, err := s.client.Push(context.Background(), &segmentwriterv1.PushRequest{})
172-
s.Assert().Equal(codes.DeadlineExceeded.String(), status.Code(err).String())
184+
s.Assert().NoError(err)
173185
}
174186

175187
func (s *segwriterClientSuite) Test_Push_ClientError_InvalidArgument() {

Diff for: pkg/experiment/ingester/segment.go

+16-18
Original file line numberDiff line numberDiff line change
@@ -581,6 +581,12 @@ func (sw *segmentsWriter) uploadBlock(ctx context.Context, blockData []byte, met
581581
WithLabelValues(s.sshard).
582582
Observe(float64(len(blockData)))
583583

584+
if sw.config.UploadTimeout > 0 {
585+
var cancel context.CancelFunc
586+
ctx, cancel = context.WithTimeout(ctx, sw.config.UploadTimeout)
587+
defer cancel()
588+
}
589+
584590
// To mitigate tail latency issues, we use a hedged upload strategy:
585591
// if the request is not completed within a certain time, we trigger
586592
// a second upload attempt. Upload errors are retried explicitly and
@@ -604,8 +610,8 @@ func (sw *segmentsWriter) uploadBlock(ctx context.Context, blockData []byte, met
604610
}
605611
// Retry on all errors.
606612
retries := backoff.New(ctx, retryConfig)
607-
for retries.Ongoing() {
608-
if attemptErr = sw.uploadWithTimeout(ctx, path, bytes.NewReader(blockData)); attemptErr == nil {
613+
for retries.Ongoing() && ctx.Err() == nil {
614+
if attemptErr = sw.bucket.Upload(ctx, path, bytes.NewReader(blockData)); attemptErr == nil {
609615
break
610616
}
611617
retries.Wait()
@@ -628,22 +634,7 @@ func (sw *segmentsWriter) uploadBlock(ctx context.Context, blockData []byte, met
628634
return nil
629635
}
630636

631-
func (sw *segmentsWriter) uploadWithTimeout(ctx context.Context, path string, r io.Reader) error {
632-
if sw.config.UploadTimeout > 0 {
633-
var cancel context.CancelFunc
634-
ctx, cancel = context.WithTimeout(ctx, sw.config.UploadTimeout)
635-
defer cancel()
636-
}
637-
return sw.bucket.Upload(ctx, path, r)
638-
}
639-
640637
func (sw *segmentsWriter) storeMetadata(ctx context.Context, meta *metastorev1.BlockMeta, s *segment) error {
641-
if sw.config.MetadataUpdateTimeout > 0 {
642-
var cancel context.CancelFunc
643-
ctx, cancel = context.WithTimeout(ctx, sw.config.MetadataUpdateTimeout)
644-
defer cancel()
645-
}
646-
647638
start := time.Now()
648639
var err error
649640
defer func() {
@@ -653,7 +644,14 @@ func (sw *segmentsWriter) storeMetadata(ctx context.Context, meta *metastorev1.B
653644
s.debuginfo.storeMetaDuration = time.Since(start)
654645
}()
655646

656-
if _, err = sw.metastore.AddBlock(ctx, &metastorev1.AddBlockRequest{Block: meta}); err == nil {
647+
mdCtx := ctx
648+
if sw.config.MetadataUpdateTimeout > 0 {
649+
var cancel context.CancelFunc
650+
mdCtx, cancel = context.WithTimeout(mdCtx, sw.config.MetadataUpdateTimeout)
651+
defer cancel()
652+
}
653+
654+
if _, err = sw.metastore.AddBlock(mdCtx, &metastorev1.AddBlockRequest{Block: meta}); err == nil {
657655
return nil
658656
}
659657

Diff for: pkg/experiment/ingester/service.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -71,15 +71,15 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
7171
cfg.LifecyclerConfig.RegisterFlagsWithPrefix(prefix+".", f, util.Logger)
7272
f.DurationVar(&cfg.SegmentDuration, prefix+".segment-duration", defaultSegmentDuration, "Timeout when flushing segments to bucket.")
7373
f.UintVar(&cfg.FlushConcurrency, prefix+".flush-concurrency", 0, "Number of concurrent flushes. Defaults to the number of CPUs, but not less than 8.")
74-
f.DurationVar(&cfg.UploadTimeout, prefix+".upload-timeout", time.Second, "Timeout for upload requests.")
74+
f.DurationVar(&cfg.UploadTimeout, prefix+".upload-timeout", 2*time.Second, "Timeout for upload requests, including retries.")
7575
f.IntVar(&cfg.UploadMaxRetries, prefix+".upload-max-retries", 3, "Number of times to backoff and retry before failing.")
7676
f.DurationVar(&cfg.UploadMinBackoff, prefix+".upload-retry-min-period", 50*time.Millisecond, "Minimum delay when backing off.")
7777
f.DurationVar(&cfg.UploadMaxBackoff, prefix+".upload-retry-max-period", defaultSegmentDuration, "Maximum delay when backing off.")
7878
f.DurationVar(&cfg.UploadHedgeAfter, prefix+".upload-hedge-after", defaultSegmentDuration, "Time after which to hedge the upload request.")
7979
f.Float64Var(&cfg.UploadHedgeRateMax, prefix+".upload-hedge-rate-max", defaultHedgedRequestMaxRate, "Maximum number of hedged requests per second.")
8080
f.UintVar(&cfg.UploadHedgeRateBurst, prefix+".upload-hedge-rate-burst", defaultHedgedRequestBurst, "Maximum number of hedged requests in a burst.")
8181
f.BoolVar(&cfg.MetadataDLQEnabled, prefix+".metadata-dlq-enabled", true, "Enables dead letter queue (DLQ) for metadata. If the metadata update fails, it will be stored and updated asynchronously.")
82-
f.DurationVar(&cfg.MetadataUpdateTimeout, prefix+".metadata-update-timeout", time.Second, "Timeout for metadata update requests.")
82+
f.DurationVar(&cfg.MetadataUpdateTimeout, prefix+".metadata-update-timeout", 2*time.Second, "Timeout for metadata update requests.")
8383
}
8484

8585
type Limits interface {

0 commit comments

Comments
 (0)