Skip to content
This repository was archived by the owner on Jan 15, 2022. It is now read-only.

Commit eecc711

Browse files
bors[bot]rustatian
andauthored
Merge #39
39: Replace json std package with github.com/json-iterator/go r=48d90782 a=48d90782 1. Refactor amqp/conn, move redial from config to conn.go 2. Remove dialing function, as not concurrent safe, refactor it 3. Simplify conn.notify function 4. Add a simple threshold limiter to BS/conn.go. Limit located in the constants.go file in beanstalk package Co-authored-by: Valery Piashchynski <[email protected]>
2 parents e85ad6b + ce1f712 commit eecc711

14 files changed

+51
-46
lines changed

.github/workflows/ci-build.yml

+2
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ jobs:
5959
run: |
6060
docker-compose -f ./tests/docker-compose.yml up -d
6161
vendor/bin/spiral-cs check src tests
62+
go mod vendor
63+
composer update
6264
go test -v -race -cover -coverprofile=jobs.txt -covermode=atomic
6365
go test -v -race -cover ./broker/amqp -coverprofile=amqp.txt -covermode=atomic
6466
go test -v -race -cover ./broker/ephemeral -coverprofile=ephemeral.txt -covermode=atomic

Makefile

+1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ install: all
55
uninstall:
66
rm -f /usr/local/bin/rr-jobs
77
test:
8+
go mod vendor
89
composer update
910
go test -v -race -cover
1011
go test -v -race -cover ./broker/amqp

