Skip to content

Commit 8f9a24b

Browse files
authored
compactor: skip final status update if job not found by scheduler (#15094)
#### What this PR does When the compaction worker is sending job status updates to the compactor scheduler, the scheduler can reply "I have no idea what job you are talking about" (`NotFound`) if a lease expired. Previously, the worker would then still reply with another message to say "Please reassign this job that you don't know about" and the scheduler would then again reply "I have no idea what job you are talking about". This change makes the worker skip the useless reassignment reply by recognizing that the job context was canceled by the status update goroutine with a sentinel error. #### Which issue(s) this PR fixes or relates to Fixes N/A #### Checklist - [x] Tests updated. - [ ] Documentation added. - [ ] `CHANGELOG.md` updated - the order of entries should be `[CHANGE]`, `[FEATURE]`, `[ENHANCEMENT]`, `[BUGFIX]`. If changelog entry is not needed, please add the `changelog-not-needed` label to the PR. - [ ] [`about-versioning.md`](https://github.com/grafana/mimir/blob/main/docs/sources/mimir/configure/about-versioning.md) updated with experimental features. <!-- CURSOR_SUMMARY --> --- > [!NOTE] > **Low Risk** > Low risk: narrowly changes scheduler-mode job status reporting to avoid sending terminal updates after a `NotFound` lease cancellation, with targeted tests covering both job types. > > **Overview** > When the scheduler responds `NotFound` to a periodic keep-alive, the compactor now treats the job as *canceled by the scheduler* and **skips sending any final status/reassign update** for that job. > > This introduces a sentinel cancellation cause (`errJobCanceledByScheduler`) and adds a new test that simulates mid-flight cancellation for both compaction and planning jobs, asserting only the heartbeat update is attempted and no final status is sent. > > <sup>Reviewed by [Cursor Bugbot](https://cursor.com/bugbot) for commit 42ad3c7. Bugbot is set up for automated code reviews on this repo. Configure [here](https://www.cursor.com/dashboard/bugbot).</sup> <!-- /CURSOR_SUMMARY -->
1 parent 0e6241d commit 8f9a24b

2 files changed

Lines changed: 102 additions & 5 deletions

File tree

pkg/compactor/executor.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ import (
4242
var (
4343
errCompactionJobHasNoBlocks = errors.New("compaction job has no blocks")
4444
errNoBlockMetadataProvided = errors.New("no block metadata provided")
45+
errJobCanceledByScheduler = errors.New("job canceled by scheduler")
4546
)
4647

4748
// compactionExecutor defines how compaction work is executed.
@@ -317,7 +318,7 @@ func (e *schedulerExecutor) startJobStatusUpdater(ctx context.Context, c *Multit
317318
// Check if the job was canceled from the scheduler side (not found response)
318319
if grpcutil.ErrorToStatusCode(err) == codes.NotFound {
319320
level.Info(e.logger).Log("msg", "job canceled by scheduler, stopping work", "job_id", jobId, "tenant", jobTenant)
320-
cancelJob(err) // Cancel the job context to stop the main work
321+
cancelJob(errJobCanceledByScheduler) // Cancel the job context to stop the main work
321322
return
322323
}
323324
level.Warn(e.logger).Log("msg", "failed to send keep-alive update", "job_id", jobId, "tenant", jobTenant, "err", err)
@@ -418,7 +419,9 @@ func (e *schedulerExecutor) leaseAndExecuteJob(ctx context.Context, c *Multitena
418419
wg.Wait()
419420
if err != nil {
420421
level.Warn(e.logger).Log("msg", "failed to execute job", "job_id", jobID, "tenant", jobTenant, "job_type", jobType, "err", err)
421-
e.sendFinalJobStatus(ctx, resp.Key, resp.Spec, status)
422+
if !errors.Is(context.Cause(jobCtx), errJobCanceledByScheduler) {
423+
e.sendFinalJobStatus(ctx, resp.Key, resp.Spec, status)
424+
}
422425
return true, err
423426
}
424427
e.sendFinalJobStatus(ctx, resp.Key, resp.Spec, status)
@@ -430,12 +433,14 @@ func (e *schedulerExecutor) leaseAndExecuteJob(ctx context.Context, c *Multitena
430433
wg.Wait()
431434
if planErr != nil {
432435
level.Warn(e.logger).Log("msg", "failed to execute planning job", "job_id", jobID, "tenant", jobTenant, "job_type", jobType, "err", planErr)
433-
// Planning jobs only send final status updates on failure
434-
e.sendFinalJobStatus(ctx, resp.Key, resp.Spec, compactorschedulerpb.UPDATE_TYPE_REASSIGN)
436+
if !errors.Is(context.Cause(jobCtx), errJobCanceledByScheduler) {
437+
// Only send an update on failure if the scheduler still thinks we own the job.
438+
e.sendFinalJobStatus(ctx, resp.Key, resp.Spec, compactorschedulerpb.UPDATE_TYPE_REASSIGN)
439+
}
435440
return true, planErr
436441
}
437442

438-
// For planning jobs, no final status update is sent - results are communicated via PlannedJobs
443+
// For planning jobs, no final status update is sent on success. Completion is communicated via PlannedJobs.
439444
if err := e.sendPlannedJobs(ctx, resp.Key, resp.Spec, plannedJobs); err != nil {
440445
level.Warn(e.logger).Log("msg", "failed to send planned jobs", "job_id", jobID, "tenant", jobTenant, "num_jobs", len(plannedJobs), "err", err)
441446
return true, err

pkg/compactor/executor_test.go

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"context"
88
"errors"
99
"fmt"
10+
"io"
1011
"math/rand"
1112
"os"
1213
"path/filepath"
@@ -920,3 +921,94 @@ func TestBuildCompactionJobFromMetas(t *testing.T) {
920921
})
921922
}
922923
}
924+
925+
type blockingBucket struct {
926+
objstore.Bucket
927+
}
928+
929+
func (b *blockingBucket) Get(ctx context.Context, _ string) (io.ReadCloser, error) {
930+
// Blocks compaction jobs
931+
<-ctx.Done()
932+
return nil, ctx.Err()
933+
}
934+
935+
func (b *blockingBucket) Iter(ctx context.Context, _ string, _ func(string) error, _ ...objstore.IterOption) error {
936+
// Blocks planning jobs
937+
<-ctx.Done()
938+
return ctx.Err()
939+
}
940+
941+
func TestSchedulerExecutor_SchedulerCancellation_SkipsFinalStatus(t *testing.T) {
942+
tests := map[string]struct {
943+
leaseResponse *compactorschedulerpb.LeaseJobResponse
944+
setupUpdate func(*mockCompactorSchedulerClient)
945+
}{
946+
"compaction": {
947+
leaseResponse: &compactorschedulerpb.LeaseJobResponse{
948+
Key: &compactorschedulerpb.JobKey{Id: "compaction"},
949+
Spec: &compactorschedulerpb.JobSpec{
950+
Tenant: "tenant",
951+
Job: &compactorschedulerpb.CompactionJob{BlockIds: [][]byte{testBlockID1.Bytes()}},
952+
JobType: compactorschedulerpb.JOB_TYPE_COMPACTION,
953+
},
954+
},
955+
setupUpdate: func(m *mockCompactorSchedulerClient) {
956+
m.UpdateJobFunc = func(_ context.Context, _ *compactorschedulerpb.UpdateCompactionJobRequest) (*compactorschedulerpb.UpdateJobResponse, error) {
957+
return nil, status.Error(codes.NotFound, "not found")
958+
}
959+
},
960+
},
961+
"planning": {
962+
leaseResponse: &compactorschedulerpb.LeaseJobResponse{
963+
Key: &compactorschedulerpb.JobKey{Id: "planning"},
964+
Spec: &compactorschedulerpb.JobSpec{
965+
Tenant: "tenant",
966+
JobType: compactorschedulerpb.JOB_TYPE_PLANNING,
967+
},
968+
},
969+
setupUpdate: func(m *mockCompactorSchedulerClient) {
970+
m.UpdatePlanJobFunc = func(_ context.Context, _ *compactorschedulerpb.UpdatePlanJobRequest) (*compactorschedulerpb.UpdateJobResponse, error) {
971+
return nil, status.Error(codes.NotFound, "not found")
972+
}
973+
},
974+
},
975+
}
976+
977+
for testName, tc := range tests {
978+
t.Run(testName, func(t *testing.T) {
979+
mockSchedulerClient := &mockCompactorSchedulerClient{}
980+
mockSchedulerClient.LeaseJobFunc = func(_ context.Context, _ *compactorschedulerpb.LeaseJobRequest) (*compactorschedulerpb.LeaseJobResponse, error) {
981+
return tc.leaseResponse, nil
982+
}
983+
tc.setupUpdate(mockSchedulerClient)
984+
985+
cfg := makeTestCompactorConfig()
986+
cfg.SchedulerClientConfig.UpdateInterval = 1 * time.Millisecond // arbitrary
987+
988+
schedulerExec := newTestSchedulerExecutor(t, cfg, mockSchedulerClient)
989+
c := prepareCompactorForExecutorTest(t, cfg, &blockingBucket{Bucket: objstore.NewInMemBucket()}, newMockConfigProvider())
990+
991+
synctest.Test(t, func(t *testing.T) {
992+
errCh := make(chan error, 1)
993+
go func() {
994+
_, err := schedulerExec.leaseAndExecuteJob(context.Background(), c, "compactor-1")
995+
errCh <- err
996+
}()
997+
998+
// Wait until both goroutines get caught:
999+
// 1. The job is blocked in a bucket call
1000+
// 2. The heartbeat is waiting on its ticker
1001+
synctest.Wait()
1002+
1003+
// Advance synctest time so a heartbeat is sent.
1004+
// The scheduler will return NotFound in response, which cancels the job context,
1005+
// which then frees the goroutine from the blocking bucket.
1006+
time.Sleep(cfg.SchedulerClientConfig.UpdateInterval + time.Millisecond)
1007+
synctest.Wait()
1008+
1009+
require.Error(t, <-errCh)
1010+
require.Equal(t, 1, mockSchedulerClient.GetUpdateJobCallCount()) // no final status sent
1011+
})
1012+
})
1013+
}
1014+
}

0 commit comments

Comments
 (0)