Skip to content

Commit e102278

Browse files
committed
Add default queue implementation
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
1 parent 3f5cc3a commit e102278

3 files changed

Lines changed: 130 additions & 3 deletions

File tree

pkg/rabbitmqamqp/amqp_connection_recovery.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,13 @@ func (q *queueRecoveryRecord) toIQueueSpecification() IQueueSpecification {
119119
Name: q.queueName,
120120
Arguments: q.arguments,
121121
}
122+
default:
123+
return &DefaultQueueSpecification{
124+
Name: q.queueName,
125+
IsAutoDelete: *q.autoDelete,
126+
IsExclusive: *q.exclusive,
127+
Arguments: q.arguments,
128+
}
122129
}
123130
return nil
124131
}

pkg/rabbitmqamqp/amqp_queue_test.go

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,10 @@ package rabbitmqamqp
22

33
import (
44
"context"
5+
"strconv"
6+
57
. "github.com/onsi/ginkgo/v2"
68
. "github.com/onsi/gomega"
7-
"strconv"
89
)
910

1011
var _ = Describe("AMQP Queue test ", func() {
@@ -245,6 +246,29 @@ var _ = Describe("AMQP Queue test ", func() {
245246
Expect(err).To(Equal(ErrDoesNotExist))
246247
Expect(result).To(BeNil())
247248
})
249+
250+
// default
251+
It("AMQP Declare Queue with DefaultQueueSpecification should succeed", func() {
252+
queueName := generateName("AMQP Declare Queue with DefaultQueueSpecification should succeed")
253+
queueInfo, err := management.DeclareQueue(context.TODO(), &DefaultQueueSpecification{
254+
Name: queueName,
255+
DeadLetterExchange: "dead-letter-exchange",
256+
DeadLetterRoutingKey: "dead-letter-routing-key",
257+
})
258+
Expect(err).To(BeNil())
259+
Expect(queueInfo).NotTo(BeNil())
260+
Expect(queueInfo.Name()).To(Equal(queueName))
261+
Expect(queueInfo.IsDurable()).To(BeTrue())
262+
Expect(queueInfo.IsAutoDelete()).To(BeFalse())
263+
Expect(queueInfo.IsExclusive()).To(BeFalse())
264+
Expect(queueInfo.arguments["x-dead-letter-exchange"]).To(Equal("dead-letter-exchange"))
265+
Expect(queueInfo.arguments["x-dead-letter-routing-key"]).To(Equal("dead-letter-routing-key"))
266+
// the default value for queue type is classic
267+
Expect(queueInfo.Type()).To(Equal(Classic))
268+
269+
err = management.DeleteQueue(context.TODO(), queueName)
270+
Expect(err).To(BeNil())
271+
})
248272
})
249273

250274
func publishMessages(queueName string, count int, args ...string) {

pkg/rabbitmqamqp/entities.go

Lines changed: 98 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,12 @@ const (
1313
)
1414

1515
/*
16-
IQueueSpecification represents the specification of a queue
16+
IQueueSpecification represents the specification of a queue.
17+
The implementation of the queue specification can be used to declare a queue with the specified properties and arguments.
18+
The client provides multiple implementations of the queue specification for different use cases,
19+
such as DefaultQueueSpecification, QuorumQueueSpecification, ClassicQueueSpecification, and AutoGeneratedQueueSpecification.
20+
Yhe client implementations are helpers that cover the most common use cases, but you can implement your own queue
21+
specification by implementing the IQueueSpecification interface.
1722
*/
1823
type IQueueSpecification interface {
1924
name() string
@@ -66,8 +71,99 @@ func (r *ClientLocalLeaderLocator) leaderLocator() string {
6671
return "client-local"
6772
}
6873

74+
// DefaultQueueSpecification represents the specification of the default queue.
75+
// It is used when the queue type is not specified.
76+
// The queue type is determined by the server based on the arguments provided per virtual host configuration.
77+
type DefaultQueueSpecification struct {
78+
Name string
79+
IsAutoDelete bool
80+
IsExclusive bool
81+
AutoExpire int64
82+
MessageTTL int64
83+
OverflowStrategy IOverflowStrategy
84+
SingleActiveConsumer bool
85+
DeadLetterExchange string
86+
DeadLetterRoutingKey string
87+
MaxLength int64
88+
MaxLengthBytes int64
89+
MaxPriority int64
90+
LeaderLocator ILeaderLocator
91+
QuorumInitialGroupSize int
92+
Arguments map[string]any
93+
}
94+
95+
func (q *DefaultQueueSpecification) name() string {
96+
return q.Name
97+
}
98+
99+
func (q *DefaultQueueSpecification) isAutoDelete() bool {
100+
return q.IsAutoDelete
101+
}
102+
103+
func (q *DefaultQueueSpecification) isExclusive() bool {
104+
return q.IsExclusive
105+
}
106+
107+
func (q *DefaultQueueSpecification) queueType() TQueueType {
108+
return ""
109+
}
110+
111+
func (q *DefaultQueueSpecification) buildArguments() map[string]any {
112+
result := q.Arguments
113+
if result == nil {
114+
result = map[string]any{}
115+
}
116+
117+
if q.MaxLengthBytes != 0 {
118+
result["x-max-length-bytes"] = q.MaxLengthBytes
119+
}
120+
121+
if len(q.DeadLetterExchange) != 0 {
122+
result["x-dead-letter-exchange"] = q.DeadLetterExchange
123+
}
124+
125+
if len(q.DeadLetterRoutingKey) != 0 {
126+
result["x-dead-letter-routing-key"] = q.DeadLetterRoutingKey
127+
}
128+
129+
if q.AutoExpire != 0 {
130+
result["x-expires"] = q.AutoExpire
131+
}
132+
133+
if q.MessageTTL != 0 {
134+
result["x-message-ttl"] = q.MessageTTL
135+
}
136+
137+
if q.OverflowStrategy != nil {
138+
result["x-overflow"] = q.OverflowStrategy.overflowStrategy()
139+
}
140+
141+
if q.SingleActiveConsumer {
142+
result["x-single-active-consumer"] = true
143+
}
144+
145+
if q.MaxLength != 0 {
146+
result["x-max-length"] = q.MaxLength
147+
}
148+
149+
if q.MaxPriority != 0 {
150+
result["x-max-priority"] = q.MaxPriority
151+
}
152+
153+
if q.LeaderLocator != nil {
154+
result["x-queue-leader-locator"] = q.LeaderLocator.leaderLocator()
155+
}
156+
157+
if q.QuorumInitialGroupSize != 0 {
158+
result["x-quorum-initial-group-size"] = q.QuorumInitialGroupSize
159+
}
160+
161+
return result
162+
163+
}
164+
69165
/*
70-
QuorumQueueSpecification represents the specification of the quorum queue
166+
QuorumQueueSpecification represents the specification of the quorum queue
71167
*/
72168

73169
type QuorumQueueSpecification struct {

0 commit comments

Comments
 (0)