Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ docs/examples/ # Example code demonstrating usage
- `Classic` - Classic queue type
- `Stream` - Stream queue type
- `Jms` - JMS queue type // for rabbitmq tanzu
- `Delayed` - Delayed queue type // for Tanzu RabbitMQ 4.3+

### Consumer Options

Expand All @@ -78,6 +79,7 @@ docs/examples/ # Example code demonstrating usage
- **`QuorumQueueSpecification`** - Specification for quorum queues
- **`ClassicQueueSpecification`** - Specification for classic queues
- **`StreamQueueSpecification`** - Specification for stream queues
- **`DelayedQueueSpecification`** - Specification for delayed queues (Tanzu RabbitMQ 4.3+)
- **`AutoGeneratedQueueSpecification`** - It is a classic queue with auto-generated name
- **`DefaultQueueSpecification`** - Default queue specification. Server will decide the queue type based on the virtual host configuration.

Expand Down
3 changes: 2 additions & 1 deletion docs/examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@


- [Getting Started](getting_started) - A simple example to get you started.
- [JMS queue](jms_queue) - Same flow as getting started, using `JmsQueueSpecification` (Tanzu RabbitMQ 4.x).
- [JMS queue](jms_queue) - Same flow as getting started, using `JMSQueueSpecification` (Tanzu RabbitMQ 4.3+).
- [Delayed queue](delayed_queue) - Same flow as getting started, using `DelayedQueueSpecification` (Tanzu RabbitMQ 4.x).
Copy link

Copilot AI Mar 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Delayed queue entry says Tanzu RabbitMQ 4.x, while other documentation in this PR (AGENTS.md and the runtime validation error) says 4.3+. Please make the version requirement consistent across docs and code so users don’t get conflicting guidance.

Suggested change
- [Delayed queue](delayed_queue) - Same flow as getting started, using `DelayedQueueSpecification` (Tanzu RabbitMQ 4.x).
- [Delayed queue](delayed_queue) - Same flow as getting started, using `DelayedQueueSpecification` (Tanzu RabbitMQ 4.3+).

