Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .ci/ubuntu/gha-setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ set -o xtrace
script_dir="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
readonly script_dir
echo "[INFO] script_dir: '$script_dir'"
readonly rabbitmq_image=${RABBITMQ_IMAGE:-rabbitmq:4.2-rc-management-alpine}
readonly rabbitmq_image=${RABBITMQ_IMAGE:-rabbitmq:4.2-management-alpine}

readonly docker_name_prefix='rabbitmq-amqp-go-client'
readonly docker_network_name="$docker_name_prefix-network"
Expand Down
3 changes: 2 additions & 1 deletion docs/examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@
- [Broadcast](broadcast) - An example of how to use fanout to broadcast messages to multiple auto-deleted queues.
- [RPC Echo](rpc_echo_server) - An example of how to implement RPC with the AMQP 1.0 client.
- [SQL stream Filtering](sql_stream_filter) - An example of how to use SQL stream filtering with RabbitMQ Streams.
- [Web Sockets](web_sockets) - An example of how to use Web Sockets with the AMQP 1.0 client.
- [Web Sockets](web_sockets) - An example of how to use Web Sockets with the AMQP 1.0 client.
- [Pre-settled messages](pre_settled) - An example of how to receive pre-settled messages with the AMQP 1.0 client.
103 changes: 103 additions & 0 deletions docs/examples/pre_settled/pre_settled.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// RabbitMQ AMQP 1.0 Go Client: https://github.com/rabbitmq/rabbitmq-amqp-go-client
// RabbitMQ AMQP 1.0 documentation: https://www.rabbitmq.com/docs/amqp
// This example is demonstrating how to consume messages with pre-settled mode enabled.
// pre-settled is valid only for classic and quorum queues.
// Pre-settled mode means that messages are considered accepted by the broker as soon as they are delivered to the consumer,
// without requiring explicit acceptance from the consumer side.
// This is useful for scenarios where at-most-once delivery semantics are acceptable, and it can improve performance by reducing
// the overhead of message acknowledgments.
// Example path:https://github.com/rabbitmq/rabbitmq-amqp-go-client/tree/main/docs/examples/pre_settled/pre_settled.go
package main

import (
"context"
"fmt"

rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp"
)

func main() {
rmq.Info("Pre-Settled example with AMQP Go AMQP 1.0 Client")

env := rmq.NewEnvironment("amqp://guest:guest@localhost:5672/", nil)
amqpConnection, err := env.NewConnection(context.TODO())
if err != nil {
rmq.Error("Error opening connection", err)
return
}
// create queue for the example
_, err = amqpConnection.Management().DeclareQueue(context.TODO(), &rmq.QuorumQueueSpecification{
Name: "pre-settled-queue",
})
if err != nil {
rmq.Error("Error declaring queue", err)
return
}
rmq.Info("Queue 'pre-settled-queue' declared")

// publish some messages to the queue
producer, err := amqpConnection.NewPublisher(context.TODO(), &rmq.QueueAddress{
Queue: "pre-settled-queue",
}, nil)
if err != nil {
rmq.Error("Error creating producer", err)
return
}
for i := 0; i < 100; i++ {
msg := rmq.NewMessage([]byte(fmt.Sprintf("Pre-settled message number %d ", i+1)))
_, err := producer.Publish(context.TODO(), msg)
if err != nil {
rmq.Error("Error publishing message", err)
return
}
}

// create consumer with pre-settled mode enabled

consumer, err := amqpConnection.NewConsumer(context.TODO(), "pre-settled-queue", &rmq.ConsumerOptions{
PreSettled: true,
})
if err != nil {
rmq.Error("Error creating consumer", err)
return
}
rmq.Info("Consumer created with pre-settled mode enabled")

for i := 0; i < 100; i++ {
dc, err := consumer.Receive(context.TODO())
if err != nil {
rmq.Error("Error consuming message", err)
return
}
// here we don't need to accept the message
// because pre-settled mode is enabled
rmq.Info("[Consumer]", "Message received", string(dc.Message().GetData()))
}

// clean up
err = consumer.Close(context.TODO())
if err != nil {
rmq.Error("Error closing consumer", err)
return
}
err = producer.Close(context.TODO())
if err != nil {
rmq.Error("Error closing producer", err)
return
}
// delete the queue
err = amqpConnection.Management().DeleteQueue(context.TODO(), "pre-settled-queue")
if err != nil {
rmq.Error("Error deleting queue", err)
return
}
rmq.Info("Queue 'pre-settled-queue' deleted")

err = amqpConnection.Close(context.TODO())
if err != nil {
rmq.Error("Error closing connection", err)
return
}
rmq.Info("Example finished successfully")

}
55 changes: 53 additions & 2 deletions pkg/rabbitmqamqp/amqp_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,17 @@ import (
"github.com/google/uuid"
)

