Skip to content

Commit 0a8d383

Browse files
authored
compactor: allow grace period for final status update (#15113)
#### What this PR does This adds a grace period for compactor workers to send final status updates when the parent context of the scheduler executor is canceled (e.g. during shutdown). The purpose is for compactors to be able to request job reassignment before shutting down. This avoids having to wait for lease expiration if restarts occur mid-job and during a rollout would help avoid the confusing situation of having more active jobs than running compactors. <img width="2420" height="800" alt="image" src="https://github.com/user-attachments/assets/26a68ffd-0b33-4dba-80c4-40fd90d96bb7" /> My original attempt at fixing this was to check `if ctx.Err() != nil`, then pass a different context, but that subtly never provided the grace period if the shutdown happened during the final update itself. This approach watches the context and starts a timer only if the context cancellation occurs. This restricts when the deadline is imposed to not get in front of the normal retry behavior. #### Which issue(s) this PR fixes or relates to Fixes N/A #### Checklist - [x] Tests updated. - [ ] Documentation added. - [ ] `CHANGELOG.md` updated - the order of entries should be `[CHANGE]`, `[FEATURE]`, `[ENHANCEMENT]`, `[BUGFIX]`. If changelog entry is not needed, please add the `changelog-not-needed` label to the PR. - [ ] [`about-versioning.md`](https://github.com/grafana/mimir/blob/main/docs/sources/mimir/configure/about-versioning.md) updated with experimental features.
1 parent ac77c4f commit 0a8d383

2 files changed

Lines changed: 102 additions & 24 deletions

File tree

pkg/compactor/executor.go

Lines changed: 45 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,10 @@ import (
4040
)
4141

4242
var (
43-
errCompactionJobHasNoBlocks = errors.New("compaction job has no blocks")
44-
errNoBlockMetadataProvided = errors.New("no block metadata provided")
45-
errJobCanceledByScheduler = errors.New("job canceled by scheduler")
43+
errCompactionJobHasNoBlocks = errors.New("compaction job has no blocks")
44+
errNoBlockMetadataProvided = errors.New("no block metadata provided")
45+
errJobCanceledByScheduler = errors.New("job canceled by scheduler")
46+
errFinalStatusGracePeriodTimeout = errors.New("final status grace period timed out")
4647
)
4748

4849
// compactionExecutor defines how compaction work is executed.
@@ -82,25 +83,27 @@ func (e *standaloneExecutor) stop() error {
8283
}
8384

8485
var (
85-
errInvalidSchedulerEndpoint = fmt.Errorf("invalid compactor.scheduler-client.scheduler-endpoint, required when compactor.scheduler-client.enabled is true")
86-
errInvalidSchedulerUpdateInterval = fmt.Errorf("invalid compactor.scheduler-client.update-interval, interval must be positive")
87-
errInvalidSchedulerLeasingMinBackoff = fmt.Errorf("invalid compactor.scheduler-client.leasing-min-backoff, must be positive")
88-
errInvalidSchedulerLeasingMaxBackoff = fmt.Errorf("invalid compactor.scheduler-client.leasing-max-backoff, must be greater than min backoff")
89-
errInvalidSchedulerUpdateMinBackoff = fmt.Errorf("invalid compactor.scheduler-client.update-min-backoff, must be positive")
90-
errInvalidSchedulerUpdateMaxBackoff = fmt.Errorf("invalid compactor.scheduler-client.update-max-backoff, must be greater than min backoff")
86+
errInvalidSchedulerEndpoint = fmt.Errorf("invalid compactor.scheduler-client.scheduler-endpoint, required when compactor.scheduler-client.enabled is true")
87+
errInvalidSchedulerUpdateInterval = fmt.Errorf("invalid compactor.scheduler-client.update-interval, interval must be positive")
88+
errInvalidSchedulerLeasingMinBackoff = fmt.Errorf("invalid compactor.scheduler-client.leasing-min-backoff, must be positive")
89+
errInvalidSchedulerLeasingMaxBackoff = fmt.Errorf("invalid compactor.scheduler-client.leasing-max-backoff, must be greater than min backoff")
90+
errInvalidSchedulerUpdateMinBackoff = fmt.Errorf("invalid compactor.scheduler-client.update-min-backoff, must be positive")
91+
errInvalidSchedulerUpdateMaxBackoff = fmt.Errorf("invalid compactor.scheduler-client.update-max-backoff, must be greater than min backoff")
92+
errInvalidSchedulerTerminatingFinalStatusTimeout = fmt.Errorf("invalid compactor.scheduler-client.terminating-final-status-timeout, must be positive")
9193
)
9294

9395
type SchedulerClientConfig struct {
94-
Enabled bool `yaml:"enabled" category:"experimental"`
95-
SchedulerEndpoint string `yaml:"scheduler_endpoint" category:"experimental"`
96-
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config" category:"experimental"`
97-
LeasingMinBackoff time.Duration `yaml:"leasing_min_backoff" category:"experimental"`
98-
LeasingMaxBackoff time.Duration `yaml:"leasing_max_backoff" category:"experimental"`
99-
UpdateInterval time.Duration `yaml:"update_interval" category:"experimental"`
100-
UpdateMinBackoff time.Duration `yaml:"update_min_backoff" category:"experimental"`
101-
UpdateMaxBackoff time.Duration `yaml:"update_max_backoff" category:"experimental"`
102-
CompactionDirCleanupInterval time.Duration `yaml:"compaction_dir_cleanup_interval" category:"experimental"`
103-
MetadataCacheConfig mimir_tsdb.MetadataCacheConfig `yaml:"metadata_cache" category:"experimental"`
96+
Enabled bool `yaml:"enabled" category:"experimental"`
97+
SchedulerEndpoint string `yaml:"scheduler_endpoint" category:"experimental"`
98+
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config" category:"experimental"`
99+
LeasingMinBackoff time.Duration `yaml:"leasing_min_backoff" category:"experimental"`
100+
LeasingMaxBackoff time.Duration `yaml:"leasing_max_backoff" category:"experimental"`
101+
UpdateInterval time.Duration `yaml:"update_interval" category:"experimental"`
102+
UpdateMinBackoff time.Duration `yaml:"update_min_backoff" category:"experimental"`
103+
UpdateMaxBackoff time.Duration `yaml:"update_max_backoff" category:"experimental"`
104+
CompactionDirCleanupInterval time.Duration `yaml:"compaction_dir_cleanup_interval" category:"experimental"`
105+
MetadataCacheConfig mimir_tsdb.MetadataCacheConfig `yaml:"metadata_cache" category:"experimental"`
106+
TerminatingFinalStatusTimeout time.Duration `yaml:"terminating_final_status_timeout" category:"experimental"`
104107
}
105108

106109
func (cfg *SchedulerClientConfig) RegisterFlags(f *flag.FlagSet) {
@@ -113,6 +116,7 @@ func (cfg *SchedulerClientConfig) RegisterFlags(f *flag.FlagSet) {
113116
f.DurationVar(&cfg.UpdateMinBackoff, flagPrefix+"update-min-backoff", 1*time.Second, "Minimum backoff time for compaction executor retries when sending scheduler status updates.")
114117
f.DurationVar(&cfg.UpdateMaxBackoff, flagPrefix+"update-max-backoff", 32*time.Second, "Maximum backoff time for compaction executor retries when sending scheduler status updates.")
115118
f.DurationVar(&cfg.CompactionDirCleanupInterval, flagPrefix+"compaction-dir-cleanup-interval", 30*time.Minute, "Defines how frequently to clean up the compaction working directory. The directory is cleaned on startup and then only when this interval has elapsed since the last cleanup. Set to 0 to disable periodic cleanup.")
119+
f.DurationVar(&cfg.TerminatingFinalStatusTimeout, flagPrefix+"terminating-final-status-timeout", 30*time.Second, "Timeout for sending a final job status update to the scheduler when the parent context is canceled (e.g. during shutdown).")
116120
cfg.GRPCClientConfig.RegisterFlagsWithPrefix(flagPrefix+"grpc-client-config", f)
117121
cfg.MetadataCacheConfig.RegisterFlagsWithPrefix(f, flagPrefix+"metadata-cache.")
118122
}
@@ -142,6 +146,9 @@ func (cfg *SchedulerClientConfig) Validate() error {
142146
if err := cfg.MetadataCacheConfig.Validate(); err != nil {
143147
return err
144148
}
149+
if cfg.TerminatingFinalStatusTimeout <= 0 {
150+
return errInvalidSchedulerTerminatingFinalStatusTimeout
151+
}
145152
return nil
146153
}
147154

@@ -332,24 +339,38 @@ func (e *schedulerExecutor) startJobStatusUpdater(ctx context.Context, c *Multit
332339
}
333340
}
334341

335-
// sendFinalJobStatus sends a final status update to the scheduler with retry policy.
342+
// sendFinalJobStatus sends a final status update to the scheduler with a retry policy.
336343
// Compaction jobs send final statuses on completion, planning jobs only on failure for reassignment.
344+
// If ctx is canceled (e.g. during shutdown), attempting to send a final status can continue for up to TerminatingFinalStatusTimeout.
337345
func (e *schedulerExecutor) sendFinalJobStatus(ctx context.Context, key *compactorschedulerpb.JobKey, spec *compactorschedulerpb.JobSpec, status compactorschedulerpb.UpdateType) {
346+
graceCtx, cancel := context.WithCancelCause(context.WithoutCancel(ctx))
347+
defer cancel(nil)
348+
stop := context.AfterFunc(ctx, func() {
349+
timer := time.NewTimer(e.cfg.TerminatingFinalStatusTimeout)
350+
defer timer.Stop()
351+
select {
352+
case <-timer.C:
353+
cancel(errFinalStatusGracePeriodTimeout)
354+
case <-graceCtx.Done():
355+
}
356+
})
357+
defer stop()
358+
338359
jobId := key.Id
339360
jobTenant := spec.Tenant
340361

341362
var err error
342363
switch spec.JobType {
343364
case compactorschedulerpb.JOB_TYPE_COMPACTION:
344365
req := &compactorschedulerpb.UpdateCompactionJobRequest{Key: key, Tenant: spec.Tenant, Update: status}
345-
err = e.retryable.WithContext(ctx).Run(func() error {
346-
_, err := e.schedulerClient.UpdateCompactionJob(ctx, req)
366+
err = e.retryable.WithContext(graceCtx).Run(func() error {
367+
_, err := e.schedulerClient.UpdateCompactionJob(graceCtx, req)
347368
return err
348369
})
349370
case compactorschedulerpb.JOB_TYPE_PLANNING:
350371
req := &compactorschedulerpb.UpdatePlanJobRequest{Key: key, Tenant: spec.Tenant, Update: status}
351-
err = e.retryable.WithContext(ctx).Run(func() error {
352-
_, err := e.schedulerClient.UpdatePlanJob(ctx, req)
372+
err = e.retryable.WithContext(graceCtx).Run(func() error {
373+
_, err := e.schedulerClient.UpdatePlanJob(graceCtx, req)
353374
return err
354375
})
355376
default:

pkg/compactor/executor_test.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -568,6 +568,63 @@ func TestSchedulerExecutor_JobCancellationOn_NotFoundResponse(t *testing.T) {
568568
require.Equal(t, 2, mockSchedulerClient.GetUpdateJobCallCount(), "should have sent exactly 2 updates: one successful, then one NOT_FOUND")
569569
}
570570

571+
func TestSchedulerExecutor_TerminatingFinalJobStatus(t *testing.T) {
572+
key := &compactorschedulerpb.JobKey{Id: "test-job"}
573+
spec := &compactorschedulerpb.JobSpec{
574+
Tenant: "tenant",
575+
JobType: compactorschedulerpb.JOB_TYPE_PLANNING,
576+
}
577+
578+
t.Run("canceled_context_uses_fresh_context_for_update", func(t *testing.T) {
579+
mock := &mockCompactorSchedulerClient{
580+
UpdatePlanJobFunc: func(ctx context.Context, _ *compactorschedulerpb.UpdatePlanJobRequest) (*compactorschedulerpb.UpdateJobResponse, error) {
581+
assert.NoError(t, ctx.Err())
582+
return &compactorschedulerpb.UpdateJobResponse{}, nil
583+
},
584+
}
585+
exec := newTestSchedulerExecutor(t, makeTestCompactorConfig(), mock)
586+
587+
canceledCtx, cancel := context.WithCancel(context.Background())
588+
cancel()
589+
590+
exec.sendFinalJobStatus(canceledCtx, key, spec, compactorschedulerpb.UPDATE_TYPE_REASSIGN)
591+
require.Equal(t, 1, mock.GetUpdateJobCallCount())
592+
})
593+
594+
t.Run("canceled_context_returns_after_final_status_timeout", func(t *testing.T) {
595+
mock := &mockCompactorSchedulerClient{
596+
UpdatePlanJobFunc: func(ctx context.Context, _ *compactorschedulerpb.UpdatePlanJobRequest) (*compactorschedulerpb.UpdateJobResponse, error) {
597+
<-ctx.Done()
598+
return nil, ctx.Err()
599+
},
600+
}
601+
cfg := makeTestCompactorConfig()
602+
cfg.SchedulerClientConfig.TerminatingFinalStatusTimeout = time.Millisecond // arbitrary
603+
604+
exec := newTestSchedulerExecutor(t, cfg, mock)
605+
cancelledCtx, cancel := context.WithCancel(context.Background())
606+
cancel()
607+
608+
synctest.Test(t, func(t *testing.T) {
609+
done := make(chan struct{})
610+
go func() {
611+
exec.sendFinalJobStatus(cancelledCtx, key, spec, compactorschedulerpb.UPDATE_TYPE_REASSIGN)
612+
close(done) // unblock select
613+
}()
614+
615+
// Advance synctest time past timeout
616+
time.Sleep(cfg.SchedulerClientConfig.TerminatingFinalStatusTimeout + time.Millisecond)
617+
synctest.Wait()
618+
619+
select {
620+
case <-done:
621+
default:
622+
t.Fatal("sendFinalJobStatus should have returned after FinalStatusTimeout elapsed")
623+
}
624+
})
625+
})
626+
}
627+
571628
func TestSchedulerExecutor_ExecuteCompactionJob_InvalidInput(t *testing.T) {
572629
tests := map[string]struct {
573630
spec *compactorschedulerpb.JobSpec

0 commit comments

Comments
 (0)