Skip to content

Commit 4b3d450

Browse files
committed
perf: use min-heap from robfig/pull/423
1 parent 1742845 commit 4b3d450

File tree

3 files changed

+71
-22
lines changed

3 files changed

+71
-22
lines changed

cron.go

Lines changed: 28 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
package cron
22

33
import (
4+
"container/heap"
45
"context"
5-
"sort"
66
"sync"
77
"time"
88
)
@@ -11,7 +11,7 @@ import (
1111
// specified by the schedule. It may be started, stopped, and the entries may
1212
// be inspected while running.
1313
type Cron struct {
14-
entries []*Entry
14+
entries EntryHeap
1515
chain Chain
1616
stop chan struct{}
1717
add chan *Entry
@@ -97,17 +97,17 @@ func (s byTime) Less(i, j int) bool {
9797
//
9898
// Available Settings
9999
//
100-
// Time Zone
101-
// Description: The time zone in which schedules are interpreted
102-
// Default: time.Local
100+
// Time Zone
101+
// Description: The time zone in which schedules are interpreted
102+
// Default: time.Local
103103
//
104-
// Parser
105-
// Description: Parser converts cron spec strings into cron.Schedules.
106-
// Default: Accepts this spec: https://en.wikipedia.org/wiki/Cron
104+
// Parser
105+
// Description: Parser converts cron spec strings into cron.Schedules.
106+
// Default: Accepts this spec: https://en.wikipedia.org/wiki/Cron
107107
//
108-
// Chain
109-
// Description: Wrap submitted jobs to customize behavior.
110-
// Default: A chain that recovers panics and logs them to stderr.
108+
// Chain
109+
// Description: Wrap submitted jobs to customize behavior.
110+
// Default: A chain that recovers panics and logs them to stderr.
111111
//
112112
// See "cron.With*" to modify the default behavior.
113113
func New(opts ...Option) *Cron {
@@ -166,7 +166,7 @@ func (c *Cron) Schedule(schedule Schedule, cmd Job) EntryID {
166166
Job: cmd,
167167
}
168168
if !c.running {
169-
c.entries = append(c.entries, entry)
169+
heap.Push(&c.entries, entry)
170170
} else {
171171
c.add <- entry
172172
}
@@ -241,14 +241,19 @@ func (c *Cron) run() {
241241

242242
// Figure out the next activation times for each entry.
243243
now := c.now()
244-
for _, entry := range c.entries {
244+
sortedEntries := new(EntryHeap)
245+
for len(c.entries) > 0 {
246+
entry := heap.Pop(&c.entries).(*Entry)
245247
entry.Next = entry.Schedule.Next(now)
248+
heap.Push(sortedEntries, entry)
246249
c.logger.Info("schedule", "now", now, "entry", entry.ID, "next", entry.Next)
247250
}
251+
c.entries = *sortedEntries
248252

249253
for {
250254
// Determine the next entry to run.
251-
sort.Sort(byTime(c.entries))
255+
// User min-heap no need sort anymore
256+
// sort.Sort(byTime(c.entries))
252257

253258
var timer *time.Timer
254259
if len(c.entries) == 0 || c.entries[0].Next.IsZero() {
@@ -266,21 +271,24 @@ func (c *Cron) run() {
266271
c.logger.Info("wake", "now", now)
267272

268273
// Run every entry whose next time was less than now
269-
for _, e := range c.entries {
274+
for {
275+
e := c.entries.Peek()
270276
if e.Next.After(now) || e.Next.IsZero() {
271277
break
272278
}
279+
e = heap.Pop(&c.entries).(*Entry)
273280
c.startJob(e.WrappedJob)
274281
e.Prev = e.Next
275282
e.Next = e.Schedule.Next(now)
283+
heap.Push(&c.entries, e)
276284
c.logger.Info("run", "now", now, "entry", e.ID, "next", e.Next)
277285
}
278286

279287
case newEntry := <-c.add:
280288
timer.Stop()
281289
now = c.now()
282290
newEntry.Next = newEntry.Schedule.Next(now)
283-
c.entries = append(c.entries, newEntry)
291+
heap.Push(&c.entries, newEntry)
284292
c.logger.Info("added", "now", now, "entry", newEntry.ID, "next", newEntry.Next)
285293

286294
case replyChan := <-c.snapshot:
@@ -345,11 +353,10 @@ func (c *Cron) entrySnapshot() []Entry {
345353
}
346354

347355
func (c *Cron) removeEntry(id EntryID) {
348-
var entries []*Entry
349-
for _, e := range c.entries {
350-
if e.ID != id {
351-
entries = append(entries, e)
356+
for idx, e := range c.entries {
357+
if e.ID == id {
358+
heap.Remove(&c.entries, idx)
359+
return
352360
}
353361
}
354-
c.entries = entries
355362
}

cron_test.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package cron
22

33
import (
44
"bytes"
5+
"container/heap"
56
"fmt"
67
"log"
78
"strings"
@@ -466,9 +467,13 @@ func TestJob(t *testing.T) {
466467
expecteds := []string{"job2", "job4", "job5", "job1", "job3", "job0"}
467468

468469
var actuals []string
469-
for _, entry := range cron.Entries() {
470+
clone := new(EntryHeap)
471+
for len(cron.entries) > 0 {
472+
entry := heap.Pop(&cron.entries).(*Entry)
470473
actuals = append(actuals, entry.Job.(testJob).name)
474+
heap.Push(clone, entry)
471475
}
476+
cron.entries = *clone
472477

473478
for i, expected := range expecteds {
474479
if actuals[i] != expected {

entry_heap.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package cron
2+
3+
type EntryHeap []*Entry
4+
5+
func (h *EntryHeap) Less(i, j int) bool {
6+
if (*h)[i].Next.IsZero() {
7+
return false
8+
}
9+
if (*h)[j].Next.IsZero() {
10+
return true
11+
}
12+
return (*h)[i].Next.Before((*h)[j].Next)
13+
}
14+
15+
func (h *EntryHeap) Swap(i, j int) {
16+
(*h)[i], (*h)[j] = (*h)[j], (*h)[i]
17+
}
18+
19+
func (h *EntryHeap) Len() int {
20+
return len(*h)
21+
}
22+
23+
func (h *EntryHeap) Pop() (v interface{}) {
24+
*h, v = (*h)[:h.Len()-1], (*h)[h.Len()-1]
25+
return
26+
}
27+
28+
func (h *EntryHeap) Push(v interface{}) {
29+
*h = append(*h, v.(*Entry))
30+
}
31+
32+
func (h *EntryHeap) Peek() *Entry {
33+
if len(*h) == 0 {
34+
return nil
35+
}
36+
return (*h)[0]
37+
}

0 commit comments

Comments
 (0)