// IDeliveryContext represents a delivery context for received messages.
// It provides methods to access the message and settle it (accept, discard, requeue).
type IDeliveryContext interface {
Message() *amqp.Message
Accept(ctx context.Context) error
Discard(ctx context.Context, e *amqp.Error) error
DiscardWithAnnotations(ctx context.Context, annotations amqp.Annotations) error
Requeue(ctx context.Context) error
RequeueWithAnnotations(ctx context.Context, annotations amqp.Annotations) error
}

type DeliveryContext struct {
receiver *amqp.Receiver
message *amqp.Message
Expand Down Expand Up @@ -66,6 +77,36 @@ func (dc *DeliveryContext) RequeueWithAnnotations(ctx context.Context, annotatio
})
}

// PreSettledDeliveryContext represents a delivery context for pre-settled messages.
// All settlement methods throw errors since the message is already settled.
type PreSettledDeliveryContext struct {
message *amqp.Message
}

func (dc *PreSettledDeliveryContext) Message() *amqp.Message {
return dc.message
}

func (dc *PreSettledDeliveryContext) Accept(ctx context.Context) error {
return fmt.Errorf("auto-settle on, message is already disposed")
}

func (dc *PreSettledDeliveryContext) Discard(ctx context.Context, e *amqp.Error) error {
return fmt.Errorf("auto-settle on, message is already disposed")
}

func (dc *PreSettledDeliveryContext) DiscardWithAnnotations(ctx context.Context, annotations amqp.Annotations) error {
return fmt.Errorf("auto-settle on, message is already disposed")
}

func (dc *PreSettledDeliveryContext) Requeue(ctx context.Context) error {
return fmt.Errorf("auto-settle on, message is already disposed")
}

func (dc *PreSettledDeliveryContext) RequeueWithAnnotations(ctx context.Context, annotations amqp.Annotations) error {
return fmt.Errorf("auto-settle on, message is already disposed")
}

type consumerState byte

