Skip to content

Commit a5d9304

Browse files
committed
rename rpcclient to Requester
and rpcServer to Responder Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
1 parent 9b55044 commit a5d9304

7 files changed

Lines changed: 24 additions & 24 deletions

File tree

docs/examples/rpc_echo_server/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ func main() {
6060
panic(err)
6161
}
6262

63-
rpcClient, err := clientConn.NewRpcClient(context.TODO(), &rabbitmqamqp.RpcClientOptions{
63+
rpcClient, err := clientConn.NewRequester(context.TODO(), &rabbitmqamqp.RequesterOptions{
6464
RequestQueueName: rpcServerQueueName,
6565
})
6666
if err != nil {

pkg/rabbitmqamqp/amqp_connection.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -244,9 +244,9 @@ func (a *AmqpConnection) NewRpcServer(ctx context.Context, options RpcServerOpti
244244
return server, nil
245245
}
246246

247-
// NewRpcClient creates a new RPC client that sends requests to the specified queue
247+
// NewRequester creates a new RPC client that sends requests to the specified queue
248248
// and receives replies on a dynamically created reply queue.
249-
func (a *AmqpConnection) NewRpcClient(ctx context.Context, options *RpcClientOptions) (RpcClient, error) {
249+
func (a *AmqpConnection) NewRequester(ctx context.Context, options *RequesterOptions) (RpcClient, error) {
250250
if options == nil {
251251
return nil, fmt.Errorf("options cannot be nil")
252252
}
@@ -305,7 +305,7 @@ func (a *AmqpConnection) NewRpcClient(ctx context.Context, options *RpcClientOpt
305305
correlationIdExtractor = defaultReplyCorrelationIdExtractor
306306
}
307307

308-
client := &amqpRpcClient{
308+
client := &amqpRequester{
309309
requestQueue: requestQueue,
310310
replyToQueue: &QueueAddress{Queue: replyQueueName},
311311
publisher: publisher,

pkg/rabbitmqamqp/example_rpc_custom_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ func Example_customCorrelationId() {
7070
}
7171
defer clientConn.Close(context.Background())
7272

73-
rpcClient, err := clientConn.NewRpcClient(context.TODO(), &rabbitmqamqp.RpcClientOptions{
73+
rpcClient, err := clientConn.NewRequester(context.TODO(), &rabbitmqamqp.RequesterOptions{
7474
RequestQueueName: rpcServerQueueNameCustom,
7575
CorrelationIdSupplier: &customCorrelationIDSupplier{},
7676
CorrelationIdExtractor: func(message *amqp.Message) any {

pkg/rabbitmqamqp/example_rpc_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ func Example() {
5858
panic(err)
5959
}
6060

61-
rpcClient, err := clientConn.NewRpcClient(context.TODO(), &rabbitmqamqp.RpcClientOptions{
61+
rpcClient, err := clientConn.NewRequester(context.TODO(), &rabbitmqamqp.RequesterOptions{
6262
RequestQueueName: rpcServerQueueName,
6363
})
6464
if err != nil {
Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -86,16 +86,16 @@ var defaultReplyCorrelationIdExtractor CorrelationIdExtractor = func(message *am
8686
// address for the reply queue. The function must return the modified message.
8787
//
8888
// The default `RequestPostProcessor` implementation (used when `RequestPostProcessor`
89-
// is not explicitly set in `RpcClientOptions`) performs the following:
89+
// is not explicitly set in `RequesterOptions`) performs the following:
9090
// - Assigns the `correlationID` to the `MessageID` property of the `amqp.Message`.
9191
// - Sets the `ReplyTo` message property to a client-generated exclusive auto-delete queue.
9292
type RequestPostProcessor func(request *amqp.Message, correlationID any) *amqp.Message
9393

9494
var DefaultRpcRequestTimeout = 30 * time.Second
9595

96-
// RpcClientOptions is a struct that contains the options for the RPC client.
96+
// RequesterOptions is a struct that contains the options for the RPC client.
9797
// It is used to configure the RPC client.
98-
type RpcClientOptions struct {
98+
type RequesterOptions struct {
9999
// The name of the queue to send requests to. This queue must exist.
100100
//
101101
// Mandatory.
@@ -130,7 +130,7 @@ type outstandingRequest struct {
130130
// err error
131131
}
132132

133-
type amqpRpcClient struct {
133+
type amqpRequester struct {
134134
requestQueue ITargetAddress
135135
replyToQueue ITargetAddress
136136
publisher *Publisher
@@ -149,7 +149,7 @@ type amqpRpcClient struct {
149149
// Close shuts down the RPC client, closing its underlying publisher and consumer.
150150
// It ensures that all pending requests are cleaned up by closing their respective
151151
// channels. This method is safe to call multiple times.
152-
func (a *amqpRpcClient) Close(ctx context.Context) error {
152+
func (a *amqpRequester) Close(ctx context.Context) error {
153153
var err error
154154
a.closer.Do(func() {
155155
a.mu.Lock()
@@ -173,7 +173,7 @@ func (a *amqpRpcClient) Close(ctx context.Context) error {
173173
return err
174174
}
175175

176-
func (a *amqpRpcClient) Message(body []byte) *amqp.Message {
176+
func (a *amqpRequester) Message(body []byte) *amqp.Message {
177177
return amqp.NewMessage(body)
178178
}
179179

@@ -184,9 +184,9 @@ func (a *amqpRpcClient) Message(body []byte) *amqp.Message {
184184
// an `outstandingRequest` is created and stored, and a channel is returned
185185
// for the reply. The channel will be closed if the request times out or the
186186
// client is closed before a reply is received.
187-
func (a *amqpRpcClient) Publish(ctx context.Context, message *amqp.Message) (<-chan *amqp.Message, error) {
187+
func (a *amqpRequester) Publish(ctx context.Context, message *amqp.Message) (<-chan *amqp.Message, error) {
188188
if a.isClosed() {
189-
return nil, fmt.Errorf("rpc client is closed")
189+
return nil, fmt.Errorf("requester is closed")
190190
}
191191
replyTo, err := a.replyToQueue.toAddress()
192192
if err != nil {
@@ -220,7 +220,7 @@ func (a *amqpRpcClient) Publish(ctx context.Context, message *amqp.Message) (<-c
220220
return ch, nil
221221
}
222222

223-
func (a *amqpRpcClient) isClosed() bool {
223+
func (a *amqpRequester) isClosed() bool {
224224
a.mu.Lock()
225225
defer a.mu.Unlock()
226226
return a.closed
@@ -231,7 +231,7 @@ func (a *amqpRpcClient) isClosed() bool {
231231
// If a request's `sentAt` timestamp is older than the `requestTimeout`,
232232
// its channel is closed, and the request is removed from `pendingRequests`.
233233
// The goroutine exits when the `done` channel is closed, typically when the client is closed.
234-
func (a *amqpRpcClient) requestTimeoutTask() {
234+
func (a *amqpRequester) requestTimeoutTask() {
235235
t := time.NewTicker(a.requestTimeout)
236236
defer t.Stop()
237237
for {
@@ -260,7 +260,7 @@ func (a *amqpRpcClient) requestTimeoutTask() {
260260
// corresponding request's channel, and the request is removed from `pendingRequests`.
261261
// If no match is found, the message is requeued. The goroutine exits when the `done`
262262
// channel is closed, typically when the client is closed.
263-
func (a *amqpRpcClient) messageReceivedHandler() {
263+
func (a *amqpRequester) messageReceivedHandler() {
264264
for {
265265
select {
266266
case <-a.done:
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ var _ = Describe("RpcClient", func() {
8585
// Server goroutine to handle incoming requests
8686
go pongRpcServer(ctx, publisher, consumer)
8787

88-
client, err := conn.NewRpcClient(ctx, &RpcClientOptions{
88+
client, err := conn.NewRequester(ctx, &RequesterOptions{
8989
RequestQueueName: queueName,
9090
})
9191
Ω(err).ShouldNot(HaveOccurred())
@@ -110,7 +110,7 @@ var _ = Describe("RpcClient", func() {
110110

111111
It("uses a custom correlation id extractor and post processor", func(ctx SpecContext) {
112112
go pongRpcServer(ctx, publisher, consumer)
113-
client, err := conn.NewRpcClient(ctx, &RpcClientOptions{
113+
client, err := conn.NewRequester(ctx, &RequesterOptions{
114114
RequestQueueName: queueName,
115115
CorrelationIdExtractor: func(message *amqp.Message) any {
116116
return message.ApplicationProperties["correlationId"]

pkg/rabbitmqamqp/rpc_e2e_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ var _ = Describe("RPC E2E", Label("e2e"), func() {
6969
m := sync.Mutex{}
7070
messagesReceivedByServer := 0
7171
var err error
72-
rpcClient, err = clientConn.NewRpcClient(ctx, &rabbitmqamqp.RpcClientOptions{
72+
rpcClient, err = clientConn.NewRequester(ctx, &rabbitmqamqp.RequesterOptions{
7373
RequestQueueName: rpcQueueName,
7474
})
7575
Ω(err).ShouldNot(HaveOccurred())
@@ -147,11 +147,11 @@ func ExampleRpcClient() {
147147

148148
// Create RPC client options
149149
// RequestQueueName is mandatory. The queue must exist.
150-
options := rabbitmqamqp.RpcClientOptions{
150+
options := rabbitmqamqp.RequesterOptions{
151151
RequestQueueName: "rpc-queue",
152152
}
153153
// Create a new RPC client
154-
rpcClient, err := conn.NewRpcClient(context.TODO(), &options)
154+
rpcClient, err := conn.NewRequester(context.TODO(), &options)
155155
if err != nil {
156156
panic(err)
157157
}
@@ -200,13 +200,13 @@ func ExampleRpcClient_customCorrelationID() {
200200
defer conn.Close(context.TODO())
201201

202202
// Create RPC client options
203-
options := rabbitmqamqp.RpcClientOptions{
203+
options := rabbitmqamqp.RequesterOptions{
204204
RequestQueueName: "rpc-queue", // the queue must exist
205205
CorrelationIdSupplier: &fooCorrelationIdSupplier{},
206206
}
207207

208208
// Create a new RPC client
209-
rpcClient, _ := conn.NewRpcClient(context.TODO(), &options)
209+
rpcClient, _ := conn.NewRequester(context.TODO(), &options)
210210
pendingRequestCh, _ := rpcClient.Publish(context.TODO(), rpcClient.Message([]byte("hello world")))
211211
replyFromServer := <-pendingRequestCh
212212
fmt.Printf("reply correlation ID: %s\n", replyFromServer.Properties.CorrelationID)

0 commit comments

Comments
 (0)