Skip to content

Commit a2c186a

Browse files
committed
Add ability to stop logger and add async tests
1 parent 2c70912 commit a2c186a

File tree

2 files changed

+78
-53
lines changed

2 files changed

+78
-53
lines changed

mqlog/__init__.py

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -37,28 +37,39 @@ def __init__(
3737
self.capacity = capacity
3838
self.buffer = []
3939
self.will_flush = asyncio.Event()
40+
self.active = True
4041

41-
def should_flush(self, record):
42+
async def run(self):
43+
"""
44+
Continuously publish log records via MQTT.
45+
This method should be scheduled as an asyncio task.
46+
"""
47+
self.active = True
48+
while self.active:
49+
await self.will_flush.wait()
50+
await self._flush()
51+
52+
def stop(self):
53+
"""Stop the handler from processing further log records."""
54+
self.active = False
55+
56+
# Check if we should publish an MQTT message
57+
def _should_flush(self, record):
4258
return (len(self.buffer) >= self.capacity) or (
4359
record.levelno >= self.flush_level
4460
)
4561

46-
# Called when the logger logs a message
62+
# Called by logging.Handler when the logger logs a message
4763
def emit(self, record):
48-
self.buffer.append(record)
49-
if self.should_flush(record):
64+
self.buffer.append(self.format(record))
65+
if self._should_flush(record):
5066
self.will_flush.set()
5167

52-
# Needs to be run in an event loop once MQTT client is connected
53-
async def run(self):
54-
while True:
55-
await self.will_flush.wait()
56-
await self._flush()
57-
5868
# Named with an underscore to avoid conflict with logging.Handler.flush
5969
async def _flush(self):
6070
if self.buffer:
61-
records = [self.buffer.pop(0) for _ in range(len(self.buffer))]
62-
msg = "\n".join(self.format(record) for record in records)
63-
await self.client.publish(self.topic, msg, qos=self.qos)
71+
msg = ""
72+
while self.buffer:
73+
msg += self.buffer.pop(0) + "\n"
74+
await self.client.publish(self.topic, msg.strip(), qos=self.qos)
6475
self.will_flush.clear()

tests/test_handler.py

Lines changed: 54 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,30 @@
11
import asyncio
22
import logging
3-
from unittest import IsolatedAsyncioTestCase
4-
from unittest.mock import AsyncMock, MagicMock, call
3+
from unittest import TestCase
54

65
from mqlog import MqttHandler
76

87

9-
class TestMqttHandler(IsolatedAsyncioTestCase):
8+
class FakeClient:
9+
"""A fake MQTT client for testing purposes."""
10+
11+
def __init__(self):
12+
self.calls = []
13+
14+
# Store the call like a mock so we can check it later
15+
async def publish(self, topic, payload, qos=0):
16+
self.calls.append((topic, payload, qos))
17+
18+
19+
class TestMqttHandler(TestCase):
1020
def setUp(self):
11-
self.client = MagicMock()
12-
self.client.publish = AsyncMock()
21+
self.client = FakeClient()
1322
self.handler = MqttHandler(self.client, "test_topic")
23+
self.handler.setFormatter(logging.Formatter("%(message)s"))
1424
self.logger = logging.getLogger("test")
1525
self.logger.addHandler(self.handler)
1626
self.logger.setLevel(logging.INFO)
1727

18-
# Run for X seconds, then stop
19-
async def run_with_timeout(self, timeout=1):
20-
async with asyncio.timeout(timeout):
21-
await self.handler.run()
22-
self.handler.close()
23-
24-
# Log a message at a given level after a delay of X seconds
25-
async def log_message(self, msg, level, delay):
26-
await asyncio.sleep(delay)
27-
self.logger.log(level, msg)
28-
2928
def test_no_flush(self):
3029
"""Buffer should not be flushed until capacity/level reached"""
3130
self.handler.flush_level = logging.WARNING
@@ -45,35 +44,50 @@ def test_flush_full_buffer(self):
4544
self.logger.info(f"Test message {i}")
4645
self.assertTrue(self.handler.will_flush.is_set())
4746

48-
async def test_publish(self):
47+
def test_publish(self):
4948
"""Flushing should publish messages to MQTT topic"""
5049
self.handler.flush_level = logging.ERROR
51-
await asyncio.gather(
52-
self.run_with_timeout(1),
53-
self.log_message("Test message 1", logging.INFO, 0.1),
54-
self.log_message("Test message 2", logging.ERROR, 0.2),
55-
return_exceptions=True,
56-
)
57-
self.client.publish.assert_awaited_with(
58-
"test_topic",
59-
"Test message 1\nTest message 2",
60-
qos=0,
50+
51+
async def do_test(handler, logger):
52+
logger.info("Test message 1")
53+
await asyncio.sleep(0.1)
54+
logger.error("Test message 2")
55+
await asyncio.sleep(0.1)
56+
handler.stop()
57+
58+
async def main(handler, logger):
59+
await asyncio.gather(handler.run(), do_test(handler, logger))
60+
61+
asyncio.run(main(self.handler, self.logger))
62+
63+
self.assertEqual(
64+
self.client.calls, [("test_topic", "Test message 1\nTest message 2", 0)]
6165
)
6266

63-
async def test_flush_multiple(self):
67+
def test_flush_multiple(self):
6468
"""Flushing multiple times should publish separate messages"""
6569
self.handler.capacity = 2
66-
await asyncio.gather(
67-
self.run_with_timeout(1),
68-
self.log_message("Test message 1", logging.INFO, 0.1),
69-
self.log_message("Test message 2", logging.INFO, 0.2),
70-
self.log_message("Test message 3", logging.INFO, 0.3),
71-
self.log_message("Test message 4", logging.INFO, 0.4),
72-
return_exceptions=True,
73-
)
74-
self.client.publish.assert_has_awaits(
70+
71+
async def do_test(handler, logger):
72+
logger.info("Test message 1")
73+
await asyncio.sleep(0.1)
74+
logger.info("Test message 2")
75+
await asyncio.sleep(0.1)
76+
logger.info("Test message 3")
77+
await asyncio.sleep(0.1)
78+
logger.info("Test message 4")
79+
await asyncio.sleep(0.1)
80+
handler.stop()
81+
82+
async def main(handler, logger):
83+
await asyncio.gather(handler.run(), do_test(handler, logger))
84+
85+
asyncio.run(main(self.handler, self.logger))
86+
87+
self.assertEqual(
88+
self.client.calls,
7589
[
76-
call("test_topic", "Test message 1\nTest message 2", qos=0),
77-
call("test_topic", "Test message 3\nTest message 4", qos=0),
78-
]
90+
("test_topic", "Test message 1\nTest message 2", 0),
91+
("test_topic", "Test message 3\nTest message 4", 0),
92+
],
7993
)

0 commit comments

Comments
 (0)