Skip to content

Commit 21a9b4b

Browse files
authored
Merge pull request #783 from Bogdanp/fix/rabbitmq-pika-logging
Fix/rabbitmq pika logging
2 parents cbc392e + 4e4c877 commit 21a9b4b

File tree

2 files changed

+56
-8
lines changed

2 files changed

+56
-8
lines changed

dramatiq/brokers/rabbitmq.py

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -208,16 +208,29 @@ def channel(self):
208208
except Exception:
209209
self.logger.exception("Encountered exception while closing Channel.")
210210

211+
def _ignore_pika_logs(self) -> None:
212+
"""Ensures that pika logs are filtered.
213+
214+
The main thread may keep connections open for a long time
215+
w/o publishing heartbeats, which means that they'll end up
216+
being closed by the time the broker is closed. When that
217+
happens, pika logs a bunch of scary stuff so we want to
218+
filter that out.
219+
"""
220+
221+
logging_filter = _IgnoreScaryLogs()
222+
ignored_loggers = ["pika.adapters.base_connection", "pika.adapters.blocking_connection"]
223+
224+
# Make sure the filter is added only once.
225+
for logger_name in ignored_loggers:
226+
ignored_logger = logging.getLogger(logger_name)
227+
if not any(isinstance(f, _IgnoreScaryLogs) for f in ignored_logger.filters):
228+
ignored_logger.addFilter(logging_filter)
229+
211230
def close(self) -> None:
212231
"""Close all open RabbitMQ connections."""
213-
# The main thread may keep connections open for a long time
214-
# w/o publishing heartbeats, which means that they'll end up
215-
# being closed by the time the broker is closed. When that
216-
# happens, pika logs a bunch of scary stuff so we want to
217-
# filter that out.
218-
logging_filter = _IgnoreScaryLogs()
219-
logging.getLogger("pika.adapters.base_connection").addFilter(logging_filter)
220-
logging.getLogger("pika.adapters.blocking_connection").addFilter(logging_filter)
232+
233+
self._ignore_pika_logs()
221234

222235
self.logger.debug("Closing channels and connections...")
223236
for channel_or_conn in chain(self.channels, self.connections):

tests/test_rabbitmq.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from __future__ import annotations
22

3+
import logging
34
import os
45
import time
56
from threading import Event
@@ -400,6 +401,40 @@ def test_ignore_scary_logs_filter_ignores_logs():
400401
assert log_filter.filter(record)
401402

402403

404+
def test_rabbitmq_close_only_registers_ignore_filter_once():
405+
# Given a RabbitmqBroker
406+
broker = RabbitmqBroker()
407+
408+
base_logger = logging.getLogger("pika.adapters.base_connection")
409+
blocking_logger = logging.getLogger("pika.adapters.blocking_connection")
410+
411+
# And snapshots of the current filters
412+
original_base_filters = list(base_logger.filters)
413+
original_blocking_filters = list(blocking_logger.filters)
414+
415+
try:
416+
# When I close the broker twice
417+
broker.close()
418+
broker.close()
419+
420+
base_filters = [f for f in base_logger.filters if isinstance(f, _IgnoreScaryLogs)]
421+
blocking_filters = [f for f in blocking_logger.filters if isinstance(f, _IgnoreScaryLogs)]
422+
423+
# Then only one ignore filter is registered per logger
424+
assert len(base_filters) == 1
425+
assert len(blocking_filters) == 1
426+
finally:
427+
# And the filters are removed after the test run
428+
# so they don't affect the global state.
429+
for log_filter in list(base_logger.filters):
430+
if log_filter not in original_base_filters:
431+
base_logger.removeFilter(log_filter)
432+
433+
for log_filter in list(blocking_logger.filters):
434+
if log_filter not in original_blocking_filters:
435+
blocking_logger.removeFilter(log_filter)
436+
437+
403438
def test_rabbitmq_broker_can_join_with_timeout(rabbitmq_broker, rabbitmq_worker):
404439
# Given that I have an actor that takes a long time to run
405440
@dramatiq.actor

0 commit comments

Comments
 (0)