Skip to content

Commit c6329c3

Browse files
committed
refact
1 parent ce3d20e commit c6329c3

File tree

5 files changed

+58
-54
lines changed

5 files changed

+58
-54
lines changed

examples/oauth/oaut.py

+6-7
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
OAuth2Options,
1919
OutcomeState,
2020
QuorumQueueSpecification,
21+
Converter
2122
)
2223

2324
MESSAGES_TO_PUBLISH = 100
@@ -30,7 +31,7 @@ def __init__(self):
3031
self._count = 0
3132

3233
def on_amqp_message(self, event: Event):
33-
print("received message: " + str(event.message.body))
34+
print("received message: " + Converter.bytes_to_string(event.message.body))
3435

3536
# accepting
3637
self.delivery_context.accept(event)
@@ -85,7 +86,6 @@ def create_connection(environment: Environment) -> Connection:
8586

8687

8788
def main() -> None:
88-
8989
exchange_name = "test-exchange"
9090
queue_name = "example-queue"
9191
routing_key = "routing-key"
@@ -144,14 +144,13 @@ def main() -> None:
144144

145145
# publish 10 messages
146146
for i in range(MESSAGES_TO_PUBLISH):
147-
print("publishing")
148-
status = publisher.publish(Message(body="test"))
147+
status = publisher.publish(Message(body=Converter.string_to_bytes("test_{}".format(i))))
149148
if status.remote_state == OutcomeState.ACCEPTED:
150-
print("message accepted")
149+
print("message: test_{} accepted".format(i))
151150
elif status.remote_state == OutcomeState.RELEASED:
152-
print("message not routed")
151+
print("message: test_{} not routed".format(i))
153152
elif status.remote_state == OutcomeState.REJECTED:
154-
print("message not rejected")
153+
print("message: test_{} rejected".format(i))
155154

156155
publisher.close()
157156

examples/reconnection/reconnection_example.py

+3-5
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,12 @@
99
ExchangeSpecification,
1010
ExchangeToQueueBindingSpecification,
1111
Message,
12-
QuorumQueueSpecification,
12+
QuorumQueueSpecification, Converter,
1313
)
1414

1515
# here we keep track of the objects we need to reconnect
1616
MESSAGES_TO_PUBLISH = 50000
1717

18-
1918
environment = Environment(
2019
uri="amqp://guest:guest@localhost:5672/",
2120
)
@@ -29,7 +28,7 @@ def __init__(self):
2928

3029
def on_message(self, event: Event):
3130
if self._count % 1000 == 0:
32-
print("received 100 message: " + str(event.message.body))
31+
print("received 100 message: " + Converter.bytes_to_string(event.message.body))
3332

3433
# accepting
3534
self.delivery_context.accept(event)
@@ -79,7 +78,6 @@ def create_connection() -> Connection:
7978

8079

8180
def main() -> None:
82-
8381
exchange_name = "test-exchange"
8482
queue_name = "example-queue"
8583
routing_key = "routing-key"
@@ -128,7 +126,7 @@ def main() -> None:
128126
print("published 1000 messages...")
129127
try:
130128
if publisher is not None:
131-
publisher.publish(Message(body="test"))
129+
publisher.publish(Message(body=Converter.string_to_bytes("test")))
132130
except ConnectionClosed:
133131
print("publisher closing exception, resubmitting")
134132
# publisher = connection.publisher(addr)

rabbitmq_amqp_python_client/management.py

+36-35
Original file line numberDiff line numberDiff line change
@@ -98,11 +98,11 @@ def close(self) -> None:
9898
self._receiver.close()
9999

