Skip to content
This repository was archived by the owner on Feb 20, 2024. It is now read-only.

Commit 5d20767

Browse files
committed
refactor: change (almost) everything
1 parent e396721 commit 5d20767

File tree

2 files changed

+194
-255
lines changed

2 files changed

+194
-255
lines changed

workers.go

Lines changed: 121 additions & 169 deletions
Original file line numberDiff line numberDiff line change
@@ -3,247 +3,199 @@ package workers
33
import (
44
"context"
55
"errors"
6-
"os"
7-
"os/signal"
86
"sync"
9-
"syscall"
107
"time"
118
)
129

13-
var defaultWatchSignals = []os.Signal{syscall.SIGINT, syscall.SIGTERM, syscall.SIGKILL}
14-
15-
// Worker Contains the work function. Allows an input and output to a channel or another worker for pipeline work.
16-
// Return nil if you want the Runner to continue otherwise any error will cause the Runner to shutdown and return the
17-
// error.
10+
// Worker Contains the work function.
11+
// Work() get input and could put in outChan for followers.
12+
// ⚠️ outChan could be closed if follower is stoped before producer.
13+
// error returned can be process by afterFunc but will be ignored by default.
1814
type Worker interface {
19-
Work(in interface{}, out chan<- interface{}) error
20-
}
21-
22-
// Runner Handles the running the Worker logic.
23-
type Runner interface {
24-
BeforeFunc(func(ctx context.Context) error) Runner
25-
AfterFunc(func(ctx context.Context, err error) error) Runner
26-
SetDeadline(t time.Time) Runner
27-
SetTimeout(duration time.Duration) Runner
28-
SetFollower()
29-
Send(in interface{})
30-
InFrom(w ...Runner) Runner
31-
SetOut(chan interface{})
32-
Start() Runner
33-
Stop() chan error
34-
Wait() error
15+
Work(ctx context.Context, in interface{}, out chan<- interface{}) error
3516
}
3617

