diff --git a/pkg/stream/server_frame.go b/pkg/stream/server_frame.go index 6153eb9d..a5d48412 100644 --- a/pkg/stream/server_frame.go +++ b/pkg/stream/server_frame.go @@ -3,11 +3,12 @@ package stream import ( "bufio" "bytes" - "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp" - "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/logs" "hash/crc32" "io" "time" + + "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp" + "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/logs" ) type ReaderProtocol struct { @@ -281,7 +282,6 @@ func (c *Client) queryPublisherSequenceFrameHandler(readProtocol *ReaderProtocol res.data <- sequence } func (c *Client) handleDeliver(r *bufio.Reader) { - subscriptionId := readByte(r) consumer, err := c.coordinator.GetConsumerById(subscriptionId) consumerFound := err == nil @@ -407,7 +407,13 @@ func (c *Client) handleDeliver(r *bufio.Reader) { // dispatch the messages with offset to the consumer chunk.offsetMessages = batchConsumingMessages if consumer.getStatus() == open { - consumer.chunkForConsumer <- chunk + select { + case consumer.chunkForConsumer <- chunk: + default: + logs.LogDebug("The consumer %s for the stream %s reports as open but is probably "+ + "closed during chunk dispatching. Messages won't be dispatched. ", + consumer.GetName(), consumer.GetStreamName()) + } } else { logs.LogDebug("The consumer %s for the stream %s is closed during the chunk dispatching. "+ "Messages won't dispatched", consumer.GetName(), consumer.GetStreamName())