@@ -370,23 +370,23 @@ func TestAssignJobSkipsObsoleteOffsets_PriorScheduler(t *testing.T) {
370370
371371func TestObservations (t * testing.T ) {
372372 sched , _ := mustScheduler (t , 10 )
373- // Initially we're in observation mode. We have Kafka's start offsets, but no client jobs.
373+ // Initially we're in observation mode. We have Kafka's commit offsets, but no client jobs.
374374
375375 sched .getPartitionState ("ingest" , 1 ).initCommit (5000 )
376376 sched .getPartitionState ("ingest" , 2 ).initCommit (800 )
377377 sched .getPartitionState ("ingest" , 3 ).initCommit (974 )
378378 sched .getPartitionState ("ingest" , 4 ).initCommit (500 )
379379 sched .getPartitionState ("ingest" , 5 ).initCommit (12000 )
380- // no 6
381- // no 7
380+ sched . getPartitionState ( "ingest" , 6 ) // no commit for 6
381+ sched . getPartitionState ( "ingest" , 7 ) // no commit for 7
382382 sched .getPartitionState ("ingest" , 8 ).initCommit (1000 )
383383 sched .getPartitionState ("ingest" , 9 ).initCommit (1000 )
384384
385385 {
386386 nq := newJobQueue (988 * time .Hour , noOpJobCreationPolicy [schedulerpb.JobSpec ]{}, 2 , sched .metrics , test .NewTestingLogger (t ))
387387 sched .jobs = nq
388388 sched .finalizeObservations ()
389- require .Len (t , nq .jobs , 0 , "No observations, no jobs" )
389+ require .Empty (t , nq .jobs , "No observations, no jobs" )
390390 }
391391
392392 type observation struct {
@@ -448,13 +448,12 @@ func TestObservations(t *testing.T) {
448448 mkJob (inProgress , "w103" , 5 , "ingest/5/12000" , 33 , 12000 , 13000 , maybeBadEpoch , errBadEpoch )
449449 mkJob (inProgress , "w104" , 5 , "ingest/5/12000" , 34 , 12000 , 13000 , nil , nil )
450450
451- // Partition 6 has a complete job, but wasn't among the offsets we learned
452- // from Kafka. We'll drop this as the start offset is nonzero, but not-found
453- // completed jobs are ignored on update.
451+ // Partition 6 has a complete job but had no commit at startup. We allow
452+ // transitioning from empty to commit to any offset.
454453 mkJob (complete , "w0" , 6 , "ingest/6/500" , 48 , 500 , 600 , nil , nil )
455- // Partition 7 has an in-progress job, but wasn't among the offsets we
456- // learned from Kafka. We'll drop this one, too .
457- mkJob (inProgress , "w1" , 7 , "ingest/7/92874" , 52 , 92874 , 93874 , nil , errJobNotFound )
454+ // Partition 7 has an in-progress job, but had no commit at startup. We
455+ // honor this job and allow it to influence the planned/resumption offset .
456+ mkJob (inProgress , "w1" , 7 , "ingest/7/92874" , 52 , 92874 , 93874 , nil , nil )
458457
459458 // Partition 8 has a number of reports and has a hole that should should not be passed.
460459 mkJob (complete , "w0" , 8 , "ingest/8/1000" , 53 , 1000 , 1100 , nil , nil )
@@ -487,11 +486,11 @@ func TestObservations(t *testing.T) {
487486 }
488487
489488 if errors .Is (expectedErr , maybeBadEpoch ) {
490- assert .True (t , errors .Is (err , errBadEpoch ) || err == nil , "job %V: expected either bad epoch or no error, got %v" , c .key , err )
489+ require .True (t , errors .Is (err , errBadEpoch ) || err == nil , "job %V: expected either bad epoch or no error, got %v" , c .key , err )
491490 } else if expectedErr != nil {
492- assert .ErrorIs (t , err , expectedErr , "job %V: expected %v, got %v" , c .key , expectedErr , err )
491+ require .ErrorIs (t , err , expectedErr , "job %V: expected %v, got %v" , c .key , expectedErr , err )
493492 } else {
494- assert .NoError (t , err , "job %V: expected no error" , c .key )
493+ require .NoError (t , err , "job %V: expected no error" , c .key )
495494 }
496495 }
497496 })
@@ -504,15 +503,17 @@ func TestObservations(t *testing.T) {
504503 sched .requireOffset (t , "ingest" , 3 , 974 , "ingest/3 should be unchanged - no updates" )
505504 sched .requireOffset (t , "ingest" , 4 , 900 , "ingest/4 should be moved forward to account for the completed jobs" )
506505 sched .requireOffset (t , "ingest" , 5 , 12000 , "ingest/5 has nothing new completed" )
507- sched .requireOffset (t , "ingest" , 6 , 0 , "ingest/6 should not have been added to the offsets as there was a gap " )
508- sched .requireOffset (t , "ingest" , 7 , 0 , "ingest/7 should not have been added to the offsets as there was a gap " )
506+ sched .requireOffset (t , "ingest" , 6 , 600 , "ingest/6 allowed to move the commit " )
507+ sched .requireOffset (t , "ingest" , 7 , offsetEmpty , "ingest/7 has an in-progress job, but had no commit at startup " )
509508 sched .requireOffset (t , "ingest" , 8 , 1300 , "ingest/8 should be committed only until the gap" )
510509 sched .requireOffset (t , "ingest" , 9 , 1300 , "ingest/9 should be committed only until the gap" )
511510 }
512511
513512 sendUpdates ()
514513 sched .completeObservationMode (context .Background ())
515514
515+ verifyCommits ()
516+
516517 // Make sure the resumption offsets account for the gaps.
517518 offs , err := sched .consumptionOffsets (context .Background (), "ingest" , time .Now ())
518519 require .NoError (t , err )
@@ -523,15 +524,13 @@ func TestObservations(t *testing.T) {
523524 {topic : "ingest" , partition : 3 , resume : 974 },
524525 {topic : "ingest" , partition : 4 , resume : 900 },
525526 {topic : "ingest" , partition : 5 , resume : 13000 },
526- {topic : "ingest" , partition : 6 , resume : 0 },
527- {topic : "ingest" , partition : 7 , resume : 0 },
527+ {topic : "ingest" , partition : 6 , resume : 600 },
528+ {topic : "ingest" , partition : 7 , resume : 93874 },
528529 {topic : "ingest" , partition : 8 , resume : 1300 },
529530 {topic : "ingest" , partition : 9 , resume : 1300 },
530531 }, offs )
531532
532- verifyCommits ()
533-
534- require .Len (t , sched .jobs .jobs , 3 , "should be 3 in-progress jobs" )
533+ require .Len (t , sched .jobs .jobs , 4 , "should be 4 in-progress jobs" )
535534 require .Equal (t , 65 , int (sched .jobs .epoch ))
536535
537536 // Verify that the same set of updates can be sent now that we're out of
@@ -595,11 +594,17 @@ func TestKafkaFlush(t *testing.T) {
595594 ctx := context .Background ()
596595 sched .completeObservationMode (ctx )
597596
598- flushAndRequireOffsets := func (topic string , offsets map [int32 ]int64 , args ... interface {} ) {
597+ flushAndRequireOffsets := func (topic string , offsets map [int32 ]int64 , args ... any ) {
599598 require .NoError (t , sched .flushOffsetsToKafka (ctx ))
600599
601600 offs , err := sched .fetchCommittedOffsets (ctx )
602601 require .NoError (t , err )
602+ offcount := 0
603+ offs .Each (func (o kadm.Offset ) {
604+ offcount ++
605+ })
606+ require .Equal (t , len (offsets ), offcount )
607+
603608 for partition , expected := range offsets {
604609 o , ok := offs .Lookup (topic , partition )
605610 require .True (t , ok , args ... )
@@ -609,24 +614,41 @@ func TestKafkaFlush(t *testing.T) {
609614
610615 flushAndRequireOffsets ("ingest" , map [int32 ]int64 {}, "no group found -> no offsets" )
611616
617+ _ = sched .getPartitionState ("ingest" , 0 )
618+ // (No commit yet for p0.)
619+
612620 p1 := sched .getPartitionState ("ingest" , 1 )
613- p1 .committed . set (2000 )
621+ p1 .initCommit (2000 )
614622 flushAndRequireOffsets ("ingest" , map [int32 ]int64 {
615623 1 : 2000 ,
616624 })
617625
618626 p4 := sched .getPartitionState ("ingest" , 4 )
619- p4 .committed . set (65535 )
627+ p4 .initCommit (65535 )
620628 flushAndRequireOffsets ("ingest" , map [int32 ]int64 {
621629 1 : 2000 ,
622630 4 : 65535 ,
623631 })
624632
625- p1 .committed . set (4000 )
633+ p1 .initCommit (4000 )
626634 flushAndRequireOffsets ("ingest" , map [int32 ]int64 {
627635 1 : 4000 ,
628636 4 : 65535 ,
629637 }, "should be able to advance an existing offset" )
638+
639+ reg := sched .register .(* prometheus.Registry )
640+ require .NoError (t , promtest .GatherAndCompare (reg , strings .NewReader (
641+ `# HELP cortex_blockbuilder_scheduler_partition_committed_offset The observed committed offset of each partition.
642+ # TYPE cortex_blockbuilder_scheduler_partition_committed_offset gauge
643+ cortex_blockbuilder_scheduler_partition_committed_offset{partition="1"} 4000
644+ cortex_blockbuilder_scheduler_partition_committed_offset{partition="4"} 65535
645+ ` ), "cortex_blockbuilder_scheduler_partition_committed_offset" ), "should only modify commit gauge for non-empty commit offsets" )
646+ require .NoError (t , promtest .GatherAndCompare (reg , strings .NewReader (
647+ `# HELP cortex_blockbuilder_scheduler_partition_planned_offset The planned offset of each partition.
648+ # TYPE cortex_blockbuilder_scheduler_partition_planned_offset gauge
649+ cortex_blockbuilder_scheduler_partition_planned_offset{partition="1"} 4000
650+ cortex_blockbuilder_scheduler_partition_planned_offset{partition="4"} 65535
651+ ` ), "cortex_blockbuilder_scheduler_partition_planned_offset" ), "should only modify planned gauge for non-empty planned offsets" )
630652}
631653
632654func TestUpdateSchedule (t * testing.T ) {
@@ -1235,8 +1257,7 @@ func TestBlockBuilderScheduler_EnqueuePendingJobs_GapDetection(t *testing.T) {
12351257
12361258 assert .Equal (t , 3 , pt .pendingJobs .Len ())
12371259 assert .Equal (t , 0 , sched .jobs .count ())
1238- pt .planned .offset ()
1239- assert .Equal (t , int64 (0 ), pt .planned .offset ())
1260+ assert .True (t , pt .planned .empty ())
12401261 sched .enqueuePendingJobs ()
12411262 assert .Equal (t , 0 , pt .pendingJobs .Len ())
12421263 assert .Equal (t , 3 , sched .jobs .count ())
@@ -1293,3 +1314,44 @@ func TestBlockBuilderScheduler_EnqueuePendingJobs_GapDetection(t *testing.T) {
12931314 requireGaps (t , reg , part , 2 , commitGaps , "expected %d commit gaps at job %d" , commitGaps , j )
12941315 }
12951316}
1317+
1318+ func TestBlockBuilderScheduler_NoCommit_NoGap (t * testing.T ) {
1319+ sched , _ := mustScheduler (t , 4 )
1320+ reg := sched .register .(* prometheus.Registry )
1321+
1322+ const part int32 = 1
1323+ requireGaps (t , reg , part , 0 , 0 )
1324+
1325+ pp := sched .getPartitionState ("ingest" , part )
1326+ require .True (t , pp .planned .empty ())
1327+ require .True (t , pp .committed .empty ())
1328+
1329+ k := jobKey {"myjob5" , 5 }
1330+ spec := schedulerpb.JobSpec {
1331+ Topic : "ingest" ,
1332+ Partition : part ,
1333+ StartOffset : 10 ,
1334+ EndOffset : 20 ,
1335+ }
1336+
1337+ pp .planned .advance (k , spec )
1338+ requireGaps (t , reg , part , 0 , 0 , "advancing an empty planned offset should not register a gap" )
1339+
1340+ pp .committed .advance (k , spec )
1341+ requireGaps (t , reg , part , 0 , 0 , "advancing an empty committed offset should not register a gap" )
1342+
1343+ // Now create a gap:
1344+ k2 := jobKey {"myjob7" , 23 }
1345+ spec2 := schedulerpb.JobSpec {
1346+ Topic : "ingest" ,
1347+ Partition : part ,
1348+ StartOffset : 40 ,
1349+ EndOffset : 50 ,
1350+ }
1351+
1352+ pp .planned .advance (k2 , spec2 )
1353+ requireGaps (t , reg , part , 1 , 0 , "a gap after a non-empty planned offset should register a gap" )
1354+
1355+ pp .committed .advance (k2 , spec2 )
1356+ requireGaps (t , reg , part , 1 , 1 , "a gap after a non-empty committed offset should register a gap" )
1357+ }
0 commit comments