Skip to content

Commit 7b29d9f

Browse files
committed
rename rpcclient to Requester
and rpcServer to Responder Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
1 parent a5d9304 commit 7b29d9f

9 files changed

Lines changed: 48 additions & 48 deletions

File tree

docs/examples/rpc_echo_server/main.go

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -11,39 +11,39 @@ import (
1111
"github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp"
1212
)
1313

14-
type echoRpcServer struct {
14+
type echoResponder struct {
1515
conn *rabbitmqamqp.AmqpConnection
16-
server rabbitmqamqp.RpcServer
16+
server rabbitmqamqp.Responder
1717
}
1818

19-
func (s *echoRpcServer) stop(ctx context.Context) {
19+
func (s *echoResponder) stop(ctx context.Context) {
2020
s.server.Close(ctx)
2121
s.conn.Close(ctx)
2222
}
2323

24-
func newEchoRpcServer(conn *rabbitmqamqp.AmqpConnection) *echoRpcServer {
24+
func newEchoResponder(conn *rabbitmqamqp.AmqpConnection) *echoResponder {
2525
_, err := conn.Management().DeclareQueue(context.TODO(), &rabbitmqamqp.QuorumQueueSpecification{
26-
Name: rpcServerQueueName,
26+
Name: requestQueue,
2727
})
2828
if err != nil {
2929
panic(err)
3030
}
31-
srv, err := conn.NewRpcServer(context.TODO(), rabbitmqamqp.RpcServerOptions{
32-
RequestQueue: rpcServerQueueName,
31+
srv, err := conn.NewResponder(context.TODO(), rabbitmqamqp.ResponderOptions{
32+
RequestQueue: requestQueue,
3333
Handler: func(ctx context.Context, request *amqp.Message) (*amqp.Message, error) {
3434
return request, nil
3535
},
3636
})
3737
if err != nil {
3838
panic(err)
3939
}
40-
return &echoRpcServer{
40+
return &echoResponder{
4141
conn: conn,
4242
server: srv,
4343
}
4444
}
4545

46-
const rpcServerQueueName = "rpc-queue"
46+
const requestQueue = "go-amqp1.0-request-queue"
4747

4848
func main() {
4949
// Dial rabbit for RPC server connection
@@ -52,16 +52,16 @@ func main() {
5252
panic(err)
5353
}
5454

55-
srv := newEchoRpcServer(srvConn)
55+
srv := newEchoResponder(srvConn)
5656

5757
// Dial rabbit for RPC client connection
5858
clientConn, err := rabbitmqamqp.Dial(context.TODO(), "amqp://localhost:5672", nil)
5959
if err != nil {
6060
panic(err)
6161
}
6262

63-
rpcClient, err := clientConn.NewRequester(context.TODO(), &rabbitmqamqp.RequesterOptions{
64-
RequestQueueName: rpcServerQueueName,
63+
requester, err := clientConn.NewRequester(context.TODO(), &rabbitmqamqp.RequesterOptions{
64+
RequestQueueName: requestQueue,
6565
})
6666
if err != nil {
6767
panic(err)
@@ -94,7 +94,7 @@ func main() {
9494
continue
9595
}
9696

97-
resp, err := rpcClient.Publish(context.TODO(), amqp.NewMessage([]byte(message)))
97+
resp, err := requester.Publish(context.TODO(), amqp.NewMessage([]byte(message)))
9898
if err != nil {
9999
fmt.Printf("Error calling RPC: %v\n", err)
100100
continue

pkg/rabbitmqamqp/amqp_connection.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -192,10 +192,10 @@ func (a *AmqpConnection) NewConsumer(ctx context.Context, queueName string, opti
192192
return newConsumer(ctx, a, destinationAdd, options)
193193
}
194194

195-
// NewRpcServer creates a new RPC server that processes requests from the
195+
// NewResponder creates a new RPC server that processes requests from the
196196
// specified queue. The requestQueue in options is mandatory, while other
197197
// fields are optional and will use defaults if not provided.
198-
func (a *AmqpConnection) NewRpcServer(ctx context.Context, options RpcServerOptions) (RpcServer, error) {
198+
func (a *AmqpConnection) NewResponder(ctx context.Context, options ResponderOptions) (Responder, error) {
199199
if err := options.validate(); err != nil {
200200
return nil, fmt.Errorf("rpc server options validation: %w", err)
201201
}
@@ -231,7 +231,7 @@ func (a *AmqpConnection) NewRpcServer(ctx context.Context, options RpcServerOpti
231231
replyPostProcessor = defaultReplyPostProcessor
232232
}
233233

234-
server := &amqpRpcServer{
234+
server := &amqpResponder{
235235
requestHandler: handler,
236236
requestQueue: options.RequestQueue,
237237
publisher: publisher,
@@ -246,12 +246,12 @@ func (a *AmqpConnection) NewRpcServer(ctx context.Context, options RpcServerOpti
246246

247247
// NewRequester creates a new RPC client that sends requests to the specified queue
248248
// and receives replies on a dynamically created reply queue.
249-
func (a *AmqpConnection) NewRequester(ctx context.Context, options *RequesterOptions) (RpcClient, error) {
249+
func (a *AmqpConnection) NewRequester(ctx context.Context, options *RequesterOptions) (Requester, error) {
250250
if options == nil {
251251
return nil, fmt.Errorf("options cannot be nil")
252252
}
253253
if options.RequestQueueName == "" {
254-
return nil, fmt.Errorf("requestQueueName is mandatory")
254+
return nil, fmt.Errorf("request QueueName is mandatory")
255255
}
256256

257257
// Create publisher for sending requests

pkg/rabbitmqamqp/example_rpc_custom_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ func Example_customCorrelationId() {
3838
panic(err)
3939
}
4040

41-
server, err := srvConn.NewRpcServer(context.TODO(), rabbitmqamqp.RpcServerOptions{
41+
server, err := srvConn.NewResponder(context.TODO(), rabbitmqamqp.ResponderOptions{
4242
RequestQueue: rpcServerQueueNameCustom,
4343
Handler: func(ctx context.Context, request *amqp.Message) (*amqp.Message, error) {
4444
fmt.Printf("Received: %s\n", request.GetData())

pkg/rabbitmqamqp/example_rpc_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import (
1313

1414
type echoRpcServer struct {
1515
conn *rabbitmqamqp.AmqpConnection
16-
server rabbitmqamqp.RpcServer
16+
server rabbitmqamqp.Responder
1717
}
1818

1919
func (s *echoRpcServer) stop(ctx context.Context) {
@@ -25,7 +25,7 @@ func newEchoRpcServer(conn *rabbitmqamqp.AmqpConnection) *echoRpcServer {
2525
conn.Management().DeclareQueue(context.TODO(), &rabbitmqamqp.QuorumQueueSpecification{
2626
Name: rpcServerQueueName,
2727
})
28-
srv, err := conn.NewRpcServer(context.TODO(), rabbitmqamqp.RpcServerOptions{
28+
srv, err := conn.NewResponder(context.TODO(), rabbitmqamqp.ResponderOptions{
2929
RequestQueue: rpcServerQueueName,
3030
Handler: func(ctx context.Context, request *amqp.Message) (*amqp.Message, error) {
3131
fmt.Printf("echo: %s\n", request.GetData())

pkg/rabbitmqamqp/requester.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import (
1212
"github.com/google/uuid"
1313
)
1414

15-
// RpcClient is an interface for making RPC (Remote Procedure Call) requests over AMQP.
15+
// Requester is an interface for making RPC (Remote Procedure Call) requests over AMQP.
1616
// Implementations of this interface should handle the sending of requests and
1717
// the receiving of corresponding replies, managing correlation IDs and timeouts.
1818
//
@@ -33,7 +33,7 @@ import (
3333
// - `Message` provides a basic AMQP message structure for RPC requests.
3434
// - `Publish` sends the request message and returns a channel that will receive
3535
// the reply message, or be closed if a timeout occurs or the client is closed.
36-
type RpcClient interface {
36+
type Requester interface {
3737
Close(context.Context) error
3838
Message(body []byte) *amqp.Message
3939
Publish(context.Context, *amqp.Message) (<-chan *amqp.Message, error)

pkg/rabbitmqamqp/requester_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import (
1010
. "github.com/onsi/gomega"
1111
)
1212

13-
var _ = Describe("RpcClient", func() {
13+
var _ = Describe("Requester", func() {
1414
var (
1515
conn *AmqpConnection
1616
queueName string
Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@ import (
66
"sync"
77
"time"
88

9-
amqp "github.com/Azure/go-amqp"
9+
"github.com/Azure/go-amqp"
1010
)
1111

12-
// RpcServerHandler is a function that processes a request message and returns a response message.
12+
// ResponderHandler is a function that processes a request message and returns a response message.
1313
// If the server wants to send a response to the client, it must return a response message.
1414
// If the function returns nil, the server will not send a response.
1515
// If the server does not send a response message, this high level RPC server doesn't make much sense,
@@ -22,9 +22,9 @@ import (
2222
// func(ctx context.Context, request *amqp.Message) (*amqp.Message, error) {
2323
// return amqp.NewMessage([]byte(fmt.Sprintf("Pong: %s", request.GetData()))), nil
2424
// }
25-
type RpcServerHandler func(ctx context.Context, request *amqp.Message) (*amqp.Message, error)
25+
type ResponderHandler func(ctx context.Context, request *amqp.Message) (*amqp.Message, error)
2626

27-
var noOpHandler RpcServerHandler = func(_ context.Context, _ *amqp.Message) (*amqp.Message, error) {
27+
var noOpHandler ResponderHandler = func(_ context.Context, _ *amqp.Message) (*amqp.Message, error) {
2828
return nil, nil
2929
}
3030

@@ -54,9 +54,9 @@ var defaultReplyPostProcessor ReplyPostProcessor = func(reply *amqp.Message, cor
5454
return reply
5555
}
5656

57-
// RpcServer is Remote Procedure Call server that receives a message, process them,
57+
// Responder is Remote Procedure Call server that receives a message, process them,
5858
// and sends a response.
59-
type RpcServer interface {
59+
type Responder interface {
6060
// Close the RPC server and its underlying resources.
6161
Close(context.Context) error
6262
// Pause the server to stop receiving messages.
@@ -65,7 +65,7 @@ type RpcServer interface {
6565
Unpause() error
6666
}
6767

68-
type RpcServerOptions struct {
68+
type ResponderOptions struct {
6969
// RequestQueue is the name of the queue to subscribe to. This queue must be pre-declared.
7070
// The RPC server does not declare the queue, it is the responsibility of the caller to declare the queue.
7171
//
@@ -86,7 +86,7 @@ type RpcServerOptions struct {
8686
// }
8787
//
8888
// Mandatory.
89-
Handler RpcServerHandler
89+
Handler ResponderHandler
9090
// CorrectionIdExtractor is a function that extracts a correction ID from the request message.
9191
// The returned value should be an AMQP type that can be binary encoded.
9292
//
@@ -121,17 +121,17 @@ type RpcServerOptions struct {
121121
ReplyPostProcessor ReplyPostProcessor
122122
}
123123

124-
func (r *RpcServerOptions) validate() error {
124+
func (r *ResponderOptions) validate() error {
125125
if r.RequestQueue == "" {
126126
return fmt.Errorf("requestQueue is mandatory")
127127
}
128128
return nil
129129
}
130130

131-
type amqpRpcServer struct {
131+
type amqpResponder struct {
132132
// TODO: handle state changes for reconnections
133133
mu sync.Mutex
134-
requestHandler RpcServerHandler
134+
requestHandler ResponderHandler
135135
requestQueue string
136136
publisher *Publisher
137137
consumer *Consumer
@@ -145,7 +145,7 @@ type amqpRpcServer struct {
145145
// are closed gracefully and only once, even if Close is called multiple times.
146146
// The provided context (ctx) controls the timeout for the close operation, ensuring the operation
147147
// does not exceed the context's deadline.
148-
func (a *amqpRpcServer) Close(ctx context.Context) error {
148+
func (a *amqpResponder) Close(ctx context.Context) error {
149149
// TODO: wait for unsettled messages
150150
a.closer.Do(func() {
151151
a.mu.Lock()
@@ -168,14 +168,14 @@ func (a *amqpRpcServer) Close(ctx context.Context) error {
168168
return nil
169169
}
170170

171-
func (a *amqpRpcServer) Pause() {
171+
func (a *amqpResponder) Pause() {
172172
err := a.consumer.pause(context.Background())
173173
if err != nil {
174174
Warn("Did not pause consumer", "error", err)
175175
}
176176
}
177177

178-
func (a *amqpRpcServer) Unpause() error {
178+
func (a *amqpResponder) Unpause() error {
179179
a.mu.Lock()
180180
if a.closed {
181181
a.mu.Unlock()
@@ -190,7 +190,7 @@ func (a *amqpRpcServer) Unpause() error {
190190
return nil
191191
}
192192

193-
func (a *amqpRpcServer) handle() {
193+
func (a *amqpResponder) handle() {
194194
/*
195195
The RPC server has the following behavior:
196196
- when receiving a message request:
@@ -259,12 +259,12 @@ func (a *amqpRpcServer) handle() {
259259
}
260260
}
261261

262-
func (a *amqpRpcServer) isClosed() bool {
262+
func (a *amqpResponder) isClosed() bool {
263263
a.mu.Lock()
264264
defer a.mu.Unlock()
265265
return a.closed
266266
}
267267

268-
func (a *amqpRpcServer) issueCredits(credits uint32) error {
268+
func (a *amqpResponder) issueCredits(credits uint32) error {
269269
return a.consumer.issueCredits(credits)
270270
}
Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import (
1212
"github.com/onsi/gomega/gbytes"
1313
)
1414

15-
var _ = Describe("RpcServer", func() {
15+
var _ = Describe("Responder", func() {
1616
var (
1717
conn *AmqpConnection
1818
requestQueue string
@@ -48,7 +48,7 @@ var _ = Describe("RpcServer", func() {
4848
requestPublisher.Close(ctx)
4949
})
5050

51-
server, err := conn.NewRpcServer(context.Background(), RpcServerOptions{
51+
server, err := conn.NewResponder(context.Background(), ResponderOptions{
5252
RequestQueue: requestQueue,
5353
Handler: func(ctx context.Context, request *amqp.Message) (*amqp.Message, error) {
5454
if request.Properties == nil {
@@ -92,7 +92,7 @@ var _ = Describe("RpcServer", func() {
9292

9393
It("stops the handler when the RPC server closes", func(ctx SpecContext) {
9494
// setup
95-
server, err := conn.NewRpcServer(context.Background(), RpcServerOptions{
95+
server, err := conn.NewResponder(context.Background(), ResponderOptions{
9696
RequestQueue: requestQueue,
9797
})
9898
Expect(err).ToNot(HaveOccurred())
@@ -137,7 +137,7 @@ var _ = Describe("RpcServer", func() {
137137
reply.ApplicationProperties["test"] = "success"
138138
return reply
139139
}
140-
server, err := conn.NewRpcServer(context.Background(), RpcServerOptions{
140+
server, err := conn.NewResponder(context.Background(), ResponderOptions{
141141
RequestQueue: requestQueue,
142142
Handler: func(ctx context.Context, request *amqp.Message) (*amqp.Message, error) {
143143
m := amqp.NewMessage(request.GetData())

pkg/rabbitmqamqp/rpc_e2e_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ var _ = Describe("RPC E2E", Label("e2e"), func() {
3030
var (
3131
clientConn *rabbitmqamqp.AmqpConnection
3232
serverConn *rabbitmqamqp.AmqpConnection
33-
rpcClient rabbitmqamqp.RpcClient
34-
rpcServer rabbitmqamqp.RpcServer
33+
rpcClient rabbitmqamqp.Requester
34+
rpcServer rabbitmqamqp.Responder
3535
rpcQueueName string
3636
)
3737

@@ -73,7 +73,7 @@ var _ = Describe("RPC E2E", Label("e2e"), func() {
7373
RequestQueueName: rpcQueueName,
7474
})
7575
Ω(err).ShouldNot(HaveOccurred())
76-
rpcServer, err = serverConn.NewRpcServer(ctx, rabbitmqamqp.RpcServerOptions{
76+
rpcServer, err = serverConn.NewResponder(ctx, rabbitmqamqp.ResponderOptions{
7777
RequestQueue: rpcQueueName,
7878
Handler: func(ctx context.Context, request *amqp.Message) (*amqp.Message, error) {
7979
m.Lock()

0 commit comments

Comments
 (0)