Skip to content

Commit 27e00a7

Browse files
committed
Add support for JMS queue
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
1 parent 4557cb2 commit 27e00a7

14 files changed

Lines changed: 350 additions & 6 deletions

AGENTS.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ docs/examples/ # Example code demonstrating usage
5858
- `Quorum` - Quorum queue type
5959
- `Classic` - Classic queue type
6060
- `Stream` - Stream queue type
61+
- `Jms` - JMS queue type // for rabbitmq tanzu
6162

6263
### Consumer Options
6364

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ All notable changes to this project will be documented in this file.
88
- [Release 0.7.0](https://github.com/rabbitmq/rabbitmq-amqp-go-client/releases/tag/v0.7.0)
99

1010
### Added
11+
- Add `docs/examples/jms_queue` example for `JmsQueueSpecification`
12+
- Add `Jms` queue type and `JmsQueueSpecification` for JMS queues
1113
- Add code documentation by @Gsantomaggio in [#86](https://github.com/rabbitmq/rabbitmq-amqp-go-client/pull/86)
1214
- Add OpenTelemetry metrics support with semantic conventions by @Zerpet in [#84](https://github.com/rabbitmq/rabbitmq-amqp-go-client/pull/84)
1315
- Add settings to the stream queues by @Gsantomaggio in [#87](https://github.com/rabbitmq/rabbitmq-amqp-go-client/pull/87)

README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,3 +35,7 @@ You need the following packages to use the rabbitmq amqp client:
3535
- Stop RabbitMQ with `./.ci/ubuntu/gha-setup.sh stop`
3636

3737

38+
### RabbitMQ Tanzu
39+
40+
Features like AMQP 1.0 over WebSocket, JMS support, and more are only available in Tanzu RabbitMQ 4.0 or later. You can get it from [here](https://techdocs.broadcom.com/us/en/vmware-tanzu/data-solutions/tanzu-rabbitmq-oci/4-2/tanzu-rabbitmq-oci-image/site-overview.html).
41+

docs/examples/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33

44
- [Getting Started](getting_started) - A simple example to get you started.
5+
- [JMS queue](jms_queue) - Same flow as getting started, using `JmsQueueSpecification` (Tanzu RabbitMQ 4.x).
56
- [Reliable](reliable) - An example of how to deal with reconnections and error handling.
67
- [Streams](streams) - An example of how to use [RabbitMQ Streams](https://www.rabbitmq.com/docs/streams) with AMQP 1.0
78
- [Stream Filtering](streams_filtering) - An example of how to use streams [Filter Expressions](https://www.rabbitmq.com/blog/2024/12/13/amqp-filter-expressions)

docs/examples/jms_queue/main.go

Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
// RabbitMQ AMQP 1.0 Go Client: https://github.com/rabbitmq/rabbitmq-amqp-go-client
2+
// RabbitMQ AMQP 1.0 documentation: https://www.rabbitmq.com/docs/amqp
3+
// This example mirrors getting_started but declares a JMS queue via JmsQueueSpecification
4+
// (RabbitMQ queue type "jms"). JMS queues are available on Tanzu RabbitMQ 4.x; see:
5+
// https://techdocs.broadcom.com/us/en/vmware-tanzu/data-solutions/tanzu-rabbitmq-oci/4-2/tanzu-rabbitmq-oci-image/site-overview.html
6+
// example path: https://github.com/rabbitmq/rabbitmq-amqp-go-client/tree/main/docs/examples/jms_queue/main.go
7+
8+
package main
9+
10+
import (
11+
"context"
12+
"errors"
13+
"fmt"
14+
"time"
15+
16+
rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp"
17+
)
18+
19+
func main() {
20+
exchangeName := "jms-queue-go-exchange"
21+
queueName := "jms-queue-go-queue"
22+
routingKey := "routing-key"
23+
24+
rmq.Info("JMS queue example with AMQP Go AMQP 1.0 Client")
25+
26+
stateChanged := make(chan *rmq.StateChanged, 1)
27+
go func(ch chan *rmq.StateChanged) {
28+
for statusChanged := range ch {
29+
rmq.Info("[connection]", "Status changed", statusChanged)
30+
}
31+
}(stateChanged)
32+
33+
env := rmq.NewEnvironment("amqp://guest:guest@localhost:5672/", nil)
34+
35+
amqpConnection, err := env.NewConnection(context.Background())
36+
if err != nil {
37+
rmq.Error("Error opening connection", err)
38+
return
39+
}
40+
amqpConnection.NotifyStatusChange(stateChanged)
41+
42+
rmq.Info("AMQP connection opened")
43+
management := amqpConnection.Management()
44+
exchangeInfo, err := management.DeclareExchange(context.TODO(), &rmq.TopicExchangeSpecification{
45+
Name: exchangeName,
46+
})
47+
if err != nil {
48+
rmq.Error("Error declaring exchange", err)
49+
return
50+
}
51+
52+
// Declare a JMS queue (x-queue-type=jms). Optional queue arguments go in Arguments.
53+
queueInfo, err := management.DeclareQueue(context.TODO(), &rmq.JmsQueueSpecification{
54+
Name: queueName,
55+
})
56+
57+
if err != nil {
58+
rmq.Error("Error declaring queue", err)
59+
return
60+
}
61+
62+
bindingPath, err := management.Bind(context.TODO(), &rmq.ExchangeToQueueBindingSpecification{
63+
SourceExchange: exchangeName,
64+
DestinationQueue: queueName,
65+
BindingKey: routingKey,
66+
})
67+
68+
if err != nil {
69+
rmq.Error("Error binding", err)
70+
return
71+
}
72+
73+
consumer, err := amqpConnection.NewConsumer(context.Background(), queueName, nil)
74+
if err != nil {
75+
rmq.Error("Error creating consumer", err)
76+
return
77+
}
78+
79+
consumerContext, cancel := context.WithCancel(context.Background())
80+
81+
go func(ctx context.Context) {
82+
for {
83+
deliveryContext, err := consumer.Receive(ctx)
84+
if errors.Is(err, context.Canceled) {
85+
rmq.Info("[Consumer] Consumer closed", "context", err)
86+
return
87+
}
88+
if err != nil {
89+
rmq.Error("[Consumer] Error receiving message", "error", err)
90+
return
91+
}
92+
93+
rmq.Info("[Consumer] Received message", "message",
94+
fmt.Sprintf("%s", deliveryContext.Message().Data))
95+
96+
err = deliveryContext.Accept(context.Background())
97+
if err != nil {
98+
rmq.Error("[Consumer] Error accepting message", "error", err)
99+
return
100+
}
101+
}
102+
}(consumerContext)
103+
104+
publisher, err := amqpConnection.NewPublisher(context.Background(), &rmq.ExchangeAddress{
105+
Exchange: exchangeName,
106+
Key: routingKey,
107+
}, nil)
108+
if err != nil {
109+
rmq.Error("Error creating publisher", err)
110+
return
111+
}
112+
113+
for i := 0; i < 100; i++ {
114+
publishResult, err := publisher.Publish(context.Background(), rmq.NewMessage([]byte("Hello, World!"+fmt.Sprintf("%d", i))))
115+
if err != nil {
116+
rmq.Error("Error publishing message", "error", err)
117+
time.Sleep(1 * time.Second)
118+
continue
119+
}
120+
switch publishResult.Outcome.(type) {
121+
case *rmq.StateAccepted:
122+
rmq.Info("[Publisher]", "Message accepted", publishResult.Message.Data[0])
123+
case *rmq.StateReleased:
124+
rmq.Warn("[Publisher]", "Message was not routed", publishResult.Message.Data[0])
125+
case *rmq.StateRejected:
126+
rmq.Warn("[Publisher]", "Message rejected", publishResult.Message.Data[0])
127+
stateType := publishResult.Outcome.(*rmq.StateRejected)
128+
if stateType.Error != nil {
129+
rmq.Warn("[Publisher]", "Message rejected with error: %v", stateType.Error)
130+
}
131+
default:
132+
rmq.Warn("Message state: %v", publishResult.Outcome)
133+
}
134+
}
135+
136+
println("press any key to close the connection")
137+
138+
var input string
139+
_, _ = fmt.Scanln(&input)
140+
141+
cancel()
142+
err = consumer.Close(context.Background())
143+
if err != nil {
144+
rmq.Error("[Consumer]", err)
145+
return
146+
}
147+
err = publisher.Close(context.Background())
148+
if err != nil {
149+
rmq.Error("[Publisher]", err)
150+
return
151+
}
152+
153+
err = management.Unbind(context.TODO(), bindingPath)
154+
155+
if err != nil {
156+
rmq.Error("Error unbinding", "error", err)
157+
return
158+
}
159+
160+
err = management.DeleteExchange(context.TODO(), exchangeInfo.Name())
161+
if err != nil {
162+
rmq.Error("Error deleting exchange", "error", err)
163+
return
164+
}
165+
166+
purged, err := management.PurgeQueue(context.TODO(), queueInfo.Name())
167+
if err != nil {
168+
rmq.Error("Error purging queue", "error", err)
169+
return
170+
}
171+
rmq.Info("Purged messages from the queue", "count", purged)
172+
173+
err = management.DeleteQueue(context.TODO(), queueInfo.Name())
174+
if err != nil {
175+
rmq.Error("Error deleting queue", "error", err)
176+
return
177+
}
178+
179+
err = env.CloseConnections(context.Background())
180+
if err != nil {
181+
rmq.Error("Error closing connection", "error", err)
182+
return
183+
}
184+
185+
rmq.Info("AMQP connection closed")
186+
time.Sleep(100 * time.Millisecond)
187+
close(stateChanged)
188+
}

docs/examples/web_sockets/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ AMQP 1.0 over WebSocket Example
33

44
This example demonstrates how to use AMQP 1.0 over WebSocket. </br>
55
## RabbitMQ Tanzu
6-
You need [Tanzu RabbitMQ 4.0](https://www.vmware.com/products/app-platform/tanzu-rabbitmq) or later with the AMQP 1.0 and `rabbitmq_web_amqp` plugins enabled.
6+
You need [Tanzu RabbitMQ 4.0](https://techdocs.broadcom.com/us/en/vmware-tanzu/data-solutions/tanzu-rabbitmq-oci/4-2/tanzu-rabbitmq-oci-image/site-overview.html) or later with the AMQP 1.0 and `rabbitmq_web_amqp` plugins enabled.
77

88
For more info read the blog post: [AMQP 1.0 over WebSocket](https://www.rabbitmq.com/blog/2025/04/16/amqp-websocket)
99

pkg/rabbitmqamqp/amqp_connection.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -408,14 +408,15 @@ func dialWithMetrics(ctx context.Context, address string, connOptions *AmqpConnO
408408
return nil, err
409409
}
410410

411+
fa := newFeaturesAvailable()
411412
// create the connection
412413
conn := &AmqpConnection{
413-
management: newAmqpManagement(connOptions.TopologyRecoveryOptions),
414+
management: newAmqpManagement(connOptions.TopologyRecoveryOptions, fa),
414415
lifeCycle: NewLifeCycle(),
415416
amqpConnOptions: connOptions,
416417
entitiesTracker: newEntitiesTracker(),
417418
topologyRecoveryRecords: newTopologyRecoveryRecords(),
418-
featuresAvailable: newFeaturesAvailable(),
419+
featuresAvailable: fa,
419420
metricsCollector: metricsCollector,
420421
serverAddress: uri.Host,
421422
serverPort: uri.Port,

pkg/rabbitmqamqp/amqp_connection_recovery.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,11 @@ func (q *queueRecoveryRecord) toIQueueSpecification() IQueueSpecification {
119119
Name: q.queueName,
120120
Arguments: q.arguments,
121121
}
122+
case Jms:
123+
return &JmsQueueSpecification{
124+
Name: q.queueName,
125+
Arguments: q.arguments,
126+
}
122127
default:
123128
return &DefaultQueueSpecification{
124129
Name: q.queueName,

pkg/rabbitmqamqp/amqp_connection_recovery_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,9 +224,11 @@ var _ = Describe("Recovery connection test", func() {
224224
Entry("Quorum queue", Quorum, false, false, map[string]any{}),
225225
Entry("Classic queue", Classic, true, true, map[string]any{}),
226226
Entry("Stream queue", Stream, false, false, map[string]any{}),
227+
//Entry("JMS queue", Jms, false, false, map[string]any{}),
227228
Entry("Quorum queue with arguments", Quorum, false, false, map[string]any{"x-max-length-bytes": 1000}),
228229
Entry("Classic queue with arguments", Classic, true, true, map[string]any{"x-max-length-bytes": 1000}),
229230
Entry("Stream queue with arguments", Stream, false, false, map[string]any{"x-max-length-bytes": 1000}),
231+
//Entry("JMS queue with arguments", Jms, false, false, map[string]any{"x-max-length-bytes": 1000}),
230232
)
231233

232234
DescribeTable("Exchange record returns the expected exchange specification",

pkg/rabbitmqamqp/amqp_management.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,15 @@ type AmqpManagement struct {
3434
// since recovery happens in a separate goroutine while public API methods
3535
// can be called from user goroutines.
3636
isRecovering atomic.Bool
37+
38+
featuresAvailable *featuresAvailable
3739
}
3840

39-
func newAmqpManagement(topologyRecovery TopologyRecoveryOptions) *AmqpManagement {
41+
func newAmqpManagement(topologyRecovery TopologyRecoveryOptions, featuresAvailable *featuresAvailable) *AmqpManagement {
4042
return &AmqpManagement{
4143
lifeCycle: NewLifeCycle(),
4244
topologyRecoveryOptions: topologyRecovery,
45+
featuresAvailable: featuresAvailable,
4346
}
4447
}
4548

@@ -190,6 +193,12 @@ func (a *AmqpManagement) DeclareQueue(ctx context.Context, specification IQueueS
190193
return nil, fmt.Errorf("queue specification cannot be nil. You need to provide a valid IQueueSpecification")
191194
}
192195

196+
err := specification.validate(a.featuresAvailable)
197+
198+
if err != nil {
199+
return nil, err
200+
}
201+
193202
amqpQueue := newAmqpQueue(a, specification.name())
194203
amqpQueue.AutoDelete(specification.isAutoDelete())
195204
// TODO: config tweak to record only exclusive queues

0 commit comments

Comments
 (0)