diff --git a/.github/workflows/build-test.yaml b/.github/workflows/build-test.yaml
index 88ffc94..d4c98e4 100644
--- a/.github/workflows/build-test.yaml
+++ b/.github/workflows/build-test.yaml
@@ -39,13 +39,13 @@ jobs:
- name: poetry install
run: poetry install --no-root
- name: isort check-only
- run: poetry run isort --check-only .
+ run: poetry run isort --skip rabbitmq_amqp_python_client/qpid --check-only .
- name: black check
run: poetry run black --check .
- name: flake8
- run: poetry run flake8 --exclude=venv,local_tests,docs/examples --max-line-length=120 --ignore=E203,W503
+ run: poetry run flake8 --exclude=venv,local_tests,docs/examples,rabbitmq_amqp_python_client/qpid --max-line-length=120 --ignore=E203,W503
- name: mypy
run: |
- poetry run mypy .
+ poetry run mypy --exclude=rabbitmq_amqp_python_client/qpid .
- name: poetry run pytest
run: poetry run pytest
\ No newline at end of file
diff --git a/README.md b/README.md
index 6b98afb..98e0029 100644
--- a/README.md
+++ b/README.md
@@ -2,8 +2,17 @@
This library is in early stages of development. It is meant to be used with RabbitMQ 4.0.
-## How to Run
+## How to Build the project and run the tests
+
+- Start a RabbitMQ 4.x broker
+- poetry build: build the source project
+- poetry install: resolves and install dependencies
+- poetry run pytest: run the tests
## Getting Started
+An example is provide in ./getting_started_main.py you can run it after starting a RabbitMQ 4.0 broker with:
+
+poetry run python ./examples/getting_started/main.py
+
diff --git a/examples/getting_started/main.py b/examples/getting_started/main.py
new file mode 100644
index 0000000..43938fb
--- /dev/null
+++ b/examples/getting_started/main.py
@@ -0,0 +1,63 @@
+from rabbitmq_amqp_python_client import (
+ BindingSpecification,
+ Connection,
+ ExchangeSpecification,
+ Message,
+ QuorumQueueSpecification,
+ exchange_address,
+)
+
+
+def main() -> None:
+ exchange_name = "test-exchange"
+ queue_name = "example-queue"
+ routing_key = "routing-key"
+ connection = Connection("amqp://guest:guest@localhost:5672/")
+
+ print("connection to amqp server")
+ connection.dial()
+
+ management = connection.management()
+
+ print("declaring exchange and queue")
+ management.declare_exchange(ExchangeSpecification(name=exchange_name, arguments={}))
+
+ management.declare_queue(QuorumQueueSpecification(name=queue_name))
+
+ print("binding queue to exchange")
+ bind_name = management.bind(
+ BindingSpecification(
+ source_exchange=exchange_name,
+ destination_queue=queue_name,
+ binding_key=routing_key,
+ )
+ )
+
+ addr = exchange_address(exchange_name, routing_key)
+
+ print("create a publisher and publish a test message")
+ publisher = connection.publisher(addr)
+
+ publisher.publish(Message(body="test"))
+
+ publisher.close()
+
+ print("unbind")
+ management.unbind(bind_name)
+
+ print("purging the queue")
+ management.purge_queue(queue_name)
+
+ print("delete queue")
+ management.delete_queue(queue_name)
+
+ print("delete exchange")
+ management.delete_exchange(exchange_name)
+
+ print("closing connections")
+ management.close()
+ connection.close()
+
+
+if __name__ == "__main__":
+ main()
diff --git a/poetry.lock b/poetry.lock
index 48d59dc..72ed139 100644
--- a/poetry.lock
+++ b/poetry.lock
@@ -2,33 +2,33 @@
[[package]]
name = "black"
-version = "24.3.0"
+version = "24.10.0"
description = "The uncompromising code formatter."
optional = false
-python-versions = ">=3.8"
+python-versions = ">=3.9"
files = [
- {file = "black-24.3.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:7d5e026f8da0322b5662fa7a8e752b3fa2dac1c1cbc213c3d7ff9bdd0ab12395"},
- {file = "black-24.3.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:9f50ea1132e2189d8dff0115ab75b65590a3e97de1e143795adb4ce317934995"},
- {file = "black-24.3.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:e2af80566f43c85f5797365077fb64a393861a3730bd110971ab7a0c94e873e7"},
- {file = "black-24.3.0-cp310-cp310-win_amd64.whl", hash = "sha256:4be5bb28e090456adfc1255e03967fb67ca846a03be7aadf6249096100ee32d0"},
- {file = "black-24.3.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:4f1373a7808a8f135b774039f61d59e4be7eb56b2513d3d2f02a8b9365b8a8a9"},
- {file = "black-24.3.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:aadf7a02d947936ee418777e0247ea114f78aff0d0959461057cae8a04f20597"},
- {file = "black-24.3.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:65c02e4ea2ae09d16314d30912a58ada9a5c4fdfedf9512d23326128ac08ac3d"},
- {file = "black-24.3.0-cp311-cp311-win_amd64.whl", hash = "sha256:bf21b7b230718a5f08bd32d5e4f1db7fc8788345c8aea1d155fc17852b3410f5"},
- {file = "black-24.3.0-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:2818cf72dfd5d289e48f37ccfa08b460bf469e67fb7c4abb07edc2e9f16fb63f"},
- {file = "black-24.3.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:4acf672def7eb1725f41f38bf6bf425c8237248bb0804faa3965c036f7672d11"},
- {file = "black-24.3.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:c7ed6668cbbfcd231fa0dc1b137d3e40c04c7f786e626b405c62bcd5db5857e4"},
- {file = "black-24.3.0-cp312-cp312-win_amd64.whl", hash = "sha256:56f52cfbd3dabe2798d76dbdd299faa046a901041faf2cf33288bc4e6dae57b5"},
- {file = "black-24.3.0-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:79dcf34b33e38ed1b17434693763301d7ccbd1c5860674a8f871bd15139e7837"},
- {file = "black-24.3.0-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:e19cb1c6365fd6dc38a6eae2dcb691d7d83935c10215aef8e6c38edee3f77abd"},
- {file = "black-24.3.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:65b76c275e4c1c5ce6e9870911384bff5ca31ab63d19c76811cb1fb162678213"},
- {file = "black-24.3.0-cp38-cp38-win_amd64.whl", hash = "sha256:b5991d523eee14756f3c8d5df5231550ae8993e2286b8014e2fdea7156ed0959"},
- {file = "black-24.3.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:c45f8dff244b3c431b36e3224b6be4a127c6aca780853574c00faf99258041eb"},
- {file = "black-24.3.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:6905238a754ceb7788a73f02b45637d820b2f5478b20fec82ea865e4f5d4d9f7"},
- {file = "black-24.3.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d7de8d330763c66663661a1ffd432274a2f92f07feeddd89ffd085b5744f85e7"},
- {file = "black-24.3.0-cp39-cp39-win_amd64.whl", hash = "sha256:7bb041dca0d784697af4646d3b62ba4a6b028276ae878e53f6b4f74ddd6db99f"},
- {file = "black-24.3.0-py3-none-any.whl", hash = "sha256:41622020d7120e01d377f74249e677039d20e6344ff5851de8a10f11f513bf93"},
- {file = "black-24.3.0.tar.gz", hash = "sha256:a0c9c4a0771afc6919578cec71ce82a3e31e054904e7197deacbc9382671c41f"},
+ {file = "black-24.10.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:e6668650ea4b685440857138e5fe40cde4d652633b1bdffc62933d0db4ed9812"},
+ {file = "black-24.10.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:1c536fcf674217e87b8cc3657b81809d3c085d7bf3ef262ead700da345bfa6ea"},
+ {file = "black-24.10.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:649fff99a20bd06c6f727d2a27f401331dc0cc861fb69cde910fe95b01b5928f"},
+ {file = "black-24.10.0-cp310-cp310-win_amd64.whl", hash = "sha256:fe4d6476887de70546212c99ac9bd803d90b42fc4767f058a0baa895013fbb3e"},
+ {file = "black-24.10.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:5a2221696a8224e335c28816a9d331a6c2ae15a2ee34ec857dcf3e45dbfa99ad"},
+ {file = "black-24.10.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:f9da3333530dbcecc1be13e69c250ed8dfa67f43c4005fb537bb426e19200d50"},
+ {file = "black-24.10.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:4007b1393d902b48b36958a216c20c4482f601569d19ed1df294a496eb366392"},
+ {file = "black-24.10.0-cp311-cp311-win_amd64.whl", hash = "sha256:394d4ddc64782e51153eadcaaca95144ac4c35e27ef9b0a42e121ae7e57a9175"},
+ {file = "black-24.10.0-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:b5e39e0fae001df40f95bd8cc36b9165c5e2ea88900167bddf258bacef9bbdc3"},
+ {file = "black-24.10.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:d37d422772111794b26757c5b55a3eade028aa3fde43121ab7b673d050949d65"},
+ {file = "black-24.10.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:14b3502784f09ce2443830e3133dacf2c0110d45191ed470ecb04d0f5f6fcb0f"},
+ {file = "black-24.10.0-cp312-cp312-win_amd64.whl", hash = "sha256:30d2c30dc5139211dda799758559d1b049f7f14c580c409d6ad925b74a4208a8"},
+ {file = "black-24.10.0-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:1cbacacb19e922a1d75ef2b6ccaefcd6e93a2c05ede32f06a21386a04cedb981"},
+ {file = "black-24.10.0-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:1f93102e0c5bb3907451063e08b9876dbeac810e7da5a8bfb7aeb5a9ef89066b"},
+ {file = "black-24.10.0-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:ddacb691cdcdf77b96f549cf9591701d8db36b2f19519373d60d31746068dbf2"},
+ {file = "black-24.10.0-cp313-cp313-win_amd64.whl", hash = "sha256:680359d932801c76d2e9c9068d05c6b107f2584b2a5b88831c83962eb9984c1b"},
+ {file = "black-24.10.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:17374989640fbca88b6a448129cd1745c5eb8d9547b464f281b251dd00155ccd"},
+ {file = "black-24.10.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:63f626344343083322233f175aaf372d326de8436f5928c042639a4afbbf1d3f"},
+ {file = "black-24.10.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:ccfa1d0cb6200857f1923b602f978386a3a2758a65b52e0950299ea014be6800"},
+ {file = "black-24.10.0-cp39-cp39-win_amd64.whl", hash = "sha256:2cd9c95431d94adc56600710f8813ee27eea544dd118d45896bb734e9d7a0dc7"},
+ {file = "black-24.10.0-py3-none-any.whl", hash = "sha256:3bb2b7a1f7b685f85b11fed1ef10f8a9148bceb49853e47a294a3dd963c1dd7d"},
+ {file = "black-24.10.0.tar.gz", hash = "sha256:846ea64c97afe3bc677b761787993be4991810ecc7a4a937816dd6bddedc4875"},
]
[package.dependencies]
@@ -42,7 +42,7 @@ typing-extensions = {version = ">=4.0.1", markers = "python_version < \"3.11\""}
[package.extras]
colorama = ["colorama (>=0.4.3)"]
-d = ["aiohttp (>=3.7.4)", "aiohttp (>=3.7.4,!=3.9.0)"]
+d = ["aiohttp (>=3.10)"]
jupyter = ["ipython (>=7.8.0)", "tokenize-rt (>=3.2.0)"]
uvloop = ["uvloop (>=0.15.2)"]
@@ -127,13 +127,13 @@ pycparser = "*"
[[package]]
name = "click"
-version = "8.1.7"
+version = "8.1.8"
description = "Composable command line interface toolkit"
optional = false
python-versions = ">=3.7"
files = [
- {file = "click-8.1.7-py3-none-any.whl", hash = "sha256:ae74fb96c20a0277a1d615f1e4d73c8414f5a98db8b799a7931d1582f3390c28"},
- {file = "click-8.1.7.tar.gz", hash = "sha256:ca9853ad459e787e2192211578cc907e7594e294c7ccc834310722b41b9ca6de"},
+ {file = "click-8.1.8-py3-none-any.whl", hash = "sha256:63c132bbbed01578a06712a2d1f497bb62d9c1c0d329b7903a866228027263b2"},
+ {file = "click-8.1.8.tar.gz", hash = "sha256:ed53c9d8990d83c2a27deae68e4ee337473f6330c040a31d4225c9574d16096a"},
]
[package.dependencies]
@@ -458,5 +458,5 @@ files = [
[metadata]
lock-version = "2.0"
-python-versions = "^3.9"
-content-hash = "bfe651ba3823b09c6d96c9e066187567a750a502a186c9a95955a6535a7134a1"
+python-versions = "^3.10"
+content-hash = "1d6eaec017e031690d3de190f4e59fbac5ad528724b05086f347758ec991b020"
diff --git a/rabbitmq_amqp_python_client/__init__.py b/rabbitmq_amqp_python_client/__init__.py
index e69de29..91bf819 100644
--- a/rabbitmq_amqp_python_client/__init__.py
+++ b/rabbitmq_amqp_python_client/__init__.py
@@ -0,0 +1,41 @@
+from importlib import metadata
+
+from .address_helper import exchange_address, queue_address
+from .common import QueueType
+from .connection import Connection
+from .entities import (
+ BindingSpecification,
+ ExchangeSpecification,
+)
+from .management import Management
+from .publisher import Publisher
+from .qpid.proton._message import Message
+from .queues import (
+ ClassicQueueSpecification,
+ QuorumQueueSpecification,
+ StreamSpecification,
+)
+
+try:
+ __version__ = metadata.version(__package__)
+ __license__ = metadata.metadata(__package__)["license"]
+except metadata.PackageNotFoundError:
+ __version__ = "dev"
+ __license__ = None
+
+del metadata
+
+__all__ = [
+ "Connection",
+ "Management",
+ "ExchangeSpecification",
+ "QuorumQueueSpecification",
+ "ClassicQueueSpecification",
+ "StreamSpecification",
+ "BindingSpecification",
+ "QueueType",
+ "Publisher",
+ "exchange_address",
+ "queue_address",
+ "Message",
+]
diff --git a/rabbitmq_amqp_python_client/address_helper.py b/rabbitmq_amqp_python_client/address_helper.py
new file mode 100644
index 0000000..68a0ef6
--- /dev/null
+++ b/rabbitmq_amqp_python_client/address_helper.py
@@ -0,0 +1,69 @@
+from .entities import BindingSpecification
+
+
+def is_unreserved(char: str) -> bool:
+ # According to RFC 3986, unreserved characters are A-Z, a-z, 0-9, '-', '.', '_', and '~'
+ return char.isalnum() or char in "-._~"
+
+
+def encode_path_segment(input_string: str) -> str:
+ encoded = []
+
+ # Iterate over each character in the input string
+ for char in input_string:
+ # Check if the character is an unreserved character
+ if is_unreserved(char):
+ encoded.append(char) # Append as is
+ else:
+ # Encode character to %HH format
+ encoded.append(f"%{ord(char):02X}")
+
+ return "".join(encoded)
+
+
+def exchange_address(exchange_name: str, routing_key: str = "") -> str:
+ if routing_key == "":
+ path = "/exchanges/" + encode_path_segment(exchange_name)
+ else:
+ path = (
+ "/exchanges/"
+ + encode_path_segment(exchange_name)
+ + "/"
+ + encode_path_segment(routing_key)
+ )
+
+ return path
+
+
+def queue_address(queue_name: str) -> str:
+ path = "/queues/" + encode_path_segment(queue_name)
+
+ return path
+
+
+def purge_queue_address(queue_name: str) -> str:
+ path = "/queues/" + encode_path_segment(queue_name) + "/messages"
+
+ return path
+
+
+def path_address() -> str:
+ path = "/bindings"
+
+ return path
+
+
+def binding_path_with_exchange_queue(bind_specification: BindingSpecification) -> str:
+ binding_path_wth_exchange_queue_key = (
+ "/bindings"
+ + "/"
+ + "src="
+ + encode_path_segment(bind_specification.source_exchange)
+ + ";"
+ + "dstq="
+ + encode_path_segment(bind_specification.destination_queue)
+ + ";key="
+ + encode_path_segment(bind_specification.binding_key)
+ + ";args="
+ )
+ return binding_path_wth_exchange_queue_key
diff --git a/rabbitmq_amqp_python_client/common.py b/rabbitmq_amqp_python_client/common.py
new file mode 100644
index 0000000..eeef54f
--- /dev/null
+++ b/rabbitmq_amqp_python_client/common.py
@@ -0,0 +1,32 @@
+import enum
+
+
+class CommonValues(enum.Enum):
+ response_code_200 = 200
+ response_code_201 = 201
+ response_code_204 = 204
+ response_code_404 = 404
+ response_code_409 = 409
+ command_put = "PUT"
+ command_get = "GET"
+ command_post = "POST"
+ command_delete = "DELETE"
+ command_reply_to = "$me"
+ management_node_address = "/management"
+ link_pair_name = "management-link-pair"
+ exchanges = "exchanges"
+ key = "key"
+ queue = "queues"
+ bindings = "bindings"
+
+
+class ExchangeType(enum.Enum):
+ direct = "direct"
+ topic = "topic"
+ fanout = "fanout"
+
+
+class QueueType(enum.Enum):
+ quorum = "quorum"
+ classic = "classic"
+ stream = "stream"
diff --git a/rabbitmq_amqp_python_client/connection.py b/rabbitmq_amqp_python_client/connection.py
new file mode 100644
index 0000000..b9a9b96
--- /dev/null
+++ b/rabbitmq_amqp_python_client/connection.py
@@ -0,0 +1,36 @@
+import logging
+
+from .management import Management
+from .publisher import Publisher
+from .qpid.proton.utils import BlockingConnection
+
+logger = logging.getLogger(__name__)
+
+
+class Connection:
+ def __init__(self, addr: str):
+ self._addr: str = addr
+ self._conn: BlockingConnection
+ self._management: Management
+
+ def dial(self) -> None:
+ logger.debug("Establishing a connection to the amqp server")
+ self._conn = BlockingConnection(self._addr)
+ self._open()
+ logger.debug("Connection to the server established")
+
+ def _open(self) -> None:
+ self._management = Management(self._conn)
+ self._management.open()
+
+ def management(self) -> Management:
+ return self._management
+
+ # closes the connection to the AMQP 1.0 server.
+ def close(self) -> None:
+ logger.debug("Closing connection")
+ self._conn.close()
+
+ def publisher(self, destination: str) -> Publisher:
+ publisher = Publisher(self._conn, destination)
+ return publisher
diff --git a/rabbitmq_amqp_python_client/entities.py b/rabbitmq_amqp_python_client/entities.py
new file mode 100644
index 0000000..bed44b1
--- /dev/null
+++ b/rabbitmq_amqp_python_client/entities.py
@@ -0,0 +1,35 @@
+from dataclasses import dataclass
+from typing import Any, Optional
+
+from .common import ExchangeType, QueueType
+
+
+@dataclass
+class ExchangeSpecification:
+ name: str
+ arguments: dict[str, str]
+ exchange_type: ExchangeType = ExchangeType.direct
+ is_auto_delete: bool = False
+ is_internal: bool = False
+ is_durable: bool = True
+
+
+@dataclass
+class QueueInfo:
+ name: str
+ arguments: dict[str, Any]
+ queue_type: QueueType = QueueType.quorum
+ is_exclusive: Optional[bool] = None
+ is_auto_delete: bool = False
+ is_durable: bool = True
+ leader: str = ""
+ members: str = ""
+ message_count: int = 0
+ consumer_count: int = 0
+
+
+@dataclass
+class BindingSpecification:
+ source_exchange: str
+ destination_queue: str
+ binding_key: str
diff --git a/rabbitmq_amqp_python_client/exceptions.py b/rabbitmq_amqp_python_client/exceptions.py
new file mode 100644
index 0000000..141e73f
--- /dev/null
+++ b/rabbitmq_amqp_python_client/exceptions.py
@@ -0,0 +1,7 @@
+class ValidationCodeException(BaseException):
+ # Constructor or Initializer
+ def __init__(self, msg: str):
+ self.msg = msg
+
+ def __str__(self) -> str:
+ return repr(self.msg)
diff --git a/rabbitmq_amqp_python_client/management.py b/rabbitmq_amqp_python_client/management.py
new file mode 100644
index 0000000..23d3067
--- /dev/null
+++ b/rabbitmq_amqp_python_client/management.py
@@ -0,0 +1,376 @@
+import logging
+import uuid
+from typing import Any, Optional, Union
+
+from .address_helper import (
+ binding_path_with_exchange_queue,
+ exchange_address,
+ path_address,
+ purge_queue_address,
+ queue_address,
+)
+from .common import CommonValues, QueueType
+from .entities import (
+ BindingSpecification,
+ ExchangeSpecification,
+ QueueInfo,
+)
+from .exceptions import ValidationCodeException
+from .options import ReceiverOption, SenderOption
+from .qpid.proton._message import Message
+from .qpid.proton.utils import (
+ BlockingConnection,
+ BlockingReceiver,
+ BlockingSender,
+)
+from .queues import (
+ ClassicQueueSpecification,
+ QuorumQueueSpecification,
+ StreamSpecification,
+)
+
+logger = logging.getLogger(__name__)
+
+
+class Management:
+ def __init__(self, conn: BlockingConnection):
+ self._sender: Optional[BlockingSender] = None
+ self._receiver: Optional[BlockingReceiver] = None
+ self._conn = conn
+
+ def open(self) -> None:
+ if self._sender is None:
+ logger.debug("Creating Sender")
+ self._sender = self._create_sender(
+ CommonValues.management_node_address.value
+ )
+ if self._receiver is None:
+ logger.debug("Creating Receiver")
+ self._receiver = self._create_receiver(
+ CommonValues.management_node_address.value,
+ )
+
+ def _create_sender(self, addr: str) -> BlockingSender:
+ return self._conn.create_sender(addr, options=SenderOption(addr))
+
+ def _create_receiver(self, addr: str) -> BlockingReceiver:
+ return self._conn.create_receiver(addr, options=ReceiverOption(addr))
+
+ # closes the connection to the AMQP 1.0 server.
+ def close(self) -> None:
+ logger.debug("Closing Sender and Receiver")
+ if self._sender is not None:
+ self._sender.close()
+ if self._receiver is not None:
+ self._receiver.close()
+
+ def request(
+ self,
+ body: Any,
+ path: str,
+ method: str,
+ expected_response_codes: list[int],
+ ) -> Message:
+ return self._request(
+ str(uuid.uuid4()), body, path, method, expected_response_codes
+ )
+
+ def _request(
+ self,
+ id: str,
+ body: Any,
+ path: str,
+ method: str,
+ expected_response_codes: list[int],
+ ) -> Message:
+ amq_message = Message(
+ id=id,
+ body=body,
+ reply_to="$me",
+ address=path,
+ subject=method,
+ )
+
+ if self._sender is not None:
+ logger.debug("Sending message: " + str(amq_message))
+ self._sender.send(amq_message)
+
+ if self._receiver is not None:
+ msg = self._receiver.receive()
+ logger.debug("Received message: " + str(msg))
+
+ self._validate_reponse_code(int(msg.subject), expected_response_codes)
+ return msg
+
+ def declare_exchange(
+ self, exchange_specification: ExchangeSpecification
+ ) -> ExchangeSpecification:
+ logger.debug("declare_exchange operation called")
+ body = {}
+ body["auto_delete"] = exchange_specification.is_auto_delete
+ body["durable"] = exchange_specification.is_durable
+ body["type"] = exchange_specification.exchange_type.value # type: ignore
+ body["internal"] = exchange_specification.is_internal
+ body["arguments"] = exchange_specification.arguments # type: ignore
+
+ path = exchange_address(exchange_specification.name)
+
+ self.request(
+ body,
+ path,
+ CommonValues.command_put.value,
+ [
+ CommonValues.response_code_201.value,
+ CommonValues.response_code_204.value,
+ CommonValues.response_code_409.value,
+ ],
+ )
+
+ return exchange_specification
+
+ def declare_queue(
+ self,
+ queue_specification: Union[
+ ClassicQueueSpecification, QuorumQueueSpecification, StreamSpecification
+ ],
+ ) -> Union[
+ ClassicQueueSpecification, QuorumQueueSpecification, StreamSpecification
+ ]:
+ logger.debug("declare_queue operation called")
+
+ if isinstance(queue_specification, ClassicQueueSpecification) or isinstance(
+ queue_specification, QuorumQueueSpecification
+ ):
+ body = self._declare_queue(queue_specification)
+
+ elif isinstance(queue_specification, StreamSpecification):
+ body = self._declare_stream(queue_specification)
+
+ path = queue_address(queue_specification.name)
+
+ self.request(
+ body,
+ path,
+ CommonValues.command_put.value,
+ [
+ CommonValues.response_code_200.value,
+ CommonValues.response_code_201.value,
+ CommonValues.response_code_409.value,
+ ],
+ )
+
+ return queue_specification
+
+ def _declare_queue(
+ self,
+ queue_specification: Union[ClassicQueueSpecification, QuorumQueueSpecification],
+ ) -> dict[str, Any]:
+
+ body = {}
+ args: dict[str, Any] = {}
+
+ body["auto_delete"] = queue_specification.is_auto_delete
+ body["durable"] = queue_specification.is_durable
+
+ if queue_specification.dead_letter_exchange is not None:
+ args["x-dead-letter-exchange"] = queue_specification.dead_letter_exchange
+ if queue_specification.dead_letter_routing_key is not None:
+ args["x-dead-letter-routing-key"] = (
+ queue_specification.dead_letter_routing_key
+ )
+ if queue_specification.overflow is not None:
+ args["x-overflow"] = queue_specification.overflow
+ if queue_specification.max_len is not None:
+ args["x-max-length"] = queue_specification.max_len
+ if queue_specification.max_len_bytes is not None:
+ args["x-max-length-bytes"] = queue_specification.max_len_bytes
+ if queue_specification.message_ttl is not None:
+ args["x-message-ttl"] = queue_specification.message_ttl
+ if queue_specification.expires is not None:
+ args["x-expires"] = queue_specification.expires
+ if queue_specification.single_active_consumer is not None:
+ args["x-single-active-consumer"] = (
+ queue_specification.single_active_consumer
+ )
+
+ if isinstance(queue_specification, ClassicQueueSpecification):
+ args["x-queue-type"] = QueueType.classic.value
+ if queue_specification.maximum_priority is not None:
+ args["x-maximum-priority"] = queue_specification.maximum_priority
+
+ if isinstance(queue_specification, QuorumQueueSpecification):
+ args["x-queue-type"] = QueueType.quorum.value
+ if queue_specification.deliver_limit is not None:
+ args["x-deliver-limit"] = queue_specification.deliver_limit
+
+ if queue_specification.dead_letter_strategy is not None:
+ args["x-dead-letter-strategy"] = (
+ queue_specification.dead_letter_strategy
+ )
+
+ if queue_specification.quorum_initial_group_size is not None:
+ args["x-initial-quorum-group-size"] = (
+ queue_specification.quorum_initial_group_size
+ )
+
+ if queue_specification.cluster_target_size is not None:
+ args["cluster_target_size"] = queue_specification.cluster_target_size
+
+ body["arguments"] = args # type: ignore
+
+ return body
+
+ def _declare_stream(
+ self, stream_specification: StreamSpecification
+ ) -> dict[str, Any]:
+
+ body = {}
+ args: dict[str, Any] = {}
+
+ args["x-queue-type"] = QueueType.stream.value
+
+ if stream_specification.max_len_bytes is not None:
+ args["x-max-length-bytes"] = stream_specification.max_len_bytes
+
+ if stream_specification.max_time_retention is not None:
+ args["x-max-time-retention"] = stream_specification.max_time_retention
+
+ if stream_specification.max_segment_size_in_bytes is not None:
+ args["x-max-segment-size-in-bytes"] = (
+ stream_specification.max_segment_size_in_bytes
+ )
+
+ if stream_specification.filter_size is not None:
+ args["x-filter-size"] = stream_specification.filter_size
+
+ if stream_specification.initial_group_size is not None:
+ args["x-initial-group-size"] = stream_specification.initial_group_size
+
+ if stream_specification.leader_locator is not None:
+ args["x-leader-locator"] = stream_specification.leader_locator
+
+ body["arguments"] = args
+
+ return body
+
+ def delete_exchange(self, exchange_name: str) -> None:
+ logger.debug("delete_exchange operation called")
+ path = exchange_address(exchange_name)
+
+ self.request(
+ None,
+ path,
+ CommonValues.command_delete.value,
+ [
+ CommonValues.response_code_204.value,
+ ],
+ )
+
+ def delete_queue(self, queue_name: str) -> None:
+ logger.debug("delete_queue operation called")
+ path = queue_address(queue_name)
+
+ self.request(
+ None,
+ path,
+ CommonValues.command_delete.value,
+ [
+ CommonValues.response_code_200.value,
+ ],
+ )
+
+ def _validate_reponse_code(
+ self, response_code: int, expected_response_codes: list[int]
+ ) -> None:
+ logger.debug("response_code received: " + str(response_code))
+ if response_code == CommonValues.response_code_409.value:
+ raise ValidationCodeException("ErrPreconditionFailed")
+
+ for code in expected_response_codes:
+ if code == response_code:
+ return None
+
+ raise ValidationCodeException(
+ "wrong response code received: " + str(response_code)
+ )
+
+ def bind(self, bind_specification: BindingSpecification) -> str:
+ logger.debug("Bind Operation called")
+ body = {}
+ body["binding_key"] = bind_specification.binding_key
+ body["source"] = bind_specification.source_exchange
+ body["destination_queue"] = bind_specification.destination_queue
+ body["arguments"] = {} # type: ignore
+
+ path = path_address()
+
+ self.request(
+ body,
+ path,
+ CommonValues.command_post.value,
+ [
+ CommonValues.response_code_204.value,
+ ],
+ )
+
+ binding_path_with_queue = binding_path_with_exchange_queue(bind_specification)
+ return binding_path_with_queue
+
+ def unbind(self, binding_exchange_queue_path: str) -> None:
+ logger.debug("UnBind Operation called")
+ self.request(
+ None,
+ binding_exchange_queue_path,
+ CommonValues.command_delete.value,
+ [
+ CommonValues.response_code_204.value,
+ ],
+ )
+
+ def purge_queue(self, queue_name: str) -> int:
+ logger.debug("purge_queue operation called")
+ path = purge_queue_address(queue_name)
+
+ response = self.request(
+ None,
+ path,
+ CommonValues.command_delete.value,
+ [
+ CommonValues.response_code_200.value,
+ ],
+ )
+
+ return int(response.body["message_count"])
+
+ def queue_info(self, queue_name: str) -> QueueInfo:
+ logger.debug("queue_info operation called")
+ path = queue_address(queue_name)
+
+ message = self.request(
+ None,
+ path,
+ CommonValues.command_get.value,
+ [
+ CommonValues.response_code_200.value,
+ ],
+ )
+
+ queue_info: dict[str, Any] = message.body
+
+ if queue_info["type"] == "quorum":
+ queue_type = QueueType.quorum
+ elif queue_info["type"] == "stream":
+ queue_type = QueueType.stream
+ else:
+ queue_type = QueueType.classic
+
+ return QueueInfo(
+ name=queue_info["name"],
+ is_durable=queue_info["durable"],
+ is_auto_delete=queue_info["auto_delete"],
+ is_exclusive=queue_info["exclusive"],
+ queue_type=queue_type,
+ leader=queue_info["leader"],
+ members=queue_info["replicas"],
+ arguments=queue_info["arguments"],
+ )
diff --git a/rabbitmq_amqp_python_client/options.py b/rabbitmq_amqp_python_client/options.py
new file mode 100644
index 0000000..17398c8
--- /dev/null
+++ b/rabbitmq_amqp_python_client/options.py
@@ -0,0 +1,36 @@
+from .qpid.proton._data import ( # noqa: E402
+ PropertyDict,
+ symbol,
+)
+from .qpid.proton._endpoints import Link # noqa: E402
+from .qpid.proton.reactor import LinkOption # noqa: E402
+
+
+class SenderOption(LinkOption): # type: ignore
+ def __init__(self, addr: str):
+ self._addr = addr
+
+ def apply(self, link: Link) -> None:
+ link.source.address = self._addr
+ link.snd_settle_mode = Link.SND_SETTLED
+ link.rcv_settle_mode = Link.RCV_FIRST
+ link.properties = PropertyDict({symbol("paired"): True})
+ link.source.dynamic = False
+
+ def test(self, link: Link) -> bool:
+ return bool(link.is_sender)
+
+
+class ReceiverOption(LinkOption): # type: ignore
+ def __init__(self, addr: str):
+ self._addr = addr
+
+ def apply(self, link: Link) -> None:
+ link.target.address = self._addr
+ link.snd_settle_mode = Link.SND_SETTLED
+ link.rcv_settle_mode = Link.RCV_FIRST
+ link.properties = PropertyDict({symbol("paired"): True})
+ link.source.dynamic = False
+
+ def test(self, link: Link) -> bool:
+ return bool(link.is_receiver)
diff --git a/rabbitmq_amqp_python_client/publisher.py b/rabbitmq_amqp_python_client/publisher.py
new file mode 100644
index 0000000..25b6a69
--- /dev/null
+++ b/rabbitmq_amqp_python_client/publisher.py
@@ -0,0 +1,36 @@
+import logging
+from typing import Optional
+
+from .options import SenderOption
+from .qpid.proton._message import Message
+from .qpid.proton.utils import (
+ BlockingConnection,
+ BlockingSender,
+)
+
+logger = logging.getLogger(__name__)
+
+
+class Publisher:
+ def __init__(self, conn: BlockingConnection, addr: str):
+ self._sender: Optional[BlockingSender] = None
+ self._conn = conn
+ self._addr = addr
+ self._open()
+
+ def _open(self) -> None:
+ if self._sender is None:
+ logger.debug("Creating Sender")
+ self._sender = self._create_sender(self._addr)
+
+ def publish(self, message: Message) -> None:
+ if self._sender is not None:
+ self._sender.send(message)
+
+ def close(self) -> None:
+ logger.debug("Closing Sender")
+ if self._sender is not None:
+ self._sender.close()
+
+ def _create_sender(self, addr: str) -> BlockingSender:
+ return self._conn.create_sender(addr, options=SenderOption(addr))
diff --git a/rabbitmq_amqp_python_client/qpid/proton/__init__.py b/rabbitmq_amqp_python_client/qpid/proton/__init__.py
new file mode 100644
index 0000000..c731f41
--- /dev/null
+++ b/rabbitmq_amqp_python_client/qpid/proton/__init__.py
@@ -0,0 +1,174 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""
+The proton module defines a suite of APIs that implement the AMQP 1.0
+protocol.
+
+The proton APIs consist of the following classes:
+
+ - :class:`Message` -- A class for creating and/or accessing AMQP message content.
+ - :class:`Data` -- A class for creating and/or accessing arbitrary AMQP encoded data.
+"""
+
+import logging
+import logging.config
+import os
+
+from cproton import (
+ PN_VERSION_MAJOR,
+ PN_VERSION_MINOR,
+ PN_VERSION_POINT,
+)
+
+from ._condition import Condition
+from ._data import (
+ UNDESCRIBED,
+ AnnotationDict,
+ Array,
+ Data,
+ Described,
+ PropertyDict,
+ SymbolList,
+ byte,
+ char,
+ decimal32,
+ decimal64,
+ decimal128,
+ float32,
+ int32,
+ short,
+ symbol,
+ timestamp,
+ ubyte,
+ uint,
+ ulong,
+ ushort,
+)
+from ._delivery import Delivery, Disposition
+from ._endpoints import (
+ Connection,
+ Endpoint,
+ Link,
+ Receiver,
+ Sender,
+ Session,
+ Terminus,
+)
+from ._events import Collector, Event, EventType
+from ._exceptions import (
+ ConnectionException,
+ DataException,
+ Interrupt,
+ LinkException,
+ MessageException,
+ ProtonException,
+ SessionException,
+ SSLException,
+ SSLUnavailable,
+ Timeout,
+ TransportException,
+)
+from ._handler import Handler
+from ._message import Message
+from ._transport import (
+ SASL,
+ SSL,
+ SSLDomain,
+ SSLSessionDetails,
+ Transport,
+)
+from ._url import Url
+
+__all__ = [
+ "API_LANGUAGE",
+ "IMPLEMENTATION_LANGUAGE",
+ "UNDESCRIBED",
+ "AnnotationDict",
+ "Array",
+ "Collector",
+ "Condition",
+ "Connection",
+ "ConnectionException",
+ "Data",
+ "DataException",
+ "Delivery",
+ "Disposition",
+ "Described",
+ "Endpoint",
+ "Event",
+ "EventType",
+ "Handler",
+ "Link",
+ "LinkException",
+ "Message",
+ "MessageException",
+ "PropertyDict",
+ "ProtonException",
+ "VERSION_MAJOR",
+ "VERSION_MINOR",
+ "Receiver",
+ "SASL",
+ "Sender",
+ "Session",
+ "SessionException",
+ "SSL",
+ "SSLDomain",
+ "SSLSessionDetails",
+ "SSLUnavailable",
+ "SSLException",
+ "SymbolList",
+ "Terminus",
+ "Timeout",
+ "Interrupt",
+ "Transport",
+ "TransportException",
+ "Url",
+ "char",
+ "symbol",
+ "timestamp",
+ "ulong",
+ "byte",
+ "short",
+ "int32",
+ "ubyte",
+ "ushort",
+ "uint",
+ "float32",
+ "decimal32",
+ "decimal64",
+ "decimal128",
+]
+
+VERSION_MAJOR = PN_VERSION_MAJOR
+VERSION_MINOR = PN_VERSION_MINOR
+VERSION_POINT = PN_VERSION_POINT
+VERSION = (VERSION_MAJOR, VERSION_MINOR, VERSION_POINT)
+API_LANGUAGE = "C"
+IMPLEMENTATION_LANGUAGE = "C"
+
+
+handler = logging.NullHandler()
+
+logconfigfile = os.getenv("PNPY_LOGGER_CONFIG", None)
+if logconfigfile:
+ logging.config.fileConfig(logconfigfile, None, False)
+else:
+ log = logging.getLogger("proton")
+ log.addHandler(handler)
diff --git a/rabbitmq_amqp_python_client/qpid/proton/_common.py b/rabbitmq_amqp_python_client/qpid/proton/_common.py
new file mode 100644
index 0000000..dde07a2
--- /dev/null
+++ b/rabbitmq_amqp_python_client/qpid/proton/_common.py
@@ -0,0 +1,36 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from typing import Union
+
+
+class Constant(object):
+ def __init__(self, name: str) -> None:
+ self.name = name
+
+ def __repr__(self) -> str:
+ return self.name
+
+
+def secs2millis(secs: Union[float, int]) -> int:
+ return int(secs * 1000)
+
+
+def millis2secs(millis: int) -> float:
+ return float(millis) / 1000.0
diff --git a/rabbitmq_amqp_python_client/qpid/proton/_condition.py b/rabbitmq_amqp_python_client/qpid/proton/_condition.py
new file mode 100644
index 0000000..ac9504e
--- /dev/null
+++ b/rabbitmq_amqp_python_client/qpid/proton/_condition.py
@@ -0,0 +1,110 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from typing import TYPE_CHECKING, Optional
+
+from cproton import (
+ pn_condition_clear,
+ pn_condition_get_description,
+ pn_condition_get_name,
+ pn_condition_info,
+ pn_condition_is_set,
+ pn_condition_set_description,
+ pn_condition_set_name,
+)
+
+from ._data import Data, dat2obj
+
+if TYPE_CHECKING:
+ from ._data import PythonAMQPData
+
+
+class Condition:
+ """
+ An AMQP Condition object. Conditions hold exception information
+ pertaining to the closing of an AMQP endpoint such as a :class:`Connection`,
+ :class:`Session`, or :class:`Link`. Conditions also hold similar information
+ pertaining to deliveries that have reached terminal states.
+ Connections, Sessions, Links, and Deliveries may all have local and
+ remote conditions associated with them.
+
+ The local condition may be modified by the local endpoint to signal
+ a particular condition to the remote peer. The remote condition may
+ be examined by the local endpoint to detect whatever condition the
+ remote peer may be signaling. Although often conditions are used to
+ indicate errors, not all conditions are errors *per/se*, e.g.
+ conditions may be used to redirect a connection from one host to
+ another.
+
+ Every condition has a short symbolic name, a longer description,
+ and an additional info map associated with it. The name identifies
+ the formally defined condition, and the map contains additional
+ information relevant to the identified condition.
+
+ :ivar ~.name: The name of the condition.
+ :ivar ~.description: A description of the condition.
+ :ivar ~.info: A data object that holds the additional information associated
+ with the condition. The data object may be used both to access and to
+ modify the additional information associated with the condition.
+ """
+
+ def __init__(
+ self,
+ name: str,
+ description: Optional[str] = None,
+ info: Optional["PythonAMQPData"] = None,
+ ) -> None:
+ self.name = name
+ self.description = description
+ self.info = info
+
+ def __repr__(self) -> str:
+ return "Condition(%s)" % ", ".join(
+ [repr(x) for x in (self.name, self.description, self.info) if x]
+ )
+
+ def __eq__(self, o: "Condition") -> bool:
+ if not isinstance(o, Condition):
+ return False
+ return (
+ self.name == o.name
+ and self.description == o.description
+ and self.info == o.info
+ )
+
+
+def obj2cond(obj, cond: Condition) -> None:
+ pn_condition_clear(cond)
+ if obj:
+ pn_condition_set_name(cond, obj.name)
+ pn_condition_set_description(cond, obj.description)
+ info = Data(pn_condition_info(cond))
+ if obj.info:
+ info.put_object(obj.info)
+
+
+def cond2obj(cond) -> Optional[Condition]:
+ if pn_condition_is_set(cond):
+ return Condition(
+ pn_condition_get_name(cond),
+ pn_condition_get_description(cond),
+ dat2obj(pn_condition_info(cond)),
+ )
+ else:
+ return None
diff --git a/rabbitmq_amqp_python_client/qpid/proton/_data.py b/rabbitmq_amqp_python_client/qpid/proton/_data.py
new file mode 100644
index 0000000..dbfa377
--- /dev/null
+++ b/rabbitmq_amqp_python_client/qpid/proton/_data.py
@@ -0,0 +1,1771 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import uuid
+from typing import (
+ Any,
+ Callable,
+ Dict,
+ Iterable,
+ List,
+ Optional,
+ Tuple,
+ TypeVar,
+ Union,
+)
+
+try:
+ from typing import Literal
+except ImportError:
+ # https://www.python.org/dev/peps/pep-0560/#class-getitem
+ class GenericMeta(type):
+ def __getitem__(self, item):
+ pass
+
+ class Literal(metaclass=GenericMeta): # type: ignore[no-redef]
+ pass
+
+
+from cproton import (
+ PN_ARRAY,
+ PN_BINARY,
+ PN_BOOL,
+ PN_BYTE,
+ PN_CHAR,
+ PN_DECIMAL32,
+ PN_DECIMAL64,
+ PN_DECIMAL128,
+ PN_DESCRIBED,
+ PN_DOUBLE,
+ PN_FLOAT,
+ PN_INT,
+ PN_LIST,
+ PN_LONG,
+ PN_MAP,
+ PN_NULL,
+ PN_OVERFLOW,
+ PN_SHORT,
+ PN_STRING,
+ PN_SYMBOL,
+ PN_TIMESTAMP,
+ PN_UBYTE,
+ PN_UINT,
+ PN_ULONG,
+ PN_USHORT,
+ PN_UUID,
+ pn_data,
+ pn_data_clear,
+ pn_data_copy,
+ pn_data_decode,
+ pn_data_dump,
+ pn_data_encode,
+ pn_data_encoded_size,
+ pn_data_enter,
+ pn_data_error,
+ pn_data_exit,
+ pn_data_format,
+ pn_data_free,
+ pn_data_get_array,
+ pn_data_get_array_type,
+ pn_data_get_binary,
+ pn_data_get_bool,
+ pn_data_get_byte,
+ pn_data_get_char,
+ pn_data_get_decimal32,
+ pn_data_get_decimal64,
+ pn_data_get_decimal128,
+ pn_data_get_double,
+ pn_data_get_float,
+ pn_data_get_int,
+ pn_data_get_list,
+ pn_data_get_long,
+ pn_data_get_map,
+ pn_data_get_short,
+ pn_data_get_string,
+ pn_data_get_symbol,
+ pn_data_get_timestamp,
+ pn_data_get_ubyte,
+ pn_data_get_uint,
+ pn_data_get_ulong,
+ pn_data_get_ushort,
+ pn_data_get_uuid,
+ pn_data_is_array_described,
+ pn_data_is_described,
+ pn_data_is_null,
+ pn_data_lookup,
+ pn_data_narrow,
+ pn_data_next,
+ pn_data_prev,
+ pn_data_put_array,
+ pn_data_put_binary,
+ pn_data_put_bool,
+ pn_data_put_byte,
+ pn_data_put_char,
+ pn_data_put_decimal32,
+ pn_data_put_decimal64,
+ pn_data_put_decimal128,
+ pn_data_put_described,
+ pn_data_put_double,
+ pn_data_put_float,
+ pn_data_put_int,
+ pn_data_put_list,
+ pn_data_put_long,
+ pn_data_put_map,
+ pn_data_put_null,
+ pn_data_put_short,
+ pn_data_put_string,
+ pn_data_put_symbol,
+ pn_data_put_timestamp,
+ pn_data_put_ubyte,
+ pn_data_put_uint,
+ pn_data_put_ulong,
+ pn_data_put_ushort,
+ pn_data_put_uuid,
+ pn_data_rewind,
+ pn_data_type,
+ pn_data_widen,
+ pn_error_text,
+)
+
+from ._common import Constant
+from ._exceptions import EXCEPTIONS, DataException
+
+long = int
+unicode = str
+
+_T = TypeVar("_T")
+
+PythonAMQPData = Union[
+ Dict["PythonAMQPData", "PythonAMQPData"],
+ List["PythonAMQPData"],
+ "Described",
+ "Array",
+ int,
+ str,
+ "symbol",
+ bytes,
+ float,
+ None,
+]
+"""This type annotation represents Python data structures that can be encoded as AMQP Data"""
+
+
+class UnmappedType:
+ def __init__(self, msg: str) -> None:
+ self.msg = msg
+
+ def __repr__(self) -> str:
+ return "UnmappedType(%s)" % self.msg
+
+
+class ulong(long):
+ """
+ The ulong AMQP type.
+
+ An unsigned 64 bit integer in the range :math:`0` to :math:`2^{64} - 1` inclusive.
+ """
+
+ def __init__(self, u64: int) -> None:
+ if u64 < 0:
+ raise AssertionError("initializing ulong with negative value")
+ super(ulong, self).__new__(ulong, u64)
+
+ def __repr__(self) -> str:
+ return "ulong(%s)" % long.__repr__(self)
+
+
+class timestamp(long):
+ """
+ The timestamp AMQP type.
+
+ An absolute point in time, represented by a signed 64 bit value measuring
+ milliseconds since the epoch. This value is encoded using the Unix ``time_t``
+ [IEEE1003] encoding of UTC, but with a precision of milliseconds. For
+ example, ``1311704463521`` represents the moment ``2011-07-26T18:21:03.521Z``.
+ """
+
+ def __repr__(self) -> str:
+ return "timestamp(%s)" % long.__repr__(self)
+
+
+class symbol(unicode):
+ """
+ The symbol AMQP type.
+
+ Symbolic values from a constrained domain, represented by a sequence of ASCII characters.
+ """
+
+ def __repr__(self) -> str:
+ return "symbol(%s)" % unicode.__repr__(self)
+
+
+class char(unicode):
+ """
+ The char AMQP type.
+
+ A 32 bit UTF-32BE encoded Unicode character.
+ """
+
+ def __repr__(self) -> str:
+ return "char(%s)" % unicode.__repr__(self)
+
+
+class byte(int):
+ """
+ The byte AMQP type.
+
+ An 8 bit signed integer in the range :math:`-(2^7)` to :math:`2^7 - 1` inclusive.
+ """
+
+ def __repr__(self) -> str:
+ return "byte(%s)" % int.__repr__(self)
+
+
+class short(int):
+ """
+ The short AMQP type.
+
+ A 16 bit signed integer in the range :math:`-(2^{15})` to :math:`2^{15} - 1` inclusive.
+ """
+
+ def __repr__(self) -> str:
+ return "short(%s)" % int.__repr__(self)
+
+
+class int32(int):
+ """
+ The signed int AMQP type.
+
+ A 32 bit signed integer in the range :math:`-(2^{31})` to :math:`2^{31} - 1` inclusive.
+ """
+
+ def __repr__(self) -> str:
+ return "int32(%s)" % int.__repr__(self)
+
+
+class ubyte(int):
+ """
+ The unsigned byte AMQP type.
+
+ An 8 bit unsigned integer in the range :math:`0` to :math:`2^8 - 1` inclusive.
+ """
+
+ def __init__(self, i: int) -> None:
+ if i < 0:
+ raise AssertionError("initializing ubyte with negative value")
+ super(ubyte, self).__new__(ubyte, i)
+
+ def __repr__(self) -> str:
+ return "ubyte(%s)" % int.__repr__(self)
+
+
+class ushort(int):
+ """
+ The unsigned short AMQP type.
+
+ A 16 bit unsigned integer in the range :math:`0` to :math:`2^{16} - 1` inclusive.
+ """
+
+ def __init__(self, i: int) -> None:
+ if i < 0:
+ raise AssertionError("initializing ushort with negative value")
+ super(ushort, self).__new__(ushort, i)
+
+ def __repr__(self) -> str:
+ return "ushort(%s)" % int.__repr__(self)
+
+
+class uint(long):
+ """
+ The unsigned int AMQP type.
+
+ A 32 bit unsigned integer in the range :math:`0` to :math:`2^{32} - 1` inclusive.
+ """
+
+ def __init__(self, u32: int) -> None:
+ if u32 < 0:
+ raise AssertionError("initializing uint with negative value")
+ super(uint, self).__new__(uint, u32)
+
+ def __repr__(self) -> str:
+ return "uint(%s)" % long.__repr__(self)
+
+
+class float32(float):
+ """
+ The float AMQP type.
+
+ A 32 bit floating point number (IEEE 754-2008 binary32).
+ """
+
+ def __repr__(self) -> str:
+ return "float32(%s)" % float.__repr__(self)
+
+
+class decimal32(int):
+ """
+ The decimal32 AMQP type.
+
+ A 32 bit decimal floating point number (IEEE 754-2008 decimal32).
+ """
+
+ def __repr__(self) -> str:
+ return "decimal32(%s)" % int.__repr__(self)
+
+
+class decimal64(long):
+ """
+ The decimal64 AMQP type.
+
+ A 64 bit decimal floating point number (IEEE 754-2008 decimal64).
+ """
+
+ def __repr__(self) -> str:
+ return "decimal64(%s)" % long.__repr__(self)
+
+
+class decimal128(bytes):
+ """
+ The decimal128 AMQP type.
+
+ A 128-bit decimal floating-point number (IEEE 754-2008 decimal128).
+ """
+
+ def __repr__(self) -> str:
+ return "decimal128(%s)" % bytes.__repr__(self)
+
+
+class Described(object):
+ """
+ A described AMQP type.
+
+ :ivar descriptor: Any AMQP value can be a descriptor
+ :ivar value: The described value
+ """
+
+ def __init__(
+ self,
+ descriptor: PythonAMQPData,
+ value: PythonAMQPData,
+ ) -> None:
+ self.descriptor = descriptor
+ self.value = value
+
+ def __repr__(self) -> str:
+ return "Described(%r, %r)" % (self.descriptor, self.value)
+
+ def __eq__(self, o: Any) -> bool:
+ if isinstance(o, Described):
+ return self.descriptor == o.descriptor and self.value == o.value
+ else:
+ return False
+
+
+UNDESCRIBED = Constant("UNDESCRIBED")
+
+
+class Array(object):
+ """
+ An AMQP array, a sequence of AMQP values of a single type.
+
+ This class provides a convenient way to handle AMQP arrays when used with
+ convenience methods :func:`Data.get_py_array` and :func:`Data.put_py_array`.
+
+ :ivar descriptor: Optional descriptor if the array is to be described, otherwise ``None``
+ :ivar type: Array element type, as an integer. The :class:`Data` class has constants defined
+ for all the valid AMQP types. For example, for an array of double values, use
+ :const:`Data.DOUBLE`, which has integer value 14.
+ :ivar elements: A Python list of elements of the appropriate type.
+ """
+
+ def __init__(self, descriptor: PythonAMQPData, type: int, *elements) -> None:
+ self.descriptor = descriptor
+ self.type = type
+ self.elements = elements
+
+ def __iter__(self):
+ return iter(self.elements)
+
+ def __repr__(self) -> str:
+ if self.elements:
+ els = ", %s" % (", ".join(map(repr, self.elements)))
+ else:
+ els = ""
+ return "Array(%r, %r%s)" % (self.descriptor, self.type, els)
+
+ def __eq__(self, o: Any) -> bool:
+ if isinstance(o, Array):
+ return (
+ self.descriptor == o.descriptor
+ and self.type == o.type
+ and self.elements == o.elements
+ )
+ else:
+ return False
+
+
+def _check_type(
+ s: _T, allow_ulong: bool = False, raise_on_error: bool = True
+) -> Union[symbol, ulong, _T]:
+ if isinstance(s, symbol):
+ return s
+ if allow_ulong and isinstance(s, ulong):
+ return s
+ if isinstance(s, str):
+ return symbol(s)
+ if raise_on_error:
+ raise TypeError("Non-symbol type %s: %s" % (type(s), s))
+ return s
+
+
+def _check_is_symbol(s: _T, raise_on_error: bool = True) -> Union[symbol, ulong, _T]:
+ return _check_type(s, allow_ulong=False, raise_on_error=raise_on_error)
+
+
+def _check_is_symbol_or_ulong(
+ s: _T, raise_on_error: bool = True
+) -> Union[symbol, ulong, _T]:
+ return _check_type(s, allow_ulong=True, raise_on_error=raise_on_error)
+
+
+class RestrictedKeyDict(dict):
+ """Parent class for :class:`PropertyDict` and :class:`AnnotationDict`"""
+
+ def __init__(
+ self,
+ validation_fn: Callable[[_T, bool], _T],
+ e: Optional[Any] = None,
+ raise_on_error: bool = True,
+ **kwargs
+ ) -> None:
+ super(RestrictedKeyDict, self).__init__()
+ self.validation_fn = validation_fn
+ self.raise_on_error = raise_on_error
+ self.update(e, **kwargs)
+
+ def __setitem__(self, key: Union[symbol, str], value: Any) -> None:
+ """Checks if the key is a :class:`symbol` type before setting the value"""
+ try:
+ return super(RestrictedKeyDict, self).__setitem__(
+ self.validation_fn(key, self.raise_on_error), value
+ )
+ except TypeError:
+ pass
+ # __setitem__() must raise a KeyError, not TypeError
+ raise KeyError("invalid non-symbol key: %s: %s" % (type(key), key))
+
+ def update(self, e: Optional[Any] = None, **kwargs) -> None:
+ """
+ Equivalent to dict.update(), but it was needed to call :meth:`__setitem__()`
+ instead of ``dict.__setitem__()``.
+ """
+ if e:
+ try:
+ for k in e:
+ self.__setitem__(k, e[k])
+ except TypeError:
+ self.__setitem__(k[0], k[1]) # use tuple consumed from from zip
+ for k, v in e:
+ self.__setitem__(k, v)
+ for k in kwargs:
+ self.__setitem__(k, kwargs[k])
+
+
+class PropertyDict(RestrictedKeyDict):
+ """
+ A dictionary that only takes :class:`symbol` types as a key.
+ However, if a string key is provided, it will be silently converted
+ into a symbol key.
+
+ >>> from proton import symbol, ulong, PropertyDict
+ >>> a = PropertyDict(one=1, two=2)
+ >>> b = PropertyDict({'one':1, symbol('two'):2})
+ >>> c = PropertyDict(zip(['one', symbol('two')], [1, 2]))
+ >>> d = PropertyDict([(symbol('one'), 1), ('two', 2)])
+ >>> e = PropertyDict(a)
+ >>> a == b == c == d == e
+ True
+
+ By default, non-string and non-symbol keys cause a ``KeyError`` to be raised:
+
+ >>> PropertyDict({'one':1, 2:'two'})
+ ...
+ KeyError: "invalid non-symbol key: : 2"
+
+ but by setting ``raise_on_error=False``, non-string and non-symbol keys will be ignored:
+
+ >>> PropertyDict({'one':1, 2:'two'}, raise_on_error=False)
+ PropertyDict({2: 'two', symbol(u'one'): 1})
+
+ :param e: Initialization for ``dict``
+ :type e: ``dict`` or ``list`` of ``tuple`` or ``zip`` object
+ :param raise_on_error: If ``True``, will raise an ``KeyError`` if a non-string or non-symbol
+ is encountered as a key in the initialization, or in a subsequent operation which
+ adds such an key. If ``False``, non-strings and non-symbols will be added as keys
+ to the dictionary without an error.
+ :param kwargs: Keyword args for initializing a ``dict`` of the form key1=val1, key2=val2, ...
+ """
+
+ def __init__(
+ self, e: Optional[Any] = None, raise_on_error: bool = True, **kwargs
+ ) -> None:
+ super(PropertyDict, self).__init__(
+ _check_is_symbol, e, raise_on_error, **kwargs
+ )
+
+ def __repr__(self):
+ """Representation of PropertyDict"""
+ return "PropertyDict(%s)" % super(PropertyDict, self).__repr__()
+
+
+class AnnotationDict(RestrictedKeyDict):
+ """
+ A dictionary that only takes :class:`symbol` or :class:`ulong` types
+ as a key. However, if a string key is provided, it will be silently
+ converted into a symbol key.
+
+ >>> from proton import symbol, ulong, AnnotationDict
+ >>> a = AnnotationDict(one=1, two=2)
+ >>> a[ulong(3)] = 'three'
+ >>> b = AnnotationDict({'one':1, symbol('two'):2, ulong(3):'three'})
+ >>> c = AnnotationDict(zip([symbol('one'), 'two', ulong(3)], [1, 2, 'three']))
+ >>> d = AnnotationDict([('one', 1), (symbol('two'), 2), (ulong(3), 'three')])
+ >>> e = AnnotationDict(a)
+ >>> a == b == c == d == e
+ True
+
+ By default, non-string, non-symbol and non-ulong keys cause a ``KeyError`` to be raised:
+
+ >>> AnnotationDict({'one': 1, 2: 'two'})
+ ...
+ KeyError: "invalid non-symbol key: : 2"
+
+ but by setting ``raise_on_error=False``, non-string, non-symbol and non-ulong keys will be ignored:
+
+ >>> AnnotationDict({'one': 1, 2: 'two'}, raise_on_error=False)
+ AnnotationDict({2: 'two', symbol(u'one'): 1})
+
+ :param e: Initializer for ``dict``: a ``dict`` or ``list`` of ``tuple`` or ``zip`` object
+ :param raise_on_error: If ``True``, will raise an ``KeyError`` if a non-string, non-symbol or
+ non-ulong is encountered as a key in the initialization, or in a subsequent
+ operation which adds such an key. If ``False``, non-strings, non-ulongs and non-symbols
+ will be added as keys to the dictionary without an error.
+ :param kwargs: Keyword args for initializing a ``dict`` of the form key1=val1, key2=val2, ...
+ """
+
+ def __init__(
+ self,
+ e: Optional[Union[Dict, List, Tuple, Iterable]] = None,
+ raise_on_error: bool = True,
+ **kwargs
+ ) -> None:
+ super(AnnotationDict, self).__init__(
+ _check_is_symbol_or_ulong, e, raise_on_error, **kwargs
+ )
+
+ def __repr__(self):
+ """Representation of AnnotationDict"""
+ return "AnnotationDict(%s)" % super(AnnotationDict, self).__repr__()
+
+
+class SymbolList(list):
+ """
+ A list that can only hold :class:`symbol` elements. However, if any string elements
+ are present, they will be converted to symbols.
+
+ >>> a = SymbolList(['one', symbol('two'), 'three'])
+ >>> b = SymbolList([symbol('one'), 'two', symbol('three')])
+ >>> c = SymbolList(a)
+ >>> a == b == c
+ True
+
+ By default, using any key other than a symbol or string will result in a ``TypeError``:
+
+ >>> SymbolList(['one', symbol('two'), 3])
+ ...
+ TypeError: Non-symbol type : 3
+
+ but by setting ``raise_on_error=False``, non-symbol and non-string keys will be ignored:
+
+ >>> SymbolList(['one', symbol('two'), 3], raise_on_error=False)
+ SymbolList([symbol(u'one'), symbol(u'two'), 3])
+
+ :param t: Initializer for list
+ :param raise_on_error: If ``True``, will raise an ``TypeError`` if a non-string or non-symbol is
+ encountered in the initialization list, or in a subsequent operation which adds such
+ an element. If ``False``, non-strings and non-symbols will be added to the list without
+ an error.
+ """
+
+ def __init__(
+ self, t: Optional[List[Any]] = None, raise_on_error: bool = True
+ ) -> None:
+ super(SymbolList, self).__init__()
+ self.raise_on_error = raise_on_error
+ if isinstance(t, (str, symbol)):
+ self.append(t)
+ else:
+ self.extend(t)
+
+ def _check_list(self, t: Iterable[Any]) -> List[Any]:
+ """Check all items in list are :class:`symbol`s (or are converted to symbols)."""
+ item = []
+ if t:
+ for v in t:
+ item.append(_check_is_symbol(v, self.raise_on_error))
+ return item
+
+ def to_array(self):
+ return Array(UNDESCRIBED, PN_SYMBOL, *self)
+
+ def append(self, v: str) -> None:
+ """Add a single value v to the end of the list"""
+ return super(SymbolList, self).append(_check_is_symbol(v, self.raise_on_error))
+
+ def extend(self, t: Iterable[str]) -> None:
+ """Add all elements of an iterable t to the end of the list"""
+ return super(SymbolList, self).extend(self._check_list(t))
+
+ def insert(self, i: int, v: str) -> None:
+ """Insert a value v at index i"""
+ return super(SymbolList, self).insert(
+ i, _check_is_symbol(v, self.raise_on_error)
+ )
+
+ def __add__(self, t: Iterable[Any]) -> "SymbolList":
+ """Handles list1 + list2"""
+ return SymbolList(
+ super(SymbolList, self).__add__(self._check_list(t)),
+ raise_on_error=self.raise_on_error,
+ )
+
+ def __iadd__(self, t):
+ """Handles list1 += list2"""
+ return super(SymbolList, self).__iadd__(self._check_list(t))
+
+ def __eq__(self, other):
+ """Handles list1 == list2"""
+ return super().__eq__(SymbolList(other, raise_on_error=False))
+
+ def __setitem__(self, i: int, t: Any) -> None:
+ """Handles list[i] = v"""
+ return super(SymbolList, self).__setitem__(
+ i, _check_is_symbol(t, self.raise_on_error)
+ )
+
+ def __repr__(self) -> str:
+ """Representation of SymbolList"""
+ return "SymbolList(%s)" % super(SymbolList, self).__repr__()
+
+
+class Data:
+ """
+ The :class:`Data` class provides an interface for decoding, extracting,
+ creating, and encoding arbitrary AMQP data. A :class:`Data` object
+ contains a tree of AMQP values. Leaf nodes in this tree correspond
+ to scalars in the AMQP type system such as :const:`ints ` or
+ :const:`strings `. Non-leaf nodes in this tree correspond to
+ compound values in the AMQP type system such as :const:`lists `,
+ :const:`maps