Skip to content

Commit a535887

Browse files
authored
feat: add tenant status check to scheduler executeJob (#2152)
* feat: add tenant status check to scheduler executeJob * fix: apply tenant status check in catch-up path and strengthen empty-tenant test * fix: correct catch-up skipped record timestamp and executedCount tracking * test: add catch-up path coverage for tenant status checker --------- Co-authored-by: Ben Coombs <bjcoombs@users.noreply.github.com>
1 parent 557f49f commit a535887

4 files changed

Lines changed: 565 additions & 34 deletions

File tree

shared/platform/scheduler/catchup.go

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,8 +115,8 @@ func (s *CronScheduler) catchUpSchedule(ctx context.Context, sched Schedule, now
115115
// Too old to execute: record as MISSED for audit trail.
116116
s.recordMissedWindow(schedCtx, sched, nextTime)
117117
missedCount++
118-
} else {
119-
// Within catch-up age: execute.
118+
} else if s.catchUpWindowEligible(schedCtx, sched, nextTime) {
119+
// Within catch-up age and tenant is active: execute.
120120
s.executeCatchUpWindow(schedCtx, sched, nextTime)
121121
executedCount++
122122
}
@@ -140,6 +140,45 @@ func (s *CronScheduler) catchUpSchedule(ctx context.Context, sched Schedule, now
140140
// itself blocks until catch-up completes or the context is cancelled, so the
141141
// lifecycle already knows work is in progress. Context cancellation (from Stop)
142142
// is checked between iterations in catchUpSchedule.
143+
// catchUpWindowEligible checks tenant status before executing a catch-up window.
144+
// Unlike tenantIsEligible used in executeJob, this records the skipped execution
145+
// with the actual catch-up window timestamp rather than time.Now().
146+
func (s *CronScheduler) catchUpWindowEligible(ctx context.Context, sched Schedule, scheduledAt time.Time) bool {
147+
if s.statusChecker == nil || sched.TenantID == "" {
148+
return true
149+
}
150+
active, err := s.statusChecker.IsActive(ctx, sched.TenantID)
151+
if err != nil {
152+
s.logger.Error("failed to check tenant status for catch-up",
153+
"schedule_id", sched.ID,
154+
"error", err)
155+
return true // fail open
156+
}
157+
if !active {
158+
s.logger.Info("skipping catch-up window for inactive tenant",
159+
"schedule_id", sched.ID,
160+
"tenant_id", sched.TenantID,
161+
"scheduled_at", scheduledAt)
162+
if s.store != nil {
163+
exec := Execution{
164+
ID: uuid.New(),
165+
SchedulerName: s.config.Name,
166+
ScheduleID: sched.ID,
167+
ScheduledAt: scheduledAt,
168+
Status: ExecutionStatusSkipped,
169+
ErrorMessage: strPtr("tenant not active"),
170+
}
171+
if err := s.store.RecordExecution(ctx, exec); err != nil {
172+
s.logger.Error("failed to record catch-up skipped execution",
173+
"schedule_id", sched.ID,
174+
"error", err)
175+
}
176+
}
177+
return false
178+
}
179+
return true
180+
}
181+
143182
func (s *CronScheduler) executeCatchUpWindow(ctx context.Context, sched Schedule, scheduledAt time.Time) {
144183
// Acquire per-schedule lock (consistent with normal executeJob).
145184
if s.lock != nil {

shared/platform/scheduler/catchup_test.go

Lines changed: 220 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package scheduler_test
22

33
import (
44
"context"
5+
"errors"
56
"log/slog"
67
"sync"
78
"testing"
@@ -14,6 +15,8 @@ import (
1415
"github.com/stretchr/testify/require"
1516
)
1617

18+
var errStatusUnavailable = errors.New("status service unavailable")
19+
1720
// secondsParser is a cron.Parser matching the seconds-level cron runner used in tests.
1821
var secondsParser = cron.NewParser(cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow)
1922

@@ -412,6 +415,223 @@ func TestCatchUp_MultipleSchedules_IndependentCatchUp(t *testing.T) {
412415
s.Stop()
413416
}
414417

418+
func TestCatchUp_TenantStatusChecker_ActiveTenant_CatchUpExecutes(t *testing.T) {
419+
cronExpr := "0 */10 * * * *"
420+
now := time.Now().UTC()
421+
lastExecTime := now.Add(-30 * time.Minute)
422+
423+
store := &stubExecutionStore{}
424+
_ = store.RecordExecution(context.Background(), scheduler.Execution{
425+
SchedulerName: "test-scheduler",
426+
ScheduleID: "sched-1",
427+
ScheduledAt: lastExecTime,
428+
Status: scheduler.ExecutionStatusCompleted,
429+
})
430+
431+
provider := &stubProvider{
432+
schedules: []scheduler.Schedule{
433+
{ID: "sched-1", CronExpr: cronExpr, TenantID: "tenant1"},
434+
},
435+
}
436+
executor := &stubExecutor{executeCh: make(chan struct{}, 100)}
437+
lock := &stubLock{acquired: true}
438+
checker := &stubTenantStatusChecker{active: true}
439+
440+
expected := expectedWindowCount(t, cronExpr, lastExecTime, now)
441+
require.Greater(t, expected, 0)
442+
443+
s := newTestScheduler(provider, executor, lock,
444+
scheduler.CronSchedulerConfig{
445+
Name: "test-scheduler",
446+
RefreshInterval: time.Hour,
447+
ShutdownTimeout: 2 * time.Second,
448+
MaxCatchUpAge: time.Hour,
449+
},
450+
scheduler.WithCronExecutionStore(store),
451+
scheduler.WithTenantStatusChecker(checker),
452+
)
453+
454+
ctx, cancel := context.WithCancel(context.Background())
455+
go func() {
456+
_ = s.Start(ctx)
457+
}()
458+
459+
err := await.New().AtMost(5 * time.Second).PollInterval(50 * time.Millisecond).Until(func() bool {
460+
return executor.callCount.Load() >= int32(expected)
461+
})
462+
require.NoError(t, err)
463+
464+
assert.GreaterOrEqual(t, int(executor.callCount.Load()), expected)
465+
466+
cancel()
467+
s.Stop()
468+
}
469+
470+
func TestCatchUp_TenantStatusChecker_InactiveTenant_CatchUpSkipped(t *testing.T) {
471+
cronExpr := "0 */10 * * * *"
472+
now := time.Now().UTC()
473+
lastExecTime := now.Add(-30 * time.Minute)
474+
475+
store := &stubExecutionStore{}
476+
_ = store.RecordExecution(context.Background(), scheduler.Execution{
477+
SchedulerName: "test-scheduler",
478+
ScheduleID: "sched-1",
479+
ScheduledAt: lastExecTime,
480+
Status: scheduler.ExecutionStatusCompleted,
481+
})
482+
483+
provider := &stubProvider{
484+
schedules: []scheduler.Schedule{
485+
{ID: "sched-1", CronExpr: cronExpr, TenantID: "inactive-tenant"},
486+
},
487+
}
488+
executor := &stubExecutor{}
489+
lock := &stubLock{acquired: true}
490+
checker := &stubTenantStatusChecker{active: false}
491+
492+
expected := expectedWindowCount(t, cronExpr, lastExecTime, now)
493+
require.Greater(t, expected, 0)
494+
495+
s := newTestScheduler(provider, executor, lock,
496+
scheduler.CronSchedulerConfig{
497+
Name: "test-scheduler",
498+
RefreshInterval: time.Hour,
499+
ShutdownTimeout: 2 * time.Second,
500+
MaxCatchUpAge: time.Hour,
501+
},
502+
scheduler.WithCronExecutionStore(store),
503+
scheduler.WithTenantStatusChecker(checker),
504+
)
505+
506+
ctx, cancel := context.WithCancel(context.Background())
507+
go func() {
508+
_ = s.Start(ctx)
509+
}()
510+
511+
// Wait until all catch-up windows are recorded as SKIPPED (one per window)
512+
err := await.New().AtMost(5 * time.Second).PollInterval(50 * time.Millisecond).Until(func() bool {
513+
count := 0
514+
for _, e := range store.getExecutions() {
515+
if e.Status == scheduler.ExecutionStatusSkipped &&
516+
e.ErrorMessage != nil && *e.ErrorMessage == "tenant not active" {
517+
count++
518+
}
519+
}
520+
return count >= expected
521+
})
522+
require.NoError(t, err)
523+
524+
assert.Equal(t, int32(0), executor.callCount.Load(),
525+
"executor must not be called for inactive tenant during catch-up")
526+
527+
cancel()
528+
s.Stop()
529+
}
530+
531+
func TestCatchUp_TenantStatusChecker_CheckError_CatchUpProceeds(t *testing.T) {
532+
cronExpr := "0 */10 * * * *"
533+
now := time.Now().UTC()
534+
lastExecTime := now.Add(-30 * time.Minute)
535+
536+
store := &stubExecutionStore{}
537+
_ = store.RecordExecution(context.Background(), scheduler.Execution{
538+
SchedulerName: "test-scheduler",
539+
ScheduleID: "sched-1",
540+
ScheduledAt: lastExecTime,
541+
Status: scheduler.ExecutionStatusCompleted,
542+
})
543+
544+
provider := &stubProvider{
545+
schedules: []scheduler.Schedule{
546+
{ID: "sched-1", CronExpr: cronExpr, TenantID: "tenant1"},
547+
},
548+
}
549+
executor := &stubExecutor{executeCh: make(chan struct{}, 100)}
550+
lock := &stubLock{acquired: true}
551+
checker := &stubTenantStatusChecker{err: errStatusUnavailable}
552+
553+
expected := expectedWindowCount(t, cronExpr, lastExecTime, now)
554+
require.Greater(t, expected, 0)
555+
556+
s := newTestScheduler(provider, executor, lock,
557+
scheduler.CronSchedulerConfig{
558+
Name: "test-scheduler",
559+
RefreshInterval: time.Hour,
560+
ShutdownTimeout: 2 * time.Second,
561+
MaxCatchUpAge: time.Hour,
562+
},
563+
scheduler.WithCronExecutionStore(store),
564+
scheduler.WithTenantStatusChecker(checker),
565+
)
566+
567+
ctx, cancel := context.WithCancel(context.Background())
568+
go func() {
569+
_ = s.Start(ctx)
570+
}()
571+
572+
// Fail open: catch-up proceeds despite status check error
573+
err := await.New().AtMost(5 * time.Second).PollInterval(50 * time.Millisecond).Until(func() bool {
574+
return executor.callCount.Load() >= int32(expected)
575+
})
576+
require.NoError(t, err)
577+
578+
assert.GreaterOrEqual(t, int(executor.callCount.Load()), expected)
579+
580+
cancel()
581+
s.Stop()
582+
}
583+
584+
func TestCatchUp_TenantStatusChecker_NoChecker_CatchUpProceeds(t *testing.T) {
585+
cronExpr := "0 */10 * * * *"
586+
now := time.Now().UTC()
587+
lastExecTime := now.Add(-30 * time.Minute)
588+
589+
store := &stubExecutionStore{}
590+
_ = store.RecordExecution(context.Background(), scheduler.Execution{
591+
SchedulerName: "test-scheduler",
592+
ScheduleID: "sched-1",
593+
ScheduledAt: lastExecTime,
594+
Status: scheduler.ExecutionStatusCompleted,
595+
})
596+
597+
provider := &stubProvider{
598+
schedules: []scheduler.Schedule{
599+
{ID: "sched-1", CronExpr: cronExpr, TenantID: "tenant1"},
600+
},
601+
}
602+
executor := &stubExecutor{executeCh: make(chan struct{}, 100)}
603+
lock := &stubLock{acquired: true}
604+
605+
expected := expectedWindowCount(t, cronExpr, lastExecTime, now)
606+
require.Greater(t, expected, 0)
607+
608+
// No WithTenantStatusChecker option
609+
s := newTestScheduler(provider, executor, lock,
610+
scheduler.CronSchedulerConfig{
611+
Name: "test-scheduler",
612+
RefreshInterval: time.Hour,
613+
ShutdownTimeout: 2 * time.Second,
614+
MaxCatchUpAge: time.Hour,
615+
},
616+
scheduler.WithCronExecutionStore(store),
617+
)
618+
619+
ctx, cancel := context.WithCancel(context.Background())
620+
go func() {
621+
_ = s.Start(ctx)
622+
}()
623+
624+
err := await.New().AtMost(5 * time.Second).PollInterval(50 * time.Millisecond).Until(func() bool {
625+
return executor.callCount.Load() >= int32(expected)
626+
})
627+
require.NoError(t, err)
628+
629+
assert.GreaterOrEqual(t, int(executor.callCount.Load()), expected)
630+
631+
cancel()
632+
s.Stop()
633+
}
634+
415635
func TestCatchUp_LockNotAcquired_SkipsCatchUp(t *testing.T) {
416636
// When the catch-up lock cannot be acquired, no catch-up should happen.
417637
store := &stubExecutionStore{}

0 commit comments

Comments
 (0)