Skip to content

Commit 93f1717

Browse files
welllezhangleivearutop
authored
113 Add support for Redis clusters (wellle#148)
* fix using another wrapper to fit redis cluster and redis single node; fix using {} to replace [] - untested * update test files with new openConn; add queue_cluster_test.go welle: Remove hardcoded connection options. * Bring back the original OpenConnection() for compatibility Add new function OpenConnectionWithOptions() for the new approach. * Allow cluster tests to run against local cluster * Remove RedisClusterWrapper again Rename RedisSingleWrapper back to RedisWrapper * Bring back original OpenConnectionWithRedisClient() * Add OpenClusterConnection() To allow opening RMQ connections which use the Redis hash tags {} instead of []. This is required to make rmq work with Redis clusters. This commit also reverts the behavior of all other OpenConnection[...] functions to behave as before by still using [] instead of {}. This switch is done by using different Redis key templates. For example instead of rmq::connection::{connection}::queue::[{queue}]::consumers we would use rmq::connection::{connection}::queue::{{queue}}::consumers when using OpenClusterConnection() * Document OpenClusterConnection() in README * Use safe accessors in tests * Update deps * Create Redis cluster in CI (wellle#150) --------- Co-authored-by: zhanglei <[email protected]> Co-authored-by: Viacheslav Poturaev <[email protected]>
1 parent 52c05b0 commit 93f1717

14 files changed

+1267
-165
lines changed

.github/workflows/test.yml

+12-21
Original file line numberDiff line numberDiff line change
@@ -17,40 +17,21 @@ env:
1717
COV_GO_VERSION: 1.19.x # Version of Go to collect coverage
1818
TARGET_DELTA_COV: 90 # Target coverage of changed lines, in percents
1919
REDIS_ADDR: "localhost:6379"
20+
REDIS_CLUSTER_ADDR: "localhost:30001,localhost:30002,localhost:30003,localhost:30004,localhost:30005,localhost:30006"
2021
jobs:
2122
test:
2223
strategy:
2324
matrix:
2425
go-version: [ 1.14.x, 1.15.x, 1.16.x, 1.17.x, 1.18.x, 1.19.x ]
2526
runs-on: ubuntu-latest
2627

27-
services:
28-
redis:
29-
image: redis
30-
options: >-
31-
--health-cmd "redis-cli ping"
32-
--health-interval 10s
33-
--health-timeout 5s
34-
--health-retries 5
35-
ports:
36-
- 6379:6379
3728
steps:
38-
- name: Install Go stable
29+
- name: Install Go
3930
if: matrix.go-version != 'tip'
4031
uses: actions/setup-go@v3
4132
with:
4233
go-version: ${{ matrix.go-version }}
4334

44-
- name: Install Go tip
45-
if: matrix.go-version == 'tip'
46-
run: |
47-
curl -sL https://storage.googleapis.com/go-build-snap/go/linux-amd64/$(git ls-remote https://github.com/golang/go.git HEAD | awk '{print $1;}').tar.gz -o gotip.tar.gz
48-
ls -lah gotip.tar.gz
49-
mkdir -p ~/sdk/gotip
50-
tar -C ~/sdk/gotip -xzf gotip.tar.gz
51-
~/sdk/gotip/bin/go version
52-
echo "PATH=$HOME/go/bin:$HOME/sdk/gotip/bin/:$PATH" >> $GITHUB_ENV
53-
5435
- name: Checkout code
5536
uses: actions/checkout@v2
5637

@@ -77,6 +58,16 @@ jobs:
7758
# Use base sha for PR or new commit hash for master/main push in test result key.
7859
key: ${{ runner.os }}-unit-test-coverage-${{ (github.event.pull_request.base.sha != github.event.after) && github.event.pull_request.base.sha || github.event.after }}
7960

61+
- name: Prepare Redis
62+
run: |
63+
sudo apt-get update && sudo apt-get install -y lsb-release curl gpg
64+
curl -fsSL https://packages.redis.io/gpg | sudo gpg --dearmor -o /usr/share/keyrings/redis-archive-keyring.gpg
65+
echo "deb [signed-by=/usr/share/keyrings/redis-archive-keyring.gpg] https://packages.redis.io/deb $(lsb_release -cs) main" | sudo tee /etc/apt/sources.list.d/redis.list
66+
sudo apt-get update && sudo apt-get install -y redis
67+
./testdata/create-cluster.sh start
68+
yes yes | ./testdata/create-cluster.sh create
69+
sleep 5
70+
8071
- name: Run test for base code
8172
if: matrix.go-version == env.COV_GO_VERSION && env.RUN_BASE_COVERAGE == 'on' && steps.base-coverage.outputs.cache-hit != 'true' && github.event.pull_request.base.sha != ''
8273
run: |

README.md

+25-16
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,10 @@
11
[![Build Status](https://github.com/adjust/rmq/workflows/test/badge.svg)](https://github.com/adjust/rmq/actions?query=branch%3Amaster+workflow%3Atest)
22
[![GoDoc](https://pkg.go.dev/badge/github.com/adjust/rmq)](https://pkg.go.dev/github.com/adjust/rmq)
33

4-
---
5-
6-
**Note**: We recently updated rmq to expose Redis errors instead of panicking.
7-
This is a major change as almost all functions now return errors. It's
8-
recommended to switch to the latest version `rmq/v4` so rmq won't crash your
9-
services anymore on Redis errors.
10-
11-
If you don't want to upgrade yet, you can continue using `rmq/v2`.
12-
13-
---
14-
154
## Overview
165

176
rmq is short for Redis message queue. It's a message queue system written in Go
18-
and backed by Redis. It's similar to [redismq][redismq], but implemented
19-
independently with a different interface in mind.
20-
21-
[redismq]: https://github.com/adjust/redismq
7+
and backed by Redis.
228

239
## Basic Usage
2410

@@ -49,7 +35,11 @@ It's also possible to access a Redis listening on a Unix socket:
4935
connection, err := rmq.OpenConnection("my service", "unix", "/tmp/redis.sock", 1, errChan)
5036
```
5137

52-
For more flexible setup you can also create your own Redis client:
38+
For more flexible setup you can pass Redis options or create your own Redis client:
39+
40+
```go
41+
connection, err := OpenConnectionWithRedisOptions("my service", redisOptions, errChan)
42+
```
5343

5444
```go
5545
connection, err := OpenConnectionWithRedisClient("my service", redisClient, errChan)
@@ -63,6 +53,25 @@ the `OpenConnection()` functions rmq will send those background errors to this
6353
channel so you can handle them asynchronously. For more details about this and
6454
handling suggestions see the section about handling background errors below.
6555

56+
#### Connecting to a Redis cluster
57+
58+
In order to connect to a Redis cluster please use `OpenClusterConnection()`:
59+
60+
```go
61+
redisClusterOptions := &redis.ClusterOptions{ /* ... */ }
62+
redisClusterClient := redis.NewClusterClient(redisClusterOptions)
63+
connection, err := OpenClusterConnection("my service", redisClusterClient, errChan)
64+
```
65+
66+
Note that such an rmq cluster connection uses different Redis than rmq connections
67+
opened by `OpenConnection()` or similar. If you have used a Redis instance
68+
with `OpenConnection()` then it is NOT SAFE to reuse that rmq system by connecting
69+
to it via `OpenClusterConnection()`. The cluster state won't be compatible and
70+
this will likely lead to data loss.
71+
72+
If you've previously used `OpenConnection()` or similar you should only consider
73+
using `OpenClusterConnection()` with a fresh Redis cluster.
74+
6675
### Queues
6776

6877
Once we have a connection we can use it to finally access queues. Each queue

cleaner_test.go

+6-19
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,23 @@
11
package rmq
22

33
import (
4-
"os"
54
"testing"
65
"time"
76

8-
"github.com/alicebob/miniredis/v2"
97
"github.com/stretchr/testify/assert"
108
"github.com/stretchr/testify/require"
119
)
1210

13-
func testRedis(t testing.TB) (addr string, closer func()) {
14-
t.Helper()
15-
16-
if redisAddr, ok := os.LookupEnv("REDIS_ADDR"); ok {
17-
return redisAddr, func() {}
18-
}
19-
20-
mr := miniredis.RunT(t)
21-
return mr.Addr(), mr.Close
22-
}
23-
2411
func TestCleaner(t *testing.T) {
25-
redisAddr, closer := testRedis(t)
12+
redisOptions, closer := testRedis(t)
2613
defer closer()
2714

28-
flushConn, err := OpenConnection("cleaner-flush", "tcp", redisAddr, 1, nil)
15+
flushConn, err := OpenConnectionWithRedisOptions("cleaner-flush", redisOptions, nil)
2916
assert.NoError(t, err)
3017
assert.NoError(t, flushConn.stopHeartbeat())
3118
assert.NoError(t, flushConn.flushDb())
3219

33-
conn, err := OpenConnection("cleaner-conn1", "tcp", redisAddr, 1, nil)
20+
conn, err := OpenConnectionWithRedisOptions("cleaner-conn1", redisOptions, nil)
3421
assert.NoError(t, err)
3522
queues, err := conn.GetOpenQueues()
3623
assert.NoError(t, err)
@@ -91,7 +78,7 @@ func TestCleaner(t *testing.T) {
9178
assert.NoError(t, conn.stopHeartbeat())
9279
time.Sleep(time.Millisecond)
9380

94-
conn, err = OpenConnection("cleaner-conn1", "tcp", redisAddr, 1, nil)
81+
conn, err = OpenConnectionWithRedisOptions("cleaner-conn1", redisOptions, nil)
9582
assert.NoError(t, err)
9683
queue, err = conn.OpenQueue("q1")
9784
assert.NoError(t, err)
@@ -138,7 +125,7 @@ func TestCleaner(t *testing.T) {
138125
assert.NoError(t, conn.stopHeartbeat())
139126
time.Sleep(time.Millisecond)
140127

141-
cleanerConn, err := OpenConnection("cleaner-conn", "tcp", redisAddr, 1, nil)
128+
cleanerConn, err := OpenConnectionWithRedisOptions("cleaner-conn", redisOptions, nil)
142129
assert.NoError(t, err)
143130
cleaner := NewCleaner(cleanerConn)
144131
returned, err := cleaner.Clean()
@@ -149,7 +136,7 @@ func TestCleaner(t *testing.T) {
149136
assert.NoError(t, err)
150137
assert.Len(t, queues, 2)
151138

152-
conn, err = OpenConnection("cleaner-conn1", "tcp", redisAddr, 1, nil)
139+
conn, err = OpenConnectionWithRedisOptions("cleaner-conn1", redisOptions, nil)
153140
assert.NoError(t, err)
154141
queue, err = conn.OpenQueue("q1")
155142
assert.NoError(t, err)

connection.go

+49-17
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,15 @@ type Connection interface {
4646
// Connection is the entry point. Use a connection to access queues, consumers and deliveries
4747
// Each connection has a single heartbeat shared among all consumers
4848
type redisConnection struct {
49-
Name string
50-
heartbeatKey string // key to keep alive
51-
queuesKey string // key to list of queues consumed by this connection
49+
Name string
50+
heartbeatKey string // key to keep alive
51+
queuesKey string // key to list of queues consumed by this connection
52+
53+
consumersTemplate string
54+
unackedTemplate string
55+
readyTemplate string
56+
rejectedTemplate string
57+
5258
redisClient RedisClient
5359
errChan chan<- error
5460
heartbeatStop chan chan struct{}
@@ -62,11 +68,16 @@ type redisConnection struct {
6268

6369
// OpenConnection opens and returns a new connection
6470
func OpenConnection(tag string, network string, address string, db int, errChan chan<- error) (Connection, error) {
65-
redisClient := redis.NewClient(&redis.Options{Network: network, Addr: address, DB: db})
66-
return OpenConnectionWithRedisClient(tag, redisClient, errChan)
71+
return OpenConnectionWithRedisOptions(tag, &redis.Options{Network: network, Addr: address, DB: db}, errChan)
72+
}
73+
74+
// OpenConnectionWithRedisOptions allows you to pass more flexible options
75+
func OpenConnectionWithRedisOptions(tag string, redisOption *redis.Options, errChan chan<- error) (Connection, error) {
76+
return OpenConnectionWithRedisClient(tag, redis.NewClient(redisOption), errChan)
6777
}
6878

6979
// OpenConnectionWithRedisClient opens and returns a new connection
80+
// This can be used to passa redis.ClusterClient.
7081
func OpenConnectionWithRedisClient(tag string, redisClient redis.Cmdable, errChan chan<- error) (Connection, error) {
7182
return OpenConnectionWithRmqRedisClient(tag, RedisWrapper{redisClient}, errChan)
7283
}
@@ -77,18 +88,31 @@ func OpenConnectionWithTestRedisClient(tag string, errChan chan<- error) (Connec
7788
return OpenConnectionWithRmqRedisClient(tag, NewTestRedisClient(), errChan)
7889
}
7990

80-
// If you would like to use a redis client other than the ones supported in the constructors above, you can implement
81-
// the RedisClient interface yourself
91+
// OpenConnectionWithRmqRedisClient: If you would like to use a redis client other than the ones
92+
// supported in the constructors above, you can implement the RedisClient interface yourself
8293
func OpenConnectionWithRmqRedisClient(tag string, redisClient RedisClient, errChan chan<- error) (Connection, error) {
94+
return openConnection(tag, redisClient, false, errChan)
95+
}
96+
97+
// OpenClusterConnection: Same as OpenConnectionWithRedisClient, but using Redis hash tags {} instead of [].
98+
func OpenClusterConnection(tag string, redisClient redis.Cmdable, errChan chan<- error) (Connection, error) {
99+
return openConnection(tag, RedisWrapper{redisClient}, true, errChan)
100+
}
101+
102+
func openConnection(tag string, redisClient RedisClient, useRedisHashTags bool, errChan chan<- error) (Connection, error) {
83103
name := fmt.Sprintf("%s-%s", tag, RandomString(6))
84104

85105
connection := &redisConnection{
86-
Name: name,
87-
heartbeatKey: strings.Replace(connectionHeartbeatTemplate, phConnection, name, 1),
88-
queuesKey: strings.Replace(connectionQueuesTemplate, phConnection, name, 1),
89-
redisClient: redisClient,
90-
errChan: errChan,
91-
heartbeatStop: make(chan chan struct{}, 1),
106+
Name: name,
107+
heartbeatKey: strings.Replace(connectionHeartbeatTemplate, phConnection, name, 1),
108+
queuesKey: strings.Replace(connectionQueuesTemplate, phConnection, name, 1),
109+
consumersTemplate: getTemplate(connectionQueueConsumersBaseTemplate, useRedisHashTags),
110+
unackedTemplate: getTemplate(connectionQueueUnackedBaseTemplate, useRedisHashTags),
111+
readyTemplate: getTemplate(queueReadyBaseTemplate, useRedisHashTags),
112+
rejectedTemplate: getTemplate(queueRejectedBaseTemplate, useRedisHashTags),
113+
redisClient: redisClient,
114+
errChan: errChan,
115+
heartbeatStop: make(chan chan struct{}, 1),
92116
}
93117

94118
if err := connection.updateHeartbeat(); err != nil { // checks the connection
@@ -243,10 +267,14 @@ func (connection *redisConnection) getConnections() ([]string, error) {
243267
// hijackConnection reopens an existing connection for inspection purposes without starting a heartbeat
244268
func (connection *redisConnection) hijackConnection(name string) Connection {
245269
return &redisConnection{
246-
Name: name,
247-
heartbeatKey: strings.Replace(connectionHeartbeatTemplate, phConnection, name, 1),
248-
queuesKey: strings.Replace(connectionQueuesTemplate, phConnection, name, 1),
249-
redisClient: connection.redisClient,
270+
Name: name,
271+
heartbeatKey: strings.Replace(connectionHeartbeatTemplate, phConnection, name, 1),
272+
queuesKey: strings.Replace(connectionQueuesTemplate, phConnection, name, 1),
273+
consumersTemplate: connection.consumersTemplate,
274+
unackedTemplate: connection.unackedTemplate,
275+
readyTemplate: connection.readyTemplate,
276+
rejectedTemplate: connection.rejectedTemplate,
277+
redisClient: connection.redisClient,
250278
}
251279
}
252280

@@ -280,6 +308,10 @@ func (connection *redisConnection) openQueue(name string) Queue {
280308
name,
281309
connection.Name,
282310
connection.queuesKey,
311+
connection.consumersTemplate,
312+
connection.unackedTemplate,
313+
connection.readyTemplate,
314+
connection.rejectedTemplate,
283315
connection.redisClient,
284316
connection.errChan,
285317
)

go.mod

+3-3
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,19 @@ module github.com/adjust/rmq/v5
33
go 1.17
44

55
require (
6-
github.com/alicebob/miniredis/v2 v2.30.0
6+
github.com/alicebob/miniredis/v2 v2.30.4
77
github.com/redis/go-redis/v9 v9.0.3
88
github.com/stretchr/testify v1.7.0
99
)
1010

1111
require (
12-
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect
12+
github.com/alicebob/gopher-json v0.0.0-20230218143504-906a9b012302 // indirect
1313
github.com/cespare/xxhash/v2 v2.2.0 // indirect
1414
github.com/davecgh/go-spew v1.1.0 // indirect
1515
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
1616
github.com/kr/pretty v0.1.0 // indirect
1717
github.com/pmezard/go-difflib v1.0.0 // indirect
18-
github.com/yuin/gopher-lua v1.0.0 // indirect
18+
github.com/yuin/gopher-lua v1.1.0 // indirect
1919
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
2020
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect
2121
)

go.sum

+6-6
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1-
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a h1:HbKu58rmZpUGpz5+4FfNmIU+FmZg2P3Xaj2v2bfNWmk=
21
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc=
3-
github.com/alicebob/miniredis/v2 v2.30.0 h1:uA3uhDbCxfO9+DI/DuGeAMr9qI+noVWwGPNTFuKID5M=
4-
github.com/alicebob/miniredis/v2 v2.30.0/go.mod h1:84TWKZlxYkfgMucPBf5SOQBYJceZeQRFIaQgNMiCX6Q=
2+
github.com/alicebob/gopher-json v0.0.0-20230218143504-906a9b012302 h1:uvdUDbHQHO85qeSydJtItA4T55Pw6BtAejd0APRJOCE=
3+
github.com/alicebob/gopher-json v0.0.0-20230218143504-906a9b012302/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc=
4+
github.com/alicebob/miniredis/v2 v2.30.4 h1:8S4/o1/KoUArAGbGwPxcwf0krlzceva2XVOSchFS7Eo=
5+
github.com/alicebob/miniredis/v2 v2.30.4/go.mod h1:b25qWj4fCEsBeAAR2mlb0ufImGC6uH3VlUfb/HS5zKg=
56
github.com/bsm/ginkgo/v2 v2.7.0 h1:ItPMPH90RbmZJt5GtkcNvIRuGEdwlBItdNVoyzaNQao=
67
github.com/bsm/ginkgo/v2 v2.7.0/go.mod h1:AiKlXPm7ItEHNc/2+OkrNG4E0ITzojb9/xWzvQ9XZ9w=
78
github.com/bsm/gomega v1.26.0 h1:LhQm+AFcgV2M0WyKroMASzAzCAJVpAxQXv4SaI9a69Y=
@@ -27,9 +28,8 @@ github.com/redis/go-redis/v9 v9.0.3/go.mod h1:WqMKv5vnQbRuZstUwxQI195wHy+t4PuXDO
2728
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
2829
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
2930
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
30-
github.com/yuin/gopher-lua v0.0.0-20220504180219-658193537a64/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw=
31-
github.com/yuin/gopher-lua v1.0.0 h1:pQCf0LN67Kf7M5u7vRd40A8M1I8IMLrxlqngUJgZ0Ow=
32-
github.com/yuin/gopher-lua v1.0.0/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw=
31+
github.com/yuin/gopher-lua v1.1.0 h1:BojcDhfyDWgU2f2TOzYK/g5p2gxMrku8oupLDqlnSqE=
32+
github.com/yuin/gopher-lua v1.1.0/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw=
3333
golang.org/x/sys v0.0.0-20190204203706-41f3e6584952/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
3434
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
3535
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=

queue.go

+9-10
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,9 @@ type redisQueue struct {
4646
connectionName string
4747
queuesKey string // key to list of queues consumed by this connection
4848
consumersKey string // key to set of consumers using this connection
49+
unackedKey string // key to list of currently consuming deliveries
4950
readyKey string // key to list of ready deliveries
5051
rejectedKey string // key to list of rejected deliveries
51-
unackedKey string // key to list of currently consuming deliveries
5252
pushKey string // key to list of pushed deliveries
5353
redisClient RedisClient
5454
errChan chan<- error
@@ -64,22 +64,21 @@ type redisQueue struct {
6464
}
6565

6666
func newQueue(
67-
name string,
68-
connectionName string,
69-
queuesKey string,
67+
name, connectionName, queuesKey string,
68+
consumersTemplate, unackedTemplate, readyTemplate, rejectedTemplate string,
7069
redisClient RedisClient,
7170
errChan chan<- error,
7271
) *redisQueue {
7372

74-
consumersKey := strings.Replace(connectionQueueConsumersTemplate, phConnection, connectionName, 1)
73+
consumersKey := strings.Replace(consumersTemplate, phConnection, connectionName, 1)
7574
consumersKey = strings.Replace(consumersKey, phQueue, name, 1)
7675

77-
readyKey := strings.Replace(queueReadyTemplate, phQueue, name, 1)
78-
rejectedKey := strings.Replace(queueRejectedTemplate, phQueue, name, 1)
79-
80-
unackedKey := strings.Replace(connectionQueueUnackedTemplate, phConnection, connectionName, 1)
76+
unackedKey := strings.Replace(unackedTemplate, phConnection, connectionName, 1)
8177
unackedKey = strings.Replace(unackedKey, phQueue, name, 1)
8278

79+
readyKey := strings.Replace(readyTemplate, phQueue, name, 1)
80+
rejectedKey := strings.Replace(rejectedTemplate, phQueue, name, 1)
81+
8382
consumingStopped := make(chan struct{})
8483
ackCtx, ackCancel := context.WithCancel(context.Background())
8584

@@ -88,9 +87,9 @@ func newQueue(
8887
connectionName: connectionName,
8988
queuesKey: queuesKey,
9089
consumersKey: consumersKey,
90+
unackedKey: unackedKey,
9191
readyKey: readyKey,
9292
rejectedKey: rejectedKey,
93-
unackedKey: unackedKey,
9493
redisClient: redisClient,
9594
errChan: errChan,
9695
consumingStopped: consumingStopped,

0 commit comments

Comments
 (0)