Skip to content
This repository was archived by the owner on Jan 15, 2022. It is now read-only.

Commit 23d517d

Browse files
committed
polishing the behaviour
1 parent cf131cc commit 23d517d

File tree

3 files changed

+32
-49
lines changed

3 files changed

+32
-49
lines changed

broker/ephemeral/broker.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ func (b *Broker) Register(pipe *jobs.Pipeline) error {
3737
return fmt.Errorf("queue `%s` has already been registered", pipe.Name())
3838
}
3939

40-
b.queues[pipe] = newQueue(pipe.Integer("concurrency", 0))
40+
b.queues[pipe] = newQueue(pipe.Integer("maxThreads", 0))
4141

4242
return nil
4343
}

broker/ephemeral/queue.go

+31-37
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,14 @@ import (
88
)
99

1010
type queue struct {
11-
active int32
12-
st *jobs.Stat
11+
on int32
12+
state *jobs.Stat
1313

1414
// job pipeline
1515
concurPool chan interface{}
1616
jobs chan *entry
1717

18-
// active operations
18+
// on operations
1919
muw sync.Mutex
2020
wg sync.WaitGroup
2121

@@ -34,56 +34,50 @@ type entry struct {
3434
}
3535

3636
// create new queue
37-
func newQueue(concurrency int) *queue {
38-
q := &queue{st: &jobs.Stat{}, jobs: make(chan *entry)}
37+
func newQueue(maxConcur int) *queue {
38+
q := &queue{state: &jobs.Stat{}, jobs: make(chan *entry)}
3939

40-
if concurrency != 0 {
41-
q.concurPool = make(chan interface{}, concurrency)
42-
for i := 0; i < concurrency; i++ {
40+
if maxConcur != 0 {
41+
q.concurPool = make(chan interface{}, maxConcur)
42+
for i := 0; i < maxConcur; i++ {
4343
q.concurPool <- nil
4444
}
4545
}
4646

4747
return q
4848
}
4949

50-
// associate queue with new do pool
51-
func (q *queue) configure(execPool chan jobs.Handler, err jobs.ErrorHandler) error {
52-
q.execPool = execPool
53-
q.errHandler = err
54-
55-
return nil
56-
}
57-
5850
// serve consumers
5951
func (q *queue) serve() {
6052
q.wait = make(chan interface{})
61-
atomic.StoreInt32(&q.active, 1)
53+
atomic.StoreInt32(&q.on, 1)
6254

6355
for {
6456
e := q.consume()
6557
if e == nil {
58+
q.wg.Wait()
6659
return
6760
}
6861

6962
if q.concurPool != nil {
7063
<-q.concurPool
7164
}
7265

73-
atomic.AddInt64(&q.st.Active, 1)
66+
atomic.AddInt64(&q.state.Active, 1)
7467
h := <-q.execPool
75-
go func(e *entry) {
68+
69+
go func(h jobs.Handler, e *entry) {
70+
defer q.wg.Done()
71+
7672
q.do(h, e)
77-
atomic.AddInt64(&q.st.Active, ^int64(0))
73+
atomic.AddInt64(&q.state.Active, ^int64(0))
7874

7975
q.execPool <- h
8076

8177
if q.concurPool != nil {
8278
q.concurPool <- nil
8379
}
84-
85-
q.wg.Done()
86-
}(e)
80+
}(h, e)
8781
}
8882
}
8983

@@ -107,14 +101,14 @@ func (q *queue) do(h jobs.Handler, e *entry) {
107101
err := h(e.id, e.job)
108102

109103
if err == nil {
110-
atomic.AddInt64(&q.st.Queue, ^int64(0))
104+
atomic.AddInt64(&q.state.Queue, ^int64(0))
111105
return
112106
}
113107

114108
q.errHandler(e.id, e.job, err)
115109

116110
if !e.job.Options.CanRetry(e.attempt) {
117-
atomic.AddInt64(&q.st.Queue, ^int64(0))
111+
atomic.AddInt64(&q.state.Queue, ^int64(0))
118112
return
119113
}
120114

@@ -123,35 +117,35 @@ func (q *queue) do(h jobs.Handler, e *entry) {
123117

124118
// stop the queue consuming
125119
func (q *queue) stop() {
126-
if atomic.LoadInt32(&q.active) == 0 {
120+
if atomic.LoadInt32(&q.on) == 0 {
127121
return
128122
}
129123

130-
atomic.StoreInt32(&q.active, 0)
131-
132124
close(q.wait)
133-
125+
134126
q.muw.Lock()
135127
q.wg.Wait()
136-
q.muw.Unlock()
128+
q.muw.Unlock()
129+
130+
atomic.StoreInt32(&q.on, 0)
137131
}
138132

139133
// add job to the queue
140134
func (q *queue) push(id string, j *jobs.Job, attempt int, delay time.Duration) {
141135
if delay == 0 {
142-
atomic.AddInt64(&q.st.Queue, 1)
136+
atomic.AddInt64(&q.state.Queue, 1)
143137
go func() {
144138
q.jobs <- &entry{id: id, job: j, attempt: attempt}
145139
}()
146140

147141
return
148142
}
149143

150-
atomic.AddInt64(&q.st.Delayed, 1)
144+
atomic.AddInt64(&q.state.Delayed, 1)
151145
go func() {
152146
time.Sleep(delay)
153-
atomic.AddInt64(&q.st.Delayed, ^int64(0))
154-
atomic.AddInt64(&q.st.Queue, 1)
147+
atomic.AddInt64(&q.state.Delayed, ^int64(0))
148+
atomic.AddInt64(&q.state.Queue, 1)
155149

156150
q.jobs <- &entry{id: id, job: j, attempt: attempt}
157151
}()
@@ -160,8 +154,8 @@ func (q *queue) push(id string, j *jobs.Job, attempt int, delay time.Duration) {
160154
func (q *queue) stat() *jobs.Stat {
161155
return &jobs.Stat{
162156
InternalName: ":memory:",
163-
Queue: atomic.LoadInt64(&q.st.Queue),
164-
Active: atomic.LoadInt64(&q.st.Active),
165-
Delayed: atomic.LoadInt64(&q.st.Delayed),
157+
Queue: atomic.LoadInt64(&q.state.Queue),
158+
Active: atomic.LoadInt64(&q.state.Active),
159+
Delayed: atomic.LoadInt64(&q.state.Delayed),
166160
}
167161
}

broker/ephemeral/queue_test.go

-11
This file was deleted.

0 commit comments

Comments
 (0)