Skip to content

Commit 84cd2ae

Browse files
committed
implement direct reply to feature
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
1 parent 44d91d1 commit 84cd2ae

3 files changed

Lines changed: 19 additions & 2 deletions

File tree

docs/examples/rpc_echo_server/main.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,12 @@ func main() {
5252
if err != nil {
5353
panic(err)
5454
}
55+
_ = srvConn.Management().DeleteQueue(context.TODO(), requestQueue)
5556

5657
srv := newEchoResponder(srvConn)
58+
reply, _ := srv.server.GetRequestQueue()
5759

60+
fmt.Printf("request queue %s \n", reply)
5861
// Dial rabbit for RPC client connection
5962
clientConn, err := rabbitmqamqp.Dial(context.TODO(), "amqp://localhost:5672", nil)
6063
if err != nil {
@@ -63,11 +66,15 @@ func main() {
6366

6467
requester, err := clientConn.NewRequester(context.TODO(), &rabbitmqamqp.RequesterOptions{
6568
RequestQueueName: requestQueue,
66-
DirectReplyTo: true,
69+
// Enable Direct Reply To feature
70+
// see: https://www.rabbitmq.com/direct-reply-to.html
71+
DirectReplyTo: true,
6772
})
6873
if err != nil {
6974
panic(err)
7075
}
76+
reply, _ = requester.GetReplyQueue()
77+
fmt.Printf("replyTo to %s \n", reply)
7178

7279
// Set up a channel to listen for OS signals
7380
sigs := make(chan os.Signal, 1)
@@ -95,7 +102,6 @@ func main() {
95102
if message == "" {
96103
continue
97104
}
98-
99105
resp, err := requester.Publish(context.TODO(), amqp.NewMessage([]byte(message)))
100106
if err != nil {
101107
fmt.Printf("Error calling RPC: %v\n", err)

pkg/rabbitmqamqp/requester.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ type Requester interface {
3737
Close(context.Context) error
3838
Message(body []byte) *amqp.Message
3939
Publish(context.Context, *amqp.Message) (<-chan *amqp.Message, error)
40+
GetReplyQueue() (string, error)
4041
}
4142

4243
// CorrelationIdSupplier is an interface for providing correlation IDs for RPC requests.
@@ -224,6 +225,10 @@ func (a *amqpRequester) Publish(ctx context.Context, message *amqp.Message) (<-c
224225
return ch, nil
225226
}
226227

228+
func (a *amqpRequester) GetReplyQueue() (string, error) {
229+
return a.consumer.GetQueue()
230+
}
231+
227232
func (a *amqpRequester) isClosed() bool {
228233
a.mu.Lock()
229234
defer a.mu.Unlock()

pkg/rabbitmqamqp/responder.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ type Responder interface {
6363
Pause()
6464
// Unpause requests to receive messages again.
6565
Unpause() error
66+
67+
GetRequestQueue() (string, error)
6668
}
6769

6870
type ResponderOptions struct {
@@ -190,6 +192,10 @@ func (a *amqpResponder) Unpause() error {
190192
return nil
191193
}
192194

195+
func (a *amqpResponder) GetRequestQueue() (string, error) {
196+
return a.consumer.GetQueue()
197+
}
198+
193199
func (a *amqpResponder) handle() {
194200
/*
195201
The RPC server has the following behavior:

0 commit comments

Comments
 (0)