Skip to content

Commit f8b5f54

Browse files
committed
rpc example useing direct reply queue
Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 0b32bd1 commit f8b5f54

File tree

2 files changed

+31
-16
lines changed

2 files changed

+31
-16
lines changed

examples/rpc/client.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,12 @@ class Requester:
1313
def __init__(self, request_queue_name: str, environment: Environment):
1414
self.connection = environment.connection()
1515
self.connection.dial()
16-
self.publisher = self.connection.publisher(AddressHelper.queue_address(request_queue_name))
17-
self.consumer = self.connection.consumer(consumer_options=DirectReplyToConsumerOptions())
16+
self.publisher = self.connection.publisher(
17+
AddressHelper.queue_address(request_queue_name)
18+
)
19+
self.consumer = self.connection.consumer(
20+
consumer_options=DirectReplyToConsumerOptions()
21+
)
1822
print("connected both publisher and consumer")
1923
print("consumer reply address is {}".format(self.consumer.address))
2024

@@ -35,9 +39,15 @@ def main() -> None:
3539
request_body = "hello {}".format(i)
3640
print("******************************************************")
3741
print("Sending request: {}".format(request_body))
38-
response_message = responder.send_request(request_body=request_body, correlation_id=correlation_id)
42+
response_message = responder.send_request(
43+
request_body=request_body, correlation_id=correlation_id
44+
)
3945
response_body = Converter.bytes_to_string(response_message.body)
40-
print("Received response: {} - correlation_id: {}".format(response_body, response_message.correlation_id))
46+
print(
47+
"Received response: {} - correlation_id: {}".format(
48+
response_body, response_message.correlation_id
49+
)
50+
)
4151
print("------------------------------------------------------")
4252
time.sleep(1)
4353

examples/rpc/server.py

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
# create a responder
2020

21+
2122
class Responder:
2223
class ResponderMessageHandler(AMQPMessagingHandler):
2324

@@ -31,18 +32,20 @@ def set_publisher(self, publisher):
3132
def on_amqp_message(self, event: Event):
3233
# process the message and create a response
3334
print("******************************************************")
34-
print("received message: {} ".format(Converter.bytes_to_string(event.message.body)))
35-
response_body = Converter.bytes_to_string(
36-
event.message.body) + "-from the server"
37-
response_message = Message(
38-
body=Converter.string_to_bytes(response_body))
35+
print(
36+
"received message: {} ".format(
37+
Converter.bytes_to_string(event.message.body)
38+
)
39+
)
40+
response_body = (
41+
Converter.bytes_to_string(event.message.body) + "-from the server"
42+
)
43+
response_message = Message(body=Converter.string_to_bytes(response_body))
3944
# publish response to the reply_to address with the same correlation_id
4045
response_message.correlation_id = event.message.correlation_id
4146
response_message.address = event.message.reply_to
4247
print("sending back: {} ".format(response_body))
43-
status = self._publisher.publish(
44-
message=response_message
45-
)
48+
status = self._publisher.publish(message=response_message)
4649
if status.remote_state == OutcomeState.ACCEPTED:
4750
print("message accepted to {}".format(response_message.address))
4851
elif status.remote_state == OutcomeState.RELEASED:
@@ -65,14 +68,16 @@ def start(self):
6568
self.connection.dial()
6669
self.connection.management().delete_queue(self.request_queue_name)
6770
self.connection.management().declare_queue(
68-
queue_specification=QuorumQueueSpecification(self.request_queue_name))
71+
queue_specification=QuorumQueueSpecification(self.request_queue_name)
72+
)
6973
self.publisher = self.connection.publisher()
7074
handler = self.ResponderMessageHandler()
7175
handler.set_publisher(self.publisher)
7276

73-
self.consumer = self.connection.consumer(destination=AddressHelper.queue_address(self.request_queue_name),
74-
message_handler=handler
75-
)
77+
self.consumer = self.connection.consumer(
78+
destination=AddressHelper.queue_address(self.request_queue_name),
79+
message_handler=handler,
80+
)
7681
addr = self.consumer.address
7782
print("Responder listening on address: {}".format(addr))
7883
try:

0 commit comments

Comments
 (0)