Skip to content
This repository was archived by the owner on Feb 20, 2024. It is now read-only.

Commit 855c498

Browse files
committed
refactor: change (almost) everything
1 parent e396721 commit 855c498

File tree

3 files changed

+196
-256
lines changed

3 files changed

+196
-256
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
module github.com/catmullet/go-workers
22

3-
go 1.15
3+
go 1.16

workers.go

Lines changed: 122 additions & 169 deletions
Original file line numberDiff line numberDiff line change
@@ -3,247 +3,200 @@ package workers
33
import (
44
"context"
55
"errors"
6-
"os"
7-
"os/signal"
86
"sync"
9-
"syscall"
107
"time"
118
)
129

13-
var defaultWatchSignals = []os.Signal{syscall.SIGINT, syscall.SIGTERM, syscall.SIGKILL}
14-
15-
// Worker Contains the work function. Allows an input and output to a channel or another worker for pipeline work.
16-
// Return nil if you want the Runner to continue otherwise any error will cause the Runner to shutdown and return the
17-
// error.
10+
// Worker Contains the work function.
11+
// Work() get input and could put in outChan for followers.
12+
// ⚠️ outChan could be closed if follower is stoped before producer.
13+
// error returned can be process by afterFunc but will be ignored by default.
1814
type Worker interface {
19-
Work(in interface{}, out chan<- interface{}) error
20-
}
21-
22-
// Runner Handles the running the Worker logic.
23-
type Runner interface {
24-
BeforeFunc(func(ctx context.Context) error) Runner
25-
AfterFunc(func(ctx context.Context, err error) error) Runner
26-
SetDeadline(t time.Time) Runner
27-
SetTimeout(duration time.Duration) Runner
28-
SetFollower()
29-
Send(in interface{})
30-
InFrom(w ...Runner) Runner
31-
SetOut(chan interface{})
32-
Start() Runner
33-
Stop() chan error
34-
Wait() error
15+
Work(ctx context.Context, in interface{}, out chan<- interface{}) error
3516
}
3617

