Skip to content

Commit 6524e9a

Browse files
authored
Merge pull request #24 from eandersson/bug/heartbeat
Bug/heartbeat
2 parents 35284ae + c4d9953 commit 6524e9a

25 files changed

Lines changed: 392 additions & 216 deletions

.travis.yml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,7 @@ install:
1313
- if [[ $TRAVIS_PYTHON_VERSION == '2.6' ]]; then pip install unittest2; fi
1414
- pip install -r requirements.txt
1515
- pip install -r test-requirements.txt
16-
- if [[ $TRAVIS_PYTHON_VERSION == '3.2' ]]; then pip install coverage==3.7.1; fi
17-
script: nosetests -v -l DEBUG --logging-level=DEBUG --with-coverage --cover-package=amqpstorm
16+
script: nosetests -v -l DEBUG --logging-level=DEBUG --with-coverage --cover-package=amqpstorm --with-timer --timer-top-n 25
1817
after_success:
1918
- bash <(curl -s https://codecov.io/bash)
2019
services:

CHANGELOG

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
# Changelog
22

3+
### Version 1.4.0
4+
- All classes are now slotted.
5+
- New improved Heartbeat Monitor.
6+
- If no data has been sent within the Heartbeat interval, the client will now send a Heartbeat to the server. - Thanks David Schneider.
7+
38
### Version 1.3.4
49
- Dropped Python 3.2 Support.
510
- Fixed incorrect SSL warning when adding heartbeat or timeout to uri string (#18) - Thanks Adam Mills.

amqpstorm/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
"""AMQP-Storm."""
2-
__version__ = '1.3.4' # noqa
2+
__version__ = '1.4.0' # noqa
33
__author__ = 'eandersson' # noqa
44

55
import logging

amqpstorm/base.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,20 @@
11
"""AMQP-Storm Base."""
22

3+
import locale
34
import threading
45

6+
AUTH_MECHANISM = 'PLAIN'
57
IDLE_WAIT = 0.01
68
FRAME_MAX = 131072
9+
MAX_CHANNELS = 65535
10+
LOCALE = locale.getdefaultlocale()[0] or 'en_US'
711

812

913
class Stateful(object):
1014
"""Stateful"""
15+
__slots__ = [
16+
'_exceptions', '_lock', '_state'
17+
]
1118
CLOSED = 0
1219
CLOSING = 1
1320
OPENING = 2
@@ -81,6 +88,9 @@ def check_for_errors(self):
8188

8289
class BaseChannel(Stateful):
8390
"""AMQP BaseChannel"""
91+
__slots__ = [
92+
'_channel_id', '_consumer_tags'
93+
]
8494

8595
def __init__(self, channel_id):
8696
super(BaseChannel, self).__init__()
@@ -129,7 +139,9 @@ def remove_consumer_tag(self, tag=None):
129139

130140
class BaseMessage(object):
131141
"""AMQP BaseMessage"""
132-
__slots__ = ['_body', '_channel', '_method', '_properties']
142+
__slots__ = [
143+
'_body', '_channel', '_method', '_properties'
144+
]
133145

134146
def __init__(self, channel, **message):
135147
"""
@@ -169,7 +181,9 @@ def to_tuple(self):
169181

170182
class Handler(object):
171183
"""Operations Handler (e.g. Queue, Exchange)"""
172-
__slots__ = ['_channel']
184+
__slots__ = [
185+
'_channel'
186+
]
173187

174188
def __init__(self, channel):
175189
self._channel = channel

amqpstorm/basic.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
class Basic(Handler):
2121
"""AMQP Channel.basic"""
22+
__slots__ = []
2223

2324
def qos(self, prefetch_count=0, prefetch_size=0, global_=False):
2425
"""Specify quality of service.

amqpstorm/channel.py

Lines changed: 13 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@
2626

2727
class Channel(BaseChannel):
2828
"""Connection.channel"""
29+
__slots__ = [
30+
'confirming_deliveries', 'consumer_callback', 'rpc', '_basic',
31+
'_connection', '_exchange', '_inbound', '_queue'
32+
]
2933

3034
def __init__(self, channel_id, connection, rpc_timeout):
3135
super(Channel, self).__init__(channel_id)
@@ -34,9 +38,9 @@ def __init__(self, channel_id, connection, rpc_timeout):
3438
self.consumer_callback = None
3539
self._inbound = []
3640
self._connection = connection
37-
self._basic = None
38-
self._exchange = None
39-
self._queue = None
41+
self._basic = Basic(self)
42+
self._exchange = Exchange(self)
43+
self._queue = Queue(self)
4044

4145
def __enter__(self):
4246
return self
@@ -58,23 +62,23 @@ def basic(self):
5862
5963
:rtype: Basic
6064
"""
61-
return self._lazy_load_handler('_basic', Basic)
65+
return self._basic
6266

6367
@property
6468
def exchange(self):
6569
"""RabbitMQ Exchange Operations.
6670
6771
:rtype: Exchange
6872
"""
69-
return self._lazy_load_handler('_exchange', Exchange)
73+
return self._exchange
7074

7175
@property
7276
def queue(self):
7377
"""RabbitMQ Queue Operations.
7478
7579
:rtype: Queue
7680
"""
77-
return self._lazy_load_handler('_queue', Queue)
81+
return self._queue
7882

7983
def open(self):
8084
"""Open Channel.
@@ -340,10 +344,10 @@ def _build_message(self):
340344
with self.lock:
341345
if len(self._inbound) < 2:
342346
return None
343-
result = self._build_message_headers()
344-
if not result:
347+
headers = self._build_message_headers()
348+
if not headers:
345349
return None
346-
basic_deliver, content_header = result
350+
basic_deliver, content_header = headers
347351
body = self._build_message_body(content_header.body_size)
348352

349353
message = Message(channel=self,
@@ -387,16 +391,3 @@ def _build_message_body(self, body_size):
387391
break
388392
body += body_piece.value
389393
return body
390-
391-
def _lazy_load_handler(self, name, handler_class):
392-
"""Lazy load operations (e.g. Queue, Exchange)
393-
394-
:param name:
395-
:param Handler handler_class: Handler (e.g. Queue)
396-
:return:
397-
"""
398-
handler = getattr(self, name)
399-
if not handler:
400-
handler = handler_class(self)
401-
setattr(self, name, handler)
402-
return handler

amqpstorm/channel0.py

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
"""AMQP-Storm Connection.Channel0."""
22

3-
import locale
43
import logging
54
import platform
65

@@ -9,15 +8,16 @@
98
from pamqp.specification import Connection as pamqp_connection
109

1110
from amqpstorm import __version__
11+
from amqpstorm.base import AUTH_MECHANISM
1212
from amqpstorm.base import FRAME_MAX
13+
from amqpstorm.base import LOCALE
14+
from amqpstorm.base import MAX_CHANNELS
1315
from amqpstorm.base import Stateful
1416
from amqpstorm.exception import AMQPConnectionError
1517
from amqpstorm.compatibility import try_utf8_decode
1618

17-
AUTH_MECHANISM = 'PLAIN'
18-
LOCALE = locale.getdefaultlocale()[0] or 'en_US'
19+
1920
LOGGER = logging.getLogger(__name__)
20-
MAX_CHANNELS = 65535
2121

2222

2323
class Channel0(object):
@@ -39,8 +39,7 @@ def on_frame(self, frame_in):
3939
"""
4040
LOGGER.debug('Frame Received: %s', frame_in.name)
4141
if frame_in.name == 'Heartbeat':
42-
self._connection.heartbeat.register_heartbeat()
43-
self._write_frame(Heartbeat())
42+
self.send_heartbeat()
4443
elif frame_in.name == 'Connection.Start':
4544
self.server_properties = frame_in.server_properties
4645
self._send_start_ok_frame(frame_in)
@@ -63,6 +62,13 @@ def on_frame(self, frame_in):
6362
else:
6463
LOGGER.error('[Channel0] Unhandled Frame: %s', frame_in.name)
6564

65+
def send_heartbeat(self):
66+
"""Send Heartbeat frame.
67+
68+
:return:
69+
"""
70+
self._write_frame(Heartbeat())
71+
6672
def send_close_connection_frame(self):
6773
"""Send Connection Close frame.
6874
@@ -107,11 +113,12 @@ def _write_frame(self, frame_out):
107113
:return:
108114
"""
109115
self._connection.write_frame(0, frame_out)
116+
LOGGER.debug('Frame Sent: %s', frame_out.name)
110117

111118
def _send_start_ok_frame(self, frame_in):
112119
"""Send Start OK frame.
113120
114-
:param pamqp_spec.Frame frame_out: Amqp frame.
121+
:param pamqp_spec.Connection.StartOk frame_in: Amqp frame.
115122
:return:
116123
"""
117124
if 'PLAIN' not in try_utf8_decode(frame_in.mechanisms):

amqpstorm/connection.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@
2525

2626
class Connection(Stateful):
2727
"""AMQP Connection"""
28+
__slots__ = [
29+
'heartbeat', 'parameters', '_channel0', '_channels', '_io'
30+
]
2831

2932
def __init__(self, hostname, username, password, port=5672, **kwargs):
3033
"""
@@ -56,10 +59,12 @@ def __init__(self, hostname, username, password, port=5672, **kwargs):
5659
'ssl_options': kwargs.get('ssl_options', {})
5760
}
5861
self._validate_parameters()
59-
self.heartbeat = Heartbeat(self.parameters['heartbeat'])
60-
self._io = IO(self.parameters, on_read=self._read_buffer)
62+
self._io = IO(self.parameters, exceptions=self._exceptions,
63+
on_read=self._read_buffer)
6164
self._channel0 = Channel0(self)
6265
self._channels = {}
66+
self.heartbeat = Heartbeat(self.parameters['heartbeat'],
67+
self._channel0.send_heartbeat)
6368
if not kwargs.get('lazy', False):
6469
self.open()
6570

@@ -116,7 +121,7 @@ def open(self):
116121
LOGGER.debug('Connection Opening')
117122
self.set_state(self.OPENING)
118123
self._exceptions = []
119-
self._io.open(self._exceptions)
124+
self._io.open()
120125
self._send_handshake()
121126
self._wait_for_connection_to_open()
122127
self.heartbeat.start(self._exceptions)
@@ -180,6 +185,7 @@ def write_frame(self, channel_id, frame_out):
180185
:return:
181186
"""
182187
frame_data = pamqp_frame.marshal(frame_out, channel_id)
188+
self.heartbeat.register_write()
183189
self._io.write_to_socket(frame_data)
184190

185191
def write_frames(self, channel_id, multiple_frames):
@@ -192,6 +198,7 @@ def write_frames(self, channel_id, multiple_frames):
192198
frame_data = EMPTY_BUFFER
193199
for single_frame in multiple_frames:
194200
frame_data += pamqp_frame.marshal(single_frame, channel_id)
201+
self.heartbeat.register_write()
195202
self._io.write_to_socket(frame_data)
196203

197204
def _validate_parameters(self):
@@ -247,7 +254,7 @@ def _read_buffer(self, buffer):
247254
if frame_in is None:
248255
break
249256

250-
self.heartbeat.register_beat()
257+
self.heartbeat.register_read()
251258
if channel_id == 0:
252259
self._channel0.on_frame(frame_in)
253260
else:
@@ -270,7 +277,7 @@ def _handle_amqp_frame(self, data_in):
270277
pass
271278
except pamqp_spec.AMQPFrameError as why:
272279
LOGGER.error('AMQPFrameError: %r', why, exc_info=True)
273-
except (UnicodeDecodeError, ValueError) as why:
280+
except ValueError as why:
274281
LOGGER.error(why, exc_info=True)
275282
self.exceptions.append(AMQPConnectionError(why))
276283
return data_in, None, None

amqpstorm/exchange.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
class Exchange(Handler):
1515
"""AMQP Channel.exchange"""
16+
__slots__ = []
1617

1718
def declare(self, exchange='', exchange_type='direct', passive=False,
1819
durable=False, auto_delete=False, arguments=None):

0 commit comments

Comments
 (0)