Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions docs/examples/rpc_echo_server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,39 +11,39 @@ import (
"github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp"
)

type echoRpcServer struct {
type echoResponder struct {
conn *rabbitmqamqp.AmqpConnection
server rabbitmqamqp.RpcServer
server rabbitmqamqp.Responder
}

func (s *echoRpcServer) stop(ctx context.Context) {
func (s *echoResponder) stop(ctx context.Context) {
s.server.Close(ctx)
s.conn.Close(ctx)
}

func newEchoRpcServer(conn *rabbitmqamqp.AmqpConnection) *echoRpcServer {
func newEchoResponder(conn *rabbitmqamqp.AmqpConnection) *echoResponder {
_, err := conn.Management().DeclareQueue(context.TODO(), &rabbitmqamqp.QuorumQueueSpecification{
Name: rpcServerQueueName,
Name: requestQueue,
})
if err != nil {
panic(err)
}
srv, err := conn.NewRpcServer(context.TODO(), rabbitmqamqp.RpcServerOptions{
RequestQueue: rpcServerQueueName,
srv, err := conn.NewResponder(context.TODO(), rabbitmqamqp.ResponderOptions{
RequestQueue: requestQueue,
Handler: func(ctx context.Context, request *amqp.Message) (*amqp.Message, error) {
return request, nil
},
})
if err != nil {
panic(err)
}
return &echoRpcServer{
return &echoResponder{
conn: conn,
server: srv,
}
}

const rpcServerQueueName = "rpc-queue"
const requestQueue = "go-amqp1.0-request-queue"

func main() {
// Dial rabbit for RPC server connection
Expand All @@ -52,16 +52,16 @@ func main() {
panic(err)
}

srv := newEchoRpcServer(srvConn)
srv := newEchoResponder(srvConn)

// Dial rabbit for RPC client connection
clientConn, err := rabbitmqamqp.Dial(context.TODO(), "amqp://localhost:5672", nil)
if err != nil {
panic(err)
}

rpcClient, err := clientConn.NewRpcClient(context.TODO(), &rabbitmqamqp.RpcClientOptions{
RequestQueueName: rpcServerQueueName,
requester, err := clientConn.NewRequester(context.TODO(), &rabbitmqamqp.RequesterOptions{
RequestQueueName: requestQueue,
})
if err != nil {
panic(err)
Expand Down Expand Up @@ -94,7 +94,7 @@ func main() {
continue
}

resp, err := rpcClient.Publish(context.TODO(), amqp.NewMessage([]byte(message)))
resp, err := requester.Publish(context.TODO(), amqp.NewMessage([]byte(message)))
if err != nil {
fmt.Printf("Error calling RPC: %v\n", err)
continue
Expand Down
14 changes: 7 additions & 7 deletions pkg/rabbitmqamqp/amqp_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,10 @@ func (a *AmqpConnection) NewConsumer(ctx context.Context, queueName string, opti
return newConsumer(ctx, a, destinationAdd, options)
}

// NewRpcServer creates a new RPC server that processes requests from the
// NewResponder creates a new RPC server that processes requests from the
// specified queue. The requestQueue in options is mandatory, while other
// fields are optional and will use defaults if not provided.
func (a *AmqpConnection) NewRpcServer(ctx context.Context, options RpcServerOptions) (RpcServer, error) {
func (a *AmqpConnection) NewResponder(ctx context.Context, options ResponderOptions) (Responder, error) {
if err := options.validate(); err != nil {
return nil, fmt.Errorf("rpc server options validation: %w", err)
}
Expand Down Expand Up @@ -231,7 +231,7 @@ func (a *AmqpConnection) NewRpcServer(ctx context.Context, options RpcServerOpti
replyPostProcessor = defaultReplyPostProcessor
}

server := &amqpRpcServer{
server := &amqpResponder{
requestHandler: handler,
requestQueue: options.RequestQueue,
publisher: publisher,
Expand All @@ -244,14 +244,14 @@ func (a *AmqpConnection) NewRpcServer(ctx context.Context, options RpcServerOpti
return server, nil
}

// NewRpcClient creates a new RPC client that sends requests to the specified queue
// NewRequester creates a new RPC client that sends requests to the specified queue
// and receives replies on a dynamically created reply queue.
func (a *AmqpConnection) NewRpcClient(ctx context.Context, options *RpcClientOptions) (RpcClient, error) {
func (a *AmqpConnection) NewRequester(ctx context.Context, options *RequesterOptions) (Requester, error) {
if options == nil {
return nil, fmt.Errorf("options cannot be nil")
}
if options.RequestQueueName == "" {
return nil, fmt.Errorf("requestQueueName is mandatory")
return nil, fmt.Errorf("request QueueName is mandatory")
}

// Create publisher for sending requests
Expand Down Expand Up @@ -305,7 +305,7 @@ func (a *AmqpConnection) NewRpcClient(ctx context.Context, options *RpcClientOpt
correlationIdExtractor = defaultReplyCorrelationIdExtractor
}

client := &amqpRpcClient{
client := &amqpRequester{
requestQueue: requestQueue,
replyToQueue: &QueueAddress{Queue: replyQueueName},
publisher: publisher,
Expand Down
4 changes: 2 additions & 2 deletions pkg/rabbitmqamqp/example_rpc_custom_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func Example_customCorrelationId() {
panic(err)
}

server, err := srvConn.NewRpcServer(context.TODO(), rabbitmqamqp.RpcServerOptions{
server, err := srvConn.NewResponder(context.TODO(), rabbitmqamqp.ResponderOptions{
RequestQueue: rpcServerQueueNameCustom,
Handler: func(ctx context.Context, request *amqp.Message) (*amqp.Message, error) {
fmt.Printf("Received: %s\n", request.GetData())
Expand Down Expand Up @@ -70,7 +70,7 @@ func Example_customCorrelationId() {
}
defer clientConn.Close(context.Background())

rpcClient, err := clientConn.NewRpcClient(context.TODO(), &rabbitmqamqp.RpcClientOptions{
rpcClient, err := clientConn.NewRequester(context.TODO(), &rabbitmqamqp.RequesterOptions{
RequestQueueName: rpcServerQueueNameCustom,
CorrelationIdSupplier: &customCorrelationIDSupplier{},
CorrelationIdExtractor: func(message *amqp.Message) any {
Expand Down
6 changes: 3 additions & 3 deletions pkg/rabbitmqamqp/example_rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

type echoRpcServer struct {
conn *rabbitmqamqp.AmqpConnection
server rabbitmqamqp.RpcServer
server rabbitmqamqp.Responder
}

func (s *echoRpcServer) stop(ctx context.Context) {
Expand All @@ -25,7 +25,7 @@ func newEchoRpcServer(conn *rabbitmqamqp.AmqpConnection) *echoRpcServer {
conn.Management().DeclareQueue(context.TODO(), &rabbitmqamqp.QuorumQueueSpecification{
Name: rpcServerQueueName,
})
srv, err := conn.NewRpcServer(context.TODO(), rabbitmqamqp.RpcServerOptions{
srv, err := conn.NewResponder(context.TODO(), rabbitmqamqp.ResponderOptions{
RequestQueue: rpcServerQueueName,
Handler: func(ctx context.Context, request *amqp.Message) (*amqp.Message, error) {
fmt.Printf("echo: %s\n", request.GetData())
Expand Down Expand Up @@ -58,7 +58,7 @@ func Example() {
panic(err)
}

rpcClient, err := clientConn.NewRpcClient(context.TODO(), &rabbitmqamqp.RpcClientOptions{
rpcClient, err := clientConn.NewRequester(context.TODO(), &rabbitmqamqp.RequesterOptions{
RequestQueueName: rpcServerQueueName,
})
if err != nil {
Expand Down
26 changes: 13 additions & 13 deletions pkg/rabbitmqamqp/rpc_client.go → pkg/rabbitmqamqp/requester.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/google/uuid"
)

// RpcClient is an interface for making RPC (Remote Procedure Call) requests over AMQP.
// Requester is an interface for making RPC (Remote Procedure Call) requests over AMQP.
// Implementations of this interface should handle the sending of requests and
// the receiving of corresponding replies, managing correlation IDs and timeouts.
//
Expand All @@ -33,7 +33,7 @@ import (
// - `Message` provides a basic AMQP message structure for RPC requests.
// - `Publish` sends the request message and returns a channel that will receive
// the reply message, or be closed if a timeout occurs or the client is closed.
type RpcClient interface {
type Requester interface {
Close(context.Context) error
Message(body []byte) *amqp.Message
Publish(context.Context, *amqp.Message) (<-chan *amqp.Message, error)
Expand Down Expand Up @@ -86,16 +86,16 @@ var defaultReplyCorrelationIdExtractor CorrelationIdExtractor = func(message *am
// address for the reply queue. The function must return the modified message.
//
// The default `RequestPostProcessor` implementation (used when `RequestPostProcessor`
// is not explicitly set in `RpcClientOptions`) performs the following:
// is not explicitly set in `RequesterOptions`) performs the following:
// - Assigns the `correlationID` to the `MessageID` property of the `amqp.Message`.
// - Sets the `ReplyTo` message property to a client-generated exclusive auto-delete queue.
type RequestPostProcessor func(request *amqp.Message, correlationID any) *amqp.Message

var DefaultRpcRequestTimeout = 30 * time.Second

// RpcClientOptions is a struct that contains the options for the RPC client.
// RequesterOptions is a struct that contains the options for the RPC client.
// It is used to configure the RPC client.
type RpcClientOptions struct {
type RequesterOptions struct {
// The name of the queue to send requests to. This queue must exist.
//
// Mandatory.
Expand Down Expand Up @@ -130,7 +130,7 @@ type outstandingRequest struct {
// err error
}

type amqpRpcClient struct {
type amqpRequester struct {
requestQueue ITargetAddress
replyToQueue ITargetAddress
publisher *Publisher
Expand All @@ -149,7 +149,7 @@ type amqpRpcClient struct {
// Close shuts down the RPC client, closing its underlying publisher and consumer.
// It ensures that all pending requests are cleaned up by closing their respective
// channels. This method is safe to call multiple times.
func (a *amqpRpcClient) Close(ctx context.Context) error {
func (a *amqpRequester) Close(ctx context.Context) error {
var err error
a.closer.Do(func() {
a.mu.Lock()
Expand All @@ -173,7 +173,7 @@ func (a *amqpRpcClient) Close(ctx context.Context) error {
return err
}

func (a *amqpRpcClient) Message(body []byte) *amqp.Message {
func (a *amqpRequester) Message(body []byte) *amqp.Message {
return amqp.NewMessage(body)
}

Expand All @@ -184,9 +184,9 @@ func (a *amqpRpcClient) Message(body []byte) *amqp.Message {
// an `outstandingRequest` is created and stored, and a channel is returned
// for the reply. The channel will be closed if the request times out or the
// client is closed before a reply is received.
func (a *amqpRpcClient) Publish(ctx context.Context, message *amqp.Message) (<-chan *amqp.Message, error) {
func (a *amqpRequester) Publish(ctx context.Context, message *amqp.Message) (<-chan *amqp.Message, error) {
if a.isClosed() {
return nil, fmt.Errorf("rpc client is closed")
return nil, fmt.Errorf("requester is closed")
}
replyTo, err := a.replyToQueue.toAddress()
if err != nil {
Expand Down Expand Up @@ -220,7 +220,7 @@ func (a *amqpRpcClient) Publish(ctx context.Context, message *amqp.Message) (<-c
return ch, nil
}

func (a *amqpRpcClient) isClosed() bool {
func (a *amqpRequester) isClosed() bool {
a.mu.Lock()
defer a.mu.Unlock()
return a.closed
Expand All @@ -231,7 +231,7 @@ func (a *amqpRpcClient) isClosed() bool {
// If a request's `sentAt` timestamp is older than the `requestTimeout`,
// its channel is closed, and the request is removed from `pendingRequests`.
// The goroutine exits when the `done` channel is closed, typically when the client is closed.
func (a *amqpRpcClient) requestTimeoutTask() {
func (a *amqpRequester) requestTimeoutTask() {
t := time.NewTicker(a.requestTimeout)
defer t.Stop()
for {
Expand Down Expand Up @@ -260,7 +260,7 @@ func (a *amqpRpcClient) requestTimeoutTask() {
// corresponding request's channel, and the request is removed from `pendingRequests`.
// If no match is found, the message is requeued. The goroutine exits when the `done`
// channel is closed, typically when the client is closed.
func (a *amqpRpcClient) messageReceivedHandler() {
func (a *amqpRequester) messageReceivedHandler() {
for {
select {
case <-a.done:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
. "github.com/onsi/gomega"
)

var _ = Describe("RpcClient", func() {
var _ = Describe("Requester", func() {
var (
conn *AmqpConnection
queueName string
Expand Down Expand Up @@ -85,7 +85,7 @@ var _ = Describe("RpcClient", func() {
// Server goroutine to handle incoming requests
go pongRpcServer(ctx, publisher, consumer)

client, err := conn.NewRpcClient(ctx, &RpcClientOptions{
client, err := conn.NewRequester(ctx, &RequesterOptions{
RequestQueueName: queueName,
})
Ω(err).ShouldNot(HaveOccurred())
Expand All @@ -110,7 +110,7 @@ var _ = Describe("RpcClient", func() {

It("uses a custom correlation id extractor and post processor", func(ctx SpecContext) {
go pongRpcServer(ctx, publisher, consumer)
client, err := conn.NewRpcClient(ctx, &RpcClientOptions{
client, err := conn.NewRequester(ctx, &RequesterOptions{
RequestQueueName: queueName,
CorrelationIdExtractor: func(message *amqp.Message) any {
return message.ApplicationProperties["correlationId"]
Expand Down
Loading