diff --git a/loguru/_handler.py b/loguru/_handler.py index 81a3dca0..a517a299 100644 --- a/loguru/_handler.py +++ b/loguru/_handler.py @@ -4,6 +4,7 @@ import os import threading from contextlib import contextmanager +from queue import Full from threading import Thread from ._colorizer import Colorizer @@ -55,7 +56,7 @@ def __init__( self._filter = filter_ self._colorize = colorize self._serialize = serialize - self._enqueue = enqueue + self._enqueue = bool(enqueue) self._multiprocessing_context = multiprocessing_context self._error_interceptor = error_interceptor self._exception_formatter = exception_formatter @@ -73,6 +74,7 @@ def __init__( self._queue_lock = None self._confirmation_event = None self._confirmation_lock = None + self._enqueue_overflow = "block" self._owner_process_pid = None self._thread = None @@ -89,12 +91,26 @@ def __init__( self._decolorized_format = self._formatter.strip() if self._enqueue: + if enqueue is True: + maxsize = 0 + elif hasattr(enqueue, "get"): + maxsize = enqueue.get("size", 0) + self._enqueue_overflow = enqueue.get("overflow", "block") + else: + maxsize = enqueue + if self._multiprocessing_context is None: - self._queue = multiprocessing.SimpleQueue() + if maxsize > 0: + self._queue = multiprocessing.Queue(maxsize=maxsize) + else: + self._queue = multiprocessing.SimpleQueue() self._confirmation_event = multiprocessing.Event() self._confirmation_lock = multiprocessing.Lock() else: - self._queue = self._multiprocessing_context.SimpleQueue() + if maxsize > 0: + self._queue = self._multiprocessing_context.Queue(maxsize=maxsize) + else: + self._queue = self._multiprocessing_context.SimpleQueue() self._confirmation_event = self._multiprocessing_context.Event() self._confirmation_lock = self._multiprocessing_context.Lock() self._queue_lock = create_handler_lock() @@ -201,7 +217,13 @@ def emit(self, record, level_id, from_decorator, is_raw, colored_message): if self._stopped: return if self._enqueue: - self._queue.put(str_record) + if self._enqueue_overflow == "drop": + try: + self._queue.put(str_record, block=False) + except Full: + pass + else: + self._queue.put(str_record) else: self._sink.write(str_record) except Exception: diff --git a/loguru/_logger.py b/loguru/_logger.py index 6c23d7b7..a561c0b0 100644 --- a/loguru/_logger.py +++ b/loguru/_logger.py @@ -302,10 +302,19 @@ def add( diagnose : |bool|, optional Whether the exception trace should display the variables values to ease the debugging. This should be set to ``False`` in production to avoid leaking sensitive data. - enqueue : |bool|, optional + enqueue : |bool|, |int| or |dict|, optional Whether the messages to be logged should first pass through a multiprocessing-safe queue before reaching the sink. This is useful while logging to a file through multiple processes. This also has the advantage of making logging calls non-blocking. + + If ``True``, an unbounded queue is used (original behavior). If an ``int`` is passed, + it specifies the maximum queue size (bounded queue). When the queue is full, logging + calls will block until space is available. + + A ``dict`` can be passed for more control with the following keys: + - ``"size"`` (int): Maximum queue size. ``0`` means unbounded. + - ``"overflow"`` (str): What to do when the queue is full. + ``"block"`` (default) waits for space, ``"drop"`` silently discards the message. context : |multiprocessing.Context| or |str|, optional A context object or name that will be used for all tasks involving internally the |multiprocessing| module, in particular when ``enqueue=True``. If ``None``, the default diff --git a/tests/test_add_option_enqueue.py b/tests/test_add_option_enqueue.py index b393f3dc..072db84e 100644 --- a/tests/test_add_option_enqueue.py +++ b/tests/test_add_option_enqueue.py @@ -292,3 +292,98 @@ def sink(message): assert type_ is ValueError assert value is None assert traceback_ is None + + +def test_enqueue_bounded_with_int(): + x = [] + + def sink(message): + time.sleep(0.05) + x.append(message) + + logger.add(sink, format="{message}", enqueue=5) + for i in range(5): + logger.debug(i) + logger.complete() + logger.remove() + + assert len(x) == 5 + + +def test_enqueue_bounded_with_dict(): + x = [] + + def sink(message): + time.sleep(0.05) + x.append(message) + + logger.add(sink, format="{message}", enqueue={"size": 5}) + for i in range(5): + logger.debug(i) + logger.complete() + logger.remove() + + assert len(x) == 5 + + +def test_enqueue_bounded_blocking(): + x = [] + + def slow_sink(message): + time.sleep(0.1) + x.append(message) + + logger.add(slow_sink, format="{message}", enqueue={"size": 2, "overflow": "block"}) + + start = time.time() + for i in range(5): + logger.debug(i) + elapsed = time.time() - start + + logger.complete() + logger.remove() + + assert len(x) == 5 + assert elapsed >= 0.2 + + +def test_enqueue_bounded_drop(): + x = [] + + def slow_sink(message): + time.sleep(0.1) + x.append(message) + + logger.add(slow_sink, format="{message}", enqueue={"size": 2, "overflow": "drop"}) + + start = time.time() + for i in range(10): + logger.debug(i) + elapsed = time.time() - start + + logger.complete() + logger.remove() + + assert elapsed < 0.1 + assert len(x) < 10 + + +def test_enqueue_bounded_with_context(): + import multiprocessing + + x = [] + + def sink(message): + time.sleep(0.05) + x.append(message) + + context = multiprocessing.get_context("spawn") + logger.add(sink, format="{message}", enqueue={"size": 5}, context=context) + + for i in range(5): + logger.debug(i) + + logger.complete() + logger.remove() + + assert len(x) == 5