Skip to content

Commit 3ae90e9

Browse files
authored
Add pre-settled option to consumer (#80)
Adds pre-settled delivery mode support to consumers, enabling "fire-and-forget" or "at-most-once" semantics where messages are automatically settled by the broker without requiring explicit acknowledgment from the consumer. This is implemented through a new PreSettled option in ConsumerOptions. Changes: * Introduced IDeliveryContext interface and PreSettledDeliveryContext type that returns errors when settlement methods are called * Updated AMQP receiver link options to support pre-settled mode with appropriate settlement mode configuration * Modified test files to use the public Message() accessor method instead of direct field access * Added comprehensive test coverage and example demonstrating the pre-settled feature * Updated CI Docker image from RC to stable RabbitMQ 4.2 * Closes: #77 Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
1 parent 9198f0b commit 3ae90e9

9 files changed

Lines changed: 302 additions & 45 deletions

File tree

.ci/ubuntu/gha-setup.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ set -o xtrace
77
script_dir="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
88
readonly script_dir
99
echo "[INFO] script_dir: '$script_dir'"
10-
readonly rabbitmq_image=${RABBITMQ_IMAGE:-rabbitmq:4.2-rc-management-alpine}
10+
readonly rabbitmq_image=${RABBITMQ_IMAGE:-rabbitmq:4.2-management-alpine}
1111

1212
readonly docker_name_prefix='rabbitmq-amqp-go-client'
1313
readonly docker_network_name="$docker_name_prefix-network"

docs/examples/README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,5 @@
1212
- [Broadcast](broadcast) - An example of how to use fanout to broadcast messages to multiple auto-deleted queues.
1313
- [RPC Echo](rpc_echo_server) - An example of how to implement RPC with the AMQP 1.0 client.
1414
- [SQL stream Filtering](sql_stream_filter) - An example of how to use SQL stream filtering with RabbitMQ Streams.
15-
- [Web Sockets](web_sockets) - An example of how to use Web Sockets with the AMQP 1.0 client.
15+
- [Web Sockets](web_sockets) - An example of how to use Web Sockets with the AMQP 1.0 client.
16+
- [Pre-settled messages](pre_settled) - An example of how to receive pre-settled messages with the AMQP 1.0 client.
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
// RabbitMQ AMQP 1.0 Go Client: https://github.com/rabbitmq/rabbitmq-amqp-go-client
2+
// RabbitMQ AMQP 1.0 documentation: https://www.rabbitmq.com/docs/amqp
3+
// This example is demonstrating how to consume messages with pre-settled mode enabled.
4+
// pre-settled is valid only for classic and quorum queues.
5+
// Pre-settled mode means that messages are considered accepted by the broker as soon as they are delivered to the consumer,
6+
// without requiring explicit acceptance from the consumer side.
7+
// This is useful for scenarios where at-most-once delivery semantics are acceptable, and it can improve performance by reducing
8+
// the overhead of message acknowledgments.
9+
// Example path:https://github.com/rabbitmq/rabbitmq-amqp-go-client/tree/main/docs/examples/pre_settled/pre_settled.go
10+
package main
11+
12+
import (
13+
"context"
14+
"fmt"
15+
16+
rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp"
17+
)
18+
19+
func main() {
20+
rmq.Info("Pre-Settled example with AMQP Go AMQP 1.0 Client")
21+
22+
env := rmq.NewEnvironment("amqp://guest:guest@localhost:5672/", nil)
23+
amqpConnection, err := env.NewConnection(context.TODO())
24+
if err != nil {
25+
rmq.Error("Error opening connection", err)
26+
return
27+
}
28+
// create queue for the example
29+
_, err = amqpConnection.Management().DeclareQueue(context.TODO(), &rmq.QuorumQueueSpecification{
30+
Name: "pre-settled-queue",
31+
})
32+
if err != nil {
33+
rmq.Error("Error declaring queue", err)
34+
return
35+
}
36+
rmq.Info("Queue 'pre-settled-queue' declared")
37+
38+
// publish some messages to the queue
39+
producer, err := amqpConnection.NewPublisher(context.TODO(), &rmq.QueueAddress{
40+
Queue: "pre-settled-queue",
41+
}, nil)
42+
if err != nil {
43+
rmq.Error("Error creating producer", err)
44+
return
45+
}
46+
for i := 0; i < 100; i++ {
47+
msg := rmq.NewMessage([]byte(fmt.Sprintf("Pre-settled message number %d ", i+1)))
48+
_, err := producer.Publish(context.TODO(), msg)
49+
if err != nil {
50+
rmq.Error("Error publishing message", err)
51+
return
52+
}
53+
}
54+
55+
// create consumer with pre-settled mode enabled
56+
57+
consumer, err := amqpConnection.NewConsumer(context.TODO(), "pre-settled-queue", &rmq.ConsumerOptions{
58+
PreSettled: true,
59+
})
60+
if err != nil {
61+
rmq.Error("Error creating consumer", err)
62+
return
63+
}
64+
rmq.Info("Consumer created with pre-settled mode enabled")
65+
66+
for i := 0; i < 100; i++ {
67+
dc, err := consumer.Receive(context.TODO())
68+
if err != nil {
69+
rmq.Error("Error consuming message", err)
70+
return
71+
}
72+
// here we don't need to accept the message
73+
// because pre-settled mode is enabled
74+
rmq.Info("[Consumer]", "Message received", string(dc.Message().GetData()))
75+
}
76+
77+
// clean up
78+
err = consumer.Close(context.TODO())
79+
if err != nil {
80+
rmq.Error("Error closing consumer", err)
81+
return
82+
}
83+
err = producer.Close(context.TODO())
84+
if err != nil {
85+
rmq.Error("Error closing producer", err)
86+
return
87+
}
88+
// delete the queue
89+
err = amqpConnection.Management().DeleteQueue(context.TODO(), "pre-settled-queue")
90+
if err != nil {
91+
rmq.Error("Error deleting queue", err)
92+
return
93+
}
94+
rmq.Info("Queue 'pre-settled-queue' deleted")
95+
96+
err = amqpConnection.Close(context.TODO())
97+
if err != nil {
98+
rmq.Error("Error closing connection", err)
99+
return
100+
}
101+
rmq.Info("Example finished successfully")
102+
103+
}

pkg/rabbitmqamqp/amqp_consumer.go

Lines changed: 53 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,17 @@ import (
99
"github.com/google/uuid"
1010
)
1111

12+
// IDeliveryContext represents a delivery context for received messages.
13+
// It provides methods to access the message and settle it (accept, discard, requeue).
14+
type IDeliveryContext interface {
15+
Message() *amqp.Message
16+
Accept(ctx context.Context) error
17+
Discard(ctx context.Context, e *amqp.Error) error
18+
DiscardWithAnnotations(ctx context.Context, annotations amqp.Annotations) error
19+
Requeue(ctx context.Context) error
20+
RequeueWithAnnotations(ctx context.Context, annotations amqp.Annotations) error
21+
}
22+
1223
type DeliveryContext struct {
1324
receiver *amqp.Receiver
1425
message *amqp.Message
@@ -66,6 +77,36 @@ func (dc *DeliveryContext) RequeueWithAnnotations(ctx context.Context, annotatio
6677
})
6778
}
6879

