Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ docs/examples/ # Example code demonstrating usage
- `Quorum` - Quorum queue type
- `Classic` - Classic queue type
- `Stream` - Stream queue type
- `Jms` - JMS queue type // for rabbitmq tanzu
Comment thread
Gsantomaggio marked this conversation as resolved.

### Consumer Options

Expand Down
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ All notable changes to this project will be documented in this file.
- [Release 0.7.0](https://github.com/rabbitmq/rabbitmq-amqp-go-client/releases/tag/v0.7.0)

### Added
- Add `docs/examples/jms_queue` example for `JmsQueueSpecification`
- Add `Jms` queue type and `JmsQueueSpecification` for JMS queues
- Add code documentation by @Gsantomaggio in [#86](https://github.com/rabbitmq/rabbitmq-amqp-go-client/pull/86)
- Add OpenTelemetry metrics support with semantic conventions by @Zerpet in [#84](https://github.com/rabbitmq/rabbitmq-amqp-go-client/pull/84)
- Add settings to the stream queues by @Gsantomaggio in [#87](https://github.com/rabbitmq/rabbitmq-amqp-go-client/pull/87)
Expand Down
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,7 @@ You need the following packages to use the rabbitmq amqp client:
- Stop RabbitMQ with `./.ci/ubuntu/gha-setup.sh stop`


### RabbitMQ Tanzu

Features like AMQP 1.0 over WebSocket, JMS support, and more are only available in Tanzu RabbitMQ 4.0 or later. You can get it from [here](https://techdocs.broadcom.com/us/en/vmware-tanzu/data-solutions/tanzu-rabbitmq-oci/4-2/tanzu-rabbitmq-oci-image/site-overview.html).

1 change: 1 addition & 0 deletions docs/examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@


- [Getting Started](getting_started) - A simple example to get you started.
- [JMS queue](jms_queue) - Same flow as getting started, using `JmsQueueSpecification` (Tanzu RabbitMQ 4.x).
- [Reliable](reliable) - An example of how to deal with reconnections and error handling.
- [Streams](streams) - An example of how to use [RabbitMQ Streams](https://www.rabbitmq.com/docs/streams) with AMQP 1.0
- [Stream Filtering](streams_filtering) - An example of how to use streams [Filter Expressions](https://www.rabbitmq.com/blog/2024/12/13/amqp-filter-expressions)
Expand Down
188 changes: 188 additions & 0 deletions docs/examples/jms_queue/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
// RabbitMQ AMQP 1.0 Go Client: https://github.com/rabbitmq/rabbitmq-amqp-go-client
// RabbitMQ AMQP 1.0 documentation: https://www.rabbitmq.com/docs/amqp
// This example mirrors getting_started but declares a JMS queue via JMSQueueSpecification
// (RabbitMQ queue type "jms"). JMS queues are available on Tanzu RabbitMQ 4.x; see:
// https://techdocs.broadcom.com/us/en/vmware-tanzu/data-solutions/tanzu-rabbitmq-oci/4-2/tanzu-rabbitmq-oci-image/site-overview.html
// example path: https://github.com/rabbitmq/rabbitmq-amqp-go-client/tree/main/docs/examples/jms_queue/main.go

package main

import (
"context"
"errors"
"fmt"
"time"

rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp"
)

func main() {
exchangeName := "jms-queue-go-exchange"
queueName := "jms-queue-go-queue"
routingKey := "routing-key"

rmq.Info("JMS queue example with AMQP Go AMQP 1.0 Client")

stateChanged := make(chan *rmq.StateChanged, 1)
go func(ch chan *rmq.StateChanged) {
for statusChanged := range ch {
rmq.Info("[connection]", "Status changed", statusChanged)
}
}(stateChanged)

env := rmq.NewEnvironment("amqp://guest:guest@localhost:5672/", nil)

amqpConnection, err := env.NewConnection(context.Background())
if err != nil {
rmq.Error("Error opening connection", err)
return
}
amqpConnection.NotifyStatusChange(stateChanged)

rmq.Info("AMQP connection opened")
management := amqpConnection.Management()
exchangeInfo, err := management.DeclareExchange(context.TODO(), &rmq.TopicExchangeSpecification{
Name: exchangeName,
})
if err != nil {
rmq.Error("Error declaring exchange", err)
return
}

// Declare a JMS queue (x-queue-type=jms). Optional queue arguments go in Arguments.
queueInfo, err := management.DeclareQueue(context.TODO(), &rmq.JMSQueueSpecification{
Name: queueName,
})

if err != nil {
rmq.Error("Error declaring queue", err)
return
}

bindingPath, err := management.Bind(context.TODO(), &rmq.ExchangeToQueueBindingSpecification{
SourceExchange: exchangeName,
DestinationQueue: queueName,
BindingKey: routingKey,
})

if err != nil {
rmq.Error("Error binding", err)
return
}

consumer, err := amqpConnection.NewConsumer(context.Background(), queueName, nil)
if err != nil {
rmq.Error("Error creating consumer", err)
return
}

consumerContext, cancel := context.WithCancel(context.Background())

go func(ctx context.Context) {
for {
deliveryContext, err := consumer.Receive(ctx)
if errors.Is(err, context.Canceled) {
rmq.Info("[Consumer] Consumer closed", "context", err)
return
}
if err != nil {
rmq.Error("[Consumer] Error receiving message", "error", err)
return
}

rmq.Info("[Consumer] Received message", "message",
fmt.Sprintf("%s", deliveryContext.Message().Data))

err = deliveryContext.Accept(context.Background())
if err != nil {
rmq.Error("[Consumer] Error accepting message", "error", err)
return
}
}
}(consumerContext)

publisher, err := amqpConnection.NewPublisher(context.Background(), &rmq.ExchangeAddress{
Exchange: exchangeName,
Key: routingKey,
}, nil)
if err != nil {
rmq.Error("Error creating publisher", err)
return
}

for i := 0; i < 100; i++ {
publishResult, err := publisher.Publish(context.Background(), rmq.NewMessage([]byte("Hello, World!"+fmt.Sprintf("%d", i))))
if err != nil {
rmq.Error("Error publishing message", "error", err)
time.Sleep(1 * time.Second)
continue
}
switch publishResult.Outcome.(type) {
case *rmq.StateAccepted:
rmq.Info("[Publisher]", "Message accepted", publishResult.Message.Data[0])
case *rmq.StateReleased:
rmq.Warn("[Publisher]", "Message was not routed", publishResult.Message.Data[0])
case *rmq.StateRejected:
rmq.Warn("[Publisher]", "Message rejected", publishResult.Message.Data[0])
stateType := publishResult.Outcome.(*rmq.StateRejected)
if stateType.Error != nil {
rmq.Warn("[Publisher]", "Message rejected with error: %v", stateType.Error)
}
default:
rmq.Warn("Message state: %v", publishResult.Outcome)
}
}

println("press any key to close the connection")

var input string
_, _ = fmt.Scanln(&input)

cancel()
err = consumer.Close(context.Background())
if err != nil {
rmq.Error("[Consumer]", err)
return
}
err = publisher.Close(context.Background())
if err != nil {
rmq.Error("[Publisher]", err)
return
}

err = management.Unbind(context.TODO(), bindingPath)

if err != nil {
rmq.Error("Error unbinding", "error", err)
return
}

err = management.DeleteExchange(context.TODO(), exchangeInfo.Name())
if err != nil {
rmq.Error("Error deleting exchange", "error", err)
return
}

purged, err := management.PurgeQueue(context.TODO(), queueInfo.Name())
if err != nil {
rmq.Error("Error purging queue", "error", err)
return
}
rmq.Info("Purged messages from the queue", "count", purged)

err = management.DeleteQueue(context.TODO(), queueInfo.Name())
if err != nil {
rmq.Error("Error deleting queue", "error", err)
return
}

err = env.CloseConnections(context.Background())
if err != nil {
rmq.Error("Error closing connection", "error", err)
return
}

rmq.Info("AMQP connection closed")
time.Sleep(100 * time.Millisecond)
close(stateChanged)
}
2 changes: 1 addition & 1 deletion docs/examples/web_sockets/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ AMQP 1.0 over WebSocket Example

This example demonstrates how to use AMQP 1.0 over WebSocket. </br>
## RabbitMQ Tanzu
You need [Tanzu RabbitMQ 4.0](https://www.vmware.com/products/app-platform/tanzu-rabbitmq) or later with the AMQP 1.0 and `rabbitmq_web_amqp` plugins enabled.
You need [Tanzu RabbitMQ 4.0](https://techdocs.broadcom.com/us/en/vmware-tanzu/data-solutions/tanzu-rabbitmq-oci/4-2/tanzu-rabbitmq-oci-image/site-overview.html) or later with the AMQP 1.0 and `rabbitmq_web_amqp` plugins enabled.

For more info read the blog post: [AMQP 1.0 over WebSocket](https://www.rabbitmq.com/blog/2025/04/16/amqp-websocket)

Expand Down
5 changes: 3 additions & 2 deletions pkg/rabbitmqamqp/amqp_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,14 +408,15 @@ func dialWithMetrics(ctx context.Context, address string, connOptions *AmqpConnO
return nil, err
}

fa := newFeaturesAvailable()
// create the connection
conn := &AmqpConnection{
management: newAmqpManagement(connOptions.TopologyRecoveryOptions),
management: newAmqpManagement(connOptions.TopologyRecoveryOptions, fa),
lifeCycle: NewLifeCycle(),
amqpConnOptions: connOptions,
entitiesTracker: newEntitiesTracker(),
topologyRecoveryRecords: newTopologyRecoveryRecords(),
featuresAvailable: newFeaturesAvailable(),
featuresAvailable: fa,
metricsCollector: metricsCollector,
serverAddress: uri.Host,
serverPort: uri.Port,
Expand Down
5 changes: 5 additions & 0 deletions pkg/rabbitmqamqp/amqp_connection_recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ func (q *queueRecoveryRecord) toIQueueSpecification() IQueueSpecification {
Name: q.queueName,
Arguments: q.arguments,
}
case Jms:
return &JMSQueueSpecification{
Name: q.queueName,
Arguments: q.arguments,
}
default:
return &DefaultQueueSpecification{
Name: q.queueName,
Expand Down
2 changes: 2 additions & 0 deletions pkg/rabbitmqamqp/amqp_connection_recovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,9 +224,11 @@ var _ = Describe("Recovery connection test", func() {
Entry("Quorum queue", Quorum, false, false, map[string]any{}),
Entry("Classic queue", Classic, true, true, map[string]any{}),
Entry("Stream queue", Stream, false, false, map[string]any{}),
//Entry("JMS queue", Jms, false, false, map[string]any{}),
Entry("Quorum queue with arguments", Quorum, false, false, map[string]any{"x-max-length-bytes": 1000}),
Entry("Classic queue with arguments", Classic, true, true, map[string]any{"x-max-length-bytes": 1000}),
Entry("Stream queue with arguments", Stream, false, false, map[string]any{"x-max-length-bytes": 1000}),
//Entry("JMS queue with arguments", Jms, false, false, map[string]any{"x-max-length-bytes": 1000}),
Comment on lines +227 to +231
Copy link

Copilot AI Mar 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The JMS queue entries in this table test are commented out, which leaves the new Jms recovery-path (queueRecoveryRecord.toIQueueSpecification) untested. If the test is expected to pass for JMS as it does for other queue types, please re-enable these entries (or replace with an explicit Skip/conditional) so regressions are caught.

Suggested change
//Entry("JMS queue", Jms, false, false, map[string]any{}),
Entry("Quorum queue with arguments", Quorum, false, false, map[string]any{"x-max-length-bytes": 1000}),
Entry("Classic queue with arguments", Classic, true, true, map[string]any{"x-max-length-bytes": 1000}),
Entry("Stream queue with arguments", Stream, false, false, map[string]any{"x-max-length-bytes": 1000}),
//Entry("JMS queue with arguments", Jms, false, false, map[string]any{"x-max-length-bytes": 1000}),
Entry("JMS queue", Jms, false, false, map[string]any{}),
Entry("Quorum queue with arguments", Quorum, false, false, map[string]any{"x-max-length-bytes": 1000}),
Entry("Classic queue with arguments", Classic, true, true, map[string]any{"x-max-length-bytes": 1000}),
Entry("Stream queue with arguments", Stream, false, false, map[string]any{"x-max-length-bytes": 1000}),
Entry("JMS queue with arguments", Jms, false, false, map[string]any{"x-max-length-bytes": 1000}),

Copilot uses AI. Check for mistakes.
)

DescribeTable("Exchange record returns the expected exchange specification",
Expand Down
11 changes: 10 additions & 1 deletion pkg/rabbitmqamqp/amqp_management.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,15 @@ type AmqpManagement struct {
// since recovery happens in a separate goroutine while public API methods
// can be called from user goroutines.
isRecovering atomic.Bool

featuresAvailable *featuresAvailable
}

func newAmqpManagement(topologyRecovery TopologyRecoveryOptions) *AmqpManagement {
func newAmqpManagement(topologyRecovery TopologyRecoveryOptions, featuresAvailable *featuresAvailable) *AmqpManagement {
return &AmqpManagement{
lifeCycle: NewLifeCycle(),
topologyRecoveryOptions: topologyRecovery,
featuresAvailable: featuresAvailable,
}
}

Expand Down Expand Up @@ -190,6 +193,12 @@ func (a *AmqpManagement) DeclareQueue(ctx context.Context, specification IQueueS
return nil, fmt.Errorf("queue specification cannot be nil. You need to provide a valid IQueueSpecification")
}

err := specification.validate(a.featuresAvailable)

if err != nil {
return nil, err
}

amqpQueue := newAmqpQueue(a, specification.name())
amqpQueue.AutoDelete(specification.isAutoDelete())
// TODO: config tweak to record only exclusive queues
Expand Down
2 changes: 1 addition & 1 deletion pkg/rabbitmqamqp/amqp_management_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ var _ = Describe("Management tests", func() {
var management *AmqpManagement

BeforeEach(func() {
management = newAmqpManagement(TopologyRecoveryAllEnabled)
management = newAmqpManagement(TopologyRecoveryAllEnabled, newFeaturesAvailable())
management.topologyRecoveryRecords = newTopologyRecoveryRecords()
})

Expand Down
10 changes: 10 additions & 0 deletions pkg/rabbitmqamqp/amqp_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,16 @@ var _ = Describe("AMQP Queue test ", func() {
Expect(result).To(BeNil())
})

It("should fail if declare a JMS queue in the open source RabbitMQ", func() {
queueName := generateName("should fail if declare a JMS queue in the open source RabbitMQ")
_, err := management.DeclareQueue(context.TODO(), &JMSQueueSpecification{
Name: queueName,
})
Expect(err).NotTo(BeNil())
Expect(err.Error()).To(ContainSubstring("JMSQueueSpecification is only supported on Tanzu RabbitMQ 4.3 or later"))

})

// default
It("AMQP Declare Queue with DefaultQueueSpecification should succeed", func() {
queueName := generateName("AMQP Declare Queue with DefaultQueueSpecification should succeed")
Expand Down
Loading
Loading