Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,10 @@ docs/examples/ # Example code demonstrating usage
- `initialCredits()` - Initial credits (defaults to 256)
- `linkFilters()` - Link filters for stream consumers
- `id()` - Consumer ID
Comment thread
Gsantomaggio marked this conversation as resolved.
- `isDirectReplyToEnable()` - Enable direct reply-to for RPC
- `validate()` - Validates the configured consumer options
- `isDirectReplyToEnable()` - Indicates whether Direct Reply-To is enabled for this consumer
- `preSettled()` - Indicates whether the consumer operates in pre-settled mode
- Settle strategy is configured via `ConsumerOptions.SettleStrategy` or `RequesterOptions.SettleStrategy` using the `ConsumerSettleStrategy` enum: `ExplicitSettle`, `DirectReplyTo`, `PreSettled`

### Queue Specifications

Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ All notable changes to this project will be documented in this file.
## 0.5.0 - 2026-01-22
- [Release 0.5.0](https://github.com/rabbitmq/rabbitmq-amqp-go-client/releases/tag/v0.5.0)

### Breaking changes
- **RequesterOptions**: Replaced `DirectReplyTo bool` with `SettleStrategy ConsumerSettleStrategy`. Use `SettleStrategy: rabbitmqamqp.DirectReplyTo` for direct-reply-to, or leave zero value for default (dedicated reply queue). Aligns consumer/requester configuration with other AMQP 1.0 clients.
- **ConsumerOptions / amqp_types**: Renamed `ConsumerFeature` to `ConsumerSettleStrategy`, `DefaultSettle` to `ExplicitSettle`, and `Feature` field to `SettleStrategy`. Aligns with the unified settle strategy API used in other AMQP 1.0 clients.

### Added
- Add WebSocket transport support for AMQP 1.0 connections by @vedanthnyk25 in [#78](https://github.com/rabbitmq/rabbitmq-amqp-go-client/pull/78)
- Add Sec-WebSocket-Protocol to the HTTP header by @Gsantomaggio in [#79](https://github.com/rabbitmq/rabbitmq-amqp-go-client/pull/79)
Expand Down
2 changes: 1 addition & 1 deletion docs/examples/pre_settled/pre_settled.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func main() {
// create consumer with pre-settled mode enabled

consumer, err := amqpConnection.NewConsumer(context.TODO(), "pre-settled-queue", &rmq.ConsumerOptions{
Feature: rmq.PreSettled,
SettleStrategy: rmq.PreSettled,
})
if err != nil {
rmq.Error("Error creating consumer", err)
Expand Down
18 changes: 15 additions & 3 deletions docs/examples/rpc_echo_server/main.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
// RabbitMQ AMQP 1.0 Go Client: https://github.com/rabbitmq/rabbitmq-amqp-go-client
// RabbitMQ AMQP 1.0 documentation: https://www.rabbitmq.com/docs/amqp
// The example is demonstrating how to implement a simple RPC echo server and client using RabbitMQ AMQP 1.0 Go Client.
// It uses DirectReplyTo for the client to receive responses without needing to declare a reply queue.
// DirectReplyTo is the recommended way to receive replies for RPC clients.
// The server listens for messages on a request queue and responds with the same message (echo).
// The client sends messages to the request queue and waits for the echoed response.
// The example also includes graceful shutdown handling when the user presses Ctrl+C.
// Example path:https://github.com/rabbitmq/rabbitmq-amqp-go-client/tree/main/docs/examples/rpc_echo_server/main.go

package main

import (
Expand Down Expand Up @@ -66,9 +76,11 @@ func main() {

requester, err := clientConn.NewRequester(context.TODO(), &rabbitmqamqp.RequesterOptions{
RequestQueueName: requestQueue,
// Enable Direct Reply To feature
// see: https://www.rabbitmq.com/direct-reply-to.html
DirectReplyTo: true,
// Use DirectReplyTo so replies are received via RabbitMQ direct-reply-to (no reply queue declared).
// See: https://www.rabbitmq.com/docs/direct-reply-to#overview
// That's the recommended way to receive replies for RPC clients,
// as it avoids the overhead of declaring and consuming from a reply queue.
SettleStrategy: rabbitmqamqp.DirectReplyTo,
Comment on lines +79 to +83
Copy link

Copilot AI Feb 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This example hard-codes DirectReplyTo, but direct-reply-to is only supported on RabbitMQ 4.2+ (the client enforces this in consumer validation). It would help to mention the minimum RabbitMQ version (or show a fallback to ExplicitSettle) so users on 4.0/4.1 don't hit a confusing runtime error.

Copilot uses AI. Check for mistakes.
})
if err != nil {
panic(err)
Expand Down
8 changes: 2 additions & 6 deletions pkg/rabbitmqamqp/amqp_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ func (a *AmqpConnection) NewRequester(ctx context.Context, options *RequesterOpt

replyQueueName := options.ReplyToQueueName
queueName := ""
if !options.DirectReplyTo {
if options.SettleStrategy != DirectReplyTo {

Comment on lines +279 to 280
Copy link

Copilot AI Feb 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RequesterOptions.SettleStrategy is treated as a boolean toggle here: any value other than DirectReplyTo will declare a reply queue. Since ConsumerSettleStrategy also includes PreSettled (and could receive unknown values), this should validate and reject unsupported strategies (or explicitly handle them) to avoid silent misconfiguration.

Suggested change
if options.SettleStrategy != DirectReplyTo {
switch options.SettleStrategy {
case DirectReplyTo:
// No reply queue is declared when using DirectReplyTo.
case PreSettled:
// PreSettled is not a valid strategy for a requester that expects replies.
return nil, fmt.Errorf("unsupported settle strategy for requester: %v", options.SettleStrategy)
default:

Copilot uses AI. Check for mistakes.
if len(replyQueueName) == 0 {
replyQueueName = generateNameWithDefaultPrefix()
Expand Down Expand Up @@ -333,13 +333,9 @@ func (a *AmqpConnection) NewRequester(ctx context.Context, options *RequesterOpt
done: make(chan struct{}),
}

feature := DefaultSettle
if options.DirectReplyTo {
feature = DirectReplyTo
}
// Create consumer for receiving replies
consumer, err := a.NewConsumer(ctx, queueName, &ConsumerOptions{
Feature: feature,
SettleStrategy: options.SettleStrategy,
})
if err != nil {
_ = publisher.Close(ctx) // cleanup publisher on failure
Expand Down
6 changes: 3 additions & 3 deletions pkg/rabbitmqamqp/amqp_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ var _ = Describe("NewConsumer tests", func() {
Expect(err).To(BeNil())
Expect(queue).NotTo(BeNil())
publishMessages(qName, 10)
consumer, err := connection.NewConsumer(context.Background(), qName, &ConsumerOptions{Feature: DefaultSettle})
consumer, err := connection.NewConsumer(context.Background(), qName, &ConsumerOptions{SettleStrategy: ExplicitSettle})
Expect(err).To(BeNil())
Expect(consumer).NotTo(BeNil())
Expect(consumer).To(BeAssignableToTypeOf(&Consumer{}))
Expand Down Expand Up @@ -247,7 +247,7 @@ var _ = Describe("Consumer direct reply to", func() {
Expect(err).To(BeNil())

consumer, err := connection.NewConsumer(context.Background(), "", &ConsumerOptions{
Feature: DirectReplyTo,
SettleStrategy: DirectReplyTo,
})
Expect(err).To(BeNil())
Expect(consumer).NotTo(BeNil())
Expand Down Expand Up @@ -331,7 +331,7 @@ var _ = Describe("Consumer pre-settled", func() {
// Create consumer with pre-settled enabled
consumer, err := connection.NewConsumer(context.Background(), qName, &ConsumerOptions{
InitialCredits: initialCredits,
Feature: PreSettled,
SettleStrategy: PreSettled,
})
Expect(err).To(BeNil())
Expect(consumer).NotTo(BeNil())
Expand Down
2 changes: 1 addition & 1 deletion pkg/rabbitmqamqp/amqp_exchange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ var _ = Describe("AMQP Exchange test ", func() {
})

It("AMQP Exchange Declare with Default and Delete should succeed", func() {
const exchangeName = "AMQP Exchange Declare and Delete with DefaultSettle should succeed"
const exchangeName = "AMQP Exchange Declare and Delete with ExplicitSettle should succeed"
exchangeInfo, err := management.DeclareExchange(context.TODO(), &DirectExchangeSpecification{
Name: exchangeName,
})
Expand Down
26 changes: 14 additions & 12 deletions pkg/rabbitmqamqp/amqp_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,15 +111,17 @@ func (mo *managementOptions) preSettled() bool {
return false
}

type ConsumerFeature byte
// ConsumerSettleStrategy configures how the consumer receives and settles messages.
// Aligns with the settle strategy concept used across AMQP 1.0 clients.
type ConsumerSettleStrategy byte

const (
// DefaultSettle means that the consumer will be created with the default settings.
// message settle mode will be the default one (explicit settle) via IDeliveryContext
DefaultSettle ConsumerFeature = iota
// ExplicitSettle means that the consumer will be created with the default settings.
// Message settle mode will be explicit via IDeliveryContext (accept, discard, requeue).
ExplicitSettle ConsumerSettleStrategy = iota
// DirectReplyTo means that the consumer will be created with the direct reply to feature enabled.
// see https://www.rabbitmq.com/docs/direct-reply-to#overview message settle mode will be auto-settled
//for direct reply to consumers.
// See https://www.rabbitmq.com/docs/direct-reply-to#overview. Message settle mode will be auto-settled
// for direct reply to consumers.
DirectReplyTo
// PreSettled means that the consumer will be created with the pre-settled delivery mode.
// The server settles the deliveries as soon as they are sent to the consumer,
Expand All @@ -136,9 +138,9 @@ type ConsumerOptions struct {
// The id of the consumer
Id string

// Feature represents the feature that should be enabled for the consumer.
// see ConsumerFeature for more details.
Feature ConsumerFeature
// SettleStrategy configures how messages are received and settled.
// See ConsumerSettleStrategy for more details.
SettleStrategy ConsumerSettleStrategy
}

func (aco *ConsumerOptions) linkName() string {
Expand All @@ -159,19 +161,19 @@ func (aco *ConsumerOptions) id() string {

func (aco *ConsumerOptions) validate(available *featuresAvailable) error {
// direct reply to is supported since RabbitMQ 4.2.0
if aco.Feature == DirectReplyTo && !available.is42rMore {
if aco.SettleStrategy == DirectReplyTo && !available.is42rMore {
return fmt.Errorf("direct reply to feature is not supported. You need RabbitMQ 4.2 or later")
}

return nil
}

func (aco *ConsumerOptions) isDirectReplyToEnable() bool {
return aco.Feature == DirectReplyTo
return aco.SettleStrategy == DirectReplyTo
}

func (aco *ConsumerOptions) preSettled() bool {
return aco.Feature == PreSettled
return aco.SettleStrategy == PreSettled
}

type IOffsetSpecification interface {
Expand Down
8 changes: 5 additions & 3 deletions pkg/rabbitmqamqp/requester.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,11 @@ type RequesterOptions struct {
// Optional. If not set, a default timeout of 30 seconds will be used.
RequestTimeout time.Duration

// If true, the requester will set the 'Direct-Reply-To' feature for RabbitMQ.
// see: https://www.rabbitmq.com/direct-reply-to.html
DirectReplyTo bool
// SettleStrategy configures how the reply consumer receives messages.
// Use ExplicitSettle for a dedicated reply queue (default).
// Use DirectReplyTo to enable RabbitMQ direct-reply-to (no reply queue declared).
Comment on lines +127 to +128
Copy link

Copilot AI Feb 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RequesterOptions.SettleStrategy uses ConsumerSettleStrategy, which also contains PreSettled, but the requester implementation always calls Accept()/Requeue() on replies. Consider documenting which strategies are actually supported for requesters (e.g., ExplicitSettle and DirectReplyTo only) or using a requester-specific enum to prevent unsupported values.

Suggested change
// Use ExplicitSettle for a dedicated reply queue (default).
// Use DirectReplyTo to enable RabbitMQ direct-reply-to (no reply queue declared).
//
// Supported values for requesters:
// - ExplicitSettle (default): use a dedicated reply queue and explicitly settle replies
// via Accept()/Requeue().
// - DirectReplyTo: enable RabbitMQ direct-reply-to (no reply queue declared).
//
// Note: PreSettled is not supported for requesters; replies are always explicitly settled.

Copilot uses AI. Check for mistakes.
// See: https://www.rabbitmq.com/docs/direct-reply-to#overview
SettleStrategy ConsumerSettleStrategy
}

type outstandingRequest struct {
Expand Down