80+
// PreSettledDeliveryContext represents a delivery context for pre-settled messages.
81+
// All settlement methods throw errors since the message is already settled.
82+
type PreSettledDeliveryContext struct {
83+
message *amqp.Message
84+
}
85+
86+
func (dc *PreSettledDeliveryContext) Message() *amqp.Message {
87+
return dc.message
88+
}
89+
90+
func (dc *PreSettledDeliveryContext) Accept(ctx context.Context) error {
91+
return fmt.Errorf("auto-settle on, message is already disposed")
92+
}
93+
94+
func (dc *PreSettledDeliveryContext) Discard(ctx context.Context, e *amqp.Error) error {
95+
return fmt.Errorf("auto-settle on, message is already disposed")
96+
}
97+
98+
func (dc *PreSettledDeliveryContext) DiscardWithAnnotations(ctx context.Context, annotations amqp.Annotations) error {
99+
return fmt.Errorf("auto-settle on, message is already disposed")
100+
}
101+
102+
func (dc *PreSettledDeliveryContext) Requeue(ctx context.Context) error {
103+
return fmt.Errorf("auto-settle on, message is already disposed")
104+
}
105+
106+
func (dc *PreSettledDeliveryContext) RequeueWithAnnotations(ctx context.Context, annotations amqp.Annotations) error {
107+
return fmt.Errorf("auto-settle on, message is already disposed")
108+
}
109+
69110
type consumerState byte
70111

