File tree Expand file tree Collapse file tree
Expand file tree Collapse file tree Original file line number Diff line number Diff line change @@ -2,21 +2,19 @@ package scheduler
22
33import (
44 "context"
5+ "errors"
56 "sync"
67)
78
8- // fifo represent as FiFo scheduler.
9+ // fifo represent as FIFO scheduler.
910type fifo struct {
1011 mu sync.Mutex
11-
1212 resume chan struct {}
1313 scheduled int
1414 finished int
1515 pendings []Job
16-
1716 ctx context.Context
1817 cancel context.CancelFunc
19-
2018 finishCond * sync.Cond
2119 donec chan struct {}
2220}
@@ -35,12 +33,12 @@ func NewFIFOScheduler() Scheduler {
3533}
3634
3735// Schedule schedules a job that will be ran in FIFO order sequentially.
38- func (f * fifo ) Schedule (j Job ) {
36+ func (f * fifo ) Schedule (j Job ) error {
3937 f .mu .Lock ()
4038 defer f .mu .Unlock ()
4139
4240 if f .cancel == nil {
43- panic ( "schedule: schedule to stopped scheduler" )
41+ return errors . New ( " schedule to stopped scheduler" )
4442 }
4543
4644 if len (f .pendings ) == 0 {
@@ -50,6 +48,7 @@ func (f *fifo) Schedule(j Job) {
5048 }
5149 }
5250 f .pendings = append (f .pendings , j )
51+ return nil
5352}
5453
5554func (f * fifo ) Pending () int {
@@ -95,17 +94,18 @@ func (f *fifo) run() {
9594 }()
9695
9796 for {
98- var todo Job
97+ var task Job
9998 f .mu .Lock ()
10099 if len (f .pendings ) != 0 {
101100 f .scheduled ++
102- todo = f .pendings [0 ]
101+ task = f .pendings [0 ]
103102 }
104103 f .mu .Unlock ()
105- if todo == nil {
104+ if task == nil {
106105 select {
107106 case <- f .resume :
108107 case <- f .ctx .Done ():
108+ // naive way to "handle" stop pending tasks.
109109 f .mu .Lock ()
110110 pendings := f .pendings
111111 f .pendings = nil
@@ -117,7 +117,7 @@ func (f *fifo) run() {
117117 return
118118 }
119119 } else {
120- todo (f .ctx )
120+ task (f .ctx )
121121 f .finishCond .L .Lock ()
122122 f .finished ++
123123 f .pendings = f .pendings [1 :]
Original file line number Diff line number Diff line change @@ -8,8 +8,7 @@ type Job func(context.Context)
88// Scheduler can schedule jobs.
99type Scheduler interface {
1010 // Schedule asks the scheduler to schedule a job defined by the given func.
11- // Schedule to a stopped scheduler might panic.
12- Schedule (j Job )
11+ Schedule (j Job ) error
1312
1413 // Pending returns number of pending jobs
1514 Pending () int
Original file line number Diff line number Diff line change @@ -25,7 +25,9 @@ func TestFIFOSchedule(t *testing.T) {
2525 }
2626
2727 for _ , j := range jobs {
28- s .Schedule (j )
28+ if err := s .Schedule (j ); err != nil {
29+ t .Errorf ("schedule job %v" , err )
30+ }
2931 }
3032
3133 s .WaitFinish (100 )
You can’t perform that action at this time.
0 commit comments