Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions examples/rpc/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
RPC example
===

This example demonstrates how to set up a simple RPC (Remote Procedure Call) server and client using [Direct reply to feature](https://www.rabbitmq.com/docs/direct-reply-to).
The example is very basic the correlation id is set but not used to match responses to requests.

Setup
---

To run this example, you need to have RabbitMQ >=4.2 server running locally.
Then run the python scripts in separate terminal windows.
```bash
$ python3 server.py
Connecting consumer to AMQP server
Responder listening on address: /queues/rpc_queue
connected both publisher and consumer
consumer reply address is /queues/amq.rabbitmq.reply-to.g1h2AA5yZXBseUA2ODc4MTMzNAAAcEoAAAAAaS8eQg%3D%
```

The `rpc_queue` is the queue where the server listens for incoming RPC requests.</br>
The `amq.rabbitmq.reply-to.g1h2AA...` is a special direct-reply-to queue used by the client to receive responses.

Use standard queues for reply
===

If you want to use standard queues for replies instead of the direct-reply-to feature is enough change the consumer declaration:

```python
queue_name = "rpc_reply_queue"
management.declare_queue(QuorumQueueSpecification(name=queue_name))

consumer = await connection_consumer.consumer(
destination=AddressHelper.queue_address(queue_name))
```

You should use [Classic Queues](https://www.rabbitmq.com/docs/classic-queues) or [Quorum Queues](https://www.rabbitmq.com/docs/quorum-queues).
56 changes: 56 additions & 0 deletions examples/rpc/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import time

from rabbitmq_amqp_python_client import (
AddressHelper,
Converter,
DirectReplyToConsumerOptions,
Environment,
Message,
)


class Requester:
def __init__(self, request_queue_name: str, environment: Environment):
self.connection = environment.connection()
self.connection.dial()
self.publisher = self.connection.publisher(
AddressHelper.queue_address(request_queue_name)
)
self.consumer = self.connection.consumer(
consumer_options=DirectReplyToConsumerOptions()
)
print("connected both publisher and consumer")
print("consumer reply address is {}".format(self.consumer.address))

def send_request(self, request_body: str, correlation_id: str) -> Message:
message = Message(body=Converter.string_to_bytes(request_body))
message.reply_to = self.consumer.address
message.correlation_id = correlation_id
self.publisher.publish(message=message)
return self.consumer.consume()


def main() -> None:
print("Connecting to AMQP server")
environment = Environment(uri="amqp://guest:guest@localhost:5672/")
requester = Requester(request_queue_name="rpc_queue", environment=environment)
for i in range(10):
correlation_id = str(i)
request_body = "hello {}".format(i)
print("******************************************************")
print("Sending request: {}".format(request_body))
response_message = requester.send_request(
request_body=request_body, correlation_id=correlation_id
)
response_body = Converter.bytes_to_string(response_message.body)
print(
"Received response: {} - correlation_id: {}".format(
response_body, response_message.correlation_id
)
)
print("------------------------------------------------------")
time.sleep(1)


if __name__ == "__main__":
main()
100 changes: 100 additions & 0 deletions examples/rpc/server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
# type: ignore


from rabbitmq_amqp_python_client import (
AddressHelper,
AMQPMessagingHandler,
Connection,
Converter,
Environment,
Event,
Message,
OutcomeState,
QuorumQueueSpecification,
)

# create a responder


class Responder:
class ResponderMessageHandler(AMQPMessagingHandler):

def __init__(self):
super().__init__()
self._publisher = None

def set_publisher(self, publisher):
self._publisher = publisher

def on_amqp_message(self, event: Event):
# process the message and create a response
print("******************************************************")
print(
"received message: {} ".format(
Converter.bytes_to_string(event.message.body)
)
)
response_body = (
Converter.bytes_to_string(event.message.body) + "-from the server"
)
response_message = Message(body=Converter.string_to_bytes(response_body))
# publish response to the reply_to address with the same correlation_id
response_message.correlation_id = event.message.correlation_id
response_message.address = event.message.reply_to
print("sending back: {} ".format(response_body))
status = self._publisher.publish(message=response_message)
if status.remote_state == OutcomeState.ACCEPTED:
print("message accepted to {}".format(response_message.address))
elif status.remote_state == OutcomeState.RELEASED:
print("message not routed")
elif status.remote_state == OutcomeState.REJECTED:
print("message rejected")

self.delivery_context.accept(event)
print("------------------------------------------------------")

def __init__(self, request_queue_name: str, environment: Environment):
self.request_queue_name = request_queue_name
self.connection = None
self.consumer = None
self.publisher = None
self._environment = environment

def start(self):
self.connection = self._environment.connection()
self.connection.dial()
self.connection.management().delete_queue(self.request_queue_name)
self.connection.management().declare_queue(
queue_specification=QuorumQueueSpecification(self.request_queue_name)
)
self.publisher = self.connection.publisher()
handler = self.ResponderMessageHandler()
handler.set_publisher(self.publisher)

self.consumer = self.connection.consumer(
destination=AddressHelper.queue_address(self.request_queue_name),
message_handler=handler,
)
addr = self.consumer.address
print("Responder listening on address: {}".format(addr))
try:
self.consumer.run()
except KeyboardInterrupt:
print("Responder stopping...")


def create_connection(environment: Environment) -> Connection:
connection = environment.connection()
connection.dial()
return connection
Comment on lines +86 to +89
Copy link

Copilot AI Dec 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The create_connection function is defined but never used in this file. Consider removing it or using it in the Responder.start() method to avoid code duplication.

Copilot uses AI. Check for mistakes.


def main() -> None:
print("Connecting consumer to AMQP server")
environment = Environment(uri="amqp://guest:guest@localhost:5672/")
responder = Responder(request_queue_name="rpc_queue", environment=environment)
responder.start()


if __name__ == "__main__":
main()