Skip to content

Commit d2aaf8d

Browse files
authored
Merge branch 'OffchainLabs:master' into master
2 parents 485eb9c + 1420803 commit d2aaf8d

File tree

7 files changed

+260
-119
lines changed

7 files changed

+260
-119
lines changed

go-ethereum

pubsub/consumer.go

Lines changed: 72 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -19,21 +19,30 @@ import (
1919
"github.com/offchainlabs/nitro/util/stopwaiter"
2020
)
2121

22+
// lint:require-exhaustive-initialization
2223
type ConsumerConfig struct {
2324
// Timeout of result entry in Redis.
2425
ResponseEntryTimeout time.Duration `koanf:"response-entry-timeout"`
2526
// Minimum idle time after which messages will be autoclaimed
2627
IdletimeToAutoclaim time.Duration `koanf:"idletime-to-autoclaim"`
28+
// Enables retrying too long pending messages
29+
Retry bool `koanf:"retry"`
30+
// Number of message retries after which we set error response
31+
MaxRetryCount int64 `koanf:"max-retry-count"`
2732
}
2833

2934
var DefaultConsumerConfig = ConsumerConfig{
3035
ResponseEntryTimeout: time.Hour,
3136
IdletimeToAutoclaim: 5 * time.Minute,
37+
Retry: true,
38+
MaxRetryCount: -1,
3239
}
3340

3441
var TestConsumerConfig = ConsumerConfig{
3542
ResponseEntryTimeout: time.Minute,
3643
IdletimeToAutoclaim: 30 * time.Millisecond,
44+
Retry: true,
45+
MaxRetryCount: -1,
3746
}
3847