const (
Expand Down Expand Up @@ -124,13 +165,18 @@ func (c *Consumer) createReceiver(ctx context.Context) error {
// In there is not a restart this code won't be executed.
if c.options != nil {
// here we assume it is a stream. So we recreate the options with the offset.
c.options = &StreamConsumerOptions{
streamOpts := &StreamConsumerOptions{
ReceiverLinkName: c.options.linkName(),
InitialCredits: c.options.initialCredits(),
// we increment the offset by one to start from the next message.
// because the current was already consumed.
Offset: &OffsetValue{Offset: uint64(c.currentOffset + 1)},
}
// Preserve StreamFilterOptions if it's a StreamConsumerOptions
if sco, ok := c.options.(*StreamConsumerOptions); ok {
streamOpts.StreamFilterOptions = sco.StreamFilterOptions
}
c.options = streamOpts
}
}
// define a variable *amqp.ReceiverOptions type
Expand All @@ -153,7 +199,7 @@ func (c *Consumer) createReceiver(ctx context.Context) error {
return nil
}

func (c *Consumer) Receive(ctx context.Context) (*DeliveryContext, error) {
func (c *Consumer) Receive(ctx context.Context) (IDeliveryContext, error) {
msg, err := c.receiver.Load().Receive(ctx, nil)
if err != nil {
return nil, err
Expand All @@ -164,6 +210,11 @@ func (c *Consumer) Receive(ctx context.Context) (*DeliveryContext, error) {
c.currentOffset = msg.Annotations["x-stream-offset"].(int64)
}

// Check if pre-settled mode is enabled
if c.options != nil && c.options.preSettled() {
return &PreSettledDeliveryContext{message: msg}, nil
}

return &DeliveryContext{receiver: c.receiver.Load(), message: msg}, nil
}

Expand Down
58 changes: 29 additions & 29 deletions pkg/rabbitmqamqp/amqp_consumer_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ var _ = Describe("Consumer stream test", func() {
Expect(dc.Message()).NotTo(BeNil())
Expect(string(dc.Message().GetData())).To(Equal(fmt.Sprintf("Message_id:%d_label:%s", i, label)))

Expect(dc.message.ApplicationProperties).To(HaveKeyWithValue(key, value))
Expect(dc.Message().ApplicationProperties).To(HaveKeyWithValue(key, value))
Expect(dc.Accept(context.Background())).To(BeNil())
}
Expect(consumer.Close(context.Background())).To(BeNil())
Expand Down Expand Up @@ -495,51 +495,51 @@ var _ = Describe("Consumer stream test", func() {
// we test one by one because of the date time fields
// It is not possible to compare the whole structure due of the time
// It is not perfect but it is enough for the test
if dc.message.Properties.MessageID != nil {
Expect(dc.message.Properties.MessageID).To(Equal(properties.MessageID))
if dc.Message().Properties.MessageID != nil {
Expect(dc.Message().Properties.MessageID).To(Equal(properties.MessageID))
}
if dc.message.Properties.Subject != nil {
Expect(dc.message.Properties.Subject).To(Equal(properties.Subject))
if dc.Message().Properties.Subject != nil {
Expect(dc.Message().Properties.Subject).To(Equal(properties.Subject))
}
if dc.message.Properties.ReplyTo != nil {
Expect(dc.message.Properties.ReplyTo).To(Equal(properties.ReplyTo))
if dc.Message().Properties.ReplyTo != nil {
Expect(dc.Message().Properties.ReplyTo).To(Equal(properties.ReplyTo))
}
if dc.message.Properties.ContentType != nil {
Expect(dc.message.Properties.ContentType).To(Equal(properties.ContentType))
if dc.Message().Properties.ContentType != nil {
Expect(dc.Message().Properties.ContentType).To(Equal(properties.ContentType))
}
if dc.message.Properties.ContentEncoding != nil {
Expect(dc.message.Properties.ContentEncoding).To(Equal(properties.ContentEncoding))
if dc.Message().Properties.ContentEncoding != nil {
Expect(dc.Message().Properties.ContentEncoding).To(Equal(properties.ContentEncoding))
}
if dc.message.Properties.GroupID != nil {
Expect(dc.message.Properties.GroupID).To(Equal(properties.GroupID))
if dc.Message().Properties.GroupID != nil {
Expect(dc.Message().Properties.GroupID).To(Equal(properties.GroupID))
}
if dc.message.Properties.ReplyToGroupID != nil {
Expect(dc.message.Properties.ReplyToGroupID).To(Equal(properties.ReplyToGroupID))
if dc.Message().Properties.ReplyToGroupID != nil {
Expect(dc.Message().Properties.ReplyToGroupID).To(Equal(properties.ReplyToGroupID))
}
if dc.message.Properties.GroupSequence != nil {
Expect(dc.message.Properties.GroupSequence).To(Equal(properties.GroupSequence))
if dc.Message().Properties.GroupSequence != nil {
Expect(dc.Message().Properties.GroupSequence).To(Equal(properties.GroupSequence))
}

if dc.message.Properties.ReplyToGroupID != nil {
Expect(dc.message.Properties.ReplyToGroupID).To(Equal(properties.ReplyToGroupID))
if dc.Message().Properties.ReplyToGroupID != nil {
Expect(dc.Message().Properties.ReplyToGroupID).To(Equal(properties.ReplyToGroupID))
}

// here we compare only the year, month and day
// it is not perfect but it is enough for the test
if dc.message.Properties.CreationTime != nil {
Expect(dc.message.Properties.CreationTime.Year()).To(Equal(properties.CreationTime.Year()))
Expect(dc.message.Properties.CreationTime.Month()).To(Equal(properties.CreationTime.Month()))
Expect(dc.message.Properties.CreationTime.Day()).To(Equal(properties.CreationTime.Day()))
if dc.Message().Properties.CreationTime != nil {
Expect(dc.Message().Properties.CreationTime.Year()).To(Equal(properties.CreationTime.Year()))
Expect(dc.Message().Properties.CreationTime.Month()).To(Equal(properties.CreationTime.Month()))
Expect(dc.Message().Properties.CreationTime.Day()).To(Equal(properties.CreationTime.Day()))
}

if dc.message.Properties.AbsoluteExpiryTime != nil {
Expect(dc.message.Properties.AbsoluteExpiryTime.Year()).To(Equal(properties.AbsoluteExpiryTime.Year()))
Expect(dc.message.Properties.AbsoluteExpiryTime.Month()).To(Equal(properties.AbsoluteExpiryTime.Month()))
Expect(dc.message.Properties.AbsoluteExpiryTime.Day()).To(Equal(properties.AbsoluteExpiryTime.Day()))
if dc.Message().Properties.AbsoluteExpiryTime != nil {
Expect(dc.Message().Properties.AbsoluteExpiryTime.Year()).To(Equal(properties.AbsoluteExpiryTime.Year()))
Expect(dc.Message().Properties.AbsoluteExpiryTime.Month()).To(Equal(properties.AbsoluteExpiryTime.Month()))
Expect(dc.Message().Properties.AbsoluteExpiryTime.Day()).To(Equal(properties.AbsoluteExpiryTime.Day()))
}

if dc.message.Properties.CorrelationID != nil {
Expect(dc.message.Properties.CorrelationID).To(Equal(properties.CorrelationID))
if dc.Message().Properties.CorrelationID != nil {
Expect(dc.Message().Properties.CorrelationID).To(Equal(properties.CorrelationID))
}

Expect(dc.Accept(context.Background())).To(BeNil())
Expand Down
62 changes: 62 additions & 0 deletions pkg/rabbitmqamqp/amqp_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package rabbitmqamqp

import (
"context"
"fmt"

"github.com/Azure/go-amqp"
. "github.com/onsi/ginkgo/v2"
Expand Down Expand Up @@ -297,3 +298,64 @@ var _ = Describe("Consumer direct reply to", func() {
})

})

var _ = Describe("Consumer pre-settled", func() {
It("should consume messages in pre-settled mode and throw error on settlement methods", func() {
qName := generateNameWithDateTime("Consumer pre-settled should consume messages in pre-settled mode")
connection, err := Dial(context.Background(), "amqp://", nil)
Expect(err).To(BeNil())
DeferCleanup(func() {
_ = connection.Management().DeleteQueue(context.Background(), qName)
_ = connection.Close(context.Background())
})

queue, err := connection.Management().DeclareQueue(context.Background(), &QuorumQueueSpecification{
Name: qName,
})
Expect(err).To(BeNil())
Expect(queue).NotTo(BeNil())

// Publish messages
messageCount := 100
initialCredits := int32(messageCount / 10)
publisher, err := connection.NewPublisher(context.Background(), &QueueAddress{Queue: qName}, nil)
Expect(err).To(BeNil())
for i := 0; i < messageCount; i++ {
msg := &amqp.Message{
Data: [][]byte{[]byte(fmt.Sprintf("message-%d", i))},
}
_, err = publisher.Publish(context.Background(), msg)
Expect(err).To(BeNil())
}

// Create consumer with pre-settled enabled
consumer, err := connection.NewConsumer(context.Background(), qName, &ConsumerOptions{
InitialCredits: initialCredits,
PreSettled: true,
})
Expect(err).To(BeNil())
Expect(consumer).NotTo(BeNil())
DeferCleanup(func() {
_ = consumer.Close(context.Background())
})

// Receive all messages
for i := 0; i < messageCount; i++ {
dc, err := consumer.Receive(context.Background())
Expect(err).To(BeNil())
Expect(dc.Message()).NotTo(BeNil())
Expect(string(dc.Message().Data[0])).To(Equal(fmt.Sprintf("message-%d", i)))

// Try to call Accept() and verify it throws an error
err = dc.Accept(context.Background())
Expect(err).NotTo(BeNil())
Expect(err.Error()).To(ContainSubstring("auto-settle on, message is already disposed"))
}

// Verify queue is empty (messages were consumed)
queueInfo, err := connection.Management().QueueInfo(context.Background(), qName)
Expect(err).To(BeNil())
Expect(queueInfo).NotTo(BeNil())
Expect(queueInfo.messageCount).To(Equal(uint64(0)))
})
})
Loading