From 70a5e45c3830b872e1bc03088d4bee36f10930a2 Mon Sep 17 00:00:00 2001
From: Richard Sugg
Date: Wed, 7 May 2025 07:20:09 -0400
Subject: [PATCH 1/3] do not panic during chunk dispatching if consumer
suddenly closed
---
pkg/stream/server_frame.go | 16 ++++++++++++----
1 file changed, 12 insertions(+), 4 deletions(-)
diff --git a/pkg/stream/server_frame.go b/pkg/stream/server_frame.go
index d3a323b3..658cd62e 100644
--- a/pkg/stream/server_frame.go
+++ b/pkg/stream/server_frame.go
@@ -3,11 +3,12 @@ package stream
import (
"bufio"
"bytes"
- "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp"
- "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/logs"
"hash/crc32"
"io"
"time"
+
+ "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp"
+ "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/logs"
)
type ReaderProtocol struct {
@@ -281,7 +282,6 @@ func (c *Client) queryPublisherSequenceFrameHandler(readProtocol *ReaderProtocol
res.data <- sequence
}
func (c *Client) handleDeliver(r *bufio.Reader) {
-
subscriptionId := readByte(r)
consumer, err := c.coordinator.GetConsumerById(subscriptionId)
consumerFound := err == nil
@@ -406,8 +406,16 @@ func (c *Client) handleDeliver(r *bufio.Reader) {
// dispatch the messages with offset to the consumer
chunk.offsetMessages = batchConsumingMessages
+ logs.LogDebug("Dispatching %d messages to consumer %d", len(batchConsumingMessages), subscriptionId)
if consumer.getStatus() == open {
- consumer.chunkForConsumer <- chunk
+ select {
+ case consumer.chunkForConsumer <- chunk:
+ return
+ default:
+ logs.LogDebug("The consumer %s for the stream %s reports as open but is probably "+
+ "closed during chunk dispatching. Messages won't be dispatched. ",
+ consumer.GetName(), consumer.GetStreamName())
+ }
} else {
logs.LogDebug("The consumer %s for the stream %s is closed during the chunk dispatching. "+
"Messages won't dispatched", consumer.GetName(), consumer.GetStreamName())
From fc27a365fbb45d09a4ff761c9fd21e266711fa7b Mon Sep 17 00:00:00 2001
From: Richard Sugg
Date: Wed, 7 May 2025 09:39:28 -0400
Subject: [PATCH 2/3] remove unnecessary return
---
pkg/stream/server_frame.go | 1 -
1 file changed, 1 deletion(-)
diff --git a/pkg/stream/server_frame.go b/pkg/stream/server_frame.go
index 8d14a059..0c606361 100644
--- a/pkg/stream/server_frame.go
+++ b/pkg/stream/server_frame.go
@@ -410,7 +410,6 @@ func (c *Client) handleDeliver(r *bufio.Reader) {
if consumer.getStatus() == open {
select {
case consumer.chunkForConsumer <- chunk:
- return
default:
logs.LogDebug("The consumer %s for the stream %s reports as open but is probably "+
"closed during chunk dispatching. Messages won't be dispatched. ",
From 38c3765755678f19e2db3169f73d34143b9b156b Mon Sep 17 00:00:00 2001
From: Richard Sugg
Date: Wed, 7 May 2025 09:43:58 -0400
Subject: [PATCH 3/3] remove unnecessary logDebug statement
---
pkg/stream/server_frame.go | 1 -
1 file changed, 1 deletion(-)
diff --git a/pkg/stream/server_frame.go b/pkg/stream/server_frame.go
index 0c606361..a5d48412 100644
--- a/pkg/stream/server_frame.go
+++ b/pkg/stream/server_frame.go
@@ -406,7 +406,6 @@ func (c *Client) handleDeliver(r *bufio.Reader) {
// dispatch the messages with offset to the consumer
chunk.offsetMessages = batchConsumingMessages
- logs.LogDebug("Dispatching %d messages to consumer %d", len(batchConsumingMessages), subscriptionId)
if consumer.getStatus() == open {
select {
case consumer.chunkForConsumer <- chunk: