Skip to content

feat: support overriding the Prev time for a new job/entry #446

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 33 additions & 7 deletions cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,17 @@ type Entry struct {
// Valid returns true if this is not the zero entry.
func (e Entry) Valid() bool { return e.ID != 0 }

// ScheduleFirst is used for the initial scheduling. If a Prev value has been
// included with the Entry, it will be used in place of "now" to allow schedules
// to be preserved across process restarts.
func (e Entry) ScheduleFirst(now time.Time) time.Time {
if !e.Prev.IsZero() {
return e.Schedule.Next(e.Prev)
} else {
return e.Schedule.Next(now)
}
}

// byTime is a wrapper for sorting the entry array by time
// (with zero time at the end).
type byTime []*Entry
Expand Down Expand Up @@ -138,24 +149,24 @@ func (f FuncJob) Run() { f() }
// AddFunc adds a func to the Cron to be run on the given schedule.
// The spec is parsed using the time zone of this Cron instance as the default.
// An opaque ID is returned that can be used to later remove it.
func (c *Cron) AddFunc(spec string, cmd func()) (EntryID, error) {
return c.AddJob(spec, FuncJob(cmd))
func (c *Cron) AddFunc(spec string, cmd func(), entryOpts ...EntryOption) (EntryID, error) {
return c.AddJob(spec, FuncJob(cmd), entryOpts...)
}

// AddJob adds a Job to the Cron to be run on the given schedule.
// The spec is parsed using the time zone of this Cron instance as the default.
// An opaque ID is returned that can be used to later remove it.
func (c *Cron) AddJob(spec string, cmd Job) (EntryID, error) {
func (c *Cron) AddJob(spec string, cmd Job, entryOpts ...EntryOption) (EntryID, error) {
schedule, err := c.parser.Parse(spec)
if err != nil {
return 0, err
}
return c.Schedule(schedule, cmd), nil
return c.Schedule(schedule, cmd, entryOpts...), nil
}

// Schedule adds a Job to the Cron to be run on the given schedule.
// The job is wrapped with the configured Chain.
func (c *Cron) Schedule(schedule Schedule, cmd Job) EntryID {
func (c *Cron) Schedule(schedule Schedule, cmd Job, entryOpts ...EntryOption) EntryID {
c.runningMu.Lock()
defer c.runningMu.Unlock()
c.nextID++
Expand All @@ -165,6 +176,9 @@ func (c *Cron) Schedule(schedule Schedule, cmd Job) EntryID {
WrappedJob: c.chain.Then(cmd),
Job: cmd,
}
for _, fn := range entryOpts {
fn(entry)
}
if !c.running {
c.entries = append(c.entries, entry)
} else {
Expand All @@ -173,6 +187,18 @@ func (c *Cron) Schedule(schedule Schedule, cmd Job) EntryID {
return entry.ID
}

// EntryOption is a hook which allows the Entry to be altered before being
// committed internally.
type EntryOption func(*Entry)

// EntryPrev allows setting the Prev time to allow interval-based schedules to
// preserve their timeline even in the face of process restarts.
func WithPrev(prev time.Time) EntryOption {
return func(e *Entry) {
e.Prev = prev
}
}

// Entries returns a snapshot of the cron entries.
func (c *Cron) Entries() []Entry {
c.runningMu.Lock()
Expand Down Expand Up @@ -242,7 +268,7 @@ func (c *Cron) run() {
// Figure out the next activation times for each entry.
now := c.now()
for _, entry := range c.entries {
entry.Next = entry.Schedule.Next(now)
entry.Next = entry.ScheduleFirst(now)
c.logger.Info("schedule", "now", now, "entry", entry.ID, "next", entry.Next)
}

Expand Down Expand Up @@ -279,7 +305,7 @@ func (c *Cron) run() {
case newEntry := <-c.add:
timer.Stop()
now = c.now()
newEntry.Next = newEntry.Schedule.Next(now)
newEntry.Next = newEntry.ScheduleFirst(now)
c.entries = append(c.entries, newEntry)
c.logger.Info("added", "now", now, "entry", newEntry.ID, "next", newEntry.Next)

Expand Down
14 changes: 14 additions & 0 deletions cron_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,20 @@ func TestStopAndWait(t *testing.T) {
})
}

func TestJobWithCustomPrev(t *testing.T) {
cron := New()
var calls int64
// running every 3s, but starting 2s in the past
// expected timeline: 1s ... 4s ... stop (2 calls)
// if prev was ignored, the func would only be called once (at 3s)
cron.AddFunc("@every 3s", func() { atomic.AddInt64(&calls, 1) }, WithPrev(time.Now().Add(-2*time.Second)))
cron.Start()
time.Sleep(5 * time.Second)
if atomic.LoadInt64(&calls) != 2 {
t.Errorf("called %d times, expected 2\n", calls)
}
}

func TestMultiThreadedStartAndStop(t *testing.T) {
cron := New()
go cron.Run()
Expand Down