Skip to content

Commit 445ae8e

Browse files
authored
Add asynchronous interface (#86)
* Add asyncio wrapper classes * Closes #49
1 parent c9fc44e commit 445ae8e

26 files changed

+5330
-7
lines changed

README.md

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,48 @@ To run TLS you need to:
3535

3636
Read more about the issue [here](https://stackoverflow.com/questions/44979947/python-qpid-proton-for-mac-using-amqps)
3737

38+
### SSL Problems in local environment
3839

40+
If when running tests, this exceptions is raised by the proton library: `SSLUnavailable`:
41+
``` bash
42+
pip uninstall python-qpid-proton -y
43+
44+
sudo apt-get update
45+
sudo apt-get install -y swig cmake build-essential libssl-dev pkg-config
46+
47+
export PKG_CONFIG_PATH=/usr/lib/x86_64-linux-gnu/pkgconfig
48+
export CFLAGS="-I/usr/include/openssl"
49+
export LDFLAGS="-L/usr/lib/x86_64-linux-gnu"
50+
51+
pip install "python-qpid-proton>=0.39.0,<0.40.0" --no-binary python-qpid-proton --verbose --no-cache-dir
52+
```
53+
54+
### Async Interface (Experimental)
55+
56+
The client provides an async interface via the `rabbitmq_amqp_python_client.asyncio` module. The async classes act as facades that:
57+
58+
- Wrap the corresponding synchronous classes
59+
- Execute blocking operations in a thread pool executor using `run_in_executor`
60+
- Coordinate concurrent access using `asyncio.Lock`
61+
- Implement proper async context managers (`async with`) for resource management
62+
- Maintain API compatibility with the synchronous version
63+
64+
**Key differences from the synchronous interface:**
65+
66+
1. Use `AsyncEnvironment` instead of `Environment`
67+
2. All operations must be awaited with `await`
68+
3. Use `async with` for resource management (connections, publishers, consumers, management)
69+
4. Consumer signal handling uses `asyncio.Event` and `loop.add_signal_handler`
70+
71+
For a complete example showing proper consumer termination and signal handling, refer to:
3972

73+
- [examples/getting_started/getting_started_async.py](./examples/getting_started/getting_started_async.py)
4074

75+
Additional async examples are available in the [examples](./examples) folder:
4176

77+
- OAuth2: [examples/oauth/oAuth2_async.py](./examples/oauth/oAuth2_async.py)
78+
- Reconnection: [examples/reconnection/reconnection_example_async.py](./examples/reconnection/reconnection_example_async.py)
79+
- Streams: [examples/streams/example_with_streams_async.py](./examples/streams/example_with_streams_async.py)
80+
- TLS: [examples/tls/tls_example_async.py](./examples/tls/tls_example_async.py)
4281

4382

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
# type: ignore
2+
3+
import asyncio
4+
import signal
5+
6+
from rabbitmq_amqp_python_client import (
7+
AddressHelper,
8+
AMQPMessagingHandler,
9+
AsyncEnvironment,
10+
Converter,
11+
Event,
12+
ExchangeSpecification,
13+
ExchangeToQueueBindingSpecification,
14+
Message,
15+
OutcomeState,
16+
QuorumQueueSpecification,
17+
)
18+
19+
MESSAGES_TO_PUBLISH = 100
20+
21+
22+
class StopConsumerException(Exception):
23+
"""Exception to signal consumer should stop"""
24+
25+
pass
26+
27+
28+
class MyMessageHandler(AMQPMessagingHandler):
29+
def __init__(self):
30+
super().__init__()
31+
self._count = 0
32+
33+
def on_amqp_message(self, event: Event):
34+
print(
35+
"received message: {} ".format(
36+
Converter.bytes_to_string(event.message.body)
37+
)
38+
)
39+
40+
self.delivery_context.accept(event)
41+
self._count = self._count + 1
42+
print("count " + str(self._count))
43+
44+
def on_connection_closed(self, event: Event):
45+
print("connection closed")
46+
47+
def on_link_closed(self, event: Event) -> None:
48+
print("link closed")
49+
50+
51+
async def main():
52+
exchange_name = "test-exchange"
53+
queue_name = "example-queue"
54+
routing_key = "routing-key"
55+
56+
print("connection to amqp server")
57+
async with AsyncEnvironment(
58+
uri="amqp://guest:guest@localhost:5672/"
59+
) as environment:
60+
async with await environment.connection() as connection:
61+
async with await connection.management() as management:
62+
print("declaring exchange and queue")
63+
await management.declare_exchange(
64+
ExchangeSpecification(name=exchange_name)
65+
)
66+
await management.declare_queue(
67+
QuorumQueueSpecification(name=queue_name)
68+
)
69+
70+
print("binding queue to exchange")
71+
bind_name = await management.bind(
72+
ExchangeToQueueBindingSpecification(
73+
source_exchange=exchange_name,
74+
destination_queue=queue_name,
75+
binding_key=routing_key,
76+
)
77+
)
78+
79+
addr = AddressHelper.exchange_address(exchange_name, routing_key)
80+
addr_queue = AddressHelper.queue_address(queue_name)
81+
82+
print("create a publisher and publish a test message")
83+
async with await connection.publisher(addr) as publisher:
84+
print("purging the queue")
85+
messages_purged = await management.purge_queue(queue_name)
86+
print("messages purged: " + str(messages_purged))
87+
88+
# publish messages
89+
for i in range(MESSAGES_TO_PUBLISH):
90+
status = await publisher.publish(
91+
Message(
92+
body=Converter.string_to_bytes(
93+
"test message {} ".format(i)
94+
)
95+
)
96+
)
97+
if status.remote_state == OutcomeState.ACCEPTED:
98+
print("message accepted")
99+
100+
print(
101+
"create a consumer and consume the test message - press control + c to terminate"
102+
)
103+
handler = MyMessageHandler()
104+
async with await connection.consumer(
105+
addr_queue, message_handler=handler
106+
) as consumer:
107+
# Create stop event and signal handler
108+
stop_event = asyncio.Event()
109+
110+
def handle_sigint():
111+
print("\nCtrl+C detected, stopping consumer gracefully...")
112+
stop_event.set()
113+
114+
# Register signal handler
115+
loop = asyncio.get_running_loop()
116+
loop.add_signal_handler(signal.SIGINT, handle_sigint)
117+
118+
try:
119+
# Run consumer in background
120+
consumer_task = asyncio.create_task(consumer.run())
121+
122+
# Wait for stop signal
123+
await stop_event.wait()
124+
125+
# Stop consumer gracefully
126+
print("Stopping consumer...")
127+
await consumer.stop_processing()
128+
129+
# Wait for task to complete
130+
try:
131+
await asyncio.wait_for(consumer_task, timeout=3.0)
132+
except asyncio.TimeoutError:
133+
print("Consumer task timed out")
134+
135+
finally:
136+
loop.remove_signal_handler(signal.SIGINT)
137+
138+
print("unbind")
139+
await management.unbind(bind_name)
140+
141+
print("delete queue")
142+
await management.delete_queue(queue_name)
143+
144+
print("delete exchange")
145+
await management.delete_exchange(exchange_name)
146+
147+
148+
if __name__ == "__main__":
149+
asyncio.run(main())

0 commit comments

Comments
 (0)