2020from collections .abc import Callable , Iterable
2121
2222from connectrpc .interceptor import Interceptor
23- from rigging .timing import ExponentialBackoff
23+ from rigging .timing import ExponentialBackoff , RateLimiter
2424
2525from iris .logging import str_to_log_level
2626from iris .rpc import logging_pb2
2727from iris .rpc .errors import is_retryable_error
2828from iris .rpc .logging_connect import LogServiceClientSync
2929
30+
3031# Detached from the root logger: ``RemoteLogHandler`` lives on the root
3132# logger and calls ``LogPusher.push``, so if our own diagnostics reached
3233# the root they'd be enqueued right back into the pusher — a re-entrant
3334# loop that silently amplifies during failure storms. We send to stderr
3435# directly and set ``propagate = False`` so nothing here can feed the
3536# handler we serve.
37+ class _QuietStreamHandler (logging .StreamHandler ):
38+ """StreamHandler that drops emit failures silently.
39+
40+ This logger only carries LogPusher's own diagnostics. The drain thread
41+ is a daemon that outlives pytest's stderr capture (and interpreter
42+ shutdown), so any emit failure is a dead-stream symptom of teardown,
43+ not a LogPusher bug we could react to. Swallowing avoids the cascade
44+ of "--- Logging error ---" tracebacks during test teardown.
45+ """
46+
47+ def handleError (self , record : logging .LogRecord ) -> None :
48+ pass
49+
50+
3651logger = logging .getLogger (__name__ )
3752logger .propagate = False
3853if not logger .handlers :
39- _stderr_handler = logging . StreamHandler (sys .stderr )
54+ _stderr_handler = _QuietStreamHandler (sys .stderr )
4055 _stderr_handler .setFormatter (logging .Formatter ("%(asctime)s %(levelname)s %(name)s %(message)s" ))
4156 logger .addHandler (_stderr_handler )
4257 if logger .level == logging .NOTSET :
4358 logger .setLevel (logging .INFO )
4459
4560
61+ def _format_exc_summary (exc : BaseException ) -> str :
62+ """Collapse a ConnectError-style exception to ``ClassName(CODE)``.
63+
64+ The raw str(ConnectError) repeats the endpoint URL that's already
65+ visible from configuration and log context; a short summary keeps the
66+ drain-thread diagnostics readable during failure storms.
67+ """
68+ code = getattr (exc , "code" , None )
69+ code_name = getattr (code , "name" , None ) or getattr (code , "value" , None )
70+ if code_name is not None :
71+ return f"{ type (exc ).__name__ } ({ code_name } )"
72+ return f"{ type (exc ).__name__ } : { exc } "
73+
74+
4675MAX_LOG_BUFFER_SIZE = 10_000
4776"""Global cap on buffered entries across all keys. Older entries are
4877dropped first when the cap is exceeded."""
5382_BACKOFF_INITIAL_SEC = 0.5
5483_BACKOFF_MAX_SEC = 30.0
5584
85+ # Minimum seconds between overflow warnings. Without throttling, every push
86+ # to a full buffer emits its own warning — with the RemoteLogHandler pushing
87+ # one entry per record, that is one stderr line per log record indefinitely.
88+ _OVERFLOW_LOG_INTERVAL_SEC = 5.0
89+
5690
5791class LogPusher :
5892 """Buffered client for pushing log entries to a remote LogService.
@@ -113,6 +147,12 @@ def __init__(
113147 # Owned by the drain thread; reset after any successful send.
114148 self ._backoff = ExponentialBackoff (initial = _BACKOFF_INITIAL_SEC , maximum = _BACKOFF_MAX_SEC , factor = 2.0 )
115149
150+ # Overflow-warning throttle state (guarded by _cond). Accumulates
151+ # dropped counts and flushes a single aggregated warning at most
152+ # once per _OVERFLOW_LOG_INTERVAL_SEC.
153+ self ._overflow_dropped_pending = 0
154+ self ._overflow_log_limiter = RateLimiter (interval_seconds = _OVERFLOW_LOG_INTERVAL_SEC )
155+
116156 self ._thread = threading .Thread (target = self ._run , name = "log-pusher" , daemon = True )
117157 self ._thread .start ()
118158
@@ -202,11 +242,14 @@ def _trim_oldest_locked(self) -> None:
202242 max_dropped_seq = seq
203243 dropped += 1
204244 if dropped :
205- logger .warning (
206- "LogPusher buffer overflow: dropped %d oldest entries (cap=%d)" ,
207- dropped ,
208- self ._max_buffer_size ,
209- )
245+ self ._overflow_dropped_pending += dropped
246+ if self ._overflow_log_limiter .should_run ():
247+ logger .warning (
248+ "LogPusher buffer overflow: dropped %d oldest entries (cap=%d)" ,
249+ self ._overflow_dropped_pending ,
250+ self ._max_buffer_size ,
251+ )
252+ self ._overflow_dropped_pending = 0
210253 if max_dropped_seq > self ._processed_seq :
211254 self ._processed_seq = max_dropped_seq
212255 self ._cond .notify_all ()
@@ -291,7 +334,7 @@ def _send_items(
291334 try :
292335 client = self ._get_client ()
293336 except Exception as exc :
294- logger .warning ("LogPusher: endpoint resolution failed: %s" , exc )
337+ logger .warning ("LogPusher: endpoint resolution failed: %s" , _format_exc_summary ( exc ) )
295338 return max_sent_seq , [p for p in items if p [1 ] not in sent_keys ]
296339 try :
297340 entries = [e for _s , e in seq_entries ]
@@ -302,15 +345,16 @@ def _send_items(
302345 max_sent_seq = seq
303346 except Exception as exc :
304347 retryable = is_retryable_error (exc )
348+ summary = _format_exc_summary (exc )
305349 logger .warning (
306350 "LogPusher: send failure for key=%s (%d entries, retryable=%s): %s" ,
307351 key ,
308352 len (seq_entries ),
309353 retryable ,
310- exc ,
354+ summary ,
311355 )
312356 if retryable :
313- self ._invalidate (str ( exc ) )
357+ self ._invalidate (summary )
314358 return max_sent_seq , [p for p in items if p [1 ] not in sent_keys ]
315359 return max_sent_seq , []
316360
@@ -416,7 +460,7 @@ def emit(self, record: logging.LogRecord) -> None:
416460 self .handleError (record )
417461
418462 def flush (self ) -> None :
419- self ._pusher .flush ()
463+ self ._pusher .flush (timeout = 0.5 )
420464
421465 def close (self ) -> None :
422466 self ._closed = True
0 commit comments