Skip to content

Commit fe81ac1

Browse files
committed
Ensure buffer can't overflow; update uptime check
This takes advantage of amqc's MQTTClient.up Event flag to wait until the connection is good before trying to flush. Also truncates the buffer while the connection is down so that it doesn't grow indefinitely.
1 parent 9d2244f commit fe81ac1

File tree

3 files changed

+65
-21
lines changed

3 files changed

+65
-21
lines changed

mqlog/__init__.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ async def run(self):
4747
This method should be scheduled as an asyncio task.
4848
"""
4949
while True:
50+
await self.client.up.wait()
5051
await self.will_flush.wait()
5152
await self._flush()
5253

@@ -59,6 +60,11 @@ def _should_flush(self, record):
5960
# Called by logging.Handler when the logger logs a message
6061
def emit(self, record):
6162
self.buffer.append(self.format(record))
63+
64+
# Prevent buffer from growing indefinitely
65+
if self.buffer and len(self.buffer) > self.capacity:
66+
self.buffer = self.buffer[-self.capacity :]
67+
6268
if self._should_flush(record):
6369
self.will_flush.set()
6470

@@ -73,15 +79,11 @@ async def _flush(self):
7379
# Try to publish, but if we fail, just log the error – don't
7480
# want to cause cascading errors. Something else is responsible
7581
# for bringing the connection back up.
82+
msg = "\n".join([line for line in self.buffer])
7683
try:
77-
msg = "\n".join([line for line in self.buffer])
7884
await self.client.publish(self.topic, msg, qos=self.qos)
7985
self.buffer.clear()
8086
except Exception as e:
8187
self._logger.error(f"Failed to publish logs via MQTT: {e}")
82-
self.will_flush.clear()
83-
84-
# If we were supposed to flush but couldn't, truncate the buffer
85-
# to prevent it from growing indefinitely
86-
if self.buffer and len(self.buffer) >= self.capacity:
87-
self.buffer = self.buffer[-self.capacity :]
88+
finally:
89+
self.will_flush.clear()

tests/test_handler.py

Lines changed: 51 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import asyncio
22
import logging
3+
import unittest
34
from unittest import TestCase
45

56
from mqlog import MqttHandler
@@ -9,23 +10,25 @@
910
class FakeClient:
1011
"""A fake MQTT client for testing purposes."""
1112

12-
publish: None
13+
def __init__(self):
14+
self.publish = AsyncMock()
15+
self.up = asyncio.Event()
1316

1417

1518
class TestMqttHandler(TestCase):
1619
def setUp(self):
1720
self.client = FakeClient()
18-
self.client.publish = AsyncMock()
1921
self.handler = MqttHandler(self.client, "test_topic")
20-
self.handler.setFormatter(logging.Formatter("%(message)s"))
2122
self.logger = logging.getLogger("test")
2223
self.logger.addHandler(self.handler)
2324
self.logger.setLevel(logging.INFO)
25+
self.client.up.set() # Simulate client being ready
2426

2527
def tearDown(self):
2628
"""Clean up after each test"""
2729
self.handler.buffer.clear()
2830
self.handler.will_flush.clear()
31+
self.client.up.clear()
2932

3033
def test_default_formatter(self):
3134
"""Handler should use default formatter if none is set"""
@@ -54,7 +57,7 @@ def test_flush_full_buffer(self):
5457
def test_flush_fail(self):
5558
"""Flushing should log an error if publish fails"""
5659
self.handler._logger.error = Mock()
57-
self.client.publish = AsyncMock(side_effect=Exception("Publish failed"))
60+
self.client.publish.side_effect = Exception("Publish failed")
5861

5962
async def do_test(handler: logging.Handler, logger: logging.Logger):
6063
asyncio.create_task(handler.run())
@@ -81,7 +84,7 @@ async def do_test(handler: logging.Handler, logger: logging.Logger):
8184
asyncio.run(do_test(self.handler, self.logger))
8285

8386
self.client.publish.assert_called_with(
84-
"test_topic", "Test message 1\nTest message 2", qos=0
87+
"test_topic", "INFO:test:Test message 1\nERROR:test:Test message 2", qos=0
8588
)
8689

8790
def test_flush_multiple(self):
@@ -104,15 +107,23 @@ async def do_test(handler: logging.Handler, logger: logging.Logger):
104107

105108
self.client.publish.assert_has_calls(
106109
[
107-
call("test_topic", "Test message 1\nTest message 2", qos=0),
108-
call("test_topic", "Test message 3\nTest message 4", qos=0),
110+
call(
111+
"test_topic",
112+
"INFO:test:Test message 1\nINFO:test:Test message 2",
113+
qos=0,
114+
),
115+
call(
116+
"test_topic",
117+
"INFO:test:Test message 3\nINFO:test:Test message 4",
118+
qos=0,
119+
),
109120
]
110121
)
111122

112123
def test_buffer_overflow(self):
113124
"""Buffer should get truncated if it exceeds capacity"""
114125
self.handler.capacity = 3
115-
self.client.publish = AsyncMock(side_effect=Exception("Publish failed"))
126+
self.client.up.clear() # Simulate client being disconnected
116127

117128
async def do_test(handler: logging.Handler, logger: logging.Logger):
118129
asyncio.create_task(handler.run())
@@ -129,5 +140,36 @@ async def do_test(handler: logging.Handler, logger: logging.Logger):
129140
asyncio.run(do_test(self.handler, self.logger))
130141

131142
self.assertEqual(
132-
self.handler.buffer, ["Test message 2", "Test message 3", "Test message 4"]
143+
self.handler.buffer,
144+
[
145+
"INFO:test:Test message 2",
146+
"INFO:test:Test message 3",
147+
"INFO:test:Test message 4",
148+
],
149+
)
150+
151+
def test_reconnect(self):
152+
"""Handler should reconnect and publish messages after a disconnect"""
153+
self.client.up.clear() # Simulate client being disconnected
154+
155+
async def do_test(
156+
handler: logging.Handler, logger: logging.Logger, client: FakeClient
157+
):
158+
asyncio.create_task(handler.run())
159+
await asyncio.sleep(0.1)
160+
logger.error("Test message 1")
161+
await asyncio.sleep(0.1)
162+
163+
# Simulate reconnect
164+
client.up.set()
165+
await asyncio.sleep(0.1)
166+
167+
asyncio.run(do_test(self.handler, self.logger, self.client))
168+
169+
self.client.publish.assert_called_with(
170+
"test_topic", "ERROR:test:Test message 1", qos=0
133171
)
172+
173+
174+
if __name__ == "__main__":
175+
unittest.main()

tests/utils.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,16 @@ class Mock:
66
"""A mock callable object that stores its calls."""
77

88
def __init__(self, return_value=None, side_effect=None):
9-
self._return_value = return_value
10-
self._side_effect = side_effect
9+
self.return_value = return_value
10+
self.side_effect = side_effect
1111
self._calls = []
1212

1313
def __call__(self, *args, **kwargs):
1414
"""Call the mock and store the call details."""
1515
self._calls.append(call(*args[1:], **kwargs)) # Skip the first argument (self)
16-
if self._side_effect:
17-
raise self._side_effect
18-
return self._return_value
16+
if self.side_effect:
17+
raise self.side_effect
18+
return self.return_value
1919

2020
def assert_called(self):
2121
"""Assert that the mock was called at least once."""

0 commit comments

Comments
 (0)