Skip to content

Commit fd52378

Browse files
authored
feat: add concurrency limits and refresh jitter to scheduler (#2150)
* feat: add concurrency limits and refresh jitter to scheduler Add global and per-tenant concurrency semaphores to prevent DB connection pool exhaustion when schedules align, and stop a single noisy tenant from consuming all execution slots. Add configurable jitter to schedule refresh interval to prevent thundering herd across replicas. Execution order: global semaphore -> tenant context -> per-tenant semaphore -> distributed lock -> execute. New config fields with defaults: - MaxConcurrentExecutions: 20 - MaxConcurrentPerTenant: 3 - RefreshJitterMax: 0 (opt-in) * fix: address review feedback on scheduler resilience - Fix doc comment: RefreshJitterMax default is 0 (disabled), not 10s - Use context-aware select instead of time.Sleep for jitter to avoid blocking shutdown - Move tenant context setup before global semaphore so skipped execution audit records are properly tenant-scoped * fix: increase test timeouts for slow CI runners The concurrency semaphore tests used 5-second timeouts for waiting on cron ticks, which is too tight on resource-constrained shared runners. Increased to 15s for initial execution waits and 10s for condition polling to eliminate flaky failures. * fix: fix flaky global semaphore test and remove duplicate secondsParser - Buffer the blocked channel to match MaxConcurrentExecutions so executor signals are never lost to the default branch - Increase test timeouts for CI runners (15s wait, 10s await) - Remove duplicate secondsParser declaration (already in catchup_test.go) - Remove unused WithCronParser from cron_test.go (only needed for catch-up) --------- Co-authored-by: Ben Coombs <bjcoombs@users.noreply.github.com>
1 parent b93c419 commit fd52378

2 files changed

Lines changed: 357 additions & 13 deletions

File tree

shared/platform/scheduler/cron.go

Lines changed: 105 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"fmt"
66
"log/slog"
7+
"math/rand/v2"
78
"sync"
89
"time"
910

@@ -44,6 +45,9 @@ type CronSchedulerConfig struct {
4445
Name string
4546
// RefreshInterval is how often to reload schedules from the provider.
4647
RefreshInterval time.Duration
48+
// RefreshJitterMax is the maximum random jitter added after each refresh tick
49+
// to prevent thundering herd across replicas. Default: 0 (disabled).
50+
RefreshJitterMax time.Duration
4751
// ShutdownTimeout is the maximum time to wait for in-flight jobs during shutdown.
4852
ShutdownTimeout time.Duration
4953
// ExecutionTimeout is the maximum time a single job execution can take.
@@ -52,9 +56,16 @@ type CronSchedulerConfig struct {
5256
// Windows older than this are recorded as MISSED but not executed.
5357
// Default: 1 hour.
5458
MaxCatchUpAge time.Duration
59+
// MaxConcurrentExecutions is the maximum number of jobs that can execute
60+
// concurrently across all tenants. Default: 20.
61+
MaxConcurrentExecutions int
62+
// MaxConcurrentPerTenant is the maximum number of jobs that can execute
63+
// concurrently for a single tenant. Default: 3.
64+
MaxConcurrentPerTenant int
5565
}
5666

57-
func (c CronSchedulerConfig) withDefaults() CronSchedulerConfig {
67+
// WithDefaults returns a copy of the config with zero-value fields set to defaults.
68+
func (c CronSchedulerConfig) WithDefaults() CronSchedulerConfig {
5869
if c.Name == "" {
5970
c.Name = "cron-scheduler"
6071
}
@@ -70,6 +81,12 @@ func (c CronSchedulerConfig) withDefaults() CronSchedulerConfig {
7081
if c.MaxCatchUpAge <= 0 {
7182
c.MaxCatchUpAge = time.Hour
7283
}
84+
if c.MaxConcurrentExecutions <= 0 {
85+
c.MaxConcurrentExecutions = 20
86+
}
87+
if c.MaxConcurrentPerTenant <= 0 {
88+
c.MaxConcurrentPerTenant = 3
89+
}
7390
return c
7491
}
7592

@@ -90,6 +107,10 @@ type CronScheduler struct {
90107
mu sync.Mutex
91108
entryIDs map[string]cron.EntryID
92109
schedules map[string]Schedule
110+
111+
semaphore chan struct{}
112+
tenantSemaphores map[string]chan struct{}
113+
tenantSemMu sync.Mutex
93114
}
94115

95116
// NewCronScheduler creates a new CronScheduler. The store parameter is optional;
@@ -102,7 +123,7 @@ func NewCronScheduler(
102123
logger *slog.Logger,
103124
opts ...CronSchedulerOption,
104125
) *CronScheduler {
105-
config = config.withDefaults()
126+
config = config.WithDefaults()
106127
if logger == nil {
107128
logger = slog.Default()
108129
}
@@ -115,16 +136,18 @@ func NewCronScheduler(
115136
)
116137

117138
s := &CronScheduler{
118-
lifecycle: NewWorkerLifecycle(logger),
119-
provider: provider,
120-
executor: executor,
121-
lock: lock,
122-
config: config,
123-
logger: logger.With("component", config.Name),
124-
cron: cronRunner,
125-
parser: defaultParser,
126-
entryIDs: make(map[string]cron.EntryID),
127-
schedules: make(map[string]Schedule),
139+
lifecycle: NewWorkerLifecycle(logger),
140+
provider: provider,
141+
executor: executor,
142+
lock: lock,
143+
config: config,
144+
logger: logger.With("component", config.Name),
145+
cron: cronRunner,
146+
parser: defaultParser,
147+
entryIDs: make(map[string]cron.EntryID),
148+
schedules: make(map[string]Schedule),
149+
semaphore: make(chan struct{}, config.MaxConcurrentExecutions),
150+
tenantSemaphores: make(map[string]chan struct{}),
128151
}
129152

130153
for _, opt := range opts {
@@ -195,6 +218,14 @@ func (s *CronScheduler) Start(ctx context.Context) error {
195218
s.logger.Info("cron scheduler stopping", "name", s.config.Name)
196219
return nil
197220
case <-refreshTicker.C:
221+
if s.config.RefreshJitterMax > 0 {
222+
jitter := time.Duration(rand.Int64N(int64(s.config.RefreshJitterMax)))
223+
select {
224+
case <-workCtx.Done():
225+
continue
226+
case <-time.After(jitter):
227+
}
228+
}
198229
if err := s.refreshSchedules(workCtx); err != nil {
199230
s.logger.Error("failed to refresh schedules", "error", err)
200231
}
@@ -290,19 +321,68 @@ func (s *CronScheduler) addSchedule(sched Schedule) (cron.EntryID, error) {
290321
return entryID, nil
291322
}
292323

324+
// acquireGlobalSemaphore tries to acquire the global concurrency semaphore.
325+
// Returns a release function and true if acquired, or nil and false if the limit is reached.
326+
func (s *CronScheduler) acquireGlobalSemaphore(ctx context.Context, schedule Schedule) (func(), bool) {
327+
select {
328+
case s.semaphore <- struct{}{}:
329+
return func() { <-s.semaphore }, true
330+
default:
331+
s.logger.Warn("global concurrency limit reached, skipping",
332+
"schedule_id", schedule.ID,
333+
"tenant_id", schedule.TenantID)
334+
s.recordExecution(ctx, schedule, ExecutionStatusSkipped, nil, strPtr("concurrency limit reached"))
335+
return nil, false
336+
}
337+
}
338+
339+
// acquireTenantSemaphore tries to acquire the per-tenant concurrency semaphore.
340+
// Returns a release function and true if acquired, or nil and false if the limit is reached.
341+
func (s *CronScheduler) acquireTenantSemaphore(ctx context.Context, schedule Schedule) (func(), bool) {
342+
if schedule.TenantID == "" {
343+
return func() {}, true
344+
}
345+
tenantSem := s.getOrCreateTenantSemaphore(schedule.TenantID)
346+
select {
347+
case tenantSem <- struct{}{}:
348+
return func() { <-tenantSem }, true
349+
default:
350+
s.logger.Warn("per-tenant concurrency limit reached, skipping",
351+
"schedule_id", schedule.ID,
352+
"tenant_id", schedule.TenantID)
353+
s.recordExecution(ctx, schedule, ExecutionStatusSkipped, nil,
354+
strPtr(fmt.Sprintf("per-tenant concurrency limit reached for tenant %s", schedule.TenantID)))
355+
return nil, false
356+
}
357+
}
358+
293359
// executeJob runs a single scheduled job with distributed locking and audit trail.
294360
func (s *CronScheduler) executeJob(schedule Schedule) {
295361
s.lifecycle.ExecuteGuarded(func() {
296362
ctx, cancel := context.WithTimeout(context.Background(), s.config.ExecutionTimeout)
297363
defer cancel()
298364

299-
// Propagate tenant context so ExecutionStore can scope to the correct schema
365+
// Propagate tenant context first so all audit records are properly scoped
300366
if schedule.TenantID != "" {
301367
if tid, err := tenant.NewTenantID(schedule.TenantID); err == nil {
302368
ctx = tenant.WithTenant(ctx, tid)
303369
}
304370
}
305371

372+
// Acquire global concurrency semaphore
373+
releaseGlobal, ok := s.acquireGlobalSemaphore(ctx, schedule)
374+
if !ok {
375+
return
376+
}
377+
defer releaseGlobal()
378+
379+
// Acquire per-tenant concurrency semaphore
380+
releaseTenant, ok := s.acquireTenantSemaphore(ctx, schedule)
381+
if !ok {
382+
return
383+
}
384+
defer releaseTenant()
385+
306386
// Acquire distributed lock
307387
if s.lock != nil {
308388
acquired, release, err := s.lock.Acquire(ctx, schedule.TenantID, s.lockKey(schedule.ID))
@@ -345,6 +425,18 @@ func (s *CronScheduler) executeJob(schedule Schedule) {
345425
})
346426
}
347427

428+
// getOrCreateTenantSemaphore lazily creates a per-tenant semaphore channel.
429+
func (s *CronScheduler) getOrCreateTenantSemaphore(tenantID string) chan struct{} {
430+
s.tenantSemMu.Lock()
431+
defer s.tenantSemMu.Unlock()
432+
sem, ok := s.tenantSemaphores[tenantID]
433+
if !ok {
434+
sem = make(chan struct{}, s.config.MaxConcurrentPerTenant)
435+
s.tenantSemaphores[tenantID] = sem
436+
}
437+
return sem
438+
}
439+
348440
func (s *CronScheduler) lockKey(scheduleID string) string {
349441
return fmt.Sprintf("%s:%s", s.config.Name, scheduleID)
350442
}

0 commit comments

Comments
 (0)