Skip to content

Commit 3f02324

Browse files
committed
implement direct reply to feature
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
1 parent 9202744 commit 3f02324

3 files changed

Lines changed: 10 additions & 11 deletions

File tree

pkg/rabbitmqamqp/amqp_connection.go

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -184,18 +184,16 @@ func (a *AmqpConnection) NewConsumer(ctx context.Context, queueName string, opti
184184

185185
if options != nil && options.isDirectReplyToEnable() {
186186
return newConsumer(ctx, a, "", options)
187-
} else {
188-
destination := &QueueAddress{
189-
Queue: queueName,
190-
}
191-
destinationAdd, err := destination.toAddress()
192-
if err != nil {
193-
return nil, err
194-
}
195-
return newConsumer(ctx, a, destinationAdd, options)
196-
197187
}
198188

189+
destination := &QueueAddress{
190+
Queue: queueName,
191+
}
192+
destinationAdd, err := destination.toAddress()
193+
if err != nil {
194+
return nil, err
195+
}
196+
return newConsumer(ctx, a, destinationAdd, options)
199197
}
200198

201199
// NewResponder creates a new RPC server that processes requests from the

pkg/rabbitmqamqp/amqp_consumer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ func (c *Consumer) Id() string {
101101

102102
func newConsumer(ctx context.Context, connection *AmqpConnection, destinationAdd string, options IConsumerOptions) (*Consumer, error) {
103103
id := fmt.Sprintf("consumer-%s", uuid.New().String())
104-
if options != nil && options.id() != "" {
104+
if options != nil && len(options.id()) > 0 {
105105
id = options.id()
106106
}
107107

pkg/rabbitmqamqp/requester.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
// - `Message` provides a basic AMQP message structure for RPC requests.
3434
// - `Publish` sends the request message and returns a channel that will receive
3535
// the reply message, or be closed if a timeout occurs or the client is closed.
36+
// - `GetReplyQueue` returns the address of the reply queue used by the requester.
3637
type Requester interface {
3738
Close(context.Context) error
3839
Message(body []byte) *amqp.Message

0 commit comments

Comments
 (0)