Skip to content

Commit 30187ef

Browse files
committed
Initial Heartbeat patch
1 parent 35284ae commit 30187ef

10 files changed

Lines changed: 224 additions & 146 deletions

File tree

amqpstorm/channel0.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,7 @@ def on_frame(self, frame_in):
3939
"""
4040
LOGGER.debug('Frame Received: %s', frame_in.name)
4141
if frame_in.name == 'Heartbeat':
42-
self._connection.heartbeat.register_heartbeat()
43-
self._write_frame(Heartbeat())
42+
self.send_heartbeat()
4443
elif frame_in.name == 'Connection.Start':
4544
self.server_properties = frame_in.server_properties
4645
self._send_start_ok_frame(frame_in)
@@ -63,6 +62,13 @@ def on_frame(self, frame_in):
6362
else:
6463
LOGGER.error('[Channel0] Unhandled Frame: %s', frame_in.name)
6564

65+
def send_heartbeat(self):
66+
"""Send Heartbeat frame.
67+
68+
:return:
69+
"""
70+
self._write_frame(Heartbeat())
71+
6672
def send_close_connection_frame(self):
6773
"""Send Connection Close frame.
6874
@@ -107,6 +113,7 @@ def _write_frame(self, frame_out):
107113
:return:
108114
"""
109115
self._connection.write_frame(0, frame_out)
116+
LOGGER.debug('Frame Sent: %s', frame_out.name)
110117

111118
def _send_start_ok_frame(self, frame_in):
112119
"""Send Start OK frame.

amqpstorm/connection.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,12 @@ def __init__(self, hostname, username, password, port=5672, **kwargs):
5656
'ssl_options': kwargs.get('ssl_options', {})
5757
}
5858
self._validate_parameters()
59-
self.heartbeat = Heartbeat(self.parameters['heartbeat'])
60-
self._io = IO(self.parameters, on_read=self._read_buffer)
6159
self._channel0 = Channel0(self)
60+
self.heartbeat = Heartbeat(self.parameters['heartbeat'],
61+
self._channel0.send_heartbeat)
62+
self._io = IO(self.parameters, on_read=self._read_buffer,
63+
on_write=self.heartbeat.register_write)
64+
6265
self._channels = {}
6366
if not kwargs.get('lazy', False):
6467
self.open()
@@ -247,7 +250,7 @@ def _read_buffer(self, buffer):
247250
if frame_in is None:
248251
break
249252

250-
self.heartbeat.register_beat()
253+
self.heartbeat.register_read()
251254
if channel_id == 0:
252255
self._channel0.on_frame(frame_in)
253256
else:
@@ -270,7 +273,7 @@ def _handle_amqp_frame(self, data_in):
270273
pass
271274
except pamqp_spec.AMQPFrameError as why:
272275
LOGGER.error('AMQPFrameError: %r', why, exc_info=True)
273-
except (UnicodeDecodeError, ValueError) as why:
276+
except ValueError as why:
274277
LOGGER.error(why, exc_info=True)
275278
self.exceptions.append(AMQPConnectionError(why))
276279
return data_in, None, None

amqpstorm/heartbeat.py

Lines changed: 38 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import logging
44
import threading
5-
import time
65

76
from amqpstorm.exception import AMQPConnectionError
87

@@ -12,31 +11,30 @@
1211
class Heartbeat(object):
1312
"""AMQP Internal Heartbeat Checker"""
1413

15-
def __init__(self, interval):
14+
def __init__(self, interval, send_heartbeat=None):
1615
self._lock = threading.Lock()
17-
self._stopped = threading.Event()
16+
self._running = threading.Event()
1817
self._timer = None
1918
self._exceptions = None
20-
self._last_heartbeat = 0.0
21-
self._beats_since_check = 0
22-
self._interval = interval + 1
23-
self._threshold = (interval + 1) * 2
19+
self._reads_since_check = 0
20+
self._writes_since_check = 0
21+
self._interval = interval
22+
self._threshold = 0
23+
self.send_heartbeat = send_heartbeat
2424

