Skip to content

Commit 026a898

Browse files
committed
Improve job dispatcher
1 parent 54a9560 commit 026a898

2 files changed

Lines changed: 30 additions & 28 deletions

File tree

workers/workQueue/dispatcher.go

Lines changed: 29 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -6,37 +6,42 @@ import (
66
"time"
77
)
88

9+
const DefaultRetryTimeout = 60 * time.Second
10+
911
type JobDispatcher struct {
10-
jobPool *IndexJobPool
11-
input chan Work // channel to receive work
12-
end chan bool // channel to spin down workers
13-
workerChannel chan chan Work
12+
retryTimeout time.Duration
13+
jobPool *IndexJobPool
14+
inputChan chan Work // channel to receive work
15+
endChan chan bool // channel to spin down workers
16+
workerChan chan chan Work // channel to send work to workers
17+
EmptyQueueChan chan bool // channel to communicate that queue was consumed
1418
}
1519

1620
type DispatcherConfig struct {
17-
StartIndex int64
18-
EndIndex int64
19-
JobsTopic string
21+
JobsTopic string
2022
}
2123

22-
func NewJobDispatcher(cfg DispatcherConfig) JobDispatcher {
24+
func NewJobDispatcher() JobDispatcher {
2325
d := JobDispatcher{
24-
jobPool: NewJobPool(PoolConfig{
25-
StartHeight: cfg.StartIndex,
26-
EndHeight: cfg.EndIndex,
27-
}),
28-
workerChannel: make(chan chan Work),
29-
input: make(chan Work),
30-
end: make(chan bool),
26+
retryTimeout: DefaultRetryTimeout,
27+
jobPool: NewJobPool(),
28+
workerChan: make(chan chan Work),
29+
inputChan: make(chan Work),
30+
endChan: make(chan bool),
31+
EmptyQueueChan: make(chan bool),
3132
}
3233

3334
return d
3435
}
3536

37+
func (j JobDispatcher) SetRetryTimeout(timeout time.Duration) {
38+
j.retryTimeout = timeout
39+
}
40+
3641
func (j JobDispatcher) BuildWorkers(count int, constructor WorkerConstructor) {
3742
for i := 0; i < count; i++ {
3843
workerId := fmt.Sprintf("worker.%d", i)
39-
worker := constructor(workerId, j.workerChannel)
44+
worker := constructor(workerId, j.workerChan)
4045
worker.Worker.Start()
4146
}
4247
}
@@ -48,11 +53,12 @@ func (j JobDispatcher) Start() {
4853
work := j.jobPool.GetNewJob()
4954
if work.JobId == -1 {
5055
zap.S().Infof("*** No more jobs on JobPool, waiting.... ***")
51-
time.Sleep(60 * time.Second)
56+
j.EmptyQueueChan <- true
57+
time.Sleep(j.retryTimeout)
5258
continue
5359
}
5460

55-
j.input <- work
61+
j.inputChan <- work
5662
}
5763
}
5864

@@ -64,12 +70,12 @@ func (j JobDispatcher) dispatch() {
6470
go func() {
6571
for {
6672
select {
67-
case <-j.end:
68-
fmt.Println("JobDispatcher received 'end'")
73+
case <-j.endChan:
74+
fmt.Println("JobDispatcher received 'endChan'")
6975
return
70-
case work := <-j.input:
71-
worker := <-j.workerChannel // wait for available channel
72-
worker <- work // dispatch work to worker
76+
case work := <-j.inputChan:
77+
worker := <-j.workerChan // wait for available channel
78+
worker <- work // dispatch work to worker
7379
}
7480
}
7581
}()

workers/workQueue/pool.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,12 @@ type PoolConfig struct {
1515
EndHeight int64
1616
}
1717

18-
func NewJobPool(c PoolConfig) *IndexJobPool {
18+
func NewJobPool() *IndexJobPool {
1919
pool := &IndexJobPool{
2020
mutex: sync.Mutex{},
2121
queue: queue.New(),
2222
}
2323

24-
for i := c.StartHeight; i <= c.EndHeight; i++ {
25-
pool.queue.Add(Work{JobId: i})
26-
}
27-
2824
return pool
2925
}
3026

0 commit comments

Comments
 (0)