Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 26 additions & 4 deletions loguru/_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import os
import threading
from contextlib import contextmanager
from queue import Full
from threading import Thread

from ._colorizer import Colorizer
Expand Down Expand Up @@ -55,7 +56,7 @@ def __init__(
self._filter = filter_
self._colorize = colorize
self._serialize = serialize
self._enqueue = enqueue
self._enqueue = bool(enqueue)
self._multiprocessing_context = multiprocessing_context
self._error_interceptor = error_interceptor
self._exception_formatter = exception_formatter
Expand All @@ -73,6 +74,7 @@ def __init__(
self._queue_lock = None
self._confirmation_event = None
self._confirmation_lock = None
self._enqueue_overflow = "block"
self._owner_process_pid = None
self._thread = None

Expand All @@ -89,12 +91,26 @@ def __init__(
self._decolorized_format = self._formatter.strip()

if self._enqueue:
if enqueue is True:
maxsize = 0
elif hasattr(enqueue, "get"):
maxsize = enqueue.get("size", 0)
self._enqueue_overflow = enqueue.get("overflow", "block")
else:
maxsize = enqueue

if self._multiprocessing_context is None:
self._queue = multiprocessing.SimpleQueue()
if maxsize > 0:
self._queue = multiprocessing.Queue(maxsize=maxsize)
else:
self._queue = multiprocessing.SimpleQueue()
self._confirmation_event = multiprocessing.Event()
self._confirmation_lock = multiprocessing.Lock()
else:
self._queue = self._multiprocessing_context.SimpleQueue()
if maxsize > 0:
self._queue = self._multiprocessing_context.Queue(maxsize=maxsize)
else:
self._queue = self._multiprocessing_context.SimpleQueue()
self._confirmation_event = self._multiprocessing_context.Event()
self._confirmation_lock = self._multiprocessing_context.Lock()
self._queue_lock = create_handler_lock()
Expand Down Expand Up @@ -201,7 +217,13 @@ def emit(self, record, level_id, from_decorator, is_raw, colored_message):
if self._stopped:
return
if self._enqueue:
self._queue.put(str_record)
if self._enqueue_overflow == "drop":
try:
self._queue.put(str_record, block=False)
except Full:
pass
else:
self._queue.put(str_record)
else:
self._sink.write(str_record)
except Exception:
Expand Down
11 changes: 10 additions & 1 deletion loguru/_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,10 +302,19 @@ def add(
diagnose : |bool|, optional
Whether the exception trace should display the variables values to ease the debugging.
This should be set to ``False`` in production to avoid leaking sensitive data.
enqueue : |bool|, optional
enqueue : |bool|, |int| or |dict|, optional
Whether the messages to be logged should first pass through a multiprocessing-safe queue
before reaching the sink. This is useful while logging to a file through multiple
processes. This also has the advantage of making logging calls non-blocking.

If ``True``, an unbounded queue is used (original behavior). If an ``int`` is passed,
it specifies the maximum queue size (bounded queue). When the queue is full, logging
calls will block until space is available.

A ``dict`` can be passed for more control with the following keys:
- ``"size"`` (int): Maximum queue size. ``0`` means unbounded.
- ``"overflow"`` (str): What to do when the queue is full.
``"block"`` (default) waits for space, ``"drop"`` silently discards the message.
context : |multiprocessing.Context| or |str|, optional
A context object or name that will be used for all tasks involving internally the
|multiprocessing| module, in particular when ``enqueue=True``. If ``None``, the default
Expand Down
95 changes: 95 additions & 0 deletions tests/test_add_option_enqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,3 +292,98 @@ def sink(message):
assert type_ is ValueError
assert value is None
assert traceback_ is None


def test_enqueue_bounded_with_int():
x = []

def sink(message):
time.sleep(0.05)
x.append(message)

logger.add(sink, format="{message}", enqueue=5)
for i in range(5):
logger.debug(i)
logger.complete()
logger.remove()

assert len(x) == 5


def test_enqueue_bounded_with_dict():
x = []

def sink(message):
time.sleep(0.05)
x.append(message)

logger.add(sink, format="{message}", enqueue={"size": 5})
for i in range(5):
logger.debug(i)
logger.complete()
logger.remove()

assert len(x) == 5


def test_enqueue_bounded_blocking():
x = []

def slow_sink(message):
time.sleep(0.1)
x.append(message)

logger.add(slow_sink, format="{message}", enqueue={"size": 2, "overflow": "block"})

start = time.time()
for i in range(5):
logger.debug(i)
elapsed = time.time() - start

logger.complete()
logger.remove()

assert len(x) == 5
assert elapsed >= 0.2


def test_enqueue_bounded_drop():
x = []

def slow_sink(message):
time.sleep(0.1)
x.append(message)

logger.add(slow_sink, format="{message}", enqueue={"size": 2, "overflow": "drop"})

start = time.time()
for i in range(10):
logger.debug(i)
elapsed = time.time() - start

logger.complete()
logger.remove()

assert elapsed < 0.1
assert len(x) < 10


def test_enqueue_bounded_with_context():
import multiprocessing

x = []

def sink(message):
time.sleep(0.05)
x.append(message)

context = multiprocessing.get_context("spawn")
logger.add(sink, format="{message}", enqueue={"size": 5}, context=context)

for i in range(5):
logger.debug(i)

logger.complete()
logger.remove()

assert len(x) == 5
Loading