3948
var ErrAlreadySet = errors.New("redis key already set")
@@ -44,7 +53,9 @@ func ConsumerConfigAddOptions(prefix string, f *pflag.FlagSet) {
4453

4554
func ConsumerConfigAddOptionsWithDefaults(prefix string, f *pflag.FlagSet, defaultConfig ConsumerConfig) {
4655
f.Duration(prefix+".response-entry-timeout", defaultConfig.ResponseEntryTimeout, "timeout for response entry")
47-
f.Duration(prefix+".idletime-to-autoclaim", defaultConfig.IdletimeToAutoclaim, "After a message spends this amount of time in PEL (Pending Entries List i.e claimed by another consumer but not Acknowledged) it will be allowed to be autoclaimed by other consumers")
56+
f.Duration(prefix+".idletime-to-autoclaim", defaultConfig.IdletimeToAutoclaim, "After a message spends this amount of time in PEL (Pending Entries List i.e claimed by another consumer but not Acknowledged) it will be allowed to be autoclaimed by other consumers. This option should be set to the same value for all consumers and producers.")
57+
f.Bool(prefix+".retry", defaultConfig.Retry, "enables autoclaim for this consumer, if set to false this consumer will not check messages from PEL (Pending Entries List)")
58+
f.Int64(prefix+".max-retry-count", defaultConfig.MaxRetryCount, "number of message retries after which this consumer will set an error response and Acknowledge the message (-1 = no limit)")
4859
}
4960

5061
// Consumer implements a consumer for redis stream provides heartbeat to
@@ -127,32 +138,65 @@ func decrementMsgIdByOne(msgId string) string {
127138
// Consumer first checks it there exists pending message that is claimed by
128139
// unresponsive consumer, if not then reads from the stream.
129140
func (c *Consumer[Request, Response]) Consume(ctx context.Context) (*Message[Request], error) {
130-
// First try to XAUTOCLAIM, with start as a random messageID from PEL with MinIdle as IdletimeToAutoclaim
131-
// this prioritizes processing PEL messages that have been waiting for more than IdletimeToAutoclaim duration
132141
var messages []redis.XMessage
133-
if pendingMsgs, err := c.client.XPendingExt(ctx, &redis.XPendingExtArgs{
134-
Stream: c.redisStream,
135-
Group: c.redisGroup,
136-
Start: "-",
137-
End: "+",
138-
Count: c.claimAmongOldestIdleN,
139-
Idle: c.cfg.IdletimeToAutoclaim,
140-
}).Result(); err != nil {
141-
if !errors.Is(err, redis.Nil) {
142-
log.Error("Error from XpendingExt in getting PEL for auto claim", "err", err, "penindlen", len(pendingMsgs))
143-
}
144-
} else if len(pendingMsgs) > 0 {
145-
idx := rand.Intn(len(pendingMsgs))
146-
messages, _, err = c.client.XAutoClaim(ctx, &redis.XAutoClaimArgs{
147-
Group: c.redisGroup,
148-
Consumer: c.id,
149-
MinIdle: c.cfg.IdletimeToAutoclaim, // Minimum idle time for messages to claim (in milliseconds)
150-
Stream: c.redisStream,
151-
Start: decrementMsgIdByOne(pendingMsgs[idx].ID),
152-
Count: 1,
153-
}).Result()
154-
if err != nil {
155-
log.Info("error from xautoclaim", "err", err)
142+
if c.cfg.Retry {
143+
// First try to XAUTOCLAIM, with start as a random messageID from PEL with MinIdle as IdletimeToAutoclaim
144+
// this prioritizes processing PEL messages that have been waiting for more than IdletimeToAutoclaim duration
145+
if pendingMsgs, err := c.client.XPendingExt(ctx, &redis.XPendingExtArgs{
146+
Stream: c.redisStream,
147+
Group: c.redisGroup,
148+
Start: "-",
149+
End: "+",
150+
Count: c.claimAmongOldestIdleN,
151+
Idle: c.cfg.IdletimeToAutoclaim,
152+
}).Result(); err != nil {
153+
if !errors.Is(err, redis.Nil) {
154+
log.Error("Error from XpendingExt in getting PEL for auto claim", "err", err, "pendingLen", len(pendingMsgs))
155+
}
156+
} else if len(pendingMsgs) > 0 {
157+
if c.cfg.MaxRetryCount != -1 {
158+
// choose messages that didn't exceed MaxRetryCount
159+
var exceededRetries []redis.XPendingExt
160+
var filtered []redis.XPendingExt
161+
for _, msg := range pendingMsgs {
162+
if msg.RetryCount > c.cfg.MaxRetryCount {
163+
exceededRetries = append(exceededRetries, msg)
164+
} else {
165+
filtered = append(filtered, msg)
166+
}
167+
}
168+
if len(exceededRetries) > 0 {
169+
// set error for one randomly chosen pending message
170+
// * error set for only one message - avoid starving message processing
171+
// * randomly chosen - mitigation for multiple consumers trying to set error for the same msg
172+
idx := rand.Intn(len(exceededRetries))
173+
if err := c.SetError(ctx, exceededRetries[idx].ID, "too many retries"); err != nil {
174+
logger := log.Error
175+
if errors.Is(err, ErrAlreadySet) {
176+
// if error is already set, that's not a real error
177+
logger = log.Debug
178+
}
179+
logger("Failed to set error response for a message that exceeded retries limit", "err", err, "retryCount", exceededRetries[idx].RetryCount)
180+
}
181+
}
182+
pendingMsgs = filtered
183+
}
184+
if len(pendingMsgs) > 0 {
185+
// attempt auto-claiming one randomly chosen message;
186+
// random choice is a mitigation for multiple consumers trying to claim same msg
187+
idx := rand.Intn(len(pendingMsgs))
188+
messages, _, err = c.client.XAutoClaim(ctx, &redis.XAutoClaimArgs{
189+
Group: c.redisGroup,
190+
Consumer: c.id,
191+
MinIdle: c.cfg.IdletimeToAutoclaim, // Minimum idle time for messages to claim (in milliseconds)
192+
Stream: c.redisStream,
193+
Start: decrementMsgIdByOne(pendingMsgs[idx].ID),
194+
Count: 1,
195+
}).Result()
196+
if err != nil {
197+
log.Info("error from xautoclaim", "err", err)
198+
}
199+
}
156200
}
157201
}
158202
if len(messages) == 0 {
@@ -249,6 +293,7 @@ func (c *Consumer[Request, Response]) SetResult(ctx context.Context, messageID s
249293
if _, err := c.client.XAck(ctx, c.redisStream, c.redisGroup, messageID).Result(); err != nil {
250294
return fmt.Errorf("acking message: %v, error: %w", messageID, err)
251295
}
296+
log.Debug("consumer: xdel", "cid", c.id, "messageId", messageID)
252297
if _, err := c.client.XDel(ctx, c.redisStream, messageID).Result(); err != nil {
253298
return fmt.Errorf("deleting message: %v, error: %w", messageID, err)
254299
}
@@ -269,6 +314,7 @@ func (c *Consumer[Request, Response]) SetError(ctx context.Context, messageID st
269314
if _, err := c.client.XAck(ctx, c.redisStream, c.redisGroup, messageID).Result(); err != nil {
270315
return fmt.Errorf("acking message: %v, error: %w", messageID, err)
271316
}
317+
log.Debug("consumer: xdel", "cid", c.id, "messageId", messageID)
272318
if _, err := c.client.XDel(ctx, c.redisStream, messageID).Result(); err != nil {
273319
return fmt.Errorf("deleting message: %v, error: %w", messageID, err)
274320
}

pubsub/producer.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ type Producer[Request any, Response any] struct {
4949
once sync.Once
5050
}
5151

52+
// lint:require-exhaustive-initialization
5253
type ProducerConfig struct {
5354
// Interval duration for checking the result set by consumers.
5455
CheckResultInterval time.Duration `koanf:"check-result-interval"`

0 commit comments

Comments
 (0)