Skip to content

Commit 585930f

Browse files
authored
Fix handling and storing offset by timestamp (#392)
Problem: Sometimes the timestamp value can be written as an offset in rabbitmq, which freezes message consumption Real case: You add a new consumer to an existing stream for which the offset is not yet set, and you do not want to reread all messages, so you specify -1 min. If there are no messages for a certain time and automatic flush is triggered, this timestamp value will be written as the offset.
1 parent 75a6cd7 commit 585930f

File tree

3 files changed

+97
-3
lines changed

3 files changed

+97
-3
lines changed

pkg/integration_test/stream_integration_test.go

Lines changed: 95 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,16 @@
11
package integration_test
22

33
import (
4+
"errors"
45
"fmt"
6+
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message"
57
"sync"
68
"time"
79

810
. "github.com/onsi/ginkgo/v2"
911
. "github.com/onsi/gomega"
1012

1113
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp"
12-
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message"
1314
stream "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream"
1415
)
1516

@@ -146,4 +147,97 @@ var _ = Describe("StreamIntegration", func() {
146147
Expect(consumer.GetOffset()).To(BeNumerically("==", expectedCurrentOffset))
147148
})
148149
})
150+
151+
Context("Initial timestamp offset when no messages exist", func() {
152+
var (
153+
addresses []string = []string{
154+
"rabbitmq-stream://guest:guest@localhost:5552/"}
155+
streamName string = "empty-test-stream"
156+
streamEnv *stream.Environment
157+
)
158+
159+
// init empty stream
160+
BeforeEach(func() {
161+
var err error
162+
streamEnv, err = stream.NewEnvironment(
163+
stream.NewEnvironmentOptions().SetUris(addresses))
164+
Expect(err).ToNot(HaveOccurred())
165+
166+
err = streamEnv.DeclareStream(streamName,
167+
stream.NewStreamOptions().SetMaxLengthBytes(stream.ByteCapacity{}.GB(2)))
168+
Expect(err).ToNot(HaveOccurred())
169+
})
170+
171+
AfterEach(func() {
172+
Expect(streamEnv.DeleteStream(streamName)).
173+
To(SatisfyAny(
174+
Succeed(),
175+
MatchError(stream.StreamDoesNotExist),
176+
))
177+
})
178+
179+
It("correctly handles offsets using timestamps when no messages exist", func() {
180+
var err error
181+
const consumerName = "timestamp-offset-consumer"
182+
183+
lastMinute := time.Now().Add(-time.Minute).UnixMilli()
184+
185+
// Implement the UpdateConsumer function to return a timestamp-based offset if no offset exists
186+
// For example, we add a new consumer to the incoming stream and don't want to reread it from the beginning.
187+
updateConsumer := func(streamName string, isActive bool) stream.OffsetSpecification {
188+
offset, err := streamEnv.QueryOffset(consumerName, streamName)
189+
if errors.Is(err, stream.OffsetNotFoundError) {
190+
return stream.OffsetSpecification{}.Timestamp(lastMinute)
191+
}
192+
193+
Expect(err).ToNot(HaveOccurred())
194+
195+
return stream.OffsetSpecification{}.Offset(offset + 1)
196+
}
197+
198+
options := stream.NewConsumerOptions().
199+
SetConsumerName(consumerName).
200+
SetAutoCommit(stream.NewAutoCommitStrategy().
201+
SetFlushInterval(time.Second)).
202+
SetSingleActiveConsumer(stream.NewSingleActiveConsumer(updateConsumer))
203+
204+
// Create the consumer
205+
consumer, err := streamEnv.NewConsumer(
206+
streamName,
207+
func(ctx stream.ConsumerContext, msg *amqp.Message) {},
208+
options,
209+
)
210+
Expect(err).NotTo(HaveOccurred())
211+
212+
// Wait for a flush without messages
213+
// An incorrect offset is stored during this flush
214+
time.Sleep(time.Millisecond * 1200)
215+
Expect(consumer.Close()).ToNot(HaveOccurred())
216+
217+
// Re-create the consumer
218+
consumeIsStarted := make(chan struct{})
219+
handleMessages := func(ctx stream.ConsumerContext, msg *amqp.Message) {
220+
close(consumeIsStarted)
221+
}
222+
223+
consumer, err = streamEnv.NewConsumer(streamName, handleMessages, options)
224+
Expect(err).NotTo(HaveOccurred())
225+
226+
producer, err := streamEnv.NewProducer(streamName, nil)
227+
Expect(err).ToNot(HaveOccurred())
228+
body := `{"name": "item-1}`
229+
err = producer.Send(amqp.NewMessage([]byte(body)))
230+
Expect(err).ToNot(HaveOccurred())
231+
232+
// check if messages are consumed
233+
select {
234+
case <-consumeIsStarted:
235+
case <-time.After(time.Second * 1):
236+
Fail("Timeout waiting for consumer to start")
237+
}
238+
239+
Expect(consumer.GetOffset()).To(BeNumerically("<=", 0))
240+
})
241+
242+
})
149243
})

pkg/stream/client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -926,7 +926,7 @@ func (c *Client) declareSubscriber(streamName string,
926926
// copy the option offset to the consumer offset
927927
// the option.offset won't change ( in case we need to retrive the original configuration)
928928
// consumer.current offset will be moved when reading
929-
if !options.IsSingleActiveConsumerEnabled() {
929+
if !options.IsSingleActiveConsumerEnabled() && options.Offset.isOffset() {
930930
consumer.setCurrentOffset(options.Offset.offset)
931931
}
932932

pkg/stream/server_frame.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -588,7 +588,7 @@ func (c *Client) handleConsumerUpdate(readProtocol *ReaderProtocol, r *bufio.Rea
588588
isActive == 1)
589589
consumer.options.SingleActiveConsumer.offsetSpecification = responseOff
590590

591-
if isActive == 1 {
591+
if isActive == 1 && responseOff.isOffset() {
592592
consumer.setCurrentOffset(responseOff.offset)
593593
}
594594

0 commit comments

Comments
 (0)