Copilot uses AI. Check for mistakes.
- [Reliable](reliable) - An example of how to deal with reconnections and error handling.
- [Streams](streams) - An example of how to use [RabbitMQ Streams](https://www.rabbitmq.com/docs/streams) with AMQP 1.0
- [Stream Filtering](streams_filtering) - An example of how to use streams [Filter Expressions](https://www.rabbitmq.com/blog/2024/12/13/amqp-filter-expressions)
Expand Down
199 changes: 199 additions & 0 deletions docs/examples/delayed_queue/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
// RabbitMQ AMQP 1.0 Go Client: https://github.com/rabbitmq/rabbitmq-amqp-go-client
// RabbitMQ AMQP 1.0 documentation: https://www.rabbitmq.com/docs/amqp
// This example mirrors getting_started but declares a delayed queue via DelayedQueueSpecification
// (RabbitMQ queue type "delayed"). Delayed queues are available on Tanzu RabbitMQ 4.x+; see:
// https://techdocs.broadcom.com/us/en/vmware-tanzu/data-solutions/tanzu-rabbitmq-on-kubernetes/4-2/tanzu-rabbitmq-kubernetes/delayed-queues.html
Comment on lines +4 to +5
Copy link

Copilot AI Mar 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This example header says delayed queues are available on Tanzu RabbitMQ 4.x+ and links to the 4-2 docs, but the client validation/error message says 4.3+. Please align the stated minimum version and the linked docs version with the actual requirement enforced by DelayedQueueSpecification.validate.

Suggested change
// (RabbitMQ queue type "delayed"). Delayed queues are available on Tanzu RabbitMQ 4.x+; see:
// https://techdocs.broadcom.com/us/en/vmware-tanzu/data-solutions/tanzu-rabbitmq-on-kubernetes/4-2/tanzu-rabbitmq-kubernetes/delayed-queues.html
// (RabbitMQ queue type "delayed"). Delayed queues are available on Tanzu RabbitMQ 4.3+; see:
// https://techdocs.broadcom.com/us/en/vmware-tanzu/data-solutions/tanzu-rabbitmq-on-kubernetes/4-3/tanzu-rabbitmq-kubernetes/delayed-queues.html

Copilot uses AI. Check for mistakes.
// example path: https://github.com/rabbitmq/rabbitmq-amqp-go-client/tree/main/docs/examples/delayed_queue/main.go

package main

import (
"context"
"errors"
"fmt"
"time"

"github.com/Azure/go-amqp"

rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp"
)

func main() {
exchangeName := "delayed-queue-go-exchange"
queueName := "delayed-queue-go-queue"
routingKey := "routing-key"

rmq.Info("Delayed queue example with AMQP Go AMQP 1.0 Client")

stateChanged := make(chan *rmq.StateChanged, 1)
go func(ch chan *rmq.StateChanged) {
for statusChanged := range ch {
rmq.Info("[connection]", "Status changed", statusChanged)
}
}(stateChanged)

env := rmq.NewEnvironment("amqp://guest:guest@localhost:5672/", nil)

amqpConnection, err := env.NewConnection(context.Background())
if err != nil {
rmq.Error("Error opening connection", err)
return
}
amqpConnection.NotifyStatusChange(stateChanged)

rmq.Info("AMQP connection opened")
management := amqpConnection.Management()
exchangeInfo, err := management.DeclareExchange(context.TODO(), &rmq.TopicExchangeSpecification{
Name: exchangeName,
})
if err != nil {
rmq.Error("Error declaring exchange", err)
return
}

// Declare a delayed queue (x-queue-type=delayed). Optional shovel and other arguments
// can be set on DelayedQueueSpecification (see pkg/rabbitmqamqp/entities.go).
queueInfo, err := management.DeclareQueue(context.TODO(), &rmq.DelayedQueueSpecification{
Name: queueName,
})

if err != nil {
rmq.Error("Error declaring queue", err)
return
}

bindingPath, err := management.Bind(context.TODO(), &rmq.ExchangeToQueueBindingSpecification{
SourceExchange: exchangeName,
DestinationQueue: queueName,
BindingKey: routingKey,
})

if err != nil {
rmq.Error("Error binding", err)
return
}

consumer, err := amqpConnection.NewConsumer(context.Background(), queueName, nil)
if err != nil {
rmq.Error("Error creating consumer", err)
return
}

consumerContext, cancel := context.WithCancel(context.Background())

go func(ctx context.Context) {
for {
deliveryContext, err := consumer.Receive(ctx)
if errors.Is(err, context.Canceled) {
rmq.Info("[Consumer] Consumer closed", "context", err)
return
}
if err != nil {
rmq.Error("[Consumer] Error receiving message", "error", err)
return
}

rmq.Info("[Consumer] Received message", "message",
fmt.Sprintf("%s", deliveryContext.Message().Data))

err = deliveryContext.Accept(context.Background())
if err != nil {
rmq.Error("[Consumer] Error accepting message", "error", err)
return
}
}
}(consumerContext)

publisher, err := amqpConnection.NewPublisher(context.Background(), &rmq.ExchangeAddress{
Exchange: exchangeName,
Key: routingKey,
}, nil)
if err != nil {
rmq.Error("Error creating publisher", err)
return
}

// Schedule delivery a few seconds in the future using message annotations (see Tanzu delayed queue docs).
const delayMs int64 = 3000
for i := 0; i < 5; i++ {
msg := rmq.NewMessage([]byte(fmt.Sprintf("Hello after delay #%d", i)))
if msg.Annotations == nil {
msg.Annotations = amqp.Annotations{}
}
msg.Annotations["x-opt-delivery-delay"] = delayMs

publishResult, err := publisher.Publish(context.Background(), msg)
if err != nil {
rmq.Error("Error publishing message", "error", err)
time.Sleep(1 * time.Second)
continue
}
switch publishResult.Outcome.(type) {
case *rmq.StateAccepted:
rmq.Info("[Publisher]", "Message accepted (scheduled)", publishResult.Message.Data[0])
case *rmq.StateReleased:
rmq.Warn("[Publisher]", "Message was not routed", publishResult.Message.Data[0])
case *rmq.StateRejected:
rmq.Warn("[Publisher]", "Message rejected", publishResult.Message.Data[0])
stateType := publishResult.Outcome.(*rmq.StateRejected)
if stateType.Error != nil {
rmq.Warn("[Publisher]", "Message rejected with error: %v", stateType.Error)
}
default:
rmq.Warn("Message state: %v", publishResult.Outcome)
}
}

println("press any key to close the connection")

var input string
_, _ = fmt.Scanln(&input)

cancel()
err = consumer.Close(context.Background())
if err != nil {
rmq.Error("[Consumer]", err)
return
}
err = publisher.Close(context.Background())
if err != nil {
rmq.Error("[Publisher]", err)
return
}

err = management.Unbind(context.TODO(), bindingPath)

if err != nil {
rmq.Error("Error unbinding", "error", err)
return
}

err = management.DeleteExchange(context.TODO(), exchangeInfo.Name())
if err != nil {
rmq.Error("Error deleting exchange", "error", err)
return
}

purged, err := management.PurgeQueue(context.TODO(), queueInfo.Name())
if err != nil {
rmq.Error("Error purging queue", "error", err)
return
}
rmq.Info("Purged messages from the queue", "count", purged)

err = management.DeleteQueue(context.TODO(), queueInfo.Name())
if err != nil {
rmq.Error("Error deleting queue", "error", err)
return
}

err = env.CloseConnections(context.Background())
if err != nil {
rmq.Error("Error closing connection", "error", err)
return
}

rmq.Info("AMQP connection closed")
time.Sleep(100 * time.Millisecond)
close(stateChanged)
}
7 changes: 7 additions & 0 deletions pkg/rabbitmqamqp/amqp_connection_recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,13 @@ func (q *queueRecoveryRecord) toIQueueSpecification() IQueueSpecification {
Name: q.queueName,
Arguments: q.arguments,
}
case Delayed:
return &DelayedQueueSpecification{
Name: q.queueName,
IsAutoDelete: *q.autoDelete,
IsExclusive: *q.exclusive,
Arguments: q.arguments,
}
default:
return &DefaultQueueSpecification{
Name: q.queueName,
Expand Down
2 changes: 2 additions & 0 deletions pkg/rabbitmqamqp/amqp_connection_recovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,10 +225,12 @@ var _ = Describe("Recovery connection test", func() {
Entry("Classic queue", Classic, true, true, map[string]any{}),
Entry("Stream queue", Stream, false, false, map[string]any{}),
//Entry("JMS queue", Jms, false, false, map[string]any{}),
//Entry("Delayed queue", Delayed, false, false, map[string]any{}),
Entry("Quorum queue with arguments", Quorum, false, false, map[string]any{"x-max-length-bytes": 1000}),
Entry("Classic queue with arguments", Classic, true, true, map[string]any{"x-max-length-bytes": 1000}),
Entry("Stream queue with arguments", Stream, false, false, map[string]any{"x-max-length-bytes": 1000}),
//Entry("JMS queue with arguments", Jms, false, false, map[string]any{"x-max-length-bytes": 1000}),
Entry("Delayed queue with arguments", Delayed, false, false, map[string]any{"x-max-length-bytes": 1000}),
Comment on lines 225 to +233
Copy link

Copilot AI Mar 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The topology recovery table adds a Delayed queue entry with arguments but leaves the no-arguments Delayed entry commented out. Re-enabling the basic Delayed case would keep coverage consistent with the other queue types and ensure recovery works when arguments are empty.

Copilot uses AI. Check for mistakes.
)

DescribeTable("Exchange record returns the expected exchange specification",
Expand Down
10 changes: 9 additions & 1 deletion pkg/rabbitmqamqp/amqp_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,16 @@ var _ = Describe("AMQP Queue test ", func() {
Name: queueName,
})
Expect(err).NotTo(BeNil())
Expect(err.Error()).To(ContainSubstring("JMSQueueSpecification is only supported on Tanzu RabbitMQ 4.3 or later"))
Expect(err.Error()).To(ContainSubstring("JMSQueueSpecification is only supported on Tanzu RabbitMQ"))
})

It("should fail if declare a Delayed queue in the open source RabbitMQ", func() {
queueName := generateName("should fail if declare a Delayed queue in the open source RabbitMQ")
_, err := management.DeclareQueue(context.TODO(), &DelayedQueueSpecification{
Name: queueName,
})
Expect(err).NotTo(BeNil())
Expect(err.Error()).To(ContainSubstring("DelayedQueueSpecification is only supported on Tanzu RabbitMQ"))
})

// default
Expand Down
Loading
Loading