Skip to content

Commit 77ff2b7

Browse files
authored
User initiated close (#150)
1 parent 0efa133 commit 77ff2b7

5 files changed

Lines changed: 148 additions & 4 deletions

File tree

amqpstorm/channel.py

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ def __init__(
6969
self._exchange = Exchange(self)
7070
self._tx = Tx(self)
7171
self._queue = Queue(self)
72+
self._user_closed: bool = False
7273

7374
def __enter__(self) -> Channel:
7475
return self
@@ -173,7 +174,12 @@ def build_inbound_messages(
173174
174175
:rtype: :py:class:`generator`
175176
"""
176-
self.check_for_errors()
177+
try:
178+
self.check_for_errors()
179+
except (AMQPConnectionError, AMQPChannelError):
180+
if self._user_initiated_close():
181+
return
182+
raise
177183
if message_impl:
178184
if not issubclass(message_impl, BaseMessage):
179185
raise AMQPInvalidArgument(
@@ -182,10 +188,20 @@ def build_inbound_messages(
182188
else:
183189
message_impl = Message
184190
while not self.is_closed:
185-
message = self._build_message(auto_decode=auto_decode,
186-
message_impl=message_impl)
191+
try:
192+
message = self._build_message(auto_decode=auto_decode,
193+
message_impl=message_impl)
194+
except (AMQPConnectionError, AMQPChannelError):
195+
if self._user_initiated_close():
196+
return
197+
raise
187198
if not message:
188-
self.check_for_errors()
199+
try:
200+
self.check_for_errors()
201+
except (AMQPConnectionError, AMQPChannelError):
202+
if self._user_initiated_close():
203+
return
204+
raise
189205
time.sleep(IDLE_WAIT)
190206
if break_on_empty and not self._inbound:
191207
break
@@ -195,6 +211,9 @@ def build_inbound_messages(
195211
continue
196212
yield message
197213

214+
def _user_initiated_close(self) -> bool:
215+
return self._user_closed or self._connection._user_closed
216+
198217
def close(self, reply_code: int = 200, reply_text: str = '') -> None:
199218
"""Close Channel.
200219
@@ -212,6 +231,7 @@ def close(self, reply_code: int = 200, reply_text: str = '') -> None:
212231
raise AMQPInvalidArgument('reply_code should be an integer')
213232
elif not compatibility.is_string(reply_text):
214233
raise AMQPInvalidArgument('reply_text should be a string')
234+
self._user_closed = True
215235
try:
216236
if self._connection.is_closed or not self.is_open:
217237
self.stop_consuming()

amqpstorm/connection.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ def __init__(
110110
self._channel0 = Channel0(self, self.parameters['client_properties'])
111111
self._channels: dict[int, Channel] = {}
112112
self._last_channel_id: int | None = None
113+
self._user_closed: bool = False
113114
self.heartbeat = Heartbeat(self.parameters['heartbeat'],
114115
self._channel0.send_heartbeat)
115116
if not kwargs.get('lazy', False):
@@ -240,6 +241,7 @@ def close(self) -> None:
240241
:return:
241242
"""
242243
LOGGER.debug('Connection Closing')
244+
self._user_closed = True
243245
if not self.is_closed:
244246
self.set_state(self.CLOSING)
245247
self.heartbeat.stop()

amqpstorm/tests/unit/channel/test_channel.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,20 @@ def on_close_ok(_, frame_out):
7979
self.assertEqual(channel._state, channel.CLOSED)
8080
self.assertFalse(channel.exceptions)
8181

82+
def test_channel_close_sets_user_closed_flag(self):
83+
def on_close_ok(_, frame_out):
84+
if isinstance(frame_out, commands.Basic.Cancel):
85+
channel.rpc.on_frame(commands.Basic.CancelOk())
86+
return
87+
channel.rpc.on_frame(commands.Channel.CloseOk())
88+
89+
channel = Channel(0, FakeConnection(on_write=on_close_ok), 360)
90+
channel.set_state(channel.OPEN)
91+
92+
self.assertFalse(channel._user_closed)
93+
channel.close()
94+
self.assertTrue(channel._user_closed)
95+
8296
def test_channel_close_gracefully_with_queued_error(self):
8397
def on_close_ok(_, frame_out):
8498
if isinstance(frame_out, commands.Basic.Cancel):

amqpstorm/tests/unit/channel/test_channel_message_handling.py

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from pamqp.body import ContentBody
99

1010
from amqpstorm import AMQPChannelError
11+
from amqpstorm import AMQPConnectionError
1112
from amqpstorm import Channel
1213
from amqpstorm import Message
1314
from amqpstorm.tests.utility import FakeConnection
@@ -338,6 +339,105 @@ def test_channel_build_inbound_messages_as_tuple(self):
338339

339340
self.assertEqual(messages_consumed, 1)
340341

342+
def test_channel_build_inbound_messages_returns_on_user_close(self):
343+
channel = Channel(0, FakeConnection(), 360)
344+
channel.set_state(channel.OPEN)
345+
346+
message = self.message.encode('utf-8')
347+
deliver = commands.Basic.Deliver()
348+
header = ContentHeader(body_size=len(message))
349+
body = ContentBody(value=message)
350+
channel._inbound = collections.deque([deliver, header, body])
351+
352+
consumed = 0
353+
for _ in channel.build_inbound_messages(break_on_empty=False):
354+
consumed += 1
355+
channel._user_closed = True
356+
channel.set_state(channel.CLOSED)
357+
self.assertEqual(consumed, 1)
358+
self.assertFalse(channel.exceptions)
359+
360+
def test_channel_build_inbound_messages_returns_on_connection_user_close(self):
361+
connection = FakeConnection()
362+
channel = Channel(0, connection, 360)
363+
channel.set_state(channel.OPEN)
364+
365+
message = self.message.encode('utf-8')
366+
deliver = commands.Basic.Deliver()
367+
header = ContentHeader(body_size=len(message))
368+
body = ContentBody(value=message)
369+
channel._inbound = collections.deque([deliver, header, body])
370+
371+
consumed = 0
372+
for _ in channel.build_inbound_messages(break_on_empty=False):
373+
consumed += 1
374+
connection._user_closed = True
375+
connection.set_state(connection.CLOSED)
376+
self.assertEqual(consumed, 1)
377+
378+
def test_channel_build_inbound_messages_raises_on_server_close(self):
379+
channel = Channel(0, FakeConnection(), 360)
380+
channel.set_state(channel.OPEN)
381+
382+
message = self.message.encode('utf-8')
383+
deliver = commands.Basic.Deliver()
384+
header = ContentHeader(body_size=len(message))
385+
body = ContentBody(value=message)
386+
channel._inbound = collections.deque([deliver, header, body])
387+
388+
def drive():
389+
for _ in channel.build_inbound_messages(break_on_empty=False):
390+
channel.exceptions.append(
391+
AMQPChannelError('closed by server', reply_code=406)
392+
)
393+
394+
self.assertRaises(AMQPChannelError, drive)
395+
396+
def test_channel_build_inbound_messages_returns_when_already_user_closed(
397+
self,
398+
):
399+
channel = Channel(0, FakeConnection(), 360)
400+
channel._user_closed = True
401+
channel.set_state(channel.CLOSED)
402+
403+
messages = list(
404+
channel.build_inbound_messages(break_on_empty=False),
405+
)
406+
self.assertEqual(messages, [])
407+
408+
def test_channel_build_inbound_messages_raises_when_closed_without_flag(
409+
self,
410+
):
411+
channel = Channel(0, FakeConnection(), 360)
412+
channel.set_state(channel.CLOSED)
413+
414+
self.assertRaises(
415+
AMQPChannelError,
416+
lambda: list(
417+
channel.build_inbound_messages(break_on_empty=False)
418+
),
419+
)
420+
421+
def test_channel_user_initiated_close_helper(self):
422+
connection = FakeConnection()
423+
channel = Channel(0, connection, 360)
424+
425+
self.assertFalse(channel._user_initiated_close())
426+
427+
channel._user_closed = True
428+
self.assertTrue(channel._user_initiated_close())
429+
430+
channel._user_closed = False
431+
connection._user_closed = True
432+
self.assertTrue(channel._user_initiated_close())
433+
434+
def test_channel_check_for_errors_still_raises_after_user_close(self):
435+
connection = FakeConnection(state=FakeConnection.CLOSED)
436+
connection._user_closed = True
437+
channel = Channel(0, connection, 360)
438+
439+
self.assertRaises(AMQPConnectionError, channel.check_for_errors)
440+
341441

342442
class ChannelProcessDataEventTests(TestFramework):
343443
def test_channel_process_data_events(self):

amqpstorm/tests/unit/connection/test_connection.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,14 @@ def test_connection_close_state(self):
7575

7676
self.assertTrue(connection.is_closed)
7777

78+
def test_connection_close_sets_user_closed_flag(self):
79+
connection = Connection('127.0.0.1', 'guest', 'guest', lazy=True)
80+
connection.set_state(Connection.OPEN)
81+
82+
self.assertFalse(connection._user_closed)
83+
connection.close()
84+
self.assertTrue(connection._user_closed)
85+
7886
def test_connection_open_channel_on_closed_connection(self):
7987
connection = Connection('127.0.0.1', 'guest', 'guest', lazy=True)
8088

0 commit comments

Comments
 (0)