37-
type runner struct {
38-
ctx context.Context
39-
cancel context.CancelFunc
40-
inChan chan interface{}
41-
outChan chan interface{}
42-
errChan chan error
43-
signalChan chan os.Signal
44-
limiter chan struct{}
18+
type Runner struct {
19+
ctx context.Context
20+
cancel context.CancelFunc
21+
inputCtx context.Context
22+
inputCancel context.CancelFunc
23+
inChan chan interface{}
24+
outChan chan interface{}
25+
limiter chan struct{}
4526

4627
afterFunc func(ctx context.Context, err error) error
47-
workFunc func(in interface{}, out chan<- interface{}) error
28+
workFunc func(ctx context.Context, in interface{}, out chan<- interface{}) error
4829
beforeFunc func(ctx context.Context) error
4930

50-
timeout time.Duration
51-
deadline time.Duration
52-
53-
isLeader bool
54-
stopCalled bool
31+
timeout time.Duration
5532

5633
numWorkers int64
57-
lock *sync.RWMutex
58-
wg *sync.WaitGroup
59-
done *sync.Once
60-
once *sync.Once
34+
started *sync.Once
35+
done chan struct{}
6136
}
6237

6338
// NewRunner Factory function for a new Runner. The Runner will handle running the workers logic.
64-
func NewRunner(ctx context.Context, w Worker, numWorkers int64) Runner {
65-
var runnerCtx, runnerCancel = context.WithCancel(ctx)
66-
var runner = &runner{
67-
ctx: runnerCtx,
68-
cancel: runnerCancel,
69-
inChan: make(chan interface{}, numWorkers),
70-
outChan: nil,
71-
errChan: make(chan error, 1),
72-
signalChan: make(chan os.Signal, 1),
73-
limiter: make(chan struct{}, numWorkers),
74-
afterFunc: func(ctx context.Context, err error) error { return err },
75-
workFunc: w.Work,
76-
beforeFunc: func(ctx context.Context) error { return nil },
77-
numWorkers: numWorkers,
78-
isLeader: true,
79-
lock: new(sync.RWMutex),
80-
wg: new(sync.WaitGroup),
81-
once: new(sync.Once),
82-
done: new(sync.Once),
39+
func NewRunner(ctx context.Context, w Worker, numWorkers int64, buffer int64) *Runner {
40+
// runnerCtx, runnerCancel := signal.NotifyContext(ctx, defaultWatchSignals...)
41+
runnerCtx, runnerCancel := context.WithCancel(ctx)
42+
inputCtx, inputCancel := context.WithCancel(runnerCtx)
43+
44+
runner := &Runner{
45+
ctx: runnerCtx,
46+
cancel: runnerCancel,
47+
inputCtx: inputCtx,
48+
inputCancel: inputCancel,
49+
inChan: make(chan interface{}, buffer),
50+
outChan: nil,
51+
limiter: make(chan struct{}, numWorkers),
52+
afterFunc: func(ctx context.Context, err error) error { return nil },
53+
workFunc: w.Work,
54+
beforeFunc: func(ctx context.Context) error { return nil },
55+
numWorkers: numWorkers,
56+
started: new(sync.Once),
57+
done: make(chan struct{}),
8358
}
84-
runner.waitForSignal(defaultWatchSignals...)
8559
return runner
8660
}
8761

88-
// Send Send an object to the worker for processing.
89-
func (r *runner) Send(in interface{}) {
62+
var ErrInputClosed = errors.New("input closed")
63+
64+
// Send Send an object to the worker for processing if context is not Done.
65+
func (r *Runner) Send(in interface{}) error {
9066
select {
91-
case <-r.ctx.Done():
92-
return
67+
case <-r.inputCtx.Done():
68+
return ErrInputClosed
9369
case r.inChan <- in:
9470
}
71+
return nil
9572
}
9673

9774
// InFrom Set a worker to accept output from another worker(s).
98-
func (r *runner) InFrom(w ...Runner) Runner {
99-
r.SetFollower()
75+
func (r *Runner) InFrom(w ...*Runner) *Runner {
10076
for _, wr := range w {
101-
wr.SetOut(r.inChan)
77+
// in := make(chan interface{})
78+
// go func(in chan interface{}) {
79+
// for msg := range in {
80+
// if err := r.Send(msg); err != nil {
81+
// return
82+
// }
83+
// }
84+
// }(in)
85+
wr.SetOut(r.inChan) // nolint
10286
}
10387
return r
10488
}
10589

106-
// SetFollower Sets the worker as a follower and does not need to close it's in channel.
107-
func (r *runner) SetFollower() {
108-
r.lock.Lock()
109-
r.isLeader = false
110-
r.lock.Unlock()
111-
}
112-
113-
// Start Starts the worker on processing.
114-
func (r *runner) Start() Runner {
115-
r.startWork()
116-
return r
90+
// Start execute beforeFunc and launch worker processing.
91+
func (r *Runner) Start() error {
92+
r.started.Do(func() {
93+
if err := r.beforeFunc(r.ctx); err == nil {
94+
go r.work()
95+
}
96+
})
97+
return nil
11798
}
11899

119100
// BeforeFunc Function to be run before worker starts processing.
120-
func (r *runner) BeforeFunc(f func(ctx context.Context) error) Runner {
101+
func (r *Runner) BeforeFunc(f func(ctx context.Context) error) *Runner {
121102
r.beforeFunc = f
122103
return r
123104
}
124105

125106
// AfterFunc Function to be run after worker has stopped.
126-
func (r *runner) AfterFunc(f func(ctx context.Context, err error) error) Runner {
107+
// It can be used for logging and error management.
108+
// input can be retreive with context value:
109+
// ctx.Value(workers.InputKey{})
110+
// ⚠️ If an error is returned it stop Runner execution.
111+
func (r *Runner) AfterFunc(f func(ctx context.Context, err error) error) *Runner {
127112
r.afterFunc = f
128113
return r
129114
}
130115

116+
var ErrOutAlready = errors.New("out already set")
117+
131118
// SetOut Allows the setting of a workers out channel, if not already set.
132-
func (r *runner) SetOut(c chan interface{}) {
119+
func (r *Runner) SetOut(c chan interface{}) error {
133120
if r.outChan != nil {
134-
return
121+
return ErrOutAlready
135122
}
136123
r.outChan = c
124+
return nil
137125
}
138126

139-
// SetDeadline allows a time to be set when the workers should stop.
140-
// Deadline needs to be handled by the IsDone method.
141-
func (r *runner) SetDeadline(t time.Time) Runner {
142-
r.lock.Lock()
143-
defer r.lock.Unlock()
127+
// SetDeadline allows a time to be set when the Runner should stop.
128+
// ⚠️ Should only be called before Start
129+
func (r *Runner) SetDeadline(t time.Time) *Runner {
144130
r.ctx, r.cancel = context.WithDeadline(r.ctx, t)
145131
return r
146132
}
147133

148-
// SetTimeout allows a time duration to be set when the workers should stop.
149-
// Timeout needs to be handled by the IsDone method.
150-
func (r *runner) SetTimeout(duration time.Duration) Runner {
151-
r.lock.Lock()
152-
defer r.lock.Unlock()
134+
// SetWorkerTimeout allows a time duration to be set when the workers should stop.
135+
// ⚠️ Should only be called before Start
136+
func (r *Runner) SetWorkerTimeout(duration time.Duration) *Runner {
153137
r.timeout = duration
154138
return r
155139
}
156140

157-
// Wait calls stop on workers and waits for the channel to drain.
158-
// !!Should only be called when certain nothing will send to worker.
159-
func (r *runner) Wait() error {
160-
r.waitForDrain()
161-
if err := <-r.Stop(); err != nil && !errors.Is(err, context.Canceled) {
162-
return err
141+
// Wait close the input channel and waits it to drain and process.
142+
func (r *Runner) Wait() *Runner {
143+
if r.inputCtx.Err() == nil {
144+
r.inputCancel()
145+
close(r.inChan)
163146
}
164-
return nil
165-
}
166147

167-
// Stop Stops the processing of a worker and closes it's channel in.
168-
// Returns a blocking channel with type error.
169-
// !!Should only be called when certain nothing will send to worker.
170-
func (r *runner) Stop() chan error {
171-
r.done.Do(func() {
172-
if r.inChan != nil && r.isLeader {
173-
close(r.inChan)
174-
}
175-
})
176-
return r.errChan
148+
<-r.done
149+
150+
return r
177151
}
178152

179-
// IsDone returns a channel signaling the workers context has been canceled.
180-
func (r *runner) IsDone() <-chan struct{} {
181-
return r.ctx.Done()
153+
// Stop Stops the processing of a worker and waits for workers to finish.
154+
func (r *Runner) Stop() *Runner {
155+
r.cancel()
156+
r.Wait()
157+
return r
182158
}
183159

184-
// waitForSignal make sure we wait for a term signal and shutdown correctly
185-
func (r *runner) waitForSignal(signals ...os.Signal) {
186-
go func() {
187-
signal.Notify(r.signalChan, signals...)
188-
<-r.signalChan
189-
if r.cancel != nil {
190-
r.cancel()
191-
}
160+
type InputKey struct{}
161+
162+
// work starts processing input and limits worker instance number.
163+
func (r *Runner) work() {
164+
var wg sync.WaitGroup
165+
166+
defer func() {
167+
wg.Wait()
168+
r.cancel()
169+
close(r.done)
192170
}()
193-
}
194171

195-
// waitForDrain Waits for the limiter to be zeroed out and the in channel to be empty.
196-
func (r *runner) waitForDrain() {
197-
for len(r.limiter) > 0 || len(r.inChan) > 0 {
198-
// Wait for the drain.
199-
}
200-
}
172+
for {
173+
select {
174+
case <-r.ctx.Done():
175+
return
176+
case input, open := <-r.inChan:
177+
if !open {
178+
return
179+
}
180+
wg.Add(1)
201181

202-
// startWork Runs the before function and starts processing until one of three things happen.
203-
// 1. A term signal is received or cancellation of context.
204-
// 2. Stop function is called.
205-
// 3. Worker returns an error.
206-
func (r *runner) startWork() {
207-
var err error
208-
if err = r.beforeFunc(r.ctx); err != nil {
209-
r.errChan <- err
210-
return
211-
}
212-
if r.timeout > 0 {
213-
r.ctx, r.cancel = context.WithTimeout(r.ctx, r.timeout)
214-
}
215-
r.wg.Add(1)
216-
go func() {
217-
var workerWG = new(sync.WaitGroup)
218-
var closeOnce = new(sync.Once)
219-
220-
// write out error if not nil on exit.
221-
defer func() {
222-
workerWG.Wait()
223-
r.errChan <- err
224-
closeOnce.Do(func() {
225-
if r.outChan != nil {
226-
close(r.outChan)
227-
}
228-
})
229-
r.wg.Done()
230-
}()
231-
for in := range r.inChan {
232-
input := in
233182
r.limiter <- struct{}{}
234-
workerWG.Add(1)
183+
184+
inputCtx := context.WithValue(r.ctx, InputKey{}, input)
185+
workCtx, workCancel := context.WithCancel(inputCtx)
186+
if r.timeout > 0 {
187+
workCtx, workCancel = context.WithTimeout(inputCtx, r.timeout)
188+
}
189+
235190
go func() {
236191
defer func() {
237192
<-r.limiter
238-
workerWG.Done()
193+
workCancel()
194+
wg.Done()
239195
}()
240-
if err := r.afterFunc(r.ctx, r.workFunc(input, r.outChan)); err != nil {
241-
r.once.Do(func() {
242-
r.errChan <- err
243-
r.cancel()
244-
})
196+
if err := r.afterFunc(inputCtx, r.workFunc(workCtx, input, r.outChan)); err != nil {
197+
r.cancel()
245198
}
246199
}()
247200
}
248-
}()
201+
}
249202
}

0 commit comments

Comments
 (0)