25-
def register_beat(self):
25+
def register_read(self):
2626
"""Register that a frame has been received.
2727
2828
:return:
2929
"""
30-
with self._lock:
31-
self._beats_since_check += 1
30+
self._reads_since_check += 1
3231

33-
def register_heartbeat(self):
34-
"""Register a Heartbeat.
32+
def register_write(self):
33+
"""Register that a frame has been sent.
3534
3635
:return:
3736
"""
38-
with self._lock:
39-
self._last_heartbeat = time.time()
37+
self._writes_since_check += 1
4038

4139
def start(self, exceptions):
4240
"""Start the Heartbeat Checker.
@@ -46,9 +44,10 @@ def start(self, exceptions):
4644
"""
4745
LOGGER.debug('Heartbeat Checker Started')
4846
with self._lock:
49-
self._stopped = threading.Event()
50-
self._beats_since_check = 0
51-
self._last_heartbeat = time.time()
47+
self._running.set()
48+
self._threshold = 0
49+
self._reads_since_check = 0
50+
self._writes_since_check = 0
5251
self._exceptions = exceptions
5352
self._start_new_timer()
5453

@@ -57,14 +56,16 @@ def stop(self):
5756
5857
:return:
5958
"""
60-
self._stopped.set()
59+
self._running.clear()
6160
if self._timer:
6261
self._timer.cancel()
6362
self._timer = None
6463

6564
def _check_for_life_signs(self):
6665
"""Check if we have any sign of life.
6766
67+
First check if any data has been sent, if not send a heartbeat.
68+
6869
If we have not received a heartbeat, or any data what so ever
6970
we should raise an exception so that we can close the connection.
7071
@@ -73,21 +74,28 @@ def _check_for_life_signs(self):
7374
7475
:return:
7576
"""
76-
if self._stopped.is_set():
77+
if not self._running.is_set():
7778
return False
7879
self._lock.acquire()
7980
try:
80-
elapsed = time.time() - self._last_heartbeat
81-
if self._beats_since_check == 0 and elapsed > self._threshold:
82-
self._stopped.set()
83-
message = ('Connection dead, no heartbeat or data received '
84-
'in %ds' % round(elapsed, 3))
85-
why = AMQPConnectionError(message)
86-
if self._exceptions is None:
87-
raise why
88-
self._exceptions.append(why)
89-
self._beats_since_check = 0
81+
if self._writes_since_check == 0:
82+
self.send_heartbeat()
83+
if self._reads_since_check == 0:
84+
self._threshold += 1
85+
if self._threshold >= 2:
86+
self._running.set()
87+
message = ('Connection dead, no heartbeat or data received'
88+
' in >= %ds' % (self._interval * 2))
89+
why = AMQPConnectionError(message)
90+
if self._exceptions is None:
91+
raise why
92+
self._exceptions.append(why)
93+
return
94+
else:
95+
self._threshold = 0
9096
finally:
97+
self._reads_since_check = 0
98+
self._writes_since_check = 0
9199
self._lock.release()
92100
self._start_new_timer()
93101
return True
@@ -97,7 +105,7 @@ def _start_new_timer(self):
97105
98106
:return:
99107
"""
100-
if self._stopped.is_set():
108+
if not self._running.is_set():
101109
return
102110
self._timer = threading.Timer(interval=self._interval,
103111
function=self._check_for_life_signs)

amqpstorm/io.py

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -53,13 +53,14 @@ def is_ready(self):
5353
class IO(object):
5454
"""AMQP Connection.io"""
5555

56-
def __init__(self, parameters, on_read=None):
57-
self._inbound_thread = None
56+
def __init__(self, parameters, on_read=None, on_write=None):
57+
self._exceptions = None
5858
self._lock = threading.Lock()
59-
self._stopped = threading.Event()
60-
self._parameters = parameters
59+
self._inbound_thread = None
6160
self._on_read = on_read
62-
self._exceptions = None
61+
self._on_write = on_write
62+
self._running = threading.Event()
63+
self._parameters = parameters
6364
self.buffer = EMPTY_BUFFER
6465
self.poller = None
6566
self.socket = None
@@ -76,7 +77,7 @@ def open(self, exceptions):
7677
self._lock.acquire()
7778
try:
7879
self.buffer = EMPTY_BUFFER
79-
self._stopped = threading.Event()
80+
self._running.set()
8081
self._exceptions = exceptions
8182
sock_addresses = self._get_socket_addresses()
8283
self.socket = self._find_address_and_connect(sock_addresses)
@@ -93,7 +94,7 @@ def close(self):
9394
"""
9495
self._lock.acquire()
9596
try:
96-
self._stopped.set()
97+
self._running.clear()
9798
if self._inbound_thread:
9899
self._inbound_thread.join()
99100
self._inbound_thread = None
@@ -110,12 +111,16 @@ def write_to_socket(self, frame_data):
110111
:param str frame_data:
111112
:return:
112113
"""
114+
if self._on_write:
115+
self._on_write()
113116
self._lock.acquire()
114117
try:
115118
total_bytes_written = 0
116119
bytes_to_send = len(frame_data)
117120
while total_bytes_written < bytes_to_send:
118121
try:
122+
if not self.socket:
123+
raise socket.error('connection/socket error')
119124
bytes_written = \
120125
self.socket.send(frame_data[total_bytes_written:])
121126
if bytes_written == 0:
@@ -210,7 +215,7 @@ def _process_incoming_data(self):
210215
211216
:return:
212217
"""
213-
while not self._stopped.is_set():
218+
while self._running.is_set():
214219
if self.poller.is_ready:
215220
self.buffer += self._receive()
216221
self.buffer = self._on_read(self.buffer)
@@ -231,7 +236,7 @@ def _receive(self):
231236
pass
232237
except (IOError, OSError) as why:
233238
self._exceptions.append(AMQPConnectionError(why))
234-
self._stopped.set()
239+
self._running.clear()
235240
return result
236241

