Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion pkg/rabbitmqamqp/amqp_connection_recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,15 @@ func (q *queueRecoveryRecord) toIQueueSpecification() IQueueSpecification {
Name: q.queueName,
Arguments: q.arguments,
}
default:
return &DefaultQueueSpecification{
Name: q.queueName,
IsAutoDelete: *q.autoDelete,
IsExclusive: *q.exclusive,
Arguments: q.arguments,
}
Comment on lines +122 to +128
Copy link

Copilot AI Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the new default branch, toIQueueSpecification dereferences q.autoDelete and q.exclusive, but queueRecoveryRecord.autoDelete/exclusive are only populated for Classic queues in AmqpManagement.DeclareQueue. For DefaultQueueSpecification records these pointers will be nil, causing a panic during topology recovery. Consider either storing autoDelete/exclusive for default queues as well, or handling nil pointers here (e.g., defaulting to false).

Copilot uses AI. Check for mistakes.
}
return nil

}

type exchangeRecoveryRecord struct {
Expand Down
3 changes: 2 additions & 1 deletion pkg/rabbitmqamqp/amqp_management.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,8 @@ func (a *AmqpManagement) DeclareQueue(ctx context.Context, specification IQueueS
queueName: specification.name(),
queueType: specification.queueType(),
}
if specification.queueType() == Classic {
// valid only for classic queues or default when queue type is not specified, for quorum queues these fields are ignored and not set in the recovery record
if len(specification.queueType()) == 0 || specification.queueType() == Classic {
recoveryRecord.autoDelete = ptr(specification.isAutoDelete())
recoveryRecord.exclusive = ptr(specification.isExclusive())
}
Expand Down
26 changes: 25 additions & 1 deletion pkg/rabbitmqamqp/amqp_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package rabbitmqamqp

import (
"context"
"strconv"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"strconv"
)

var _ = Describe("AMQP Queue test ", func() {
Expand Down Expand Up @@ -245,6 +246,29 @@ var _ = Describe("AMQP Queue test ", func() {
Expect(err).To(Equal(ErrDoesNotExist))
Expect(result).To(BeNil())
})

// default
It("AMQP Declare Queue with DefaultQueueSpecification should succeed", func() {
queueName := generateName("AMQP Declare Queue with DefaultQueueSpecification should succeed")
queueInfo, err := management.DeclareQueue(context.TODO(), &DefaultQueueSpecification{
Name: queueName,
DeadLetterExchange: "dead-letter-exchange",
DeadLetterRoutingKey: "dead-letter-routing-key",
})
Expect(err).To(BeNil())
Expect(queueInfo).NotTo(BeNil())
Expect(queueInfo.Name()).To(Equal(queueName))
Expect(queueInfo.IsDurable()).To(BeTrue())
Expect(queueInfo.IsAutoDelete()).To(BeFalse())
Expect(queueInfo.IsExclusive()).To(BeFalse())
Expect(queueInfo.arguments["x-dead-letter-exchange"]).To(Equal("dead-letter-exchange"))
Expect(queueInfo.arguments["x-dead-letter-routing-key"]).To(Equal("dead-letter-routing-key"))
Comment on lines +264 to +265
Copy link

Copilot AI Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rest of this file uses the public accessor queueInfo.Arguments() for argument assertions; accessing the unexported field queueInfo.arguments directly makes the test more brittle to internal refactors. Prefer using Arguments() here for consistency and encapsulation.

Suggested change
Expect(queueInfo.arguments["x-dead-letter-exchange"]).To(Equal("dead-letter-exchange"))
Expect(queueInfo.arguments["x-dead-letter-routing-key"]).To(Equal("dead-letter-routing-key"))
Expect(queueInfo.Arguments()["x-dead-letter-exchange"]).To(Equal("dead-letter-exchange"))
Expect(queueInfo.Arguments()["x-dead-letter-routing-key"]).To(Equal("dead-letter-routing-key"))

Copilot uses AI. Check for mistakes.
// the default value for queue type is classic
Expect(queueInfo.Type()).To(Equal(Classic))
Comment on lines +266 to +267
Copy link

Copilot AI Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assertion makes the test environment-dependent: RabbitMQ can be configured per-vhost to default queue type to quorum instead of classic when no x-queue-type is provided. To avoid flakiness, assert that no explicit x-queue-type was set (or only assert the DLX args), rather than hard-coding Classic here.

Suggested change
// the default value for queue type is classic
Expect(queueInfo.Type()).To(Equal(Classic))
// ensure no explicit queue type was set; actual default depends on broker configuration
Expect(queueInfo.arguments).NotTo(HaveKey("x-queue-type"))

Copilot uses AI. Check for mistakes.

err = management.DeleteQueue(context.TODO(), queueName)
Expect(err).To(BeNil())
})
})

func publishMessages(queueName string, count int, args ...string) {
Expand Down
100 changes: 98 additions & 2 deletions pkg/rabbitmqamqp/entities.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@ const (
)

/*
IQueueSpecification represents the specification of a queue
IQueueSpecification represents the specification of a queue.
The implementation of the queue specification can be used to declare a queue with the specified properties and arguments.
The client provides multiple implementations of the queue specification for different use cases,
such as DefaultQueueSpecification, QuorumQueueSpecification, ClassicQueueSpecification, and AutoGeneratedQueueSpecification.
The client implementations are helpers that cover the most common use cases, but you can implement your own queue
specification by implementing the IQueueSpecification interface.
*/
type IQueueSpecification interface {
name() string
Expand Down Expand Up @@ -66,8 +71,99 @@ func (r *ClientLocalLeaderLocator) leaderLocator() string {
return "client-local"
}

// DefaultQueueSpecification represents the specification of the default queue.
// It is used when the queue type is not specified.
// The queue type is determined by the server based on the arguments provided per virtual host configuration.
type DefaultQueueSpecification struct {
Name string
IsAutoDelete bool
IsExclusive bool
AutoExpire int64
MessageTTL int64
OverflowStrategy IOverflowStrategy
SingleActiveConsumer bool
DeadLetterExchange string
DeadLetterRoutingKey string
MaxLength int64
MaxLengthBytes int64
MaxPriority int64
LeaderLocator ILeaderLocator
QuorumInitialGroupSize int
Arguments map[string]any
}

func (q *DefaultQueueSpecification) name() string {
return q.Name
}

func (q *DefaultQueueSpecification) isAutoDelete() bool {
return q.IsAutoDelete
}

func (q *DefaultQueueSpecification) isExclusive() bool {
return q.IsExclusive
}

func (q *DefaultQueueSpecification) queueType() TQueueType {
return ""
}

func (q *DefaultQueueSpecification) buildArguments() map[string]any {
result := q.Arguments
if result == nil {
result = map[string]any{}
}

if q.MaxLengthBytes != 0 {
result["x-max-length-bytes"] = q.MaxLengthBytes
}

if len(q.DeadLetterExchange) != 0 {
result["x-dead-letter-exchange"] = q.DeadLetterExchange
}

if len(q.DeadLetterRoutingKey) != 0 {
result["x-dead-letter-routing-key"] = q.DeadLetterRoutingKey
}

if q.AutoExpire != 0 {
result["x-expires"] = q.AutoExpire
}

if q.MessageTTL != 0 {
result["x-message-ttl"] = q.MessageTTL
}

if q.OverflowStrategy != nil {
result["x-overflow"] = q.OverflowStrategy.overflowStrategy()
}

if q.SingleActiveConsumer {
result["x-single-active-consumer"] = true
}

if q.MaxLength != 0 {
result["x-max-length"] = q.MaxLength
}

if q.MaxPriority != 0 {
result["x-max-priority"] = q.MaxPriority
}

if q.LeaderLocator != nil {
result["x-queue-leader-locator"] = q.LeaderLocator.leaderLocator()
}

if q.QuorumInitialGroupSize != 0 {
result["x-quorum-initial-group-size"] = q.QuorumInitialGroupSize
}

return result

}

/*
QuorumQueueSpecification represents the specification of the quorum queue
QuorumQueueSpecification represents the specification of the quorum queue
*/

type QuorumQueueSpecification struct {
Expand Down