Skip to content

Commit c4d9953

Browse files
committed
Finalizing Heartbeat bug patch
1 parent 30187ef commit c4d9953

24 files changed

Lines changed: 196 additions & 98 deletions

.travis.yml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,7 @@ install:
1313
- if [[ $TRAVIS_PYTHON_VERSION == '2.6' ]]; then pip install unittest2; fi
1414
- pip install -r requirements.txt
1515
- pip install -r test-requirements.txt
16-
- if [[ $TRAVIS_PYTHON_VERSION == '3.2' ]]; then pip install coverage==3.7.1; fi
17-
script: nosetests -v -l DEBUG --logging-level=DEBUG --with-coverage --cover-package=amqpstorm
16+
script: nosetests -v -l DEBUG --logging-level=DEBUG --with-coverage --cover-package=amqpstorm --with-timer --timer-top-n 25
1817
after_success:
1918
- bash <(curl -s https://codecov.io/bash)
2019
services:

CHANGELOG

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
# Changelog
22

3+
### Version 1.4.0
4+
- All classes are now slotted.
5+
- New improved Heartbeat Monitor.
6+
- If no data has been sent within the Heartbeat interval, the client will now send a Heartbeat to the server. - Thanks David Schneider.
7+
38
### Version 1.3.4
49
- Dropped Python 3.2 Support.
510
- Fixed incorrect SSL warning when adding heartbeat or timeout to uri string (#18) - Thanks Adam Mills.

amqpstorm/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
"""AMQP-Storm."""
2-
__version__ = '1.3.4' # noqa
2+
__version__ = '1.4.0' # noqa
33
__author__ = 'eandersson' # noqa
44

55
import logging

amqpstorm/base.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,20 @@
11
"""AMQP-Storm Base."""
22

3+
import locale
34
import threading
45

6+
AUTH_MECHANISM = 'PLAIN'
57
IDLE_WAIT = 0.01
68
FRAME_MAX = 131072
9+
MAX_CHANNELS = 65535
10+
LOCALE = locale.getdefaultlocale()[0] or 'en_US'
711

812

913
class Stateful(object):
1014
"""Stateful"""
15+
__slots__ = [
16+
'_exceptions', '_lock', '_state'
17+
]
1118
CLOSED = 0
1219
CLOSING = 1
1320
OPENING = 2
@@ -81,6 +88,9 @@ def check_for_errors(self):
8188

8289
class BaseChannel(Stateful):
8390
"""AMQP BaseChannel"""
91+
__slots__ = [
92+
'_channel_id', '_consumer_tags'
93+
]
8494

8595
def __init__(self, channel_id):
8696
super(BaseChannel, self).__init__()
@@ -129,7 +139,9 @@ def remove_consumer_tag(self, tag=None):
129139

130140
class BaseMessage(object):
131141
"""AMQP BaseMessage"""
132-
__slots__ = ['_body', '_channel', '_method', '_properties']
142+
__slots__ = [
143+
'_body', '_channel', '_method', '_properties'
144+
]
133145

134146
def __init__(self, channel, **message):
135147
"""
@@ -169,7 +181,9 @@ def to_tuple(self):
169181

170182
class Handler(object):
171183
"""Operations Handler (e.g. Queue, Exchange)"""
172-
__slots__ = ['_channel']
184+
__slots__ = [
185+
'_channel'
186+
]
173187

174188
def __init__(self, channel):
175189
self._channel = channel

amqpstorm/basic.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
class Basic(Handler):
2121
"""AMQP Channel.basic"""
22+
__slots__ = []
2223

2324
def qos(self, prefetch_count=0, prefetch_size=0, global_=False):
2425
"""Specify quality of service.

amqpstorm/channel.py

Lines changed: 13 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@
2626

2727
class Channel(BaseChannel):
2828
"""Connection.channel"""
29+
__slots__ = [
30+
'confirming_deliveries', 'consumer_callback', 'rpc', '_basic',
31+
'_connection', '_exchange', '_inbound', '_queue'
32+
]
2933

3034
def __init__(self, channel_id, connection, rpc_timeout):
3135
super(Channel, self).__init__(channel_id)
@@ -34,9 +38,9 @@ def __init__(self, channel_id, connection, rpc_timeout):
3438
self.consumer_callback = None
3539
self._inbound = []
3640
self._connection = connection
37-
self._basic = None
38-
self._exchange = None
39-
self._queue = None
41+
self._basic = Basic(self)
42+
self._exchange = Exchange(self)
43+
self._queue = Queue(self)
4044

4145
def __enter__(self):
4246
return self
@@ -58,23 +62,23 @@ def basic(self):
5862
5963
:rtype: Basic
6064
"""
61-
return self._lazy_load_handler('_basic', Basic)
65+
return self._basic
6266

6367
@property
6468
def exchange(self):
6569
"""RabbitMQ Exchange Operations.
6670
6771
:rtype: Exchange
6872
"""
69-
return self._lazy_load_handler('_exchange', Exchange)
73+
return self._exchange
7074

7175
@property
7276
def queue(self):
7377
"""RabbitMQ Queue Operations.
7478
7579
:rtype: Queue
7680
"""
77-
return self._lazy_load_handler('_queue', Queue)
81+
return self._queue
7882

7983
def open(self):
8084
"""Open Channel.
@@ -340,10 +344,10 @@ def _build_message(self):
340344
with self.lock:
341345
if len(self._inbound) < 2:
342346
return None
343-
result = self._build_message_headers()
344-
if not result:
347+
headers = self._build_message_headers()
348+
if not headers:
345349
return None
346-
basic_deliver, content_header = result
350+
basic_deliver, content_header = headers
347351
body = self._build_message_body(content_header.body_size)
348352

349353
message = Message(channel=self,
@@ -387,16 +391,3 @@ def _build_message_body(self, body_size):
387391
break
388392
body += body_piece.value
389393
return body
390-
391-
def _lazy_load_handler(self, name, handler_class):
392-
"""Lazy load operations (e.g. Queue, Exchange)
393-
394-
:param name:
395-
:param Handler handler_class: Handler (e.g. Queue)
396-
:return:
397-
"""
398-
handler = getattr(self, name)
399-
if not handler:
400-
handler = handler_class(self)
401-
setattr(self, name, handler)
402-
return handler

amqpstorm/channel0.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
"""AMQP-Storm Connection.Channel0."""
22

3-
import locale
43
import logging
54
import platform
65

@@ -9,15 +8,16 @@
98
from pamqp.specification import Connection as pamqp_connection
109

1110
from amqpstorm import __version__
11+
from amqpstorm.base import AUTH_MECHANISM
1212
from amqpstorm.base import FRAME_MAX
13+
from amqpstorm.base import LOCALE
14+
from amqpstorm.base import MAX_CHANNELS
1315
from amqpstorm.base import Stateful
1416
from amqpstorm.exception import AMQPConnectionError
1517
from amqpstorm.compatibility import try_utf8_decode
1618

17-
AUTH_MECHANISM = 'PLAIN'
18-
LOCALE = locale.getdefaultlocale()[0] or 'en_US'
19+
1920
LOGGER = logging.getLogger(__name__)
20-
MAX_CHANNELS = 65535
2121

2222

2323
class Channel0(object):
@@ -118,7 +118,7 @@ def _write_frame(self, frame_out):
118118
def _send_start_ok_frame(self, frame_in):
119119
"""Send Start OK frame.
120120
121-
:param pamqp_spec.Frame frame_out: Amqp frame.
121+
:param pamqp_spec.Connection.StartOk frame_in: Amqp frame.
122122
:return:
123123
"""
124124
if 'PLAIN' not in try_utf8_decode(frame_in.mechanisms):

amqpstorm/connection.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@
2525

2626
class Connection(Stateful):
2727
"""AMQP Connection"""
28+
__slots__ = [
29+
'heartbeat', 'parameters', '_channel0', '_channels', '_io'
30+
]
2831

2932
def __init__(self, hostname, username, password, port=5672, **kwargs):
3033
"""
@@ -56,13 +59,12 @@ def __init__(self, hostname, username, password, port=5672, **kwargs):
5659
'ssl_options': kwargs.get('ssl_options', {})
5760
}
5861
self._validate_parameters()
62+
self._io = IO(self.parameters, exceptions=self._exceptions,
63+
on_read=self._read_buffer)
5964
self._channel0 = Channel0(self)
65+
self._channels = {}
6066
self.heartbeat = Heartbeat(self.parameters['heartbeat'],
6167
self._channel0.send_heartbeat)
62-
self._io = IO(self.parameters, on_read=self._read_buffer,
63-
on_write=self.heartbeat.register_write)
64-
65-
self._channels = {}
6668
if not kwargs.get('lazy', False):
6769
self.open()
6870

@@ -119,7 +121,7 @@ def open(self):
119121
LOGGER.debug('Connection Opening')
120122
self.set_state(self.OPENING)
121123
self._exceptions = []
122-
self._io.open(self._exceptions)
124+
self._io.open()
123125
self._send_handshake()
124126
self._wait_for_connection_to_open()
125127
self.heartbeat.start(self._exceptions)
@@ -183,6 +185,7 @@ def write_frame(self, channel_id, frame_out):
183185
:return:
184186
"""
185187
frame_data = pamqp_frame.marshal(frame_out, channel_id)
188+
self.heartbeat.register_write()
186189
self._io.write_to_socket(frame_data)
187190

188191
def write_frames(self, channel_id, multiple_frames):
@@ -195,6 +198,7 @@ def write_frames(self, channel_id, multiple_frames):
195198
frame_data = EMPTY_BUFFER
196199
for single_frame in multiple_frames:
197200
frame_data += pamqp_frame.marshal(single_frame, channel_id)
201+
self.heartbeat.register_write()
198202
self._io.write_to_socket(frame_data)
199203

200204
def _validate_parameters(self):

amqpstorm/exchange.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
class Exchange(Handler):
1515
"""AMQP Channel.exchange"""
16+
__slots__ = []
1617

1718
def declare(self, exchange='', exchange_type='direct', passive=False,
1819
durable=False, auto_delete=False, arguments=None):

amqpstorm/heartbeat.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ class Heartbeat(object):
1212
"""AMQP Internal Heartbeat Checker"""
1313

1414
def __init__(self, interval, send_heartbeat=None):
15+
self.send_heartbeat = send_heartbeat
1516
self._lock = threading.Lock()
1617
self._running = threading.Event()
1718
self._timer = None
@@ -20,7 +21,6 @@ def __init__(self, interval, send_heartbeat=None):
2021
self._writes_since_check = 0
2122
self._interval = interval
2223
self._threshold = 0
23-
self.send_heartbeat = send_heartbeat
2424

2525
def register_read(self):
2626
"""Register that a frame has been received.
@@ -67,12 +67,13 @@ def _check_for_life_signs(self):
6767
First check if any data has been sent, if not send a heartbeat.
6868
6969
If we have not received a heartbeat, or any data what so ever
70-
we should raise an exception so that we can close the connection.
70+
within two intervals, we need to raise an exception so
71+
that we can close the connection.
7172
7273
RabbitMQ may not necessarily send heartbeats if the connection
7374
is busy, so we only raise if no frame has been received.
7475
75-
:return:
76+
:rtype: bool
7677
"""
7778
if not self._running.is_set():
7879
return False
@@ -90,7 +91,7 @@ def _check_for_life_signs(self):
9091
if self._exceptions is None:
9192
raise why
9293
self._exceptions.append(why)
93-
return
94+
return False
9495
else:
9596
self._threshold = 0
9697
finally:

0 commit comments

Comments
 (0)