Skip to content

Commit 442b466

Browse files
tasks: improved scheduler
Restart the daily timers appropiately if the clock has drifted, for example when the computer is put to sleep.
1 parent 24c770c commit 442b466

File tree

2 files changed

+187
-90
lines changed

2 files changed

+187
-90
lines changed

daemon/tasks/scheduler/daily.go

Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
package scheduler
2+
3+
import (
4+
"context"
5+
"sync"
6+
"time"
7+
8+
"github.com/evilsocket/opensnitch/daemon/log"
9+
)
10+
11+
// if the computer enters sleep mode, the duration of the sleep is substracted to time.Sleep()
12+
// Example:
13+
// - the timer fires at 18:51; it'll be checked again in 1h and will fire in 24h.
14+
// - last check at 00:51
15+
// - computer put to sleep at 01:51
16+
// - it wakes up at 10:31, having slept for 8h.
17+
// - the timer attempts to fire at 10:31, exactly 8h before the deadline.
18+
func timeHasDrifted(now, tms *time.Time) bool {
19+
return now.Minute() != tms.Minute() && now.Second() != tms.Second()
20+
}
21+
22+
func waitToStart(ctx context.Context, id int, t string, wait time.Duration, tms *time.Time, drifted chan struct{}) (bool, bool) {
23+
now := time.Now()
24+
for {
25+
select {
26+
case <-ctx.Done():
27+
goto Exit
28+
case <-time.After(wait):
29+
realNow := time.Now()
30+
log.Debug("[tasks-scheduler] %d, %s ticker ready: %s - after: %s", id, t, realNow.Format(time.DateTime), now.Format(time.DateTime))
31+
goto Continue
32+
case <-drifted:
33+
//stopTimer(tck)
34+
goto Reschedule
35+
}
36+
}
37+
Exit:
38+
return true, false
39+
Continue:
40+
return false, false
41+
Reschedule:
42+
return false, true
43+
}
44+
45+
// calcDailyTicker calculates the amount of time to wait until the timer must start.
46+
func calcDailyTicker(tm string) (*time.Time, time.Duration) {
47+
// support 2 formats when specifying times:
48+
// - 15:04:05
49+
// - 15:04 -> assume seconds == 00
50+
tms, err := time.Parse("15:04:05", tm)
51+
if err != nil {
52+
tms, err = time.Parse("15:04", tm)
53+
if err != nil {
54+
log.Error("[tasks-scheduler] invalid daily ticker time: %s", err)
55+
return nil, time.Millisecond
56+
}
57+
}
58+
wait := time.Millisecond
59+
now := time.Now().Round(0)
60+
tmd := time.Date(
61+
now.Year(), now.Month(), now.Day(),
62+
tms.Hour(), tms.Minute(), tms.Second(), 0,
63+
now.Location(),
64+
)
65+
// if the Ticker is created before the time, wait until the ticker
66+
if tmd.Before(now) {
67+
wait = (24 * time.Hour) - now.Sub(tmd)
68+
} else if tmd.After(now) {
69+
wait = time.Until(tmd)
70+
}
71+
log.Debug("[tasks-scheduler] NewDailyTicker scheduled %s, waiting to start: %s", tm, wait)
72+
73+
return &tmd, wait
74+
}
75+
76+
// NewDailyTicker creates a new ticker.
77+
func NewDailyTicker(tm string) (*time.Ticker, *time.Time, time.Duration) {
78+
tms, wait := calcDailyTicker(tm)
79+
return time.NewTicker(wait), tms, wait
80+
}
81+
82+
func (s *Scheduler) stopTimer(id int, t *time.Ticker) {
83+
t.Stop()
84+
s.mu.Lock()
85+
delete(s.Tickers, id)
86+
s.mu.Unlock()
87+
}
88+
89+
// Instruct the timers to stop.
90+
// Mainly when the clock has drifted.
91+
func (s *Scheduler) restartTimers(drifted chan struct{}) {
92+
timers := len(s.Config.Time)
93+
for range timers {
94+
select {
95+
case drifted <- struct{}{}:
96+
default:
97+
log.Trace("[tasks-scheduler] restartTimers() unable to deliver")
98+
}
99+
}
100+
}
101+
102+
// SetupDailyTimers creates the daily timers that will fire every 24h at the configured hour.
103+
// We create the timers and wait for the remaining time from now until the configured hour.
104+
// From that on, the timer will be scheduled to tick every 24h.
105+
func (s *Scheduler) SetupDailyTimers() {
106+
var wg sync.WaitGroup
107+
drifted := make(chan struct{})
108+
109+
for id, t := range s.Config.Time {
110+
wg.Add(1)
111+
go func(drifted chan struct{}) {
112+
defer wg.Done()
113+
114+
Reschedule:
115+
tck, tms, wait := NewDailyTicker(t)
116+
if tck == nil {
117+
log.Error("[tasks-scheduler] invalid timer %d-%s", id, t)
118+
return
119+
}
120+
// save tickers to stop them later when stopping the scheduler.
121+
s.mu.Lock()
122+
s.Tickers[id] = tck
123+
s.mu.Unlock()
124+
125+
exit, resched := waitToStart(s.Ctx, id, t, wait, tms, drifted)
126+
if exit {
127+
goto Exit
128+
}
129+
if resched {
130+
s.stopTimer(id, tck)
131+
goto Reschedule
132+
}
133+
134+
log.Debug("[tasks-scheduler] %d, %s daily ticker started", id, t)
135+
for {
136+
select {
137+
case <-s.Ctx.Done():
138+
goto Exit
139+
case <-drifted:
140+
now := time.Now()
141+
log.Debug("[tasks-scheduler] %d, %s running ticker drifted, now: %v", id, t, now.Format(time.DateTime))
142+
s.stopTimer(id, tck)
143+
goto Reschedule
144+
case now := <-tck.C:
145+
realNow := time.Now()
146+
//log.Debug("[tasks-scheduler] %d, %s tick now: %s real-now: %s tms: %s", id, t, now.Format(time.DateTime), realNow.Format(time.DateTime), tms.Format(time.DateTime))
147+
// these timers are scheduled every hour, so the minute and second should match.
148+
// If they don't, the clock has drifted.
149+
if timeHasDrifted(&realNow, tms) {
150+
log.Debug("[tasks-scheduler] %d, %s tick out-of-sync, rescheduling: %s", id, t, realNow.Format(time.DateTime))
151+
s.restartTimers(drifted)
152+
s.stopTimer(id, tck)
153+
goto Reschedule
154+
}
155+
156+
today := int(now.Weekday())
157+
for _, wd := range s.Config.Weekday {
158+
if wd != today {
159+
continue
160+
}
161+
//log.Debug("[tasks-scheduler] %d, %s tick is today %d", id, t, c)
162+
if realNow.Hour() == tms.Hour() {
163+
log.Debug("[tasks-scheduler] %d, %s ticker fired", id, t)
164+
s.TickChan <- now
165+
tck.Reset(1 * time.Hour)
166+
}
167+
}
168+
}
169+
}
170+
171+
Exit:
172+
// wait for ticks while the tickers are active.
173+
// stop the ticker only when stopping the scheduler.
174+
tck.Stop()
175+
log.Debug("[tasks-scheduler] scheduler timer %d stopped", id)
176+
}(drifted)
177+
}
178+
wg.Wait()
179+
180+
log.Debug("[tasks-scheduler] SetupDailyTimers() finished")
181+
}

