Skip to content
This repository was archived by the owner on Jan 15, 2022. It is now read-only.

Commit e85ad6b

Browse files
bors[bot]rustatian
andauthored
Merge #37
37: Backoff mechanism r=48d90782 a=48d90782 Co-authored-by: Valery Piashchynski <[email protected]>
2 parents 0f64187 + 00d8ff7 commit e85ad6b

File tree

8 files changed

+74
-42
lines changed

8 files changed

+74
-42
lines changed

.gitignore

+3-1
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,6 @@ vendor/
66
clover.xml
77
.DS_Store
88
tests/.rr-test.yml
9-
go.sum
9+
go.sum
10+
psr-worker.php
11+
.rr-sample.yaml

README.md

+1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
[![Go Report Card](https://goreportcard.com/badge/github.com/spiral/jobs)](https://goreportcard.com/report/github.com/spiral/jobs)
66
[![Codecov](https://codecov.io/gh/spiral/jobs/branch/master/graph/badge.svg)](https://codecov.io/gh/spiral/jobs/)
77

8+
89
## Documentation
910
* [Installation and Configuration](https://spiral.dev/docs/queue-configuration)
1011
* [Console Commands](https://spiral.dev/docs/queue-commands)

broker/amqp/conn.go

+34-33
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,14 @@ package amqp
22

33
import (
44
"fmt"
5+
"github.com/cenkalti/backoff/v4"
56
"github.com/streadway/amqp"
67
"sync"
78
"time"
89
)
910

1011
// manages set of AMQP channels
1112
type chanPool struct {
12-
tout time.Duration
1313
mu sync.Mutex
1414
conn *amqp.Connection
1515
channels map[string]*channel
@@ -36,7 +36,6 @@ func newConn(dial dialer, tout time.Duration) (*chanPool, error) {
3636
}
3737

3838
cp := &chanPool{
39-
tout: tout,
4039
conn: conn,
4140
channels: make(map[string]*channel),
4241
wait: make(chan interface{}),
@@ -112,46 +111,48 @@ func (cp *chanPool) watch(dial dialer, errors chan *amqp.Error) {
112111
cp.channels = nil
113112
cp.mu.Unlock()
114113

115-
conn, errChan := cp.reconnect(dial)
116-
if conn == nil {
114+
// initialize the backoff
115+
expb := backoff.NewExponentialBackOff()
116+
expb.MaxInterval = DefaultMaxInterval
117+
118+
//reconnect function
119+
reconnect := func() error {
120+
cp.mu.Lock()
121+
defer cp.mu.Unlock()
122+
conn, err := dial()
123+
if err != nil {
124+
// still failing
125+
fmt.Println(fmt.Sprintf("error during the amqp dialing, %s", err.Error()))
126+
return err
127+
}
128+
129+
// TODO ADD LOGGING
130+
fmt.Println("------amqp successfully redialed------")
131+
132+
// here we are reconnected
133+
// replace the connection
134+
cp.conn = conn
135+
// re-init the channels
136+
cp.channels = make(map[string]*channel)
137+
errors = cp.conn.NotifyClose(make(chan *amqp.Error))
138+
return nil
139+
}
140+
141+
142+
errb := backoff.Retry(reconnect, expb)
143+
if errb != nil {
144+
fmt.Println(fmt.Sprintf("backoff Retry error, %s", errb.Error()))
145+
// reconnection failed
117146
cp.mu.Lock()
118147
close(cp.connected)
119148
cp.mu.Unlock()
120-
121-
// interrupted
122149
return
123150
}
124-
125-
cp.mu.Lock()
126-
cp.conn = conn
127-
cp.channels = make(map[string]*channel)
128-
errors = errChan
129-
cp.mu.Unlock()
130-
131151
close(cp.connected)
132152
}
133153
}
134154
}
135155

136-
func (cp *chanPool) reconnect(dial dialer) (conn *amqp.Connection, errors chan *amqp.Error) {
137-
for {
138-
select {
139-
case <-cp.wait:
140-
// connection has been cancelled is not possible
141-
return nil, nil
142-
143-
case <-time.NewTimer(cp.tout).C:
144-
conn, err := dial()
145-
if err != nil {
146-
// still failing
147-
continue
148-
}
149-
150-
return conn, conn.NotifyClose(make(chan *amqp.Error))
151-
}
152-
}
153-
}
154-
155156
// channel allocates new channel on amqp connection
156157
func (cp *chanPool) channel(name string) (*channel, error) {
157158
cp.mu.Lock()
@@ -161,7 +162,7 @@ func (cp *chanPool) channel(name string) (*channel, error) {
161162
if dead {
162163
// wait for connection restoration (doubled the timeout duration)
163164
select {
164-
case <-time.NewTimer(cp.tout * 2).C:
165+
case <-time.NewTimer(DefaultMaxInterval * 2).C:
165166
return nil, fmt.Errorf("connection is dead")
166167
case <-cp.connected:
167168
// connected

broker/amqp/constants.go

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package amqp
2+
3+
import "time"
4+
5+
// DefaultMaxInterval is the max reconnect time interval
6+
const DefaultMaxInterval = 30 * time.Second

broker/beanstalk/conn.go

+21-5
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package beanstalk
33
import (
44
"fmt"
55
"github.com/beanstalkd/go-beanstalk"
6+
"github.com/cenkalti/backoff/v4"
67
"strings"
78
"sync"
89
"time"
@@ -114,15 +115,30 @@ func (cn *conn) watch(network, addr string) {
114115
select {
115116
case <-cn.dead:
116117
// try to reconnect
117-
if conn, err := beanstalk.Dial(network, addr); err == nil {
118+
// TODO add logging here
119+
expb := backoff.NewExponentialBackOff()
120+
expb.MaxInterval = time.Second * 5
121+
122+
reconnect := func() error {
123+
conn, err := beanstalk.Dial(network, addr)
124+
if err != nil {
125+
fmt.Println(fmt.Sprintf("redial: error during the beanstalk dialing, %s", err.Error()))
126+
return err
127+
}
128+
129+
// TODO ADD LOGGING
130+
fmt.Println("------beanstalk successfully redialed------")
131+
118132
cn.conn = conn
119133
cn.free <- nil
120-
continue
134+
return nil
121135
}
122136

123-
// retry later
124-
time.Sleep(cn.tout)
125-
cn.dead <- nil
137+
err := backoff.Retry(reconnect, expb)
138+
if err != nil {
139+
fmt.Println(fmt.Sprintf("redial failed: %s", err.Error()))
140+
cn.dead <- nil
141+
}
126142

127143
case <-cn.stop:
128144
cn.lock.L.Lock()

broker/beanstalk/constants.go

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package beanstalk
2+
3+
import "time"
4+
5+
// DefaultMaxInterval is the max reconnect time interval
6+
const DefaultMaxInterval = 30 * time.Second

broker/sqs/job.go

+1-3
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ var jobAttributes = []*string{
1919

2020
// pack job metadata into headers
2121
func pack(url *string, j *jobs.Job) *sqs.SendMessageInput {
22-
msg := &sqs.SendMessageInput{
22+
return &sqs.SendMessageInput{
2323
QueueUrl: url,
2424
DelaySeconds: aws.Int64(int64(j.Options.Delay)),
2525
MessageBody: aws.String(j.Payload),
@@ -31,8 +31,6 @@ func pack(url *string, j *jobs.Job) *sqs.SendMessageInput {
3131
"rr-retryDelay": {DataType: aws.String("Number"), StringValue: awsDuration(j.Options.RetryDuration())},
3232
},
3333
}
34-
35-
return msg
3634
}
3735

3836
// unpack restores jobs.Options

go.mod

+2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ require (
66
github.com/aws/aws-sdk-go v1.16.14
77
github.com/beanstalkd/go-beanstalk v0.0.0-20180822062812-53ecdaa3bcfb
88
github.com/buger/goterm v0.0.0-20181115115552-c206103e1f37
9+
github.com/cenkalti/backoff v2.2.1+incompatible // indirect
10+
github.com/cenkalti/backoff/v4 v4.0.0
911
github.com/coreos/go-etcd v2.0.0+incompatible // indirect
1012
github.com/cpuguy83/go-md2man v1.0.10 // indirect
1113
github.com/dustin/go-humanize v1.0.0

0 commit comments

Comments
 (0)