Skip to content

Commit f4c6d14

Browse files
feat: add scheduler lifecycle monitoring <> #785 (#889)
* git commit -m "feat: add SchedulerStarted and SchedulerStopped monitoring" * fix lint issue * Update errors.go Co-authored-by: John Roesler <[email protected]> * feat() updated with remaning metrics & events (JobRegistered/JobUnregistered, JobStarted/JobRunning/JobFailed/JobCompleted) * feat: enhance scheduler and job observability by adding new monitor events for lifecycle, performance, and concurrency limits. * docs: expand metrics section to include scheduler lifecycle events and `SchedulerMonitor` details with Prometheus example * refactor: conditionally send scheduler notifications only when a scheduler monitor is configured. --------- Co-authored-by: John Roesler <[email protected]>
1 parent 9cc3be7 commit f4c6d14

File tree

10 files changed

+946
-9
lines changed

10 files changed

+946
-9
lines changed

README.md

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,12 +169,52 @@ The Logger interface can be implemented with your desired logging library.
169169
The provided NewLogger uses the standard library's log package.
170170

171171
### Metrics
172-
Metrics may be collected from the execution of each job.
172+
Metrics may be collected from the execution of each job and scheduler lifecycle events.
173173
- [**Monitor**](https://pkg.go.dev/github.com/go-co-op/gocron/v2#Monitor):
174174
- [**MonitorStatus**](https://pkg.go.dev/github.com/go-co-op/gocron/v2#MonitorStatus) (includes status and error (if any) of the Job)
175175
A monitor can be used to collect metrics for each job from a scheduler.
176176
- Implementations: [go-co-op monitors](https://github.com/go-co-op?q=-monitor&type=all&language=&sort=)
177177
(don't see what you need? request on slack to get a repo created to contribute it!)
178+
- [**SchedulerMonitor**](https://pkg.go.dev/github.com/go-co-op/gocron/v2#SchedulerMonitor):
179+
A scheduler monitor provides comprehensive observability into scheduler and job lifecycle events.
180+
181+
**Available Metrics:**
182+
- **Scheduler Lifecycle**: `SchedulerStarted`, `SchedulerStopped`, `SchedulerShutdown`
183+
- **Job Management**: `JobRegistered`, `JobUnregistered` - track jobs added/removed from scheduler
184+
- **Job Execution**: `JobStarted`, `JobRunning`, `JobCompleted`, `JobFailed` - monitor job execution flow
185+
- **Performance**: `JobExecutionTime`, `JobSchedulingDelay` - measure job duration and scheduling lag
186+
- **Concurrency**: `ConcurrencyLimitReached` - detect when singleton or limit mode constraints are hit
187+
188+
**Derived Metrics** (calculable from events):
189+
- Error rate: `JobFailed / (JobCompleted + JobFailed)`
190+
- Average execution time: from `JobExecutionTime` events
191+
- Active jobs: `JobRegistered - JobUnregistered`
192+
- Current queue depth: `JobStarted - (JobCompleted + JobFailed)`
193+
194+
**Example - Prometheus Integration:**
195+
```go
196+
type PrometheusMonitor struct {
197+
jobsCompleted prometheus.Counter
198+
jobsFailed prometheus.Counter
199+
executionTime prometheus.Histogram
200+
schedulingDelay prometheus.Histogram
201+
}
202+
203+
func (p *PrometheusMonitor) JobExecutionTime(job gocron.Job, duration time.Duration) {
204+
p.executionTime.Observe(duration.Seconds())
205+
}
206+
207+
func (p *PrometheusMonitor) JobSchedulingDelay(job gocron.Job, scheduled, actual time.Time) {
208+
if delay := actual.Sub(scheduled); delay > 0 {
209+
p.schedulingDelay.Observe(delay.Seconds())
210+
}
211+
}
212+
213+
// Initialize scheduler with monitor
214+
s, _ := gocron.NewScheduler(gocron.WithSchedulerMonitor(monitor))
215+
```
216+
217+
**Use Cases:** Prometheus metrics, custom dashboards, alerting systems, performance monitoring
178218

179219
### Testing
180220
The gocron library is set up to enable testing.

errors.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ var (
4848
ErrWithDistributedLockerNil = errors.New("gocron: WithDistributedLocker: locker must not be nil")
4949
ErrWithDistributedJobLockerNil = errors.New("gocron: WithDistributedJobLocker: locker must not be nil")
5050
ErrWithIdentifierNil = errors.New("gocron: WithIdentifier: identifier must not be nil")
51+
ErrSchedulerMonitorNil = errors.New("gocron: WithSchedulerMonitor: monitor must not be nil")
5152
ErrWithLimitConcurrentJobsZero = errors.New("gocron: WithLimitConcurrentJobs: limit must be greater than 0")
5253
ErrWithLocationNil = errors.New("gocron: WithLocation: location must not be nil")
5354
ErrWithLoggerNil = errors.New("gocron: WithLogger: logger must not be nil")
@@ -59,6 +60,7 @@ var (
5960
ErrStartTimeLaterThanEndTime = errors.New("gocron: WithStartDateTime: start must not be later than end")
6061
ErrStopTimeEarlierThanStartTime = errors.New("gocron: WithStopDateTime: end must not be earlier than start")
6162
ErrWithStopTimeoutZeroOrNegative = errors.New("gocron: WithStopTimeout: timeout must be greater than 0")
63+
ErrWithSchedulerMonitorNil = errors.New("gocron: WithSchedulerMonitor: scheduler monitor cannot be nil")
6264
ErrWithLimitedRunsZero = errors.New("gocron: WithLimitedRuns: limit must be greater than 0")
6365
)
6466

executor.go

Lines changed: 51 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ type executor struct {
5656
monitor Monitor
5757
// monitorStatus for reporting metrics
5858
monitorStatus MonitorStatus
59+
// reference to parent scheduler for lifecycle notifications
60+
scheduler *scheduler
5961
}
6062

6163
type jobIn struct {
@@ -155,6 +157,15 @@ func (e *executor) start() {
155157
// all runners are busy, reschedule the work for later
156158
// which means we just skip it here and do nothing
157159
// TODO when metrics are added, this should increment a rescheduled metric
160+
// Notify concurrency limit reached if monitor is configured
161+
if e.scheduler != nil && e.scheduler.schedulerMonitor != nil {
162+
ctx2, cancel2 := context.WithCancel(executorCtx)
163+
job := requestJobCtx(ctx2, jIn.id, e.jobOutRequest)
164+
cancel2()
165+
if job != nil {
166+
e.scheduler.notifyConcurrencyLimitReached("limit", e.scheduler.jobFromInternalJob(*job))
167+
}
168+
}
158169
e.sendOutForRescheduling(&jIn)
159170
}
160171
} else {
@@ -209,6 +220,10 @@ func (e *executor) start() {
209220
// which means we just skip it here and do nothing
210221
e.incrementJobCounter(*j, SingletonRescheduled)
211222
e.sendOutForRescheduling(&jIn)
223+
// Notify concurrency limit reached if monitor is configured
224+
if e.scheduler != nil && e.scheduler.schedulerMonitor != nil {
225+
e.scheduler.notifyConcurrencyLimitReached("singleton", e.scheduler.jobFromInternalJob(*j))
226+
}
212227
}
213228
} else {
214229
// wait mode, fill up that queue (buffered channel, so it's ok)
@@ -416,18 +431,36 @@ func (e *executor) runJob(j internalJob, jIn jobIn) {
416431

417432
_ = callJobFuncWithParams(j.beforeJobRuns, j.id, j.name)
418433

434+
// Notify job started
435+
actualStartTime := time.Now()
436+
if e.scheduler != nil && e.scheduler.schedulerMonitor != nil {
437+
jobObj := e.scheduler.jobFromInternalJob(j)
438+
e.scheduler.notifyJobStarted(jobObj)
439+
// Notify scheduling delay if job had a scheduled time
440+
if len(j.nextScheduled) > 0 {
441+
e.scheduler.notifyJobSchedulingDelay(jobObj, j.nextScheduled[0], actualStartTime)
442+
}
443+
}
444+
419445
err := callJobFuncWithParams(j.beforeJobRunsSkipIfBeforeFuncErrors, j.id, j.name)
420446
if err != nil {
421447
e.sendOutForRescheduling(&jIn)
422-
423448
select {
424449
case e.jobsOutCompleted <- j.id:
425450
case <-e.ctx.Done():
426451
}
427-
452+
// Notify job failed (before actual run)
453+
if e.scheduler != nil && e.scheduler.schedulerMonitor != nil {
454+
e.scheduler.notifyJobFailed(e.scheduler.jobFromInternalJob(j), err)
455+
}
428456
return
429457
}
430458

459+
// Notify job running
460+
if e.scheduler != nil && e.scheduler.schedulerMonitor != nil {
461+
e.scheduler.notifyJobRunning(e.scheduler.jobFromInternalJob(j))
462+
}
463+
431464
// For intervalFromCompletion, we need to reschedule AFTER the job completes,
432465
// not before. For regular jobs, we reschedule before execution (existing behavior).
433466
if !j.intervalFromCompletion {
@@ -448,11 +481,25 @@ func (e *executor) runJob(j internalJob, jIn jobIn) {
448481
if err != nil {
449482
_ = callJobFuncWithParams(j.afterJobRunsWithError, j.id, j.name, err)
450483
e.incrementJobCounter(j, Fail)
451-
e.recordJobTimingWithStatus(startTime, time.Now(), j, Fail, err)
484+
endTime := time.Now()
485+
e.recordJobTimingWithStatus(startTime, endTime, j, Fail, err)
486+
// Notify job failed
487+
if e.scheduler != nil && e.scheduler.schedulerMonitor != nil {
488+
jobObj := e.scheduler.jobFromInternalJob(j)
489+
e.scheduler.notifyJobFailed(jobObj, err)
490+
e.scheduler.notifyJobExecutionTime(jobObj, endTime.Sub(startTime))
491+
}
452492
} else {
453493
_ = callJobFuncWithParams(j.afterJobRuns, j.id, j.name)
454494
e.incrementJobCounter(j, Success)
455-
e.recordJobTimingWithStatus(startTime, time.Now(), j, Success, nil)
495+
endTime := time.Now()
496+
e.recordJobTimingWithStatus(startTime, endTime, j, Success, nil)
497+
// Notify job completed
498+
if e.scheduler != nil && e.scheduler.schedulerMonitor != nil {
499+
jobObj := e.scheduler.jobFromInternalJob(j)
500+
e.scheduler.notifyJobCompleted(jobObj)
501+
e.scheduler.notifyJobExecutionTime(jobObj, endTime.Sub(startTime))
502+
}
456503
}
457504

458505
// For intervalFromCompletion, reschedule AFTER the job completes

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
module github.com/go-co-op/gocron/v2
22

3-
go 1.24.0
3+
go 1.21.4
44

55
require (
66
github.com/google/uuid v1.6.0
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
package main
2+
3+
import (
4+
"fmt"
5+
"time"
6+
7+
"github.com/go-co-op/gocron/v2"
8+
)
9+
10+
type DebugMonitor struct {
11+
startCount int
12+
stopCount int
13+
jobRegCount int
14+
jobUnregCount int
15+
jobStartCount int
16+
jobRunningCount int
17+
jobCompletCount int
18+
jobFailCount int
19+
}
20+
21+
func (m *DebugMonitor) SchedulerStarted() {
22+
m.startCount++
23+
fmt.Printf("✓ SchedulerStarted() called (total: %d)\n", m.startCount)
24+
}
25+
26+
func (m *DebugMonitor) SchedulerShutdown() {
27+
m.stopCount++
28+
fmt.Printf("✓ SchedulerShutdown() called (total: %d)\n", m.stopCount)
29+
}
30+
31+
func (m *DebugMonitor) JobRegistered(job *gocron.Job) {
32+
m.jobRegCount++
33+
fmt.Printf("✓ JobRegistered() called (total: %d) - Job ID: %s\n", m.jobRegCount, (*job).ID())
34+
}
35+
36+
func (m *DebugMonitor) JobUnregistered(job *gocron.Job) {
37+
m.jobUnregCount++
38+
fmt.Printf("✓ JobUnregistered() called (total: %d) - Job ID: %s\n", m.jobUnregCount, (*job).ID())
39+
}
40+
41+
func (m *DebugMonitor) JobStarted(job *gocron.Job) {
42+
m.jobStartCount++
43+
fmt.Printf("✓ JobStarted() called (total: %d) - Job ID: %s\n", m.jobStartCount, (*job).ID())
44+
}
45+
46+
func (m *DebugMonitor) JobRunning(job *gocron.Job) {
47+
m.jobRunningCount++
48+
fmt.Printf("✓ JobRunning() called (total: %d) - Job ID: %s\n", m.jobRunningCount, (*job).ID())
49+
}
50+
51+
func (m *DebugMonitor) JobCompleted(job *gocron.Job) {
52+
m.jobCompletCount++
53+
fmt.Printf("✓ JobCompleted() called (total: %d) - Job ID: %s\n", m.jobCompletCount, (*job).ID())
54+
}
55+
56+
func (m *DebugMonitor) JobFailed(job *gocron.Job, err error) {
57+
m.jobFailCount++
58+
fmt.Printf("✓ JobFailed() called (total: %d) - Job ID: %s, Error: %v\n", m.jobFailCount, (*job).ID(), err)
59+
}
60+
61+
func main() {
62+
// ONE monitor, multiple scheduler instances
63+
monitor := &DebugMonitor{}
64+
65+
fmt.Println("=== Cycle 1 (Scheduler Instance 1) ===")
66+
s1, err := gocron.NewScheduler(
67+
gocron.WithSchedulerMonitor(monitor),
68+
)
69+
if err != nil {
70+
panic(err)
71+
}
72+
73+
// Create and register some test jobs
74+
fmt.Println("Creating jobs...")
75+
_, err = s1.NewJob(
76+
gocron.DurationJob(1*time.Second),
77+
gocron.NewTask(func() { fmt.Println("Job 1 running") }),
78+
)
79+
if err != nil {
80+
panic(err)
81+
}
82+
83+
_, err = s1.NewJob(
84+
gocron.DurationJob(2*time.Second),
85+
gocron.NewTask(func() error {
86+
fmt.Println("Job 2 executing and returning error")
87+
return fmt.Errorf("simulated job failure")
88+
}), // This job will fail with error
89+
)
90+
if err != nil {
91+
panic(err)
92+
}
93+
94+
fmt.Println("Calling Start()...")
95+
s1.Start()
96+
time.Sleep(3 * time.Second) // Wait for jobs to execute
97+
98+
fmt.Println("Calling Shutdown()...")
99+
err = s1.Shutdown()
100+
if err != nil {
101+
fmt.Printf("Shutdown error: %v\n", err)
102+
}
103+
104+
fmt.Println("\n=== Cycle 2 (Job Updates) ===")
105+
s2, err := gocron.NewScheduler(
106+
gocron.WithSchedulerMonitor(monitor),
107+
)
108+
if err != nil {
109+
panic(err)
110+
}
111+
112+
fmt.Println("Creating and updating jobs...")
113+
job3, err := s2.NewJob(
114+
gocron.DurationJob(1*time.Second),
115+
gocron.NewTask(func() { fmt.Println("Job 3 running") }),
116+
)
117+
if err != nil {
118+
panic(err)
119+
}
120+
121+
// Update the job
122+
_, err = s2.Update(
123+
job3.ID(),
124+
gocron.DurationJob(2*time.Second),
125+
gocron.NewTask(func() { fmt.Println("Job 3 updated") }),
126+
)
127+
if err != nil {
128+
panic(err)
129+
}
130+
131+
fmt.Println("Calling Start()...")
132+
s2.Start()
133+
time.Sleep(3 * time.Second)
134+
135+
fmt.Println("Calling Shutdown()...")
136+
err = s2.Shutdown()
137+
if err != nil {
138+
fmt.Printf("Shutdown error: %v\n", err)
139+
}
140+
141+
fmt.Println("\n=== Summary ===")
142+
fmt.Printf("Total Scheduler Starts: %d\n", monitor.startCount)
143+
fmt.Printf("Total Scheduler Stops: %d\n", monitor.stopCount)
144+
fmt.Printf("Total Jobs Registered: %d\n", monitor.jobRegCount)
145+
fmt.Printf("Total Jobs Unregistered: %d\n", monitor.jobUnregCount)
146+
fmt.Printf("Total Jobs Started: %d\n", monitor.jobStartCount)
147+
fmt.Printf("Total Jobs Running: %d\n", monitor.jobRunningCount)
148+
fmt.Printf("Total Jobs Completed: %d\n", monitor.jobCompletCount)
149+
fmt.Printf("Total Jobs Failed: %d\n", monitor.jobFailCount)
150+
}

gocron-monitor-test/go.mod

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
module test
2+
3+
go 1.21.4
4+
5+
require github.com/go-co-op/gocron/v2 v2.17.0
6+
7+
require (
8+
github.com/google/uuid v1.6.0 // indirect
9+
github.com/jonboulle/clockwork v0.5.0 // indirect
10+
github.com/robfig/cron/v3 v3.0.1 // indirect
11+
)
12+
13+
replace github.com/go-co-op/gocron/v2 => ../

gocron-monitor-test/go.sum

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
2+
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
3+
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
4+
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
5+
github.com/jonboulle/clockwork v0.5.0 h1:Hyh9A8u51kptdkR+cqRpT1EebBwTn1oK9YfGYbdFz6I=
6+
github.com/jonboulle/clockwork v0.5.0/go.mod h1:3mZlmanh0g2NDKO5TWZVJAfofYk64M7XN3SzBPjZF60=
7+
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
8+
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
9+
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
10+
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
11+
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
12+
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
13+
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
14+
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
15+
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
16+
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

0 commit comments

Comments
 (0)