✨ feat(rabbitmq): support quorum queue and delivery limit#101
Open
lambda-hj wants to merge 1 commit intozeromicro:masterfrom
Open
✨ feat(rabbitmq): support quorum queue and delivery limit#101lambda-hj wants to merge 1 commit intozeromicro:masterfrom
lambda-hj wants to merge 1 commit intozeromicro:masterfrom
Conversation
【feat】扩展 QueueConf 支持队列类型与投递限制 【feat】新增 DeclareQueueConf 方法支持声明 Quorum 队列 【feat】支持配置死信交换机和死信路由键 ✨ feat(rabbitmq): add custom error handler for listener 【feat】新增 WithErrorHandler 选项用于自定义错误处理 【feat】允许在消费失败时记录详细的上下文信息 【feat】更新日志组件依赖从 logx 改为 logc ✅ test(rabbitmq): add tests for queue args and error handler 【test】覆盖 buildQueueArgs 的各种参数组合场景 【test】验证监听器错误处理器的调用逻辑 📝 docs(rabbitmq): add quorum queue usage guide 【docs】新增 RabbitMQ Quorum Queue 使用指南文档 【docs】详细介绍 Delivery Limit 机制与死信队列配置 【docs】提供完整的声明、消费和死信处理示例
Comment on lines
81
to
92
| go func() { | ||
| for d := range msg { | ||
| ctx := context.Background() | ||
| if err := q.handler.Consume(string(d.Body)); err != nil { | ||
| logx.Errorf("Error on consuming: %s, error: %v", string(d.Body), err) | ||
| q.errorHandler(ctx, d, err) | ||
| if !que.AutoAck { | ||
| if e := d.Nack(false, true); e != nil { | ||
| logc.Errorf(ctx, "nack failed, error: %v", e) | ||
| } | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
logic: successful messages are never acknowledged when AutoAck is false. this will cause all successfully processed messages to remain unacknowledged, eventually blocking the consumer.
Suggested change
| go func() { | |
| for d := range msg { | |
| ctx := context.Background() | |
| if err := q.handler.Consume(string(d.Body)); err != nil { | |
| logx.Errorf("Error on consuming: %s, error: %v", string(d.Body), err) | |
| q.errorHandler(ctx, d, err) | |
| if !que.AutoAck { | |
| if e := d.Nack(false, true); e != nil { | |
| logc.Errorf(ctx, "nack failed, error: %v", e) | |
| } | |
| } | |
| } | |
| } | |
| go func() { | |
| for d := range msg { | |
| ctx := context.Background() | |
| if err := q.handler.Consume(string(d.Body)); err != nil { | |
| q.errorHandler(ctx, d, err) | |
| if !que.AutoAck { | |
| if e := d.Nack(false, true); e != nil { | |
| logc.Errorf(ctx, "nack failed, error: %v", e) | |
| } | |
| } | |
| } else if !que.AutoAck { | |
| if e := d.Ack(false); e != nil { | |
| logc.Errorf(ctx, "ack failed, error: %v", e) | |
| } | |
| } | |
| } | |
| }() |
Prompt To Fix With AI
This is a comment left during a code review.
Path: rabbitmq/listener.go
Line: 81:92
Comment:
**logic:** successful messages are never acknowledged when `AutoAck` is false. this will cause all successfully processed messages to remain unacknowledged, eventually blocking the consumer.
```suggestion
go func() {
for d := range msg {
ctx := context.Background()
if err := q.handler.Consume(string(d.Body)); err != nil {
q.errorHandler(ctx, d, err)
if !que.AutoAck {
if e := d.Nack(false, true); e != nil {
logc.Errorf(ctx, "nack failed, error: %v", e)
}
}
} else if !que.AutoAck {
if e := d.Ack(false); e != nil {
logc.Errorf(ctx, "ack failed, error: %v", e)
}
}
}
}()
```
How can I resolve this? If you propose a fix, please make it concise.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
【feat】扩展 QueueConf 支持队列类型与投递限制
【feat】新增 DeclareQueueConf 方法支持声明 Quorum 队列
【feat】支持配置死信交换机和死信路由键
✨ feat(rabbitmq): add custom error handler for listener 【feat】新增 WithErrorHandler 选项用于自定义错误处理
【feat】允许在消费失败时记录详细的上下文信息
【feat】更新日志组件依赖从 logx 改为 logc
✅ test(rabbitmq): add tests for queue args and error handler 【test】覆盖 buildQueueArgs 的各种参数组合场景
【test】验证监听器错误处理器的调用逻辑
📝 docs(rabbitmq): add quorum queue usage guide
【docs】新增 RabbitMQ Quorum Queue 使用指南文档
【docs】详细介绍 Delivery Limit 机制与死信队列配置
【docs】提供完整的声明、消费和死信处理示例
Greptile Summary
adds support for RabbitMQ quorum queues with delivery limit and dead letter configuration to prevent poison message loops.
QueueConfwithQueueType,DeliveryLimit,DeadLetterExchange, andDeadLetterRoutingKeyfieldsDeclareQueueConf()method that usesbuildQueueArgs()to construct RabbitMQ arguments from configWithErrorHandler()to allow detailed logging of consume failureslogxtologcfor context-aware loggingCritical Issue:
listener.go:81-92only callsNack()on errors but never callsAck()on success whenAutoAckis false, which will cause all successfully processed messages to remain unacknowledged and eventually block the consumer.Confidence Score: 1/5
Ack()call inlistener.gois a critical functional bug that will cause the consumer to stop processing messages whenAutoAckis false. this breaks a core feature and makes manual ack mode unusable.rabbitmq/listener.gorequires immediate attention to add proper message acknowledgementImportant Files Changed
Sequence Diagram
sequenceDiagram participant App as Application participant Admin as RabbitMQ Admin participant Listener as RabbitMQ Listener participant Queue as RabbitMQ Queue participant Handler as Consumer Handler participant ErrorHandler as Error Handler Note over App,Admin: Setup Phase App->>Admin: DeclareQueueConf(QueueConf) Admin->>Admin: buildQueueArgs(conf) Note over Admin: Set x-queue-type=quorum<br/>x-delivery-limit=20<br/>x-dead-letter-exchange<br/>x-dead-letter-routing-key Admin->>Queue: QueueDeclare(args) Queue-->>Admin: Queue created Note over App,Queue: Consumption Phase App->>Listener: MustNewListener(conf, handler, opts) Listener->>Listener: ensureListenerOptions(opts) App->>Listener: Start() Listener->>Queue: Consume(queueName) loop For each message Queue->>Listener: Delivery (msg + headers) Note over Queue,Listener: x-delivery-count header included Listener->>Handler: Consume(message) alt Success Handler-->>Listener: nil Note over Listener: BUG: No Ack() called<br/>when AutoAck=false else Failure Handler-->>Listener: error Listener->>ErrorHandler: errorHandler(ctx, msg, err) ErrorHandler-->>Listener: (logs error) alt AutoAck=false Listener->>Queue: Nack(requeue=true) Note over Queue: Increment x-delivery-count alt delivery-count > delivery-limit Queue->>Queue: Route to dead letter exchange else delivery-count <= delivery-limit Queue->>Queue: Requeue message end end end end