daemon/tasks/scheduler/scheduler.go

Lines changed: 6 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ type Config struct {
3939
type Scheduler struct {
4040
Ctx context.Context
4141
Cancel context.CancelFunc
42-
Tickers []*time.Ticker
42+
Tickers map[int]*time.Ticker
4343
Ticker *time.Ticker
4444
TickChan chan time.Time
4545
ticky chan time.Time
@@ -54,100 +54,14 @@ func New(ctx context.Context, cancel context.CancelFunc, config Config) *Schedul
5454
Cancel: cancel,
5555
TickChan: make(chan time.Time),
5656
ticky: make(chan time.Time),
57+
Tickers: make(map[int]*time.Ticker),
5758
Config: config,
5859
mu: &sync.RWMutex{},
5960
}
6061

6162
return sched
6263
}
6364

64-
// NewDailyTicker creates a new ticker.
65-
func NewDailyTicker(tm string) (*time.Ticker, time.Duration) {
66-
tms, err := time.Parse("15:04:05", tm)
67-
if err != nil {
68-
tms, err = time.Parse("15:04", tm)
69-
if err != nil {
70-
log.Debug("[tasks-scheduler] invalid daily ticker time: %s", err)
71-
return nil, time.Millisecond
72-
}
73-
}
74-
wait := time.Millisecond
75-
now := time.Now()
76-
tmd := time.Date(
77-
now.Year(), now.Month(), now.Day(),
78-
tms.Hour(), tms.Minute(), tms.Second(), 0,
79-
now.Location(),
80-
)
81-
// if the Ticker is created before the time, wait until the ticker
82-
if tmd.Before(now) {
83-
wait = (24 * time.Hour) - now.Sub(tmd)
84-
} else if tmd.After(now) {
85-
wait = time.Until(tmd)
86-
}
87-
log.Debug("[tasks-scheduler] NewDailyTicker scheduled, waiting to start: %s", wait)
88-
89-
return time.NewTicker(wait), wait
90-
}
91-
92-
// SetupDailyTimers creates the daily timers that will fire every 24h at the configured hour.
93-
// We create the timers and wait for the remaining time from now until the configured hour.
94-
// From that on, the timer will be scheduled to tick every 24h.
95-
func (s *Scheduler) SetupDailyTimers() {
96-
var wg sync.WaitGroup
97-
98-
for id, t := range s.Config.Time {
99-
wg.Add(1)
100-
go func() {
101-
defer wg.Done()
102-
103-
tck, wait := NewDailyTicker(t)
104-
if tck == nil {
105-
log.Error("[tasks-scheduler] invalid timer %d-%s", id, t)
106-
return
107-
}
108-
// save tickers to stop them later when stopping the scheduler.
109-
s.mu.Lock()
110-
s.Tickers = append(s.Tickers, tck)
111-
s.mu.Unlock()
112-
113-
// wait for ticks while the tickers are active.
114-
go func() {
115-
// stop the ticker only when stopping the scheduler.
116-
defer tck.Stop()
117-
log.Debug("[tasks-scheduler] %d, %s daily ticker started", id, t)
118-
for {
119-
select {
120-
case <-s.Ctx.Done():
121-
goto Exit
122-
case now := <-tck.C:
123-
for _, wd := range s.Config.Weekday {
124-
if wd == int(now.Weekday()) {
125-
s.TickChan <- now
126-
}
127-
}
128-
}
129-
}
130-
Exit:
131-
log.Debug("[tasks-scheduler] scheduler timer %d stopped", id)
132-
}()
133-
134-
// wait the remaining time until the configured hour. Then set the ticker to repeat every 24h.
135-
if wait > time.Millisecond {
136-
log.Debug("[tasks-scheduler] scheduler %d-%s timer MUST wait: %v", id, t, wait)
137-
time.Sleep(wait)
138-
// FIXME: if the computer is suspended, the timers do not take into account the time spent
139-
// suspended.
140-
tck.Reset(24 * time.Hour)
141-
}
142-
143-
}()
144-
}
145-
wg.Wait()
146-
<-s.Ctx.Done()
147-
148-
log.Debug("[tasks-scheduler] SetupDailyTimers() finished")
149-
}
150-
15165
func (s *Scheduler) Start() {
15266
log.Debug("[tasks-scheduler] Start()")
15367

@@ -240,12 +154,14 @@ func (s *Scheduler) Start() {
240154

241155
func (s *Scheduler) Stop() {
242156
if len(s.Tickers) > 0 {
243-
for _, t := range s.Tickers {
157+
for id, t := range s.Tickers {
244158
if t != nil {
245159
t.Stop()
246160
}
161+
t = nil
162+
delete(s.Tickers, id)
247163
}
248-
s.Tickers = make([]*time.Ticker, 0)
164+
s.Tickers = make(map[int]*time.Ticker)
249165
}
250166
if s.Ticker != nil {
251167
s.Ticker.Stop()

0 commit comments

Comments
 (0)