Skip to content

Commit 37d065e

Browse files
committed
Add bounded queue support for enqueue option
Add support for bounded queues to prevent unbounded memory growth when using enqueue=True with slow sinks. New enqueue options: - enqueue=<int>: bounded queue with specified max size (blocking) - enqueue={"size": N, "overflow": "block|drop"}: configurable overflow
1 parent 764cd30 commit 37d065e

File tree

3 files changed

+110
-5
lines changed

3 files changed

+110
-5
lines changed

loguru/_handler.py

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import os
55
import threading
66
from contextlib import contextmanager
7+
from queue import Full
78
from threading import Thread
89

910
from ._colorizer import Colorizer
@@ -55,7 +56,7 @@ def __init__(
5556
self._filter = filter_
5657
self._colorize = colorize
5758
self._serialize = serialize
58-
self._enqueue = enqueue
59+
self._enqueue = bool(enqueue)
5960
self._multiprocessing_context = multiprocessing_context
6061
self._error_interceptor = error_interceptor
6162
self._exception_formatter = exception_formatter
@@ -73,6 +74,7 @@ def __init__(
7374
self._queue_lock = None
7475
self._confirmation_event = None
7576
self._confirmation_lock = None
77+
self._enqueue_overflow = "block"
7678
self._owner_process_pid = None
7779
self._thread = None
7880

@@ -89,12 +91,26 @@ def __init__(
8991
self._decolorized_format = self._formatter.strip()
9092

9193
if self._enqueue:
94+
if enqueue is True:
95+
maxsize = 0
96+
elif hasattr(enqueue, "get"):
97+
maxsize = enqueue.get("size", 0)
98+
self._enqueue_overflow = enqueue.get("overflow", "block")
99+
else:
100+
maxsize = enqueue
101+
92102
if self._multiprocessing_context is None:
93-
self._queue = multiprocessing.SimpleQueue()
103+
if maxsize > 0:
104+
self._queue = multiprocessing.Queue(maxsize=maxsize)
105+
else:
106+
self._queue = multiprocessing.SimpleQueue()
94107
self._confirmation_event = multiprocessing.Event()
95108
self._confirmation_lock = multiprocessing.Lock()
96109
else:
97-
self._queue = self._multiprocessing_context.SimpleQueue()
110+
if maxsize > 0:
111+
self._queue = self._multiprocessing_context.Queue(maxsize=maxsize)
112+
else:
113+
self._queue = self._multiprocessing_context.SimpleQueue()
98114
self._confirmation_event = self._multiprocessing_context.Event()
99115
self._confirmation_lock = self._multiprocessing_context.Lock()
100116
self._queue_lock = create_handler_lock()
@@ -201,7 +217,13 @@ def emit(self, record, level_id, from_decorator, is_raw, colored_message):
201217
if self._stopped:
202218
return
203219
if self._enqueue:
204-
self._queue.put(str_record)
220+
if self._enqueue_overflow == "drop":
221+
try:
222+
self._queue.put(str_record, block=False)
223+
except Full:
224+
pass
225+
else:
226+
self._queue.put(str_record)
205227
else:
206228
self._sink.write(str_record)
207229
except Exception:

loguru/_logger.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -302,10 +302,19 @@ def add(
302302
diagnose : |bool|, optional
303303
Whether the exception trace should display the variables values to ease the debugging.
304304
This should be set to ``False`` in production to avoid leaking sensitive data.
305-
enqueue : |bool|, optional
305+
enqueue : |bool|, |int| or |dict|, optional
306306
Whether the messages to be logged should first pass through a multiprocessing-safe queue
307307
before reaching the sink. This is useful while logging to a file through multiple
308308
processes. This also has the advantage of making logging calls non-blocking.
309+
310+
If ``True``, an unbounded queue is used (original behavior). If an ``int`` is passed,
311+
it specifies the maximum queue size (bounded queue). When the queue is full, logging
312+
calls will block until space is available.
313+
314+
A ``dict`` can be passed for more control with the following keys:
315+
- ``"size"`` (int): Maximum queue size. ``0`` means unbounded.
316+
- ``"overflow"`` (str): What to do when the queue is full.
317+
``"block"`` (default) waits for space, ``"drop"`` silently discards the message.
309318
context : |multiprocessing.Context| or |str|, optional
310319
A context object or name that will be used for all tasks involving internally the
311320
|multiprocessing| module, in particular when ``enqueue=True``. If ``None``, the default

tests/test_add_option_enqueue.py

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,3 +292,77 @@ def sink(message):
292292
assert type_ is ValueError
293293
assert value is None
294294
assert traceback_ is None
295+
296+
297+
def test_enqueue_bounded_with_int():
298+
x = []
299+
300+
def sink(message):
301+
time.sleep(0.05)
302+
x.append(message)
303+
304+
logger.add(sink, format="{message}", enqueue=5)
305+
for i in range(5):
306+
logger.debug(i)
307+
logger.complete()
308+
logger.remove()
309+
310+
assert len(x) == 5
311+
312+
313+
def test_enqueue_bounded_with_dict():
314+
x = []
315+
316+
def sink(message):
317+
time.sleep(0.05)
318+
x.append(message)
319+
320+
logger.add(sink, format="{message}", enqueue={"size": 5})
321+
for i in range(5):
322+
logger.debug(i)
323+
logger.complete()
324+
logger.remove()
325+
326+
assert len(x) == 5
327+
328+
329+
def test_enqueue_bounded_blocking():
330+
x = []
331+
332+
def slow_sink(message):
333+
time.sleep(0.1)
334+
x.append(message)
335+
336+
logger.add(slow_sink, format="{message}", enqueue={"size": 2, "overflow": "block"})
337+
338+
start = time.time()
339+
for i in range(5):
340+
logger.debug(i)
341+
elapsed = time.time() - start
342+
343+
logger.complete()
344+
logger.remove()
345+
346+
assert len(x) == 5
347+
assert elapsed >= 0.2
348+
349+
350+
def test_enqueue_bounded_drop():
351+
x = []
352+
353+
def slow_sink(message):
354+
time.sleep(0.1)
355+
x.append(message)
356+
357+
logger.add(slow_sink, format="{message}", enqueue={"size": 2, "overflow": "drop"})
358+
359+
start = time.time()
360+
for i in range(10):
361+
logger.debug(i)
362+
elapsed = time.time() - start
363+
364+
logger.complete()
365+
logger.remove()
366+
367+
assert elapsed < 0.1
368+
assert len(x) < 10

0 commit comments

Comments
 (0)