Skip to content

Commit ba88885

Browse files
DanielePalaiaDanielePalaiaGsantomaggio
authored
Implementing Connection, Management and Publisher modules (#10)
* connection layer implementation * WIP * implementing Declare Exchange/Queue * test for gabriele * body encodiding Signed-off-by: Gabriele Santomaggio <[email protected]> * adding basic tests * implementing bind * adding user-defined exception * Adding debugging info * publisher implementation * adding publisher basic test * improve help_address utility functions * modify example * integrate qpid-proton lib * implementing purge operation * improving url helper * cleaning up qpid unecessary folders and files * some improvements * implementing queue_info * fixing queue arguments management * better management of arguments * improved arguments management during declare_queue * adding purge test * adding fixtures in tests * adding a publisher test * removing useless queue_type parameter * removing receiver from publisher --------- Signed-off-by: Gabriele Santomaggio <[email protected]> Co-authored-by: DanielePalaia <daniele985@@gmail.com> Co-authored-by: Gabriele Santomaggio <[email protected]>
1 parent b6a619c commit ba88885

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+13023
-38
lines changed

.github/workflows/build-test.yaml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,13 +39,13 @@ jobs:
3939
- name: poetry install
4040
run: poetry install --no-root
4141
- name: isort check-only
42-
run: poetry run isort --check-only .
42+
run: poetry run isort --skip rabbitmq_amqp_python_client/qpid --check-only .
4343
- name: black check
4444
run: poetry run black --check .
4545
- name: flake8
46-
run: poetry run flake8 --exclude=venv,local_tests,docs/examples --max-line-length=120 --ignore=E203,W503
46+
run: poetry run flake8 --exclude=venv,local_tests,docs/examples,rabbitmq_amqp_python_client/qpid --max-line-length=120 --ignore=E203,W503
4747
- name: mypy
4848
run: |
49-
poetry run mypy .
49+
poetry run mypy --exclude=rabbitmq_amqp_python_client/qpid .
5050
- name: poetry run pytest
5151
run: poetry run pytest

README.md

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,17 @@
22

33
This library is in early stages of development. It is meant to be used with RabbitMQ 4.0.
44

5-
## How to Run
5+
## How to Build the project and run the tests
6+
7+
- Start a RabbitMQ 4.x broker
8+
- poetry build: build the source project
9+
- poetry install: resolves and install dependencies
10+
- poetry run pytest: run the tests
611

712
## Getting Started
813

14+
An example is provide in ./getting_started_main.py you can run it after starting a RabbitMQ 4.0 broker with:
15+
16+
poetry run python ./examples/getting_started/main.py
17+
918

examples/getting_started/main.py

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
from rabbitmq_amqp_python_client import (
2+
BindingSpecification,
3+
Connection,
4+
ExchangeSpecification,
5+
Message,
6+
QuorumQueueSpecification,
7+
exchange_address,
8+
)
9+
10+
11+
def main() -> None:
12+
exchange_name = "test-exchange"
13+
queue_name = "example-queue"
14+
routing_key = "routing-key"
15+
connection = Connection("amqp://guest:guest@localhost:5672/")
16+
17+
print("connection to amqp server")
18+
connection.dial()
19+
20+
management = connection.management()
21+
22+
print("declaring exchange and queue")
23+
management.declare_exchange(ExchangeSpecification(name=exchange_name, arguments={}))
24+
25+
management.declare_queue(QuorumQueueSpecification(name=queue_name))
26+
27+
print("binding queue to exchange")
28+
bind_name = management.bind(
29+
BindingSpecification(
30+
source_exchange=exchange_name,
31+
destination_queue=queue_name,
32+
binding_key=routing_key,
33+
)
34+
)
35+
36+
addr = exchange_address(exchange_name, routing_key)
37+
38+
print("create a publisher and publish a test message")
39+
publisher = connection.publisher(addr)
40+
41+
publisher.publish(Message(body="test"))
42+
43+
publisher.close()
44+
45+
print("unbind")
46+
management.unbind(bind_name)
47+
48+
print("purging the queue")
49+
management.purge_queue(queue_name)
50+
51+
print("delete queue")
52+
management.delete_queue(queue_name)
53+
54+
print("delete exchange")
55+
management.delete_exchange(exchange_name)
56+
57+
print("closing connections")
58+
management.close()
59+
connection.close()
60+
61+
62+
if __name__ == "__main__":
63+
main()

poetry.lock

Lines changed: 30 additions & 30 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
from importlib import metadata
2+
3+
from .address_helper import exchange_address, queue_address
4+
from .common import QueueType
5+
from .connection import Connection
6+
from .entities import (
7+
BindingSpecification,
8+
ExchangeSpecification,
9+
)
10+
from .management import Management
11+
from .publisher import Publisher
12+
from .qpid.proton._message import Message
13+
from .queues import (
14+
ClassicQueueSpecification,
15+
QuorumQueueSpecification,
16+
StreamSpecification,
17+
)
18+
19+
try:
20+
__version__ = metadata.version(__package__)
21+
__license__ = metadata.metadata(__package__)["license"]
22+
except metadata.PackageNotFoundError:
23+
__version__ = "dev"
24+
__license__ = None
25+
26+
del metadata
27+
28+
__all__ = [
29+
"Connection",
30+
"Management",
31+
"ExchangeSpecification",
32+
"QuorumQueueSpecification",
33+
"ClassicQueueSpecification",
34+
"StreamSpecification",
35+
"BindingSpecification",
36+
"QueueType",
37+
"Publisher",
38+
"exchange_address",
39+
"queue_address",
40+
"Message",
41+
]
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
from .entities import BindingSpecification
2+
3+
4+
def is_unreserved(char: str) -> bool:
5+
# According to RFC 3986, unreserved characters are A-Z, a-z, 0-9, '-', '.', '_', and '~'
6+
return char.isalnum() or char in "-._~"
7+
8+
9+
def encode_path_segment(input_string: str) -> str:
10+
encoded = []
11+
12+
# Iterate over each character in the input string
13+
for char in input_string:
14+
# Check if the character is an unreserved character
15+
if is_unreserved(char):
16+
encoded.append(char) # Append as is
17+
else:
18+
# Encode character to %HH format
19+
encoded.append(f"%{ord(char):02X}")
20+
21+
return "".join(encoded)
22+
23+
24+
def exchange_address(exchange_name: str, routing_key: str = "") -> str:
25+
if routing_key == "":
26+
path = "/exchanges/" + encode_path_segment(exchange_name)
27+
else:
28+
path = (
29+
"/exchanges/"
30+
+ encode_path_segment(exchange_name)
31+
+ "/"
32+
+ encode_path_segment(routing_key)
33+
)
34+
35+
return path
36+
37+
38+
def queue_address(queue_name: str) -> str:
39+
path = "/queues/" + encode_path_segment(queue_name)
40+
41+
return path
42+
43+
44+
def purge_queue_address(queue_name: str) -> str:
45+
path = "/queues/" + encode_path_segment(queue_name) + "/messages"
46+
47+
return path
48+
49+
50+
def path_address() -> str:
51+
path = "/bindings"
52+
53+
return path
54+
55+
56+
def binding_path_with_exchange_queue(bind_specification: BindingSpecification) -> str:
57+
binding_path_wth_exchange_queue_key = (
58+
"/bindings"
59+
+ "/"
60+
+ "src="
61+
+ encode_path_segment(bind_specification.source_exchange)
62+
+ ";"
63+
+ "dstq="
64+
+ encode_path_segment(bind_specification.destination_queue)
65+
+ ";key="
66+
+ encode_path_segment(bind_specification.binding_key)
67+
+ ";args="
68+
)
69+
return binding_path_wth_exchange_queue_key

rabbitmq_amqp_python_client/common.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
import enum
2+
3+
4+
class CommonValues(enum.Enum):
5+
response_code_200 = 200
6+
response_code_201 = 201
7+
response_code_204 = 204
8+
response_code_404 = 404
9+
response_code_409 = 409
10+
command_put = "PUT"
11+
command_get = "GET"
12+
command_post = "POST"
13+
command_delete = "DELETE"
14+
command_reply_to = "$me"
15+
management_node_address = "/management"
16+
link_pair_name = "management-link-pair"
17+
exchanges = "exchanges"
18+
key = "key"
19+
queue = "queues"
20+
bindings = "bindings"
21+
22+
23+
class ExchangeType(enum.Enum):
24+
direct = "direct"
25+
topic = "topic"
26+
fanout = "fanout"
27+
28+
29+
class QueueType(enum.Enum):
30+
quorum = "quorum"
31+
classic = "classic"
32+
stream = "stream"
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import logging
2+
3+
from .management import Management
4+
from .publisher import Publisher
5+
from .qpid.proton.utils import BlockingConnection
6+
7+
logger = logging.getLogger(__name__)
8+
9+
10+
class Connection:
11+
def __init__(self, addr: str):
12+
self._addr: str = addr
13+
self._conn: BlockingConnection
14+
self._management: Management
15+
16+
def dial(self) -> None:
17+
logger.debug("Establishing a connection to the amqp server")
18+
self._conn = BlockingConnection(self._addr)
19+
self._open()
20+
logger.debug("Connection to the server established")
21+
22+
def _open(self) -> None:
23+
self._management = Management(self._conn)
24+
self._management.open()
25+
26+
def management(self) -> Management:
27+
return self._management
28+
29+
# closes the connection to the AMQP 1.0 server.
30+
def close(self) -> None:
31+
logger.debug("Closing connection")
32+
self._conn.close()
33+
34+
def publisher(self, destination: str) -> Publisher:
35+
publisher = Publisher(self._conn, destination)
36+
return publisher

0 commit comments

Comments
 (0)