diff --git a/chain.go b/chain.go index 9c087b7b..885dba63 100644 --- a/chain.go +++ b/chain.go @@ -5,6 +5,8 @@ import ( "runtime" "sync" "time" + + "github.com/benbjohnson/clock" ) // JobWrapper decorates the given Job with some behavior. @@ -24,9 +26,12 @@ func NewChain(c ...JobWrapper) Chain { // Then decorates the given job with all JobWrappers in the chain. // // This: -// NewChain(m1, m2, m3).Then(job) +// +// NewChain(m1, m2, m3).Then(job) +// // is equivalent to: -// m1(m2(m3(job))) +// +// m1(m2(m3(job))) func (c Chain) Then(j Job) Job { for i := range c.wrappers { j = c.wrappers[len(c.wrappers)-i-1](j) @@ -59,13 +64,19 @@ func Recover(logger Logger) JobWrapper { // previous one is complete. Jobs running after a delay of more than a minute // have the delay logged at Info. func DelayIfStillRunning(logger Logger) JobWrapper { + return DelayIfStillRunningWithClock(logger, clock.New()) +} + +// DelayIfStillRunningWithClock behaves identically to DelayIfStillRunning but +// uses the provided Clock for measuring the delay, for use in testing. +func DelayIfStillRunningWithClock(logger Logger, clk clock.Clock) JobWrapper { return func(j Job) Job { var mu sync.Mutex return FuncJob(func() { - start := time.Now() + start := clk.Now() mu.Lock() defer mu.Unlock() - if dur := time.Since(start); dur > time.Minute { + if dur := clk.Since(start); dur > time.Minute { logger.Info("delay", "duration", dur) } j.Run() @@ -77,13 +88,13 @@ func DelayIfStillRunning(logger Logger) JobWrapper { // still running. It logs skips to the given logger at Info level. func SkipIfStillRunning(logger Logger) JobWrapper { return func(j Job) Job { - var ch = make(chan struct{}, 1) + ch := make(chan struct{}, 1) ch <- struct{}{} return FuncJob(func() { select { case v := <-ch: - defer func() { ch <- v }() j.Run() + ch <- v default: logger.Info("skip") } diff --git a/chain_test.go b/chain_test.go index ec910975..380d654e 100644 --- a/chain_test.go +++ b/chain_test.go @@ -104,7 +104,7 @@ func TestChainDelayIfStillRunning(t *testing.T) { var j countJob wrappedJob := NewChain(DelayIfStillRunning(DiscardLogger)).Then(&j) go wrappedJob.Run() - time.Sleep(2 * time.Millisecond) // Give the job 2ms to complete. + time.Sleep(50 * time.Millisecond) // Give the job 50 ms to complete. if c := j.Done(); c != 1 { t.Errorf("expected job run once, immediately, got %d", c) } @@ -115,10 +115,10 @@ func TestChainDelayIfStillRunning(t *testing.T) { wrappedJob := NewChain(DelayIfStillRunning(DiscardLogger)).Then(&j) go func() { go wrappedJob.Run() - time.Sleep(time.Millisecond) + time.Sleep(10* time.Millisecond) go wrappedJob.Run() }() - time.Sleep(3 * time.Millisecond) // Give both jobs 3ms to complete. + time.Sleep(100 * time.Millisecond) // Give both jobs 100 ms to complete. if c := j.Done(); c != 2 { t.Errorf("expected job run twice, immediately, got %d", c) } @@ -126,24 +126,24 @@ func TestChainDelayIfStillRunning(t *testing.T) { t.Run("second run delayed if first not done", func(t *testing.T) { var j countJob - j.delay = 10 * time.Millisecond + j.delay = 100 * time.Millisecond wrappedJob := NewChain(DelayIfStillRunning(DiscardLogger)).Then(&j) go func() { go wrappedJob.Run() - time.Sleep(time.Millisecond) + time.Sleep(10 * time.Millisecond) go wrappedJob.Run() }() - // After 5ms, the first job is still in progress, and the second job was + // After 50 ms, the first job is still in progress, and the second job was // run but should be waiting for it to finish. - time.Sleep(5 * time.Millisecond) + time.Sleep(50 * time.Millisecond) started, done := j.Started(), j.Done() if started != 1 || done != 0 { t.Error("expected first job started, but not finished, got", started, done) } // Verify that the second job completes. - time.Sleep(25 * time.Millisecond) + time.Sleep(200 * time.Millisecond) started, done = j.Started(), j.Done() if started != 2 || done != 2 { t.Error("expected both jobs done, got", started, done) @@ -158,7 +158,7 @@ func TestChainSkipIfStillRunning(t *testing.T) { var j countJob wrappedJob := NewChain(SkipIfStillRunning(DiscardLogger)).Then(&j) go wrappedJob.Run() - time.Sleep(2 * time.Millisecond) // Give the job 2ms to complete. + time.Sleep(50 * time.Millisecond) // Give the job 50ms to complete. if c := j.Done(); c != 1 { t.Errorf("expected job run once, immediately, got %d", c) } @@ -169,10 +169,10 @@ func TestChainSkipIfStillRunning(t *testing.T) { wrappedJob := NewChain(SkipIfStillRunning(DiscardLogger)).Then(&j) go func() { go wrappedJob.Run() - time.Sleep(time.Millisecond) + time.Sleep(10* time.Millisecond) go wrappedJob.Run() }() - time.Sleep(3 * time.Millisecond) // Give both jobs 3ms to complete. + time.Sleep(100 * time.Millisecond) // Give both jobs 100ms to complete. if c := j.Done(); c != 2 { t.Errorf("expected job run twice, immediately, got %d", c) } @@ -180,24 +180,24 @@ func TestChainSkipIfStillRunning(t *testing.T) { t.Run("second run skipped if first not done", func(t *testing.T) { var j countJob - j.delay = 10 * time.Millisecond + j.delay = 100 * time.Millisecond wrappedJob := NewChain(SkipIfStillRunning(DiscardLogger)).Then(&j) go func() { go wrappedJob.Run() - time.Sleep(time.Millisecond) + time.Sleep(10 * time.Millisecond) go wrappedJob.Run() }() - // After 5ms, the first job is still in progress, and the second job was + // After 50ms, the first job is still in progress, and the second job was // aleady skipped. - time.Sleep(5 * time.Millisecond) + time.Sleep(50 * time.Millisecond) started, done := j.Started(), j.Done() if started != 1 || done != 0 { t.Error("expected first job started, but not finished, got", started, done) } // Verify that the first job completes and second does not run. - time.Sleep(25 * time.Millisecond) + time.Sleep(200 * time.Millisecond) started, done = j.Started(), j.Done() if started != 1 || done != 1 { t.Error("expected second job skipped, got", started, done) diff --git a/cron.go b/cron.go index c7e91766..6345df24 100644 --- a/cron.go +++ b/cron.go @@ -5,6 +5,8 @@ import ( "sort" "sync" "time" + + "github.com/benbjohnson/clock" ) // Cron keeps track of any number of entries, invoking the associated func as @@ -24,6 +26,7 @@ type Cron struct { parser ScheduleParser nextID EntryID jobWaiter sync.WaitGroup + clk clock.Clock } // ScheduleParser is an interface for schedule spec parsers that return a Schedule @@ -97,17 +100,17 @@ func (s byTime) Less(i, j int) bool { // // Available Settings // -// Time Zone -// Description: The time zone in which schedules are interpreted -// Default: time.Local +// Time Zone +// Description: The time zone in which schedules are interpreted +// Default: time.Local // -// Parser -// Description: Parser converts cron spec strings into cron.Schedules. -// Default: Accepts this spec: https://en.wikipedia.org/wiki/Cron +// Parser +// Description: Parser converts cron spec strings into cron.Schedules. +// Default: Accepts this spec: https://en.wikipedia.org/wiki/Cron // -// Chain -// Description: Wrap submitted jobs to customize behavior. -// Default: A chain that recovers panics and logs them to stderr. +// Chain +// Description: Wrap submitted jobs to customize behavior. +// Default: A chain that recovers panics and logs them to stderr. // // See "cron.With*" to modify the default behavior. func New(opts ...Option) *Cron { @@ -123,6 +126,7 @@ func New(opts ...Option) *Cron { logger: DefaultLogger, location: time.Local, parser: standardParser, + clk: clock.New(), } for _, opt := range opts { opt(c) @@ -250,13 +254,13 @@ func (c *Cron) run() { // Determine the next entry to run. sort.Sort(byTime(c.entries)) - var timer *time.Timer + var timer *clock.Timer if len(c.entries) == 0 || c.entries[0].Next.IsZero() { // If there are no entries yet, just sleep - it still handles new entries // and stop requests. - timer = time.NewTimer(100000 * time.Hour) + timer = c.clk.Timer(100000 * time.Hour) } else { - timer = time.NewTimer(c.entries[0].Next.Sub(now)) + timer = c.clk.Timer(c.entries[0].Next.Sub(now)) } for { @@ -315,7 +319,7 @@ func (c *Cron) startJob(j Job) { // now returns current time in c location func (c *Cron) now() time.Time { - return time.Now().In(c.location) + return c.clk.Now().In(c.location) } // Stop stops the cron scheduler if it is running; otherwise it does nothing. @@ -337,7 +341,7 @@ func (c *Cron) Stop() context.Context { // entrySnapshot returns a copy of the current cron entry list. func (c *Cron) entrySnapshot() []Entry { - var entries = make([]Entry, len(c.entries)) + entries := make([]Entry, len(c.entries)) for i, e := range c.entries { entries[i] = *e } diff --git a/cron_test.go b/cron_test.go index 36f06bf7..32ba24dd 100644 --- a/cron_test.go +++ b/cron_test.go @@ -9,6 +9,8 @@ import ( "sync/atomic" "testing" "time" + + "github.com/benbjohnson/clock" ) // Many tests schedule a job for every second, and then wait at most a second @@ -388,7 +390,7 @@ func TestBlockingRun(t *testing.T) { cron := newWithSeconds() cron.AddFunc("* * * * * ?", func() { wg.Done() }) - var unblockChan = make(chan struct{}) + unblockChan := make(chan struct{}) go func() { cron.Run() @@ -407,7 +409,7 @@ func TestBlockingRun(t *testing.T) { // Test that double-running is a no-op func TestStartNoop(t *testing.T) { - var tickChan = make(chan struct{}, 2) + tickChan := make(chan struct{}, 2) cron := newWithSeconds() cron.AddFunc("* * * * * ?", func() { @@ -667,8 +669,26 @@ func TestStopAndWait(t *testing.T) { case <-time.After(time.Millisecond): t.Error("context not done even when cron Stop is completed") } + }) +} +func TestMockClock(t *testing.T) { + clk := clock.NewMock() + clk.Set(time.Now()) + cron := New(WithClock(clk)) + counter := 0 + cron.AddFunc("@every 1s", func() { + counter += 1 }) + cron.Start() + defer cron.Stop() + for i := 0; i <= 10; i++ { + clk.Add(1 * time.Second) + } + time.Sleep(100 * time.Millisecond) + if counter != 10 { + t.Errorf("expected 10 calls, got %d", counter) + } } func TestMultiThreadedStartAndStop(t *testing.T) { diff --git a/doc.go b/doc.go index fa5d08b4..a36d2a6e 100644 --- a/doc.go +++ b/doc.go @@ -13,7 +13,7 @@ Import it in your program as: It requires Go 1.11 or later due to usage of Go Modules. -Usage +# Usage Callers may register Funcs to be invoked on a given schedule. Cron will run them in their own goroutines. @@ -36,7 +36,23 @@ them in their own goroutines. .. c.Stop() // Stop the scheduler (does not stop any jobs already running). -CRON Expression Format +# Time mocking + +import "github.com/benbjohnson/clock" + +clk := clock.NewMock() + +c := cron.New(cron.WithClock(clk)) + + c.AddFunc("@every 1h", func() { + fmt.Println("Every hour") + }) + +c.Start() + +clk.Add(1 * time.Hour) + +# CRON Expression Format A cron expression represents a set of times, using 5 space-separated fields. @@ -54,7 +70,7 @@ Month and Day-of-week field values are case insensitive. "SUN", "Sun", and The specific interpretation of the format is based on the Cron Wikipedia page: https://en.wikipedia.org/wiki/Cron -Alternative Formats +# Alternative Formats Alternative Cron expression formats support other fields like seconds. You can implement that by creating a custom Parser as follows. @@ -73,7 +89,7 @@ parser you saw earlier, except that its seconds field is REQUIRED: That emulates Quartz, the most popular alternative Cron schedule format: http://www.quartz-scheduler.org/documentation/quartz-2.x/tutorials/crontrigger.html -Special Characters +# Special Characters Asterisk ( * ) @@ -105,7 +121,7 @@ Question mark ( ? ) Question mark may be used instead of '*' for leaving either day-of-month or day-of-week blank. -Predefined schedules +# Predefined schedules You may use one of several pre-defined schedules in place of a cron expression. @@ -117,12 +133,12 @@ You may use one of several pre-defined schedules in place of a cron expression. @daily (or @midnight) | Run once a day, midnight | 0 0 * * * @hourly | Run once an hour, beginning of hour | 0 * * * * -Intervals +# Intervals You may also schedule a job to execute at fixed intervals, starting at the time it's added or cron is run. This is supported by formatting the cron spec like this: - @every + @every where "duration" is a string accepted by time.ParseDuration (http://golang.org/pkg/time/#ParseDuration). @@ -134,13 +150,13 @@ Note: The interval does not take the job runtime into account. For example, if a job takes 3 minutes to run, and it is scheduled to run every 5 minutes, it will have only 2 minutes of idle time between each run. -Time zones +# Time zones By default, all interpretation and scheduling is done in the machine's local time zone (time.Local). You can specify a different time zone on construction: - cron.New( - cron.WithLocation(time.UTC)) + cron.New( + cron.WithLocation(time.UTC)) Individual cron schedules may also override the time zone they are to be interpreted in by providing an additional space-separated field at the beginning @@ -169,7 +185,7 @@ The prefix "TZ=(TIME ZONE)" is also supported for legacy compatibility. Be aware that jobs scheduled during daylight-savings leap-ahead transitions will not be run! -Job Wrappers +# Job Wrappers A Cron runner may be configured with a chain of job wrappers to add cross-cutting functionality to all submitted jobs. For example, they may be used @@ -192,7 +208,7 @@ Install wrappers for individual jobs by explicitly wrapping them: cron.SkipIfStillRunning(logger), ).Then(job) -Thread safety +# Thread safety Since the Cron service runs concurrently with the calling code, some amount of care must be taken to ensure proper synchronization. @@ -200,7 +216,7 @@ care must be taken to ensure proper synchronization. All cron methods are designed to be correctly synchronized as long as the caller ensures that invocations have a clear happens-before ordering between them. -Logging +# Logging Cron defines a Logger interface that is a subset of the one defined in github.com/go-logr/logr. It has two logging levels (Info and Error), and @@ -216,16 +232,15 @@ Activate it with a one-off logger as follows: cron.WithLogger( cron.VerbosePrintfLogger(log.New(os.Stdout, "cron: ", log.LstdFlags)))) - -Implementation +# Implementation Cron entries are stored in an array, sorted by their next activation time. Cron sleeps until the next job is due to be run. Upon waking: - - it runs each entry that is active on that second - - it calculates the next run times for the jobs that were run - - it re-sorts the array of entries by next activation time. - - it goes to sleep until the soonest job. + - it runs each entry that is active on that second + - it calculates the next run times for the jobs that were run + - it re-sorts the array of entries by next activation time. + - it goes to sleep until the soonest job. */ package cron diff --git a/go.mod b/go.mod index 8c95bf47..85f1a2d6 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,5 @@ module github.com/robfig/cron/v3 go 1.12 + +require github.com/benbjohnson/clock v1.3.0 diff --git a/go.sum b/go.sum new file mode 100644 index 00000000..c284fd6d --- /dev/null +++ b/go.sum @@ -0,0 +1,2 @@ +github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A= +github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= diff --git a/option.go b/option.go index 09e4278e..569efb5a 100644 --- a/option.go +++ b/option.go @@ -2,6 +2,8 @@ package cron import ( "time" + + "github.com/benbjohnson/clock" ) // Option represents a modification to the default behavior of a Cron. @@ -43,3 +45,10 @@ func WithLogger(logger Logger) Option { c.logger = logger } } + +// WithClock uses the provided clock to track time. +func WithClock(clk clock.Clock) Option { + return func(c *Cron) { + c.clk = clk + } +} diff --git a/option_test.go b/option_test.go index 8aef1682..aa5884af 100644 --- a/option_test.go +++ b/option_test.go @@ -15,7 +15,7 @@ func TestWithLocation(t *testing.T) { } func TestWithParser(t *testing.T) { - var parser = NewParser(Dow) + parser := NewParser(Dow) c := New(WithParser(parser)) if c.parser != parser { t.Error("expected provided parser") @@ -24,7 +24,7 @@ func TestWithParser(t *testing.T) { func TestWithVerboseLogger(t *testing.T) { var buf syncWriter - var logger = log.New(&buf, "", log.LstdFlags) + logger := log.New(&buf, "", log.LstdFlags) c := New(WithLogger(VerbosePrintfLogger(logger))) if c.logger.(printfLogger).logger != logger { t.Error("expected provided logger") diff --git a/parser.go b/parser.go index 8da6547a..9208d5cf 100644 --- a/parser.go +++ b/parser.go @@ -56,18 +56,17 @@ type Parser struct { // // Examples // -// // Standard parser without descriptors -// specParser := NewParser(Minute | Hour | Dom | Month | Dow) -// sched, err := specParser.Parse("0 0 15 */3 *") +// // Standard parser without descriptors +// specParser := NewParser(Minute | Hour | Dom | Month | Dow) +// sched, err := specParser.Parse("0 0 15 */3 *") // -// // Same as above, just excludes time fields -// specParser := NewParser(Dom | Month | Dow) -// sched, err := specParser.Parse("15 */3 *") -// -// // Same as above, just makes Dow optional -// specParser := NewParser(Dom | Month | DowOptional) -// sched, err := specParser.Parse("15 */3") +// // Same as above, just excludes time fields +// specParser := NewParser(Dom | Month | Dow) +// sched, err := specParser.Parse("15 */3 *") // +// // Same as above, just makes Dow optional +// specParser := NewParser(Dom | Month | DowOptional) +// sched, err := specParser.Parse("15 */3") func NewParser(options ParseOption) Parser { optionals := 0 if options&DowOptional > 0 { @@ -91,7 +90,7 @@ func (p Parser) Parse(spec string) (Schedule, error) { } // Extract timezone if present - var loc = time.Local + loc := time.Local if strings.HasPrefix(spec, "TZ=") || strings.HasPrefix(spec, "CRON_TZ=") { var err error i := strings.Index(spec, " ") @@ -247,7 +246,9 @@ func getField(field string, r bounds) (uint64, error) { } // getRange returns the bits indicated by the given expression: -// number | number "-" number [ "/" number ] +// +// number | number "-" number [ "/" number ] +// // or error parsing range. func getRange(expr string, r bounds) (uint64, error) { var ( @@ -418,7 +419,6 @@ func parseDescriptor(descriptor string, loc *time.Location) (Schedule, error) { Dow: all(dow), Location: loc, }, nil - } const every = "@every " diff --git a/parser_test.go b/parser_test.go index 41c8c520..4bf8bd8a 100644 --- a/parser_test.go +++ b/parser_test.go @@ -119,7 +119,7 @@ func TestBits(t *testing.T) { } func TestParseScheduleErrors(t *testing.T) { - var tests = []struct{ expr, err string }{ + tests := []struct{ expr, err string }{ {"* 5 j * * *", "failed to parse int from"}, {"@every Xm", "failed to parse duration"}, {"@unrecognized", "unrecognized descriptor"}, diff --git a/spec_test.go b/spec_test.go index 1b8a503e..0f1be546 100644 --- a/spec_test.go +++ b/spec_test.go @@ -219,7 +219,7 @@ func getTime(value string) time.Time { return time.Time{} } - var location = time.Local + location := time.Local if strings.HasPrefix(value, "TZ=") { parts := strings.Fields(value) loc, err := time.LoadLocation(parts[0][len("TZ="):]) @@ -230,7 +230,7 @@ func getTime(value string) time.Time { value = parts[1] } - var layouts = []string{ + layouts := []string{ "Mon Jan 2 15:04 2006", "Mon Jan 2 15:04:05 2006", }