-
Notifications
You must be signed in to change notification settings - Fork 14
/
Copy pathconsumer.go
175 lines (146 loc) · 4.26 KB
/
consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
package gtrs
import (
"context"
"time"
"github.com/redis/go-redis/v9"
)
// Consumer is a generic consumer interface
type Consumer[T any] interface {
Chan() <-chan Message[T]
Close()
}
type StreamIDs = map[string]string
// StreamConsumerConfig provides basic configuration for StreamConsumer.
// It can be passed as the last argument to NewConsumer.
type StreamConsumerConfig struct {
Block time.Duration // milliseconds to block before timing out. 0 means infinite
Count int64 // maximum number of entries per request. 0 means not limited
BufferSize uint // how many entries to prefetch at most
}
// StreamConsumer is a consumer that reads from one or multiple redis streams.
// The consumer has to be closed to release resources and stop goroutines.
type StreamConsumer[T any] struct {
Consumer[T]
ctx context.Context
rdb redis.Cmdable
cfg StreamConsumerConfig
fetchErrChan chan error
fetchChan chan fetchMessage
consumeChan chan Message[T] // user facing non buffered channel
seenIds StreamIDs
closeFunc func()
}
// NewConsumer creates a new StreamConsumer with optional configuration.
func NewConsumer[T any](ctx context.Context, rdb redis.Cmdable, ids StreamIDs, cfgs ...StreamConsumerConfig) *StreamConsumer[T] {
cfg := StreamConsumerConfig{
Block: 0,
Count: 100,
BufferSize: 50,
}
if len(cfgs) > 0 {
cfg = cfgs[0]
}
ctx, closeFunc := context.WithCancel(ctx)
sc := &StreamConsumer[T]{
ctx: ctx,
rdb: rdb,
cfg: cfg,
fetchErrChan: make(chan error),
fetchChan: make(chan fetchMessage, cfg.BufferSize),
consumeChan: make(chan Message[T]),
seenIds: ids,
closeFunc: closeFunc,
}
go sc.fetchLoop()
go sc.consumeLoop()
return sc
}
// Chan returns the main channel with new messages.
//
// This channel is closed when:
// - the consumer is closed
// - immediately on context cancel
// - in case of a ReadError
func (sc *StreamConsumer[T]) Chan() <-chan Message[T] {
return sc.consumeChan
}
// Close returns a StreamIds that shows, up to which entry the streams were consumed.
//
// The StreamIds can be used to construct a new StreamConsumer that will
// pick up where this left off.
func (sc *StreamConsumer[T]) Close() StreamIDs {
select {
case <-sc.ctx.Done():
default:
sc.closeFunc()
}
// Await close.
<-sc.consumeChan
return sc.seenIds
}
// fetchLoop fills the fetchChan with new stream messages.
func (sc *StreamConsumer[T]) fetchLoop() {
defer close(sc.fetchErrChan)
defer close(sc.fetchChan)
fetchedIds := copyMap(sc.seenIds)
stBuf := make([]string, 2*len(fetchedIds))
for {
// Explicit check for context cancellation.
// In case select chooses other channels over cancellation in a streak.
if checkCancel(sc.ctx) {
return
}
res, err := sc.read(fetchedIds, stBuf)
if err != nil {
sendCheckCancel(sc.ctx, sc.fetchErrChan, err)
// Don't close channels preemptively
<-sc.ctx.Done()
return
}
for _, stream := range res {
for _, rawMsg := range stream.Messages {
msg := fetchMessage{stream: stream.Stream, message: rawMsg}
sendCheckCancel(sc.ctx, sc.fetchChan, msg)
fetchedIds[stream.Stream] = rawMsg.ID
}
}
}
}
// consumeLoop forwards messages from fetchChan and errorChan to consumeChan.
func (sc *StreamConsumer[T]) consumeLoop() {
defer close(sc.consumeChan)
var msg fetchMessage
for {
// Explicit cancellation check.
if checkCancel(sc.ctx) {
return
}
// Listen for fetch message, error or context cancellation.
select {
case <-sc.ctx.Done():
return
case err := <-sc.fetchErrChan:
sendCheckCancel(sc.ctx, sc.consumeChan, Message[T]{Err: ReadError{Err: err}})
return
case msg = <-sc.fetchChan:
}
// Send message to consumer.
if sendCheckCancel(sc.ctx, sc.consumeChan, toMessage[T](msg.message, msg.stream)) {
sc.seenIds[msg.stream] = msg.message.ID
}
}
}
// read calls XREAD to read the next portion of messages from the streams.
func (sc *StreamConsumer[T]) read(fetchIds map[string]string, stBuf []string) ([]redis.XStream, error) {
idx, offset := 0, len(fetchIds)
for k, v := range fetchIds {
stBuf[idx] = k
stBuf[idx+offset] = v
idx += 1
}
return sc.rdb.XRead(sc.ctx, &redis.XReadArgs{
Streams: stBuf,
Block: sc.cfg.Block,
Count: sc.cfg.Count,
}).Result()
}