71112
const (
@@ -124,13 +165,18 @@ func (c *Consumer) createReceiver(ctx context.Context) error {
124165
// In there is not a restart this code won't be executed.
125166
if c.options != nil {
126167
// here we assume it is a stream. So we recreate the options with the offset.
127-
c.options = &StreamConsumerOptions{
168+
streamOpts := &StreamConsumerOptions{
128169
ReceiverLinkName: c.options.linkName(),
129170
InitialCredits: c.options.initialCredits(),
130171
// we increment the offset by one to start from the next message.
131172
// because the current was already consumed.
132173
Offset: &OffsetValue{Offset: uint64(c.currentOffset + 1)},
133174
}
175+
// Preserve StreamFilterOptions if it's a StreamConsumerOptions
176+
if sco, ok := c.options.(*StreamConsumerOptions); ok {
177+
streamOpts.StreamFilterOptions = sco.StreamFilterOptions
178+
}
179+
c.options = streamOpts
134180
}
135181
}
136182
// define a variable *amqp.ReceiverOptions type
@@ -153,7 +199,7 @@ func (c *Consumer) createReceiver(ctx context.Context) error {
153199
return nil
154200
}
155201

156-
func (c *Consumer) Receive(ctx context.Context) (*DeliveryContext, error) {
202+
func (c *Consumer) Receive(ctx context.Context) (IDeliveryContext, error) {
157203
msg, err := c.receiver.Load().Receive(ctx, nil)
158204
if err != nil {
159205
return nil, err
@@ -164,6 +210,11 @@ func (c *Consumer) Receive(ctx context.Context) (*DeliveryContext, error) {
164210
c.currentOffset = msg.Annotations["x-stream-offset"].(int64)
165211
}
166212

213+
// Check if pre-settled mode is enabled
214+
if c.options != nil && c.options.preSettled() {
215+
return &PreSettledDeliveryContext{message: msg}, nil
216+
}
217+
167218
return &DeliveryContext{receiver: c.receiver.Load(), message: msg}, nil
168219
}
169220

pkg/rabbitmqamqp/amqp_consumer_stream_test.go

Lines changed: 29 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -380,7 +380,7 @@ var _ = Describe("Consumer stream test", func() {
380380
Expect(dc.Message()).NotTo(BeNil())
381381
Expect(string(dc.Message().GetData())).To(Equal(fmt.Sprintf("Message_id:%d_label:%s", i, label)))
382382

383-
Expect(dc.message.ApplicationProperties).To(HaveKeyWithValue(key, value))
383+
Expect(dc.Message().ApplicationProperties).To(HaveKeyWithValue(key, value))
384384
Expect(dc.Accept(context.Background())).To(BeNil())
385385
}
386386
Expect(consumer.Close(context.Background())).To(BeNil())
@@ -495,51 +495,51 @@ var _ = Describe("Consumer stream test", func() {
495495
// we test one by one because of the date time fields
496496
// It is not possible to compare the whole structure due of the time
497497
// It is not perfect but it is enough for the test
498-
if dc.message.Properties.MessageID != nil {
499-
Expect(dc.message.Properties.MessageID).To(Equal(properties.MessageID))
498+
if dc.Message().Properties.MessageID != nil {
499+
Expect(dc.Message().Properties.MessageID).To(Equal(properties.MessageID))
500500
}
501-
if dc.message.Properties.Subject != nil {
502-
Expect(dc.message.Properties.Subject).To(Equal(properties.Subject))
501+
if dc.Message().Properties.Subject != nil {
502+
Expect(dc.Message().Properties.Subject).To(Equal(properties.Subject))
503503
}
504-
if dc.message.Properties.ReplyTo != nil {
505-
Expect(dc.message.Properties.ReplyTo).To(Equal(properties.ReplyTo))
504+
if dc.Message().Properties.ReplyTo != nil {
505+
Expect(dc.Message().Properties.ReplyTo).To(Equal(properties.ReplyTo))
506506
}
507-
if dc.message.Properties.ContentType != nil {
508-
Expect(dc.message.Properties.ContentType).To(Equal(properties.ContentType))
507+
if dc.Message().Properties.ContentType != nil {
508+
Expect(dc.Message().Properties.ContentType).To(Equal(properties.ContentType))
509509
}
510-
if dc.message.Properties.ContentEncoding != nil {
511-
Expect(dc.message.Properties.ContentEncoding).To(Equal(properties.ContentEncoding))
510+
if dc.Message().Properties.ContentEncoding != nil {
511+
Expect(dc.Message().Properties.ContentEncoding).To(Equal(properties.ContentEncoding))
512512
}
513-
if dc.message.Properties.GroupID != nil {
514-
Expect(dc.message.Properties.GroupID).To(Equal(properties.GroupID))
513+
if dc.Message().Properties.GroupID != nil {
514+
Expect(dc.Message().Properties.GroupID).To(Equal(properties.GroupID))
515515
}
516-
if dc.message.Properties.ReplyToGroupID != nil {
517-
Expect(dc.message.Properties.ReplyToGroupID).To(Equal(properties.ReplyToGroupID))
516+
if dc.Message().Properties.ReplyToGroupID != nil {
517+
Expect(dc.Message().Properties.ReplyToGroupID).To(Equal(properties.ReplyToGroupID))
518518
}
519-
if dc.message.Properties.GroupSequence != nil {
520-
Expect(dc.message.Properties.GroupSequence).To(Equal(properties.GroupSequence))
519+
if dc.Message().Properties.GroupSequence != nil {
520+
Expect(dc.Message().Properties.GroupSequence).To(Equal(properties.GroupSequence))
521521
}
522522

523-
if dc.message.Properties.ReplyToGroupID != nil {
524-
Expect(dc.message.Properties.ReplyToGroupID).To(Equal(properties.ReplyToGroupID))
523+
if dc.Message().Properties.ReplyToGroupID != nil {
524+
Expect(dc.Message().Properties.ReplyToGroupID).To(Equal(properties.ReplyToGroupID))
525525
}
526526

527527
// here we compare only the year, month and day
528528
// it is not perfect but it is enough for the test
529-
if dc.message.Properties.CreationTime != nil {
530-
Expect(dc.message.Properties.CreationTime.Year()).To(Equal(properties.CreationTime.Year()))
531-
Expect(dc.message.Properties.CreationTime.Month()).To(Equal(properties.CreationTime.Month()))
532-
Expect(dc.message.Properties.CreationTime.Day()).To(Equal(properties.CreationTime.Day()))
529+
if dc.Message().Properties.CreationTime != nil {
530+
Expect(dc.Message().Properties.CreationTime.Year()).To(Equal(properties.CreationTime.Year()))
531+
Expect(dc.Message().Properties.CreationTime.Month()).To(Equal(properties.CreationTime.Month()))
532+
Expect(dc.Message().Properties.CreationTime.Day()).To(Equal(properties.CreationTime.Day()))
533533
}
534534

535-
if dc.message.Properties.AbsoluteExpiryTime != nil {
536-
Expect(dc.message.Properties.AbsoluteExpiryTime.Year()).To(Equal(properties.AbsoluteExpiryTime.Year()))
537-
Expect(dc.message.Properties.AbsoluteExpiryTime.Month()).To(Equal(properties.AbsoluteExpiryTime.Month()))
538-
Expect(dc.message.Properties.AbsoluteExpiryTime.Day()).To(Equal(properties.AbsoluteExpiryTime.Day()))
535+
if dc.Message().Properties.AbsoluteExpiryTime != nil {
536+
Expect(dc.Message().Properties.AbsoluteExpiryTime.Year()).To(Equal(properties.AbsoluteExpiryTime.Year()))
537+
Expect(dc.Message().Properties.AbsoluteExpiryTime.Month()).To(Equal(properties.AbsoluteExpiryTime.Month()))
538+
Expect(dc.Message().Properties.AbsoluteExpiryTime.Day()).To(Equal(properties.AbsoluteExpiryTime.Day()))
539539
}
540540

541-
if dc.message.Properties.CorrelationID != nil {
542-
Expect(dc.message.Properties.CorrelationID).To(Equal(properties.CorrelationID))
541+
if dc.Message().Properties.CorrelationID != nil {
542+
Expect(dc.Message().Properties.CorrelationID).To(Equal(properties.CorrelationID))
543543
}
544544

545545
Expect(dc.Accept(context.Background())).To(BeNil())

pkg/rabbitmqamqp/amqp_consumer_test.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package rabbitmqamqp
22

33
import (
44
"context"
5+
"fmt"
56

67
"github.com/Azure/go-amqp"
78
. "github.com/onsi/ginkgo/v2"
@@ -297,3 +298,64 @@ var _ = Describe("Consumer direct reply to", func() {
297298
})
298299

299300
})
301+
302+
var _ = Describe("Consumer pre-settled", func() {
303+
It("should consume messages in pre-settled mode and throw error on settlement methods", func() {
304+
qName := generateNameWithDateTime("Consumer pre-settled should consume messages in pre-settled mode")
305+
connection, err := Dial(context.Background(), "amqp://", nil)
306+
Expect(err).To(BeNil())
307+
DeferCleanup(func() {
308+
_ = connection.Management().DeleteQueue(context.Background(), qName)
309+
_ = connection.Close(context.Background())
310+
})
311+
312+
queue, err := connection.Management().DeclareQueue(context.Background(), &QuorumQueueSpecification{
313+
Name: qName,
314+
})
315+
Expect(err).To(BeNil())
316+
Expect(queue).NotTo(BeNil())
317+
318+
// Publish messages
319+
messageCount := 100
320+
initialCredits := int32(messageCount / 10)
321+
publisher, err := connection.NewPublisher(context.Background(), &QueueAddress{Queue: qName}, nil)
322+
Expect(err).To(BeNil())
323+
for i := 0; i < messageCount; i++ {
324+
msg := &amqp.Message{
325+
Data: [][]byte{[]byte(fmt.Sprintf("message-%d", i))},
326+
}
327+
_, err = publisher.Publish(context.Background(), msg)
328+
Expect(err).To(BeNil())
329+
}
330+
331+
// Create consumer with pre-settled enabled
332+
consumer, err := connection.NewConsumer(context.Background(), qName, &ConsumerOptions{
333+
InitialCredits: initialCredits,
334+
PreSettled: true,
335+
})
336+
Expect(err).To(BeNil())
337+
Expect(consumer).NotTo(BeNil())
338+
DeferCleanup(func() {
339+
_ = consumer.Close(context.Background())
340+
})
341+
342+
// Receive all messages
343+
for i := 0; i < messageCount; i++ {
344+
dc, err := consumer.Receive(context.Background())
345+
Expect(err).To(BeNil())
346+
Expect(dc.Message()).NotTo(BeNil())
347+
Expect(string(dc.Message().Data[0])).To(Equal(fmt.Sprintf("message-%d", i)))
348+
349+
// Try to call Accept() and verify it throws an error
350+
err = dc.Accept(context.Background())
351+
Expect(err).NotTo(BeNil())
352+
Expect(err.Error()).To(ContainSubstring("auto-settle on, message is already disposed"))
353+
}
354+
355+
// Verify queue is empty (messages were consumed)
356+
queueInfo, err := connection.Management().QueueInfo(context.Background(), qName)
357+
Expect(err).To(BeNil())
358+
Expect(queueInfo).NotTo(BeNil())
359+
Expect(queueInfo.messageCount).To(Equal(uint64(0)))
360+
})
361+
})

0 commit comments

Comments
 (0)