Skip to content

Commit 95576e2

Browse files
authored
[r351] Block-builder-scheduler: fix bugs in handling of partitions with no commit (#12130) (#12165)
Backport 0a75686 from #12130
1 parent f241084 commit 95576e2

4 files changed

Lines changed: 129 additions & 39 deletions

File tree

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@
1515
* [BUGFIX] Distributor: Validate the RW2 symbols field and reject invalid requests that don't have an empty string as the first symbol. #11953
1616
* [BUGFIX] Distributor: Check `max_inflight_push_requests_bytes` before decompressing incoming requests. #11967
1717
* [BUGFIX] Query-frontend: Allow limit parameter to be 0 in label queries to explicitly request unlimited results. #12054
18+
* [BUGFIX] Distributor: Fix a possible panic in the OTLP push path while handling a gRPC status error. #12072
19+
* [BUGFIX] Query-frontend: Evaluate experimental duration expressions before sharding, splitting, and caching. Otherwise, the result is not correct. #12038
20+
* [BUGFIX] Block-builder-scheduler: Fix bugs in handling of partitions with no commit. #12130
1821

1922
### Mixin
2023

pkg/blockbuilder/scheduler/metrics.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ type schedulerMetrics struct {
1111
updateScheduleDuration prometheus.Histogram
1212
partitionStartOffset *prometheus.GaugeVec
1313
partitionCommittedOffset *prometheus.GaugeVec
14+
partitionPlannedOffset *prometheus.GaugeVec
1415
partitionEndOffset *prometheus.GaugeVec
1516
flushFailed prometheus.Counter
1617
fetchOffsetsFailed prometheus.Counter
@@ -41,6 +42,10 @@ func newSchedulerMetrics(reg prometheus.Registerer) schedulerMetrics {
4142
Name: "cortex_blockbuilder_scheduler_partition_committed_offset",
4243
Help: "The observed committed offset of each partition.",
4344
}, []string{"partition"}),
45+
partitionPlannedOffset: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
46+
Name: "cortex_blockbuilder_scheduler_partition_planned_offset",
47+
Help: "The planned offset of each partition.",
48+
}, []string{"partition"}),
4449
flushFailed: promauto.With(reg).NewCounter(prometheus.CounterOpts{
4550
Name: "cortex_blockbuilder_scheduler_flush_failed_total",
4651
Help: "The total number of Kafka flushes that failed.",

pkg/blockbuilder/scheduler/scheduler.go

Lines changed: 33 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,7 @@ func (s *partitionState) updateEndOffset(end int64, ts time.Time, jobSize time.D
251251
case bucketBefore:
252252
// New bucket is before our current one. This should only happen if our
253253
// Kafka's end offsets aren't monotonically increasing.
254-
return nil, fmt.Errorf("time went backwards: %s < %s (%d, %d)", s.jobBucket, newJobBucket, s.offset, end)
254+
return nil, fmt.Errorf("time went backwards: %s < %s (%d, %d)", newJobBucket, s.jobBucket, s.offset, end)
255255
case bucketSame:
256256
// Observation is in the currently tracked bucket. No action needed.
257257
case bucketAfter:
@@ -401,11 +401,13 @@ func (s *BlockBuilderScheduler) getPartitionState(topic string, partition int32)
401401
pendingJobs: list.New(),
402402
planned: &advancingOffset{
403403
name: "planned",
404+
off: offsetEmpty,
404405
metrics: &s.metrics,
405406
logger: s.logger,
406407
},
407408
committed: &advancingOffset{
408409
name: "committed",
410+
off: offsetEmpty,
409411
metrics: &s.metrics,
410412
logger: s.logger,
411413
},
@@ -520,8 +522,9 @@ func (s *BlockBuilderScheduler) consumptionOffsets(ctx context.Context, topic st
520522

521523
var resumeOffset int64
522524

523-
if planned := ps.planned.offset(); planned > 0 {
524-
s.metrics.partitionCommittedOffset.WithLabelValues(partStr).Set(float64(planned))
525+
if !ps.planned.empty() {
526+
planned := ps.planned.offset()
527+
s.metrics.partitionPlannedOffset.WithLabelValues(partStr).Set(float64(planned))
525528
resumeOffset = planned
526529
} else {
527530
// Nothing planned offset for this partition. Resume from fallback offset instead.
@@ -653,35 +656,44 @@ func (s *BlockBuilderScheduler) fetchCommittedOffsets(ctx context.Context) (kadm
653656
return kadm.Offsets{}, lastErr
654657
}
655658

656-
func (s *BlockBuilderScheduler) snapCommitted() kadm.Offsets {
659+
// snapOffsets returns a snapshot of the committed and planned offsets for all partitions.
660+
func (s *BlockBuilderScheduler) snapOffsets() (kadm.Offsets, kadm.Offsets) {
657661
cp := make(kadm.Offsets)
662+
pp := make(kadm.Offsets)
658663

659664
s.mu.Lock()
660665
defer s.mu.Unlock()
661666

662667
for _, ps := range s.partState {
663-
cp.AddOffset(ps.topic, ps.partition, ps.committed.offset(), 0)
668+
if !ps.committed.empty() {
669+
cp.AddOffset(ps.topic, ps.partition, ps.committed.offset(), 0)
670+
}
671+
if !ps.planned.empty() {
672+
pp.AddOffset(ps.topic, ps.partition, ps.planned.offset(), 0)
673+
}
664674
}
665675

666-
return cp
676+
return cp, pp
667677
}
668678

669679
// flushOffsetsToKafka flushes the committed offsets to Kafka and updates relevant metrics.
670680
func (s *BlockBuilderScheduler) flushOffsetsToKafka(ctx context.Context) error {
671681
// TODO: only flush if dirty.
672-
offsets := s.snapCommitted()
682+
committed, planned := s.snapOffsets()
673683

674-
offsets.Each(func(o kadm.Offset) {
684+
committed.Each(func(o kadm.Offset) {
675685
s.metrics.partitionCommittedOffset.WithLabelValues(fmt.Sprint(o.Partition)).Set(float64(o.At))
676686
})
687+
planned.Each(func(o kadm.Offset) {
688+
s.metrics.partitionPlannedOffset.WithLabelValues(fmt.Sprint(o.Partition)).Set(float64(o.At))
689+
})
677690

678-
err := s.adminClient.CommitAllOffsets(ctx, s.cfg.ConsumerGroup, offsets)
691+
err := s.adminClient.CommitAllOffsets(ctx, s.cfg.ConsumerGroup, committed)
679692
if err != nil {
680693
return fmt.Errorf("commit offsets: %w", err)
681694
}
682695

683-
level.Debug(s.logger).Log("msg", "flushed offsets to Kafka", "offsets", offsetsStr(offsets))
684-
696+
level.Debug(s.logger).Log("msg", "flushed offsets to Kafka", "offsets", offsetsStr(committed))
685697
return nil
686698
}
687699

@@ -958,6 +970,8 @@ type advancingOffset struct {
958970
logger log.Logger
959971
}
960972

973+
const offsetEmpty int64 = -1
974+
961975
// advance moves the offset forward by the given job spec. Advancements are
962976
// expected to be monotonically increasing and contiguous. Advance will not
963977
// allow backwards movement. If a gap is detected, a warning is logged and a
@@ -988,13 +1002,19 @@ func (o *advancingOffset) set(offset int64) {
9881002
o.off = offset
9891003
}
9901004

1005+
// empty returns true if the offset is empty and uninitialized.
1006+
func (o *advancingOffset) empty() bool {
1007+
return o.off == offsetEmpty
1008+
}
1009+
9911010
// validNextSpec returns true if the given job spec is valid to be added to the
9921011
// offset. It is valid if the start offset is the same as the current offset.
1012+
// We also allow transitioning out of an empty offset without calling it a gap.
9931013
func (o *advancingOffset) validNextSpec(spec schedulerpb.JobSpec) bool {
994-
return o.off == spec.StartOffset
1014+
return o.off == spec.StartOffset || o.empty()
9951015
}
9961016

9971017
// beyondSpec returns true if the offset is beyond the given job spec.
9981018
func (o *advancingOffset) beyondSpec(spec schedulerpb.JobSpec) bool {
999-
return spec.EndOffset <= o.off
1019+
return !o.empty() && spec.EndOffset <= o.off
10001020
}

pkg/blockbuilder/scheduler/scheduler_test.go

Lines changed: 88 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -370,23 +370,23 @@ func TestAssignJobSkipsObsoleteOffsets_PriorScheduler(t *testing.T) {
370370

371371
func TestObservations(t *testing.T) {
372372
sched, _ := mustScheduler(t, 10)
373-
// Initially we're in observation mode. We have Kafka's start offsets, but no client jobs.
373+
// Initially we're in observation mode. We have Kafka's commit offsets, but no client jobs.
374374

375375
sched.getPartitionState("ingest", 1).initCommit(5000)
376376
sched.getPartitionState("ingest", 2).initCommit(800)
377377
sched.getPartitionState("ingest", 3).initCommit(974)
378378
sched.getPartitionState("ingest", 4).initCommit(500)
379379
sched.getPartitionState("ingest", 5).initCommit(12000)
380-
// no 6
381-
// no 7
380+
sched.getPartitionState("ingest", 6) // no commit for 6
381+
sched.getPartitionState("ingest", 7) // no commit for 7
382382
sched.getPartitionState("ingest", 8).initCommit(1000)
383383
sched.getPartitionState("ingest", 9).initCommit(1000)
384384

385385
{
386386
nq := newJobQueue(988*time.Hour, noOpJobCreationPolicy[schedulerpb.JobSpec]{}, 2, sched.metrics, test.NewTestingLogger(t))
387387
sched.jobs = nq
388388
sched.finalizeObservations()
389-
require.Len(t, nq.jobs, 0, "No observations, no jobs")
389+
require.Empty(t, nq.jobs, "No observations, no jobs")
390390
}
391391

392392
type observation struct {
@@ -448,13 +448,12 @@ func TestObservations(t *testing.T) {
448448
mkJob(inProgress, "w103", 5, "ingest/5/12000", 33, 12000, 13000, maybeBadEpoch, errBadEpoch)
449449
mkJob(inProgress, "w104", 5, "ingest/5/12000", 34, 12000, 13000, nil, nil)
450450

451-
// Partition 6 has a complete job, but wasn't among the offsets we learned
452-
// from Kafka. We'll drop this as the start offset is nonzero, but not-found
453-
// completed jobs are ignored on update.
451+
// Partition 6 has a complete job but had no commit at startup. We allow
452+
// transitioning from empty to commit to any offset.
454453
mkJob(complete, "w0", 6, "ingest/6/500", 48, 500, 600, nil, nil)
455-
// Partition 7 has an in-progress job, but wasn't among the offsets we
456-
// learned from Kafka. We'll drop this one, too.
457-
mkJob(inProgress, "w1", 7, "ingest/7/92874", 52, 92874, 93874, nil, errJobNotFound)
454+
// Partition 7 has an in-progress job, but had no commit at startup. We
455+
// honor this job and allow it to influence the planned/resumption offset.
456+
mkJob(inProgress, "w1", 7, "ingest/7/92874", 52, 92874, 93874, nil, nil)
458457

459458
// Partition 8 has a number of reports and has a hole that should should not be passed.
460459
mkJob(complete, "w0", 8, "ingest/8/1000", 53, 1000, 1100, nil, nil)
@@ -487,11 +486,11 @@ func TestObservations(t *testing.T) {
487486
}
488487

489488
if errors.Is(expectedErr, maybeBadEpoch) {
490-
assert.True(t, errors.Is(err, errBadEpoch) || err == nil, "job %V: expected either bad epoch or no error, got %v", c.key, err)
489+
require.True(t, errors.Is(err, errBadEpoch) || err == nil, "job %V: expected either bad epoch or no error, got %v", c.key, err)
491490
} else if expectedErr != nil {
492-
assert.ErrorIs(t, err, expectedErr, "job %V: expected %v, got %v", c.key, expectedErr, err)
491+
require.ErrorIs(t, err, expectedErr, "job %V: expected %v, got %v", c.key, expectedErr, err)
493492
} else {
494-
assert.NoError(t, err, "job %V: expected no error", c.key)
493+
require.NoError(t, err, "job %V: expected no error", c.key)
495494
}
496495
}
497496
})
@@ -504,15 +503,17 @@ func TestObservations(t *testing.T) {
504503
sched.requireOffset(t, "ingest", 3, 974, "ingest/3 should be unchanged - no updates")
505504
sched.requireOffset(t, "ingest", 4, 900, "ingest/4 should be moved forward to account for the completed jobs")
506505
sched.requireOffset(t, "ingest", 5, 12000, "ingest/5 has nothing new completed")
507-
sched.requireOffset(t, "ingest", 6, 0, "ingest/6 should not have been added to the offsets as there was a gap")
508-
sched.requireOffset(t, "ingest", 7, 0, "ingest/7 should not have been added to the offsets as there was a gap")
506+
sched.requireOffset(t, "ingest", 6, 600, "ingest/6 allowed to move the commit")
507+
sched.requireOffset(t, "ingest", 7, offsetEmpty, "ingest/7 has an in-progress job, but had no commit at startup")
509508
sched.requireOffset(t, "ingest", 8, 1300, "ingest/8 should be committed only until the gap")
510509
sched.requireOffset(t, "ingest", 9, 1300, "ingest/9 should be committed only until the gap")
511510
}
512511

513512
sendUpdates()
514513
sched.completeObservationMode(context.Background())
515514

515+
verifyCommits()
516+
516517
// Make sure the resumption offsets account for the gaps.
517518
offs, err := sched.consumptionOffsets(context.Background(), "ingest", time.Now())
518519
require.NoError(t, err)
@@ -523,15 +524,13 @@ func TestObservations(t *testing.T) {
523524
{topic: "ingest", partition: 3, resume: 974},
524525
{topic: "ingest", partition: 4, resume: 900},
525526
{topic: "ingest", partition: 5, resume: 13000},
526-
{topic: "ingest", partition: 6, resume: 0},
527-
{topic: "ingest", partition: 7, resume: 0},
527+
{topic: "ingest", partition: 6, resume: 600},
528+
{topic: "ingest", partition: 7, resume: 93874},
528529
{topic: "ingest", partition: 8, resume: 1300},
529530
{topic: "ingest", partition: 9, resume: 1300},
530531
}, offs)
531532

532-
verifyCommits()
533-
534-
require.Len(t, sched.jobs.jobs, 3, "should be 3 in-progress jobs")
533+
require.Len(t, sched.jobs.jobs, 4, "should be 4 in-progress jobs")
535534
require.Equal(t, 65, int(sched.jobs.epoch))
536535

537536
// Verify that the same set of updates can be sent now that we're out of
@@ -595,11 +594,17 @@ func TestKafkaFlush(t *testing.T) {
595594
ctx := context.Background()
596595
sched.completeObservationMode(ctx)
597596

598-
flushAndRequireOffsets := func(topic string, offsets map[int32]int64, args ...interface{}) {
597+
flushAndRequireOffsets := func(topic string, offsets map[int32]int64, args ...any) {
599598
require.NoError(t, sched.flushOffsetsToKafka(ctx))
600599

601600
offs, err := sched.fetchCommittedOffsets(ctx)
602601
require.NoError(t, err)
602+
offcount := 0
603+
offs.Each(func(o kadm.Offset) {
604+
offcount++
605+
})
606+
require.Equal(t, len(offsets), offcount)
607+
603608
for partition, expected := range offsets {
604609
o, ok := offs.Lookup(topic, partition)
605610
require.True(t, ok, args...)
@@ -609,24 +614,41 @@ func TestKafkaFlush(t *testing.T) {
609614

610615
flushAndRequireOffsets("ingest", map[int32]int64{}, "no group found -> no offsets")
611616

617+
_ = sched.getPartitionState("ingest", 0)
618+
// (No commit yet for p0.)
619+
612620
p1 := sched.getPartitionState("ingest", 1)
613-
p1.committed.set(2000)
621+
p1.initCommit(2000)
614622
flushAndRequireOffsets("ingest", map[int32]int64{
615623
1: 2000,
616624
})
617625

618626
p4 := sched.getPartitionState("ingest", 4)
619-
p4.committed.set(65535)
627+
p4.initCommit(65535)
620628
flushAndRequireOffsets("ingest", map[int32]int64{
621629
1: 2000,
622630
4: 65535,
623631
})
624632

625-
p1.committed.set(4000)
633+
p1.initCommit(4000)
626634
flushAndRequireOffsets("ingest", map[int32]int64{
627635
1: 4000,
628636
4: 65535,
629637
}, "should be able to advance an existing offset")
638+
639+
reg := sched.register.(*prometheus.Registry)
640+
require.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(
641+
`# HELP cortex_blockbuilder_scheduler_partition_committed_offset The observed committed offset of each partition.
642+
# TYPE cortex_blockbuilder_scheduler_partition_committed_offset gauge
643+
cortex_blockbuilder_scheduler_partition_committed_offset{partition="1"} 4000
644+
cortex_blockbuilder_scheduler_partition_committed_offset{partition="4"} 65535
645+
`), "cortex_blockbuilder_scheduler_partition_committed_offset"), "should only modify commit gauge for non-empty commit offsets")
646+
require.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(
647+
`# HELP cortex_blockbuilder_scheduler_partition_planned_offset The planned offset of each partition.
648+
# TYPE cortex_blockbuilder_scheduler_partition_planned_offset gauge
649+
cortex_blockbuilder_scheduler_partition_planned_offset{partition="1"} 4000
650+
cortex_blockbuilder_scheduler_partition_planned_offset{partition="4"} 65535
651+
`), "cortex_blockbuilder_scheduler_partition_planned_offset"), "should only modify planned gauge for non-empty planned offsets")
630652
}
631653

632654
func TestUpdateSchedule(t *testing.T) {
@@ -1235,8 +1257,7 @@ func TestBlockBuilderScheduler_EnqueuePendingJobs_GapDetection(t *testing.T) {
12351257

12361258
assert.Equal(t, 3, pt.pendingJobs.Len())
12371259
assert.Equal(t, 0, sched.jobs.count())
1238-
pt.planned.offset()
1239-
assert.Equal(t, int64(0), pt.planned.offset())
1260+
assert.True(t, pt.planned.empty())
12401261
sched.enqueuePendingJobs()
12411262
assert.Equal(t, 0, pt.pendingJobs.Len())
12421263
assert.Equal(t, 3, sched.jobs.count())
@@ -1293,3 +1314,44 @@ func TestBlockBuilderScheduler_EnqueuePendingJobs_GapDetection(t *testing.T) {
12931314
requireGaps(t, reg, part, 2, commitGaps, "expected %d commit gaps at job %d", commitGaps, j)
12941315
}
12951316
}
1317+
1318+
func TestBlockBuilderScheduler_NoCommit_NoGap(t *testing.T) {
1319+
sched, _ := mustScheduler(t, 4)
1320+
reg := sched.register.(*prometheus.Registry)
1321+
1322+
const part int32 = 1
1323+
requireGaps(t, reg, part, 0, 0)
1324+
1325+
pp := sched.getPartitionState("ingest", part)
1326+
require.True(t, pp.planned.empty())
1327+
require.True(t, pp.committed.empty())
1328+
1329+
k := jobKey{"myjob5", 5}
1330+
spec := schedulerpb.JobSpec{
1331+
Topic: "ingest",
1332+
Partition: part,
1333+
StartOffset: 10,
1334+
EndOffset: 20,
1335+
}
1336+
1337+
pp.planned.advance(k, spec)
1338+
requireGaps(t, reg, part, 0, 0, "advancing an empty planned offset should not register a gap")
1339+
1340+
pp.committed.advance(k, spec)
1341+
requireGaps(t, reg, part, 0, 0, "advancing an empty committed offset should not register a gap")
1342+
1343+
// Now create a gap:
1344+
k2 := jobKey{"myjob7", 23}
1345+
spec2 := schedulerpb.JobSpec{
1346+
Topic: "ingest",
1347+
Partition: part,
1348+
StartOffset: 40,
1349+
EndOffset: 50,
1350+
}
1351+
1352+
pp.planned.advance(k2, spec2)
1353+
requireGaps(t, reg, part, 1, 0, "a gap after a non-empty planned offset should register a gap")
1354+
1355+
pp.committed.advance(k2, spec2)
1356+
requireGaps(t, reg, part, 1, 1, "a gap after a non-empty committed offset should register a gap")
1357+
}

0 commit comments

Comments
 (0)