Skip to content
This repository was archived by the owner on Jan 15, 2022. It is now read-only.

Commit 24934cc

Browse files
committed
Merge remote-tracking branch 'origin/master'
2 parents dbc1290 + aca4676 commit 24934cc

21 files changed

+299
-278
lines changed

broker.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ type Broker interface {
55
// Register broker pipeline.
66
Register(pipe *Pipeline) error
77

8-
// Consume configures pipeline to be consumed. With execPool to nil to disable consuming. Method can be called before
8+
// Consume configures pipeline to be consumed. With execPool to nil to disable pipelines. Method can be called before
99
// the service is started!
1010
Consume(pipe *Pipeline, execPool chan Handler, errHandler ErrorHandler) error
1111

@@ -33,7 +33,7 @@ type Stat struct {
3333
// InternalName defines internal broker specific pipeline name.
3434
InternalName string
3535

36-
// Consuming indicates that pipeline is consuming jobs.
36+
// Consuming indicates that pipeline is pipelines jobs.
3737
Consuming bool
3838

3939
// testQueue defines number of pending jobs.

broker/amqp/queue.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,6 @@ func (q *queue) inspect(cp *chanPool) (*amqp.Queue, error) {
261261
// throw handles service, server and pool events.
262262
func (q *queue) report(err error) {
263263
if err != nil {
264-
q.lsn(jobs.EventPipelineError, &jobs.PipelineError{Pipeline: q.pipe, Caused: err})
264+
q.lsn(jobs.EventPipeError, &jobs.PipelineError{Pipeline: q.pipe, Caused: err})
265265
}
266266
}

broker/beanstalk/broker.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ func (b *Broker) Serve() (err error) {
6161

6262
for _, t := range b.tubes {
6363
if t.execPool != nil {
64-
go t.serve(connFactory(b.cfg))
64+
go t.serve(b.cfg)
6565
}
6666
}
6767

broker/beanstalk/broker_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package beanstalk
22

33
import (
4-
"github.com/kr/beanstalk"
4+
"github.com/beanstalkd/go-beanstalk"
55
"github.com/spiral/jobs"
66
"github.com/stretchr/testify/assert"
77
"testing"

broker/beanstalk/tube.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,6 @@ func (t *tube) doStat(cn *conn) (stat *jobs.Stat, err error) {
245245
// report tube specific error
246246
func (t *tube) report(err error) {
247247
if err != nil {
248-
t.lsn(jobs.EventPipelineError, &jobs.PipelineError{Pipeline: t.pipe, Caused: err})
248+
t.lsn(jobs.EventPipeError, &jobs.PipelineError{Pipeline: t.pipe, Caused: err})
249249
}
250250
}

broker/ephemeral/broker.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ func (b *Broker) Register(pipe *jobs.Pipeline) error {
3737
return fmt.Errorf("queue `%s` has already been registered", pipe.Name())
3838
}
3939

40-
b.queues[pipe] = newQueue(pipe.Integer("concurrency", 0))
40+
b.queues[pipe] = newQueue(pipe.Integer("maxThreads", 0))
4141

4242
return nil
4343
}

broker/ephemeral/queue.go

+31-37
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,14 @@ import (
88
)
99

1010
type queue struct {
11-
active int32
12-
st *jobs.Stat
11+
on int32
12+
state *jobs.Stat
1313

1414
// job pipeline
1515
concurPool chan interface{}
1616
jobs chan *entry
1717

18-
// active operations
18+
// on operations
1919
muw sync.Mutex
2020
wg sync.WaitGroup
2121

@@ -34,56 +34,50 @@ type entry struct {
3434
}
3535

3636
// create new queue
37-
func newQueue(concurrency int) *queue {
38-
q := &queue{st: &jobs.Stat{}, jobs: make(chan *entry)}
37+
func newQueue(maxConcur int) *queue {
38+
q := &queue{state: &jobs.Stat{}, jobs: make(chan *entry)}
3939

40-
if concurrency != 0 {
41-
q.concurPool = make(chan interface{}, concurrency)
42-
for i := 0; i < concurrency; i++ {
40+
if maxConcur != 0 {
41+
q.concurPool = make(chan interface{}, maxConcur)
42+
for i := 0; i < maxConcur; i++ {
4343
q.concurPool <- nil
4444
}
4545
}
4646

4747
return q
4848
}
4949

50-
// associate queue with new do pool
51-
func (q *queue) configure(execPool chan jobs.Handler, err jobs.ErrorHandler) error {
52-
q.execPool = execPool
53-
q.errHandler = err
54-
55-
return nil
56-
}
57-
5850
// serve consumers
5951
func (q *queue) serve() {
6052
q.wait = make(chan interface{})
61-
atomic.StoreInt32(&q.active, 1)
53+
atomic.StoreInt32(&q.on, 1)
6254

6355
for {
6456
e := q.consume()
6557
if e == nil {
58+
q.wg.Wait()
6659
return
6760
}
6861

6962
if q.concurPool != nil {
7063
<-q.concurPool
7164
}
7265

73-
atomic.AddInt64(&q.st.Active, 1)
66+
atomic.AddInt64(&q.state.Active, 1)
7467
h := <-q.execPool
75-
go func(e *entry) {
68+
69+
go func(h jobs.Handler, e *entry) {
70+
defer q.wg.Done()
71+
7672
q.do(h, e)
77-
atomic.AddInt64(&q.st.Active, ^int64(0))
73+
atomic.AddInt64(&q.state.Active, ^int64(0))
7874

7975
q.execPool <- h
8076

8177
if q.concurPool != nil {
8278
q.concurPool <- nil
8379
}
84-
85-
q.wg.Done()
86-
}(e)
80+
}(h, e)
8781
}
8882
}
8983

@@ -107,14 +101,14 @@ func (q *queue) do(h jobs.Handler, e *entry) {
107101
err := h(e.id, e.job)
108102

109103
if err == nil {
110-
atomic.AddInt64(&q.st.Queue, ^int64(0))
104+
atomic.AddInt64(&q.state.Queue, ^int64(0))
111105
return
112106
}
113107

114108
q.errHandler(e.id, e.job, err)
115109

116110
if !e.job.Options.CanRetry(e.attempt) {
117-
atomic.AddInt64(&q.st.Queue, ^int64(0))
111+
atomic.AddInt64(&q.state.Queue, ^int64(0))
118112
return
119113
}
120114

@@ -123,35 +117,35 @@ func (q *queue) do(h jobs.Handler, e *entry) {
123117

124118
// stop the queue consuming
125119
func (q *queue) stop() {
126-
if atomic.LoadInt32(&q.active) == 0 {
120+
if atomic.LoadInt32(&q.on) == 0 {
127121
return
128122
}
129123

130-
atomic.StoreInt32(&q.active, 0)
131-
132124
close(q.wait)
133-
125+
134126
q.muw.Lock()
135127
q.wg.Wait()
136-
q.muw.Unlock()
128+
q.muw.Unlock()
129+
130+
atomic.StoreInt32(&q.on, 0)
137131
}
138132

139133
// add job to the queue
140134
func (q *queue) push(id string, j *jobs.Job, attempt int, delay time.Duration) {
141135
if delay == 0 {
142-
atomic.AddInt64(&q.st.Queue, 1)
136+
atomic.AddInt64(&q.state.Queue, 1)
143137
go func() {
144138
q.jobs <- &entry{id: id, job: j, attempt: attempt}
145139
}()
146140

147141
return
148142
}
149143

150-
atomic.AddInt64(&q.st.Delayed, 1)
144+
atomic.AddInt64(&q.state.Delayed, 1)
151145
go func() {
152146
time.Sleep(delay)
153-
atomic.AddInt64(&q.st.Delayed, ^int64(0))
154-
atomic.AddInt64(&q.st.Queue, 1)
147+
atomic.AddInt64(&q.state.Delayed, ^int64(0))
148+
atomic.AddInt64(&q.state.Queue, 1)
155149

156150
q.jobs <- &entry{id: id, job: j, attempt: attempt}
157151
}()
@@ -160,8 +154,8 @@ func (q *queue) push(id string, j *jobs.Job, attempt int, delay time.Duration) {
160154
func (q *queue) stat() *jobs.Stat {
161155
return &jobs.Stat{
162156
InternalName: ":memory:",
163-
Queue: atomic.LoadInt64(&q.st.Queue),
164-
Active: atomic.LoadInt64(&q.st.Active),
165-
Delayed: atomic.LoadInt64(&q.st.Delayed),
157+
Queue: atomic.LoadInt64(&q.state.Queue),
158+
Active: atomic.LoadInt64(&q.state.Active),
159+
Delayed: atomic.LoadInt64(&q.state.Delayed),
166160
}
167161
}

broker/ephemeral/queue_test.go

-11
This file was deleted.

broker/sqs/broker.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ func (b *Broker) Stop() {
9696
q.stop()
9797
}
9898

99-
close(b.wait)
99+
b.wait <- nil
100100
<-b.stopped
101101
}
102102

broker/sqs/job.go

+7-1
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@ import (
1111

1212
var jobAttributes = []*string{
1313
aws.String("rr-job"),
14-
aws.String("rr-delay"),
1514
aws.String("rr-maxAttempts"),
15+
aws.String("rr-delay"),
1616
aws.String("rr-timeout"),
1717
aws.String("rr-retryDelay"),
1818
}
@@ -42,6 +42,12 @@ func unpack(msg *sqs.Message) (id string, attempt int, j *jobs.Job, err error) {
4242
}
4343
attempt, _ = strconv.Atoi(*msg.Attributes["ApproximateReceiveCount"])
4444

45+
for _, attr := range jobAttributes {
46+
if _, ok := msg.MessageAttributes[*attr]; !ok {
47+
return "", 0, nil, fmt.Errorf("missing message attribute `%s` (mixed queue?)", *attr)
48+
}
49+
}
50+
4551
j = &jobs.Job{
4652
Job: *msg.MessageAttributes["rr-job"].StringValue,
4753
Payload: *msg.Body,

broker/sqs/queue.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,6 @@ func (q *queue) stat(s *sqs.SQS) (stat *jobs.Stat, err error) {
261261
// throw handles service, server and pool events.
262262
func (q *queue) report(err error) {
263263
if err != nil {
264-
q.lsn(jobs.EventPipelineError, &jobs.PipelineError{Pipeline: q.pipe, Caused: err})
264+
q.lsn(jobs.EventPipeError, &jobs.PipelineError{Pipeline: q.pipe, Caused: err})
265265
}
266266
}

broker_test.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ func (b *testBroker) Register(pipe *Pipeline) error {
4545

4646
// Serve broker pipelines.
4747
func (b *testBroker) Serve() error {
48-
// start consuming
48+
// start pipelines
4949
b.mu.Lock()
5050
for _, q := range b.queues {
5151
if q.execPool != nil {
@@ -72,7 +72,7 @@ func (b *testBroker) Stop() {
7272
return
7373
}
7474

75-
// stop all consuming
75+
// stop all pipelines
7676
for _, q := range b.queues {
7777
q.stop()
7878
}
@@ -81,7 +81,7 @@ func (b *testBroker) Stop() {
8181
<-b.stopped
8282
}
8383

84-
// Consume configures pipeline to be consumed. With execPool to nil to disable consuming. Method can be called before
84+
// Consume configures pipeline to be consumed. With execPool to nil to disable pipelines. Method can be called before
8585
// the service is started!
8686
func (b *testBroker) Consume(pipe *Pipeline, execPool chan Handler, errHandler ErrorHandler) error {
8787
b.mu.Lock()
@@ -180,7 +180,7 @@ type testQueue struct {
180180
// job pipeline
181181
jobs chan *entry
182182

183-
// active operations
183+
// pipelines operations
184184
muw sync.Mutex
185185
wg sync.WaitGroup
186186

@@ -267,7 +267,7 @@ func (q *testQueue) do(h Handler, e *entry) {
267267
q.push(e.id, e.job, e.attempt+1, e.job.Options.RetryDuration())
268268
}
269269

270-
// stop the testQueue consuming
270+
// stop the testQueue pipelines
271271
func (q *testQueue) stop() {
272272
if atomic.LoadInt32(&q.active) == 0 {
273273
return

cmd/rr-jobs/jobs/debug.go

+9-9
Original file line numberDiff line numberDiff line change
@@ -52,23 +52,23 @@ func (s *debugger) listener(event int, ctx interface{}) {
5252
}
5353

5454
switch event {
55-
case jobs.EventPushComplete:
55+
case jobs.EventPushOK:
5656
e := ctx.(*jobs.JobEvent)
5757
s.logger.Info(util.Sprintf(
58-
"job.<magenta+h>PUSH</reset> <white+hb>%s</reset> <gray+hb>%s</reset>",
58+
"job.<yellow+h>PUSH</reset> <white+hb>%s</reset> <gray+hb>%s</reset>",
5959
e.Job.Job,
6060
e.ID,
6161
))
6262

63-
// case jobs.EventJobReceived:
63+
// case jobs.EventJobStart:
6464
// e := ctx.(*jobs.JobEvent)
6565
// s.logger.Info(util.Sprintf(
6666
// "job.<cyan+h>RECV</reset> <white+hb>%s</reset> <gray+hb>%s</reset>",
6767
// e.Job.Job,
6868
// e.ID,
6969
// ))
7070

71-
case jobs.EventJobComplete:
71+
case jobs.EventJobOK:
7272
e := ctx.(*jobs.JobEvent)
7373
s.logger.Info(util.Sprintf(
7474
"job.<green+h>DONE</reset> <green>%s</reset> <gray+hb>%s</reset> %s",
@@ -95,39 +95,39 @@ func (s *debugger) listener(event int, ctx interface{}) {
9595
e.Error(),
9696
))
9797

98-
// case jobs.EventPipelineConsume:
98+
// case jobs.EventPipeConsume:
9999
// e := ctx.(*jobs.Pipeline)
100100
// s.logger.Info(util.Sprintf(
101101
// "[%s]: resuming {<yellow+hb>%s</reset>}",
102102
// e.Broker(),
103103
// e.Name(),
104104
// ))
105105

106-
case jobs.EventPipelineConsuming:
106+
case jobs.EventPipeActive:
107107
e := ctx.(*jobs.Pipeline)
108108
s.logger.Info(util.Sprintf(
109109
"[%s]: resumed {<green+hb>%s</reset>}",
110110
e.Broker(),
111111
e.Name(),
112112
))
113113

114-
// case jobs.EventPipelineStop:
114+
// case jobs.EventPipeStop:
115115
// e := ctx.(*jobs.Pipeline)
116116
// s.logger.Info(util.Sprintf(
117117
// "[%s]: stopping {<yellow+hb>%s</reset>}",
118118
// e.Broker(),
119119
// e.Name(),
120120
// ))
121121

122-
case jobs.EventPipelineStopped:
122+
case jobs.EventPipeStopped:
123123
e := ctx.(*jobs.Pipeline)
124124
s.logger.Debugf(util.Sprintf(
125125
"[%s]: stopped {<magenta+hb>%s</reset>}",
126126
e.Broker(),
127127
e.Name(),
128128
))
129129

130-
case jobs.EventPipelineError:
130+
case jobs.EventPipeError:
131131
e := ctx.(*jobs.PipelineError)
132132
s.logger.Error(util.Sprintf(
133133
"[%s]: <red>{%s}</reset> <red+hb>%s</reset>",

0 commit comments

Comments
 (0)