Skip to content

Commit 0b32bd1

Browse files
committed
rpc example useing direct reply queue
Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 8b85ac5 commit 0b32bd1

File tree

2 files changed

+144
-0
lines changed

2 files changed

+144
-0
lines changed

examples/rpc/client.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
import time
2+
3+
from rabbitmq_amqp_python_client import (
4+
AddressHelper,
5+
Converter,
6+
DirectReplyToConsumerOptions,
7+
Environment,
8+
Message,
9+
)
10+
11+
12+
class Requester:
13+
def __init__(self, request_queue_name: str, environment: Environment):
14+
self.connection = environment.connection()
15+
self.connection.dial()
16+
self.publisher = self.connection.publisher(AddressHelper.queue_address(request_queue_name))
17+
self.consumer = self.connection.consumer(consumer_options=DirectReplyToConsumerOptions())
18+
print("connected both publisher and consumer")
19+
print("consumer reply address is {}".format(self.consumer.address))
20+
21+
def send_request(self, request_body: str, correlation_id: str) -> Message:
22+
message = Message(body=Converter.string_to_bytes(request_body))
23+
message.reply_to = self.consumer.address
24+
message.correlation_id = correlation_id
25+
self.publisher.publish(message=message)
26+
return self.consumer.consume()
27+
28+
29+
def main() -> None:
30+
print("connection_consumer to amqp server")
31+
environment = Environment(uri="amqp://guest:guest@localhost:5672/")
32+
responder = Requester(request_queue_name="rpc_queue", environment=environment)
33+
for i in range(10):
34+
correlation_id = str(i)
35+
request_body = "hello {}".format(i)
36+
print("******************************************************")
37+
print("Sending request: {}".format(request_body))
38+
response_message = responder.send_request(request_body=request_body, correlation_id=correlation_id)
39+
response_body = Converter.bytes_to_string(response_message.body)
40+
print("Received response: {} - correlation_id: {}".format(response_body, response_message.correlation_id))
41+
print("------------------------------------------------------")
42+
time.sleep(1)
43+
44+
45+
if __name__ == "__main__":
46+
main()

examples/rpc/server.py

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
# type: ignore
2+
3+
4+
from rabbitmq_amqp_python_client import (
5+
AddressHelper,
6+
AMQPMessagingHandler,
7+
Connection,
8+
Converter,
9+
Environment,
10+
Event,
11+
Message,
12+
OutcomeState,
13+
QuorumQueueSpecification,
14+
)
15+
16+
MESSAGES_TO_PUBLISH = 200
17+
18+
19+
# create a responder
20+
21+
class Responder:
22+
class ResponderMessageHandler(AMQPMessagingHandler):
23+
24+
def __init__(self):
25+
super().__init__()
26+
self._publisher = None
27+
28+
def set_publisher(self, publisher):
29+
self._publisher = publisher
30+
31+
def on_amqp_message(self, event: Event):
32+
# process the message and create a response
33+
print("******************************************************")
34+
print("received message: {} ".format(Converter.bytes_to_string(event.message.body)))
35+
response_body = Converter.bytes_to_string(
36+
event.message.body) + "-from the server"
37+
response_message = Message(
38+
body=Converter.string_to_bytes(response_body))
39+
# publish response to the reply_to address with the same correlation_id
40+
response_message.correlation_id = event.message.correlation_id
41+
response_message.address = event.message.reply_to
42+
print("sending back: {} ".format(response_body))
43+
status = self._publisher.publish(
44+
message=response_message
45+
)
46+
if status.remote_state == OutcomeState.ACCEPTED:
47+
print("message accepted to {}".format(response_message.address))
48+
elif status.remote_state == OutcomeState.RELEASED:
49+
print("message not routed")
50+
elif status.remote_state == OutcomeState.REJECTED:
51+
print("message not rejected")
52+
53+
self.delivery_context.accept(event)
54+
print("------------------------------------------------------")
55+
56+
def __init__(self, request_queue_name: str, environment: Environment):
57+
self.request_queue_name = request_queue_name
58+
self.connection = None
59+
self.consumer = None
60+
self.publisher = None
61+
self._environment = environment
62+
63+
def start(self):
64+
self.connection = self._environment.connection()
65+
self.connection.dial()
66+
self.connection.management().delete_queue(self.request_queue_name)
67+
self.connection.management().declare_queue(
68+
queue_specification=QuorumQueueSpecification(self.request_queue_name))
69+
self.publisher = self.connection.publisher()
70+
handler = self.ResponderMessageHandler()
71+
handler.set_publisher(self.publisher)
72+
73+
self.consumer = self.connection.consumer(destination=AddressHelper.queue_address(self.request_queue_name),
74+
message_handler=handler
75+
)
76+
addr = self.consumer.address
77+
print("Responder listening on address: {}".format(addr))
78+
try:
79+
self.consumer.run()
80+
except KeyboardInterrupt:
81+
print("Responder stopping...")
82+
83+
84+
def create_connection(environment: Environment) -> Connection:
85+
connection = environment.connection()
86+
connection.dial()
87+
return connection
88+
89+
90+
def main() -> None:
91+
print("connection_consumer to amqp server")
92+
environment = Environment(uri="amqp://guest:guest@localhost:5672/")
93+
responder = Responder(request_queue_name="rpc_queue", environment=environment)
94+
responder.start()
95+
96+
97+
if __name__ == "__main__":
98+
main()

0 commit comments

Comments
 (0)