broker/amqp/broker.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -56,13 +56,13 @@ func (b *Broker) Register(pipe *jobs.Pipeline) error {
5656
func (b *Broker) Serve() (err error) {
5757
b.mu.Lock()
5858

59-
if b.publish, err = newConn(b.cfg.dial, b.cfg.TimeoutDuration()); err != nil {
59+
if b.publish, err = newConn(b.cfg.Addr, b.cfg.TimeoutDuration()); err != nil {
6060
b.mu.Unlock()
6161
return err
6262
}
6363
defer b.publish.Close()
6464

65-
if b.consume, err = newConn(b.cfg.dial, b.cfg.TimeoutDuration()); err != nil {
65+
if b.consume, err = newConn(b.cfg.Addr, b.cfg.TimeoutDuration()); err != nil {
6666
b.mu.Unlock()
6767
return err
6868
}

broker/amqp/config.go

+1-7
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package amqp
33
import (
44
"fmt"
55
"github.com/spiral/roadrunner/service"
6-
"github.com/streadway/amqp"
76
"time"
87
)
98

@@ -29,7 +28,7 @@ func (c *Config) Hydrate(cfg service.Config) error {
2928
return nil
3029
}
3130

32-
// TimeoutDuration returns number of seconds allowed to allocate the publish.
31+
// TimeoutDuration returns number of seconds allowed to redial
3332
func (c *Config) TimeoutDuration() time.Duration {
3433
timeout := c.Timeout
3534
if timeout == 0 {
@@ -38,8 +37,3 @@ func (c *Config) TimeoutDuration() time.Duration {
3837

3938
return time.Duration(timeout) * time.Second
4039
}
41-
42-
// dial dials to AMQP.
43-
func (c *Config) dial() (*amqp.Connection, error) {
44-
return amqp.Dial(c.Addr)
45-
}

broker/amqp/config_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package amqp
22

33
import (
4-
"encoding/json"
4+
json "github.com/json-iterator/go"
55
"github.com/spiral/roadrunner/service"
66
"github.com/stretchr/testify/assert"
77
"testing"

broker/amqp/conn.go

+31-22
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,12 @@ import (
1010

1111
// manages set of AMQP channels
1212
type chanPool struct {
13-
mu sync.Mutex
13+
// timeout to backoff redial
14+
tout time.Duration
15+
url string
16+
17+
mu sync.Mutex
18+
1419
conn *amqp.Connection
1520
channels map[string]*channel
1621
wait chan interface{}
@@ -19,35 +24,39 @@ type chanPool struct {
1924

2025
// manages single channel
2126
type channel struct {
22-
ch *amqp.Channel
27+
ch *amqp.Channel
2328
// todo unused
2429
//consumer string
25-
confirm chan amqp.Confirmation
26-
signal chan error
30+
confirm chan amqp.Confirmation
31+
signal chan error
2732
}
2833

29-
type dialer func() (*amqp.Connection, error)
30-
3134
// newConn creates new watched AMQP connection
32-
func newConn(dial dialer, tout time.Duration) (*chanPool, error) {
33-
conn, err := dial()
35+
func newConn(url string, tout time.Duration) (*chanPool, error) {
36+
conn, err := dial(url)
3437
if err != nil {
3538
return nil, err
3639
}
3740

3841
cp := &chanPool{
42+
url: url,
43+
tout: tout,
3944
conn: conn,
4045
channels: make(map[string]*channel),
4146
wait: make(chan interface{}),
4247
connected: make(chan interface{}),
4348
}
4449

4550
close(cp.connected)
46-
go cp.watch(dial, conn.NotifyClose(make(chan *amqp.Error)))
47-
51+
go cp.watch()
4852
return cp, nil
4953
}
5054

55+
// dial dials to AMQP.
56+
func dial(url string) (*amqp.Connection, error) {
57+
return amqp.Dial(url)
58+
}
59+
5160
// Close gracefully closes all underlying channels and connection.
5261
func (cp *chanPool) Close() error {
5362
cp.mu.Lock()
@@ -91,14 +100,16 @@ func (cp *chanPool) waitConnected() chan interface{} {
91100
}
92101

93102
// watch manages connection state and reconnects if needed
94-
func (cp *chanPool) watch(dial dialer, errors chan *amqp.Error) {
103+
func (cp *chanPool) watch() {
95104
for {
96105
select {
97106
case <-cp.wait:
98107
// connection has been closed
99108
return
100-
case err := <-errors:
109+
// here we are waiting for the errors from amqp connection
110+
case err := <-cp.conn.NotifyClose(make(chan *amqp.Error)):
101111
cp.mu.Lock()
112+
// clear connected, since connections are dead
102113
cp.connected = make(chan interface{})
103114

104115
// broadcast error to all consume to let them for the tryReconnect
@@ -109,20 +120,20 @@ func (cp *chanPool) watch(dial dialer, errors chan *amqp.Error) {
109120
// disable channel allocation while server is dead
110121
cp.conn = nil
111122
cp.channels = nil
112-
cp.mu.Unlock()
113123

114124
// initialize the backoff
115125
expb := backoff.NewExponentialBackOff()
116-
expb.MaxInterval = DefaultMaxInterval
126+
expb.MaxInterval = cp.tout
127+
cp.mu.Unlock()
117128

118-
//reconnect function
129+
// reconnect function
119130
reconnect := func() error {
120131
cp.mu.Lock()
121-
defer cp.mu.Unlock()
122-
conn, err := dial()
132+
conn, err := dial(cp.url)
123133
if err != nil {
124134
// still failing
125135
fmt.Println(fmt.Sprintf("error during the amqp dialing, %s", err.Error()))
136+
cp.mu.Unlock()
126137
return err
127138
}
128139

@@ -134,18 +145,16 @@ func (cp *chanPool) watch(dial dialer, errors chan *amqp.Error) {
134145
cp.conn = conn
135146
// re-init the channels
136147
cp.channels = make(map[string]*channel)
137-
errors = cp.conn.NotifyClose(make(chan *amqp.Error))
148+
cp.mu.Unlock()
138149
return nil
139150
}
140151

141-
152+
// start backoff retry
142153
errb := backoff.Retry(reconnect, expb)
143154
if errb != nil {
144155
fmt.Println(fmt.Sprintf("backoff Retry error, %s", errb.Error()))
145156
// reconnection failed
146-
cp.mu.Lock()
147157
close(cp.connected)
148-
cp.mu.Unlock()
149158
return
150159
}
151160
close(cp.connected)
@@ -162,7 +171,7 @@ func (cp *chanPool) channel(name string) (*channel, error) {
162171
if dead {
163172
// wait for connection restoration (doubled the timeout duration)
164173
select {
165-
case <-time.NewTimer(DefaultMaxInterval * 2).C:
174+
case <-time.NewTimer(cp.tout * 2).C:
166175
return nil, fmt.Errorf("connection is dead")
167176
case <-cp.connected:
168177
// connected

broker/amqp/constants.go

-6
This file was deleted.

broker/beanstalk/config_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package beanstalk
22

33
import (
4-
"encoding/json"
4+
json "github.com/json-iterator/go"
55
"github.com/spiral/roadrunner/service"
66
"github.com/stretchr/testify/assert"
77
"testing"

broker/beanstalk/conn.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -111,13 +111,17 @@ func (cn *conn) release(err error) error {
111111
// watch and reconnect if dead
112112
func (cn *conn) watch(network, addr string) {
113113
cn.free <- nil
114+
t := time.NewTicker(WatchThrottleLimit)
115+
defer t.Stop()
114116
for {
115117
select {
116118
case <-cn.dead:
119+
// simple throttle limiter
120+
<-t.C
117121
// try to reconnect
118122
// TODO add logging here
119123
expb := backoff.NewExponentialBackOff()
120-
expb.MaxInterval = time.Second * 5
124+
expb.MaxInterval = cn.tout
121125

122126
reconnect := func() error {
123127
conn, err := beanstalk.Dial(network, addr)

broker/beanstalk/constants.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,5 @@ package beanstalk
22

33
import "time"
44

5-
// DefaultMaxInterval is the max reconnect time interval
6-
const DefaultMaxInterval = 30 * time.Second
5+
// WatchThrottleLimit is used to limit reconnection occurrence in watch function
6+
const WatchThrottleLimit = time.Second

broker/sqs/config_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package sqs
22

33
import (
4-
"encoding/json"
4+
json "github.com/json-iterator/go"
55
"github.com/spiral/roadrunner/service"
66
"github.com/stretchr/testify/assert"
77
"testing"

config_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package jobs
22

33
import (
4-
"encoding/json"
4+
json "github.com/json-iterator/go"
55
"github.com/spiral/roadrunner/service"
66
"github.com/stretchr/testify/assert"
77
"testing"

go.mod

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
module github.com/spiral/jobs/v2
22

3-
go 1.13
3+
go 1.14
44

55
require (
66
github.com/aws/aws-sdk-go v1.16.14
@@ -12,6 +12,7 @@ require (
1212
github.com/cpuguy83/go-md2man v1.0.10 // indirect
1313
github.com/dustin/go-humanize v1.0.0
1414
github.com/gofrs/uuid v3.1.0+incompatible
15+
github.com/json-iterator/go v1.1.9
1516
github.com/kr/beanstalk v0.0.0-20180818045031-cae1762e4858 // indirect
1617
github.com/olekukonko/tablewriter v0.0.4
1718
github.com/prometheus/client_golang v1.5.0

job.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package jobs
22

3-
import "encoding/json"
3+
import json "github.com/json-iterator/go"
44

55
// Handler handles job execution.
66
type Handler func(id string, j *Job) error

0 commit comments

Comments
 (0)