37-
type runner struct {
38-
ctx context.Context
39-
cancel context.CancelFunc
40-
inChan chan interface{}
41-
outChan chan interface{}
42-
errChan chan error
43-
signalChan chan os.Signal
44-
limiter chan struct{}
18+
type Runner struct {
19+
ctx context.Context
20+
cancel context.CancelFunc
21+
inputCtx context.Context
22+
inputCancel context.CancelFunc
23+
inChan chan interface{}
24+
outChan chan interface{}
25+
limiter chan struct{}
4526

4627
afterFunc func(ctx context.Context, err error) error
47-
workFunc func(in interface{}, out chan<- interface{}) error
28+
workFunc func(ctx context.Context, in interface{}, out chan<- interface{}) error
4829
beforeFunc func(ctx context.Context) error
4930

50-
timeout time.Duration
51-
deadline time.Duration
52-
53-
isLeader bool
54-
stopCalled bool
31+
timeout time.Duration
5532

5633
numWorkers int64
57-
lock *sync.RWMutex
58-
wg *sync.WaitGroup
59-
done *sync.Once
60-
once *sync.Once
34+
started *sync.Once
35+
done chan struct{}
6136
}
6237

6338
// NewRunner Factory function for a new Runner. The Runner will handle running the workers logic.
64-
func NewRunner(ctx context.Context, w Worker, numWorkers int64) Runner {
65-
var runnerCtx, runnerCancel = context.WithCancel(ctx)
66-
var runner = &runner{
67-
ctx: runnerCtx,
68-
cancel: runnerCancel,
69-
inChan: make(chan interface{}, numWorkers),
70-
outChan: nil,
71-
errChan: make(chan error, 1),
72-
signalChan: make(chan os.Signal, 1),
73-
limiter: make(chan struct{}, numWorkers),
74-
afterFunc: func(ctx context.Context, err error) error { return err },
75-
workFunc: w.Work,
76-
beforeFunc: func(ctx context.Context) error { return nil },
77-
numWorkers: numWorkers,
78-
isLeader: true,
79-
lock: new(sync.RWMutex),
80-
wg: new(sync.WaitGroup),
81-
once: new(sync.Once),
82-
done: new(sync.Once),
39+
func NewRunner(ctx context.Context, w Worker, numWorkers int64, buffer int64) *Runner {
40+
runnerCtx, runnerCancel := context.WithCancel(ctx)
41+
inputCtx, inputCancel := context.WithCancel(runnerCtx)
42+
43+
runner := &Runner{
44+
ctx: runnerCtx,
45+
cancel: runnerCancel,
46+
inputCtx: inputCtx,
47+
inputCancel: inputCancel,
48+
inChan: make(chan interface{}, buffer),
49+
outChan: nil,
50+
limiter: make(chan struct{}, numWorkers),
51+
afterFunc: func(ctx context.Context, err error) error { return nil },
52+
workFunc: w.Work,
53+
beforeFunc: func(ctx context.Context) error { return nil },
54+
numWorkers: numWorkers,
55+
started: new(sync.Once),
56+
done: make(chan struct{}),
8357
}
84-
runner.waitForSignal(defaultWatchSignals...)
8558
return runner
8659
}
8760

88-
// Send Send an object to the worker for processing.
89-
func (r *runner) Send(in interface{}) {
61+
var ErrInputClosed = errors.New("input closed")
62+
63+
// Send Send an object to the worker for processing if context is not Done.
64+
func (r *Runner) Send(in interface{}) error {
9065
select {
91-
case <-r.ctx.Done():
92-
return
66+
case <-r.inputCtx.Done():
67+
return ErrInputClosed
9368
case r.inChan <- in:
9469
}
70+
return nil
9571
}
9672

9773
// InFrom Set a worker to accept output from another worker(s).
98-
func (r *runner) InFrom(w ...Runner) Runner {
99-
r.SetFollower()
74+
func (r *Runner) InFrom(w ...*Runner) *Runner {
10075
for _, wr := range w {
101-
wr.SetOut(r.inChan)
76+
// in := make(chan interface{})
77+
// go func(in chan interface{}) {
78+
// for msg := range in {
79+
// if err := r.Send(msg); err != nil {
80+
// return
81+
// }
82+
// }
83+
// }(in)
84+
wr.SetOut(r.inChan) // nolint
10285
}
10386
return r
10487
}
10588

106-
// SetFollower Sets the worker as a follower and does not need to close it's in channel.
107-
func (r *runner) SetFollower() {
108-
r.lock.Lock()
109-
r.isLeader = false
110-
r.lock.Unlock()
111-
}
112-
113-
// Start Starts the worker on processing.
114-
func (r *runner) Start() Runner {
115-
r.startWork()
116-
return r
89+
// Start execute beforeFunc and launch worker processing.
90+
func (r *Runner) Start() error {
91+
r.started.Do(func() {
92+
if err := r.beforeFunc(r.ctx); err == nil {
93+
go r.work()
94+
}
95+
})
96+
return nil
11797
}
11898

11999
// BeforeFunc Function to be run before worker starts processing.
120-
func (r *runner) BeforeFunc(f func(ctx context.Context) error) Runner {
100+
func (r *Runner) BeforeFunc(f func(ctx context.Context) error) *Runner {
121101
r.beforeFunc = f
122102
return r
123103
}
124104

125105
// AfterFunc Function to be run after worker has stopped.
126-
func (r *runner) AfterFunc(f func(ctx context.Context, err error) error) Runner {
106+
// It can be used for logging and error management.
107+
// input can be retreive with context value:
108+
// ctx.Value(workers.InputKey{})
109+
// ⚠️ If an error is returned it stop Runner execution.
110+
func (r *Runner) AfterFunc(f func(ctx context.Context, err error) error) *Runner {
127111
r.afterFunc = f
128112
return r
129113
}
130114

115+
var ErrOutAlready = errors.New("out already set")
116+
131117
// SetOut Allows the setting of a workers out channel, if not already set.
132-
func (r *runner) SetOut(c chan interface{}) {
118+
func (r *Runner) SetOut(c chan interface{}) error {
133119
if r.outChan != nil {
134-
return
120+
return ErrOutAlready
135121
}
136122
r.outChan = c
123+
return nil
137124
}
138125

139-
// SetDeadline allows a time to be set when the workers should stop.
140-
// Deadline needs to be handled by the IsDone method.
141-
func (r *runner) SetDeadline(t time.Time) Runner {
142-
r.lock.Lock()
143-
defer r.lock.Unlock()
126+
// SetDeadline allows a time to be set when the Runner should stop.
127+
// ⚠️ Should only be called before Start
128+
func (r *Runner) SetDeadline(t time.Time) *Runner {
144129
r.ctx, r.cancel = context.WithDeadline(r.ctx, t)
145130
return r
146131
}
147132

148-
// SetTimeout allows a time duration to be set when the workers should stop.
149-
// Timeout needs to be handled by the IsDone method.
150-
func (r *runner) SetTimeout(duration time.Duration) Runner {
151-
r.lock.Lock()
152-
defer r.lock.Unlock()
133+
// SetWorkerTimeout allows a time duration to be set when the workers should stop.
134+
// ⚠️ Should only be called before Start
135+
func (r *Runner) SetWorkerTimeout(duration time.Duration) *Runner {
153136
r.timeout = duration
154137
return r
155138
}
156139

157-
// Wait calls stop on workers and waits for the channel to drain.
158-
// !!Should only be called when certain nothing will send to worker.
159-
func (r *runner) Wait() error {
160-
r.waitForDrain()
161-
if err := <-r.Stop(); err != nil && !errors.Is(err, context.Canceled) {
162-
return err
140+
// Wait close the input channel and waits it to drain and process.
141+
func (r *Runner) Wait() *Runner {
142+
if r.inputCtx.Err() == nil {
143+
r.inputCancel()
144+
close(r.inChan)
163145
}
164-
return nil
165-
}
166146

167-
// Stop Stops the processing of a worker and closes it's channel in.
168-
// Returns a blocking channel with type error.
169-
// !!Should only be called when certain nothing will send to worker.
170-
func (r *runner) Stop() chan error {
171-
r.done.Do(func() {
172-
if r.inChan != nil && r.isLeader {
173-
close(r.inChan)
174-
}
175-
})
176-
return r.errChan
147+
<-r.done
148+
149+
return r
177150
}
178151

179-
// IsDone returns a channel signaling the workers context has been canceled.
180-
func (r *runner) IsDone() <-chan struct{} {
181-
return r.ctx.Done()
152+
// Stop Stops the processing of a worker and waits for workers to finish.
153+
func (r *Runner) Stop() *Runner {
154+
r.cancel()
155+
r.Wait()
156+
return r
182157
}
183158

184-
// waitForSignal make sure we wait for a term signal and shutdown correctly
185-
func (r *runner) waitForSignal(signals ...os.Signal) {
186-
go func() {
187-
signal.Notify(r.signalChan, signals...)
188-
<-r.signalChan
189-
if r.cancel != nil {
190-
r.cancel()
191-
}
159+
type InputKey struct{}
160+
161+
// work starts processing input and limits worker instance number.
162+
func (r *Runner) work() {
163+
var wg sync.WaitGroup
164+
165+
defer func() {
166+
wg.Wait()
167+
r.cancel()
168+
close(r.done)
192169
}()
193-
}
194170

195-
// waitForDrain Waits for the limiter to be zeroed out and the in channel to be empty.
196-
func (r *runner) waitForDrain() {
197-
for len(r.limiter) > 0 || len(r.inChan) > 0 {
198-
// Wait for the drain.
199-
}
200-
}
171+
for {
172+
select {
173+
case <-r.ctx.Done():
174+
return
175+
case input, open := <-r.inChan:
176+
if !open {
177+
return
178+
}
179+
wg.Add(1)
201180

202-
// startWork Runs the before function and starts processing until one of three things happen.
203-
// 1. A term signal is received or cancellation of context.
204-
// 2. Stop function is called.
205-
// 3. Worker returns an error.
206-
func (r *runner) startWork() {
207-
var err error
208-
if err = r.beforeFunc(r.ctx); err != nil {
209-
r.errChan <- err
210-
return
211-
}
212-
if r.timeout > 0 {
213-
r.ctx, r.cancel = context.WithTimeout(r.ctx, r.timeout)
214-
}
215-
r.wg.Add(1)
216-
go func() {
217-
var workerWG = new(sync.WaitGroup)
218-
var closeOnce = new(sync.Once)
219-
220-
// write out error if not nil on exit.
221-
defer func() {
222-
workerWG.Wait()
223-
r.errChan <- err
224-
closeOnce.Do(func() {
225-
if r.outChan != nil {
226-
close(r.outChan)
227-
}
228-
})
229-
r.wg.Done()
230-
}()
231-
for in := range r.inChan {
232-
input := in
233181
r.limiter <- struct{}{}
234-
workerWG.Add(1)
182+
183+
inputCtx := context.WithValue(r.ctx, InputKey{}, input)
184+
workCtx, workCancel := context.WithCancel(inputCtx)
185+
if r.timeout > 0 {
186+
workCtx, workCancel = context.WithTimeout(inputCtx, r.timeout)
187+
}
188+
235189
go func() {
236190
defer func() {
237191
<-r.limiter
238-
workerWG.Done()
192+
workCancel()
193+
wg.Done()
239194
}()
240-
if err := r.afterFunc(r.ctx, r.workFunc(input, r.outChan)); err != nil {
241-
r.once.Do(func() {
242-
r.errChan <- err
243-
r.cancel()
244-
})
195+
if err := r.afterFunc(inputCtx, r.workFunc(workCtx, input, r.outChan)); err != nil {
196+
r.cancel()
245197
}
246198
}()
247199
}
248-
}()
200+
}
249201
}

0 commit comments

Comments
 (0)