Skip to content

Commit d1f3297

Browse files
authored
Merge branch 'main' into panic-chunk-dispatching
2 parents 70a5e45 + fa4a8a1 commit d1f3297

File tree

8 files changed

+107
-12
lines changed

8 files changed

+107
-12
lines changed

README.md

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
<h1 align="center">RabbitMQ Stream GO Client</h1>
22

33
---
4-
<div align="center">
4+
[![Go Report Card][ReportCard-Image]][ReportCard-Url] [![Build Status][Build-Status-Image]][Build-Status-Url]
55

6-
![Build](https://github.com/rabbitmq/rabbitmq-stream-go-client/actions/workflows/build_and_test.yml/badge.svg)
7-
[![codecov](https://codecov.io/gh/rabbitmq/rabbitmq-stream-go-client/branch/main/graph/badge.svg?token=HZD4S71QIM)](https://codecov.io/gh/rabbitmq/rabbitmq-stream-go-client)
6+
[ReportCard-Url]: https://goreportcard.com/report/github.com/rabbitmq/rabbitmq-stream-go-client
7+
[ReportCard-Image]: https://goreportcard.com/badge/github.com/rabbitmq/rabbitmq-stream-go-client
8+
[Build-Status-Url]: https://github.com/rabbitmq/rabbitmq-stream-go-client/actions
9+
[Build-Status-Image]: https://github.com/rabbitmq/rabbitmq-stream-go-client/actions/workflows/build_and_test.yml/badge.svg?branch=main
810

911
Go client for [RabbitMQ Stream Queues](https://github.com/rabbitmq/rabbitmq-server/tree/master/deps/rabbitmq_stream)
10-
</div>
1112

1213
# Table of Contents
1314

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
1.5.3
1+
1.5.4

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ require (
1212
github.com/pkg/errors v0.9.1
1313
github.com/spaolacci/murmur3 v1.1.0
1414
github.com/spf13/cobra v1.9.1
15-
golang.org/x/text v0.24.0
15+
golang.org/x/text v0.25.0
1616
)
1717

1818
require (

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,8 @@ golang.org/x/net v0.36.0 h1:vWF2fRbw4qslQsQzgFqZff+BItCvGFQqKzKIzx1rmoA=
5555
golang.org/x/net v0.36.0/go.mod h1:bFmbeoIPfrw4sMHNhb4J9f6+tPziuGjq7Jk/38fxi1I=
5656
golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc=
5757
golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
58-
golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0=
59-
golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU=
58+
golang.org/x/text v0.25.0 h1:qVyWApTSYLk/drJRO5mDlNYskwQznZmkpV2c8q9zls4=
59+
golang.org/x/text v0.25.0/go.mod h1:WEdwpYrmk1qmdHvhkSTNPm3app7v4rsT8F2UD6+VHIA=
6060
golang.org/x/tools v0.28.0 h1:WuB6qZ4RPCQo5aP3WdKZS7i595EdWqWR8vqJTlwTVK8=
6161
golang.org/x/tools v0.28.0/go.mod h1:dcIOrVd3mfQKTgrDVQHqCPMWy6lnhfhtX3hLXYVLfRw=
6262
google.golang.org/protobuf v1.36.1 h1:yBPeRvTftaleIgM3PZ/WBIZ7XM/eEYAaEyCwvyjq/gk=

pkg/integration_test/stream_integration_test.go

Lines changed: 95 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,16 @@
11
package integration_test
22

33
import (
4+
"errors"
45
"fmt"
6+
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message"
57
"sync"
68
"time"
79

810
. "github.com/onsi/ginkgo/v2"
911
. "github.com/onsi/gomega"
1012

1113
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp"
12-
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message"
1314
stream "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream"
1415
)
1516

@@ -146,4 +147,97 @@ var _ = Describe("StreamIntegration", func() {
146147
Expect(consumer.GetOffset()).To(BeNumerically("==", expectedCurrentOffset))
147148
})
148149
})
150+
151+
Context("Initial timestamp offset when no messages exist", func() {
152+
var (
153+
addresses []string = []string{
154+
"rabbitmq-stream://guest:guest@localhost:5552/"}
155+
streamName string = "empty-test-stream"
156+
streamEnv *stream.Environment
157+
)
158+
159+
// init empty stream
160+
BeforeEach(func() {
161+
var err error
162+
streamEnv, err = stream.NewEnvironment(
163+
stream.NewEnvironmentOptions().SetUris(addresses))
164+
Expect(err).ToNot(HaveOccurred())
165+
166+
err = streamEnv.DeclareStream(streamName,
167+
stream.NewStreamOptions().SetMaxLengthBytes(stream.ByteCapacity{}.GB(2)))
168+
Expect(err).ToNot(HaveOccurred())
169+
})
170+
171+
AfterEach(func() {
172+
Expect(streamEnv.DeleteStream(streamName)).
173+
To(SatisfyAny(
174+
Succeed(),
175+
MatchError(stream.StreamDoesNotExist),
176+
))
177+
})
178+
179+
It("correctly handles offsets using timestamps when no messages exist", func() {
180+
var err error
181+
const consumerName = "timestamp-offset-consumer"
182+
183+
lastMinute := time.Now().Add(-time.Minute).UnixMilli()
184+
185+
// Implement the UpdateConsumer function to return a timestamp-based offset if no offset exists
186+
// For example, we add a new consumer to the incoming stream and don't want to reread it from the beginning.
187+
updateConsumer := func(streamName string, isActive bool) stream.OffsetSpecification {
188+
offset, err := streamEnv.QueryOffset(consumerName, streamName)
189+
if errors.Is(err, stream.OffsetNotFoundError) {
190+
return stream.OffsetSpecification{}.Timestamp(lastMinute)
191+
}
192+
193+
Expect(err).ToNot(HaveOccurred())
194+
195+
return stream.OffsetSpecification{}.Offset(offset + 1)
196+
}
197+
198+
options := stream.NewConsumerOptions().
199+
SetConsumerName(consumerName).
200+
SetAutoCommit(stream.NewAutoCommitStrategy().
201+
SetFlushInterval(time.Second)).
202+
SetSingleActiveConsumer(stream.NewSingleActiveConsumer(updateConsumer))
203+
204+
// Create the consumer
205+
consumer, err := streamEnv.NewConsumer(
206+
streamName,
207+
func(ctx stream.ConsumerContext, msg *amqp.Message) {},
208+
options,
209+
)
210+
Expect(err).NotTo(HaveOccurred())
211+
212+
// Wait for a flush without messages
213+
// An incorrect offset is stored during this flush
214+
time.Sleep(time.Millisecond * 1200)
215+
Expect(consumer.Close()).ToNot(HaveOccurred())
216+
217+
// Re-create the consumer
218+
consumeIsStarted := make(chan struct{})
219+
handleMessages := func(ctx stream.ConsumerContext, msg *amqp.Message) {
220+
close(consumeIsStarted)
221+
}
222+
223+
consumer, err = streamEnv.NewConsumer(streamName, handleMessages, options)
224+
Expect(err).NotTo(HaveOccurred())
225+
226+
producer, err := streamEnv.NewProducer(streamName, nil)
227+
Expect(err).ToNot(HaveOccurred())
228+
body := `{"name": "item-1}`
229+
err = producer.Send(amqp.NewMessage([]byte(body)))
230+
Expect(err).ToNot(HaveOccurred())
231+
232+
// check if messages are consumed
233+
select {
234+
case <-consumeIsStarted:
235+
case <-time.After(time.Second * 1):
236+
Fail("Timeout waiting for consumer to start")
237+
}
238+
239+
Expect(consumer.GetOffset()).To(BeNumerically("<=", 0))
240+
})
241+
242+
})
149243
})

pkg/stream/client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -926,7 +926,7 @@ func (c *Client) declareSubscriber(streamName string,
926926
// copy the option offset to the consumer offset
927927
// the option.offset won't change ( in case we need to retrive the original configuration)
928928
// consumer.current offset will be moved when reading
929-
if !options.IsSingleActiveConsumerEnabled() {
929+
if !options.IsSingleActiveConsumerEnabled() && options.Offset.isOffset() {
930930
consumer.setCurrentOffset(options.Offset.offset)
931931
}
932932

pkg/stream/constants.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ const (
1919
const initBufferPublishSize = 2 + 2 + 1 + 4
2020

2121
const (
22-
ClientVersion = "1.5.3"
22+
ClientVersion = "1.5.4"
2323

2424
commandDeclarePublisher = 1
2525
commandPublish = 2

pkg/stream/server_frame.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -596,7 +596,7 @@ func (c *Client) handleConsumerUpdate(readProtocol *ReaderProtocol, r *bufio.Rea
596596
isActive == 1)
597597
consumer.options.SingleActiveConsumer.offsetSpecification = responseOff
598598

599-
if isActive == 1 {
599+
if isActive == 1 && responseOff.isOffset() {
600600
consumer.setCurrentOffset(responseOff.offset)
601601
}
602602

0 commit comments

Comments
 (0)