100100
def request(
101-
self,
102-
body: Any,
103-
path: str,
104-
method: str,
105-
expected_response_codes: list[int],
101+
self,
102+
body: Any,
103+
path: str,
104+
method: str,
105+
expected_response_codes: list[int],
106106
) -> Message:
107107
"""
108108
Send a management request with a new UUID.
@@ -124,16 +124,17 @@ def request(
124124
)
125125

126126
def _request(
127-
self,
128-
id: str,
129-
body: Any,
130-
path: str,
131-
method: str,
132-
expected_response_codes: list[int],
127+
self,
128+
id: str,
129+
body: Any,
130+
path: str,
131+
method: str,
132+
expected_response_codes: list[int],
133133
) -> Message:
134134
amq_message = Message(
135135
id=id,
136136
body=body,
137+
inferred=False,
137138
reply_to="$me",
138139
address=path,
139140
subject=method,
@@ -151,10 +152,10 @@ def _request(
151152
return msg
152153

153154
def declare_exchange(
154-
self,
155-
exchange_specification: Union[
156-
ExchangeSpecification, ExchangeCustomSpecification
157-
],
155+
self,
156+
exchange_specification: Union[
157+
ExchangeSpecification, ExchangeCustomSpecification
158+
],
158159
) -> Union[ExchangeSpecification, ExchangeCustomSpecification]:
159160
"""
160161
Declare a new exchange in RabbitMQ.
@@ -195,10 +196,10 @@ def declare_exchange(
195196
return exchange_specification
196197

197198
def declare_queue(
198-
self,
199-
queue_specification: Union[
200-
ClassicQueueSpecification, QuorumQueueSpecification, StreamSpecification
201-
],
199+
self,
200+
queue_specification: Union[
201+
ClassicQueueSpecification, QuorumQueueSpecification, StreamSpecification
202+
],
202203
) -> Union[
203204
ClassicQueueSpecification, QuorumQueueSpecification, StreamSpecification
204205
]:
@@ -219,7 +220,7 @@ def declare_queue(
219220
logger.debug("declare_queue operation called")
220221

221222
if isinstance(queue_specification, ClassicQueueSpecification) or isinstance(
222-
queue_specification, QuorumQueueSpecification
223+
queue_specification, QuorumQueueSpecification
223224
):
224225
body = self._declare_queue(queue_specification)
225226

@@ -242,8 +243,8 @@ def declare_queue(
242243
return queue_specification
243244

244245
def _declare_queue(
245-
self,
246-
queue_specification: Union[ClassicQueueSpecification, QuorumQueueSpecification],
246+
self,
247+
queue_specification: Union[ClassicQueueSpecification, QuorumQueueSpecification],
247248
) -> dict[str, Any]:
248249

249250
body = {}
@@ -311,7 +312,7 @@ def _declare_queue(
311312
return body
312313

313314
def _declare_stream(
314-
self, stream_specification: StreamSpecification
315+
self, stream_specification: StreamSpecification
315316
) -> dict[str, Any]:
316317

317318
body = {}
@@ -324,7 +325,7 @@ def _declare_stream(
324325

325326
if stream_specification.max_age is not None:
326327
args["x-max-age"] = (
327-
str(int(stream_specification.max_age.total_seconds())) + "s"
328+
str(int(stream_specification.max_age.total_seconds())) + "s"
328329
)
329330

330331
if stream_specification.stream_max_segment_size_bytes is not None:
@@ -392,7 +393,7 @@ def delete_queue(self, name: str) -> None:
392393
)
393394

394395
def _validate_reponse_code(
395-
self, response_code: int, expected_response_codes: list[int]
396+
self, response_code: int, expected_response_codes: list[int]
396397
) -> None:
397398
if response_code == CommonValues.response_code_409.value:
398399
raise ValidationCodeException("ErrPreconditionFailed")
@@ -406,10 +407,10 @@ def _validate_reponse_code(
406407
)
407408

408409
def bind(
409-
self,
410-
bind_specification: Union[
411-
ExchangeToQueueBindingSpecification, ExchangeToExchangeBindingSpecification
412-
],
410+
self,
411+
bind_specification: Union[
412+
ExchangeToQueueBindingSpecification, ExchangeToExchangeBindingSpecification
413+
],
413414
) -> str:
414415
"""
415416
Create a binding between exchanges or between an exchange and a queue.
@@ -462,12 +463,12 @@ def bind(
462463
return binding_path
463464

464465
def unbind(
465-
self,
466-
bind_specification: Union[
467-
str,
468-
ExchangeToQueueBindingSpecification,
469-
ExchangeToExchangeBindingSpecification,
470-
],
466+
self,
467+
bind_specification: Union[
468+
str,
469+
ExchangeToQueueBindingSpecification,
470+
ExchangeToExchangeBindingSpecification,
471+
],
471472
) -> None:
472473
"""
473474
Remove a binding between exchanges or between an exchange and a queue.

rabbitmq_amqp_python_client/publisher.py

+10
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,16 @@ def publish(self, message: Message) -> Delivery:
8080
"address specified in both message and publisher"
8181
)
8282

83+
if not isinstance(message.body, (bytes, type(None))):
84+
raise ArgumentOutOfRangeException(
85+
"Message body must be of type bytes or None"
86+
)
87+
88+
if not message.inferred:
89+
raise ArgumentOutOfRangeException(
90+
"Message inferred must be True"
91+
)
92+
8393
if self._addr != "":
8494
if self._sender is not None:
8595
return self._sender.send(message)

rabbitmq_amqp_python_client/qpid/proton/_message.py

+3-7
Original file line numberDiff line numberDiff line change
@@ -113,20 +113,16 @@ class Message(object):
113113
DEFAULT_PRIORITY = PN_DEFAULT_PRIORITY
114114
""" Default AMQP message priority"""
115115

116-
def __init__(self, body: Union[bytes, dict, None] = None, **kwargs) -> None:
116+
def __init__(self, body: Union[str, bytes, dict, None] = None, inferred = True, **kwargs) -> None:
117117
# validate the types
118118

119-
if not isinstance(body, (bytes, dict, type(None))):
120-
raise ArgumentOutOfRangeException(
121-
"Message body must be of type bytes, dict or None"
122-
)
123-
124119
self._msg = pn_message()
125120
self.instructions = None
126121
self.annotations = None
127122
self.properties = None
128123
self.body = body
129-
self.inferred = True
124+
self.inferred = inferred
125+
130126

131127
for k, v in kwargs.items():
132128
getattr(self, k) # Raise exception if it's not a valid attribute.

0 commit comments

Comments
 (0)