237242
def _read_from_socket(self):

amqpstorm/tests/channel0_tests.py

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -177,30 +177,6 @@ def test_channel0_unblocked(self):
177177
self.assertEqual(self.logging_handler.messages['info'][0],
178178
'Connection is no longer blocked by remote server')
179179

180-
def test_channel0_on_hearbeat_registers_heartbeat(self):
181-
connection = amqpstorm.Connection('localhost', 'guest', 'guest',
182-
lazy=True)
183-
last_heartbeat = connection.heartbeat._last_heartbeat
184-
start_time = time.time()
185-
channel = Channel0(connection)
186-
187-
time.sleep(0.1)
188-
189-
def fake(*_):
190-
pass
191-
192-
# Don't try to write to socket during test.
193-
channel._write_frame = fake
194-
195-
# As the heartbeat timer was never started, it should be 0.
196-
self.assertEqual(connection.heartbeat._last_heartbeat, 0.0)
197-
198-
channel.on_frame(Heartbeat())
199-
200-
self.assertNotEqual(connection.heartbeat._last_heartbeat,
201-
last_heartbeat)
202-
self.assertGreater(connection.heartbeat._last_heartbeat, start_time)
203-
204180
def test_channel0_unhandled_frame(self):
205181
connection = amqpstorm.Connection('localhost', 'guest', 'guest',
206182
lazy=True)

amqpstorm/tests/connection_tests.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -253,11 +253,11 @@ def test_connection_heartbeat_stopped_on_close(self):
253253
connection.heartbeat.start(connection.exceptions)
254254
connection.exceptions.append(AMQPConnectionError('error'))
255255

256-
self.assertFalse(connection.heartbeat._stopped.is_set())
256+
self.assertTrue(connection.heartbeat._running.is_set())
257257

258258
self.assertRaises(AMQPConnectionError, connection.check_for_errors)
259259

260-
self.assertTrue(connection.heartbeat._stopped.is_set())
260+
self.assertFalse(connection.heartbeat._running.is_set())
261261

262262

263263
class ConnectionParameterTests(unittest.TestCase):

0 commit comments

Comments
 (0)