Skip to content
This repository was archived by the owner on Jan 15, 2022. It is now read-only.

Commit f4f63c6

Browse files
bors[bot]rustatian
andauthored
Merge #41
41: Jobs 2.1.4 r=48d90782 a=48d90782 Co-authored-by: Valery Piashchynski <[email protected]>
2 parents eecc711 + 4a4d8cd commit f4f63c6

File tree

4 files changed

+38
-10
lines changed

4 files changed

+38
-10
lines changed

CHANGELOG.md

+10
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
CHANGELOG
2+
=========
3+
4+
## v2.1.4 (23.03.2020)
5+
- Replaced std encoding/json package with the https://github.com/json-iterator/go
6+
- Added BORS and GHA support
7+
- Added new exponential backoff mechanism for AMQP and Beanstalk modules
8+
- Fixed few concurrency issues with goroutines usage in loops
9+
- Other under-the-hood improvements with tests and code simplify
10+
- RR 1.7.0

broker/amqp/conn.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ type chanPool struct {
1414
tout time.Duration
1515
url string
1616

17-
mu sync.Mutex
17+
mu *sync.Mutex
1818

1919
conn *amqp.Connection
2020
channels map[string]*channel
@@ -42,6 +42,7 @@ func newConn(url string, tout time.Duration) (*chanPool, error) {
4242
url: url,
4343
tout: tout,
4444
conn: conn,
45+
mu: &sync.Mutex{},
4546
channels: make(map[string]*channel),
4647
wait: make(chan interface{}),
4748
connected: make(chan interface{}),

broker/amqp/consume_test.go

+21-6
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,10 @@ func TestBroker_Consume_Job(t *testing.T) {
1414
if err != nil {
1515
t.Fatal(err)
1616
}
17-
b.Register(pipe)
17+
err = b.Register(pipe)
18+
if err != nil {
19+
t.Fatal(err)
20+
}
1821

1922
ready := make(chan interface{})
2023
b.Listen(func(event int, ctx interface{}) {
@@ -57,7 +60,10 @@ func TestBroker_ConsumeAfterStart_Job(t *testing.T) {
5760
if err != nil {
5861
t.Fatal(err)
5962
}
60-
b.Register(pipe)
63+
err = b.Register(pipe)
64+
if err != nil {
65+
t.Fatal(err)
66+
}
6167

6268
ready := make(chan interface{})
6369
b.Listen(func(event int, ctx interface{}) {
@@ -101,7 +107,10 @@ func TestBroker_Consume_Delayed(t *testing.T) {
101107
t.Fatal(err)
102108
}
103109

104-
b.Register(pipe)
110+
err = b.Register(pipe)
111+
if err != nil {
112+
t.Fatal(err)
113+
}
105114

106115
ready := make(chan interface{})
107116
b.Listen(func(event int, ctx interface{}) {
@@ -140,7 +149,7 @@ func TestBroker_Consume_Delayed(t *testing.T) {
140149

141150
elapsed := time.Since(start)
142151
assert.True(t, elapsed >= time.Second)
143-
assert.True(t, elapsed < 2*time.Second)
152+
assert.True(t, elapsed < 3*time.Second)
144153
}
145154

146155
func TestBroker_Consume_Errored(t *testing.T) {
@@ -149,7 +158,10 @@ func TestBroker_Consume_Errored(t *testing.T) {
149158
if err != nil {
150159
t.Fatal(err)
151160
}
152-
b.Register(pipe)
161+
err = b.Register(pipe)
162+
if err != nil {
163+
t.Fatal(err)
164+
}
153165

154166
ready := make(chan interface{})
155167
b.Listen(func(event int, ctx interface{}) {
@@ -195,7 +207,10 @@ func TestBroker_Consume_Errored_Attempts(t *testing.T) {
195207
if err != nil {
196208
t.Fatal(err)
197209
}
198-
b.Register(pipe)
210+
err = b.Register(pipe)
211+
if err != nil {
212+
t.Fatal(err)
213+
}
199214

200215
ready := make(chan interface{})
201216
b.Listen(func(event int, ctx interface{}) {

broker/amqp/stat_test.go

+5-3
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package amqp
33
import (
44
"github.com/spiral/jobs/v2"
55
"github.com/stretchr/testify/assert"
6+
"sync"
67
"testing"
78
)
89

@@ -40,20 +41,21 @@ func TestBroker_Stat(t *testing.T) {
4041

4142
assert.NoError(t, b.Consume(pipe, exec, func(id string, j *jobs.Job, err error) {}))
4243

43-
waitJob := make(chan interface{})
44+
wg := &sync.WaitGroup{}
45+
wg.Add(1)
4446
exec <- func(id string, j *jobs.Job) error {
47+
defer wg.Done()
4548
assert.Equal(t, jid, id)
4649
assert.Equal(t, "body", j.Payload)
4750

4851
stat, err := b.Stat(pipe)
4952
assert.NoError(t, err)
5053
assert.Equal(t, int64(1), stat.Active)
5154

52-
close(waitJob)
5355
return nil
5456
}
5557

56-
<-waitJob
58+
wg.Wait()
5759
stat, err = b.Stat(pipe)
5860
assert.NoError(t, err)
5961
assert.Equal(t, int64(0), stat.Queue)

0 commit comments

Comments
 (0)