Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 50 additions & 8 deletions utilities/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,13 @@
from logging.handlers import QueueHandler, QueueListener, RotatingFileHandler
from typing import Any

from utilities.opendatahub_logger import DuplicateFilter, WrapperLogFormatter, set_human_readable
from utilities.opendatahub_logger import (
DuplicateFilter,
ThirdPartyHumanReadableFormatter,
ThirdPartyJSONFormatter,
WrapperLogFormatter,
set_human_readable,
)

LOGGER = logging.getLogger(__name__)

Expand All @@ -21,6 +27,31 @@ def __repr__(self) -> str:
return "'***REDACTED***'"


class PassthroughQueueHandler(QueueHandler):
"""QueueHandler that preserves raw log records for independent formatting by output handlers."""

def prepare(self, record: logging.LogRecord) -> logging.LogRecord:
# Convert exc_info to text for pickling (traceback objects can't be pickled)
if record.exc_info:
if not record.exc_text:
record.exc_text = logging.Formatter().formatException(record.exc_info)
record.exc_info = None
return record


class DelegatingFormatter(logging.Formatter):
"""Routes formatting to different formatters based on logger name."""

def __init__(self, formatters: dict[str, logging.Formatter], default: logging.Formatter) -> None:
super().__init__()
self._formatters = formatters
self._default = default

def format(self, record: logging.LogRecord) -> str:
formatter = self._formatters.get(record.name, self._default)
return formatter.format(record)


def setup_logging(
log_level: int,
log_file: str = "/tmp/pytest-tests.log",
Expand All @@ -29,21 +60,25 @@ def setup_logging(
human_readable: bool = False,
) -> QueueListener:
"""
Setup basic/root logging using QueueHandler/QueueListener
Setup basic/root logging using PassthroughQueueHandler/QueueListener
to consolidate log messages into a single stream to be written to multiple outputs.

Raw log records flow through the queue unformatted, allowing each output handler
to format independently: text for console, JSON for file.

Args:
log_level (int): log level
log_file (str): logging output file
thread_name (str | None): optional thread_name id prefix, e.g., [gw0]
human_readable (bool): if True, file output is also human-readable text instead of JSON

Returns:
QueueListener: Process monitoring the log Queue

Eg:
root QueueHandler ┐ ┌> StreamHandler
root QueueHandler ┐ ┌> StreamHandler (text)
├> Queue -> QueueListener ┤
basic QueueHandler ┘ └> FileHandler
basic QueueHandler ┘ └> FileHandler (JSON)
"""
basic_fmt_str = "%(message)s"
root_fmt_str = "%(asctime)s %(name)s %(log_color)s%(levelname)s%(reset)s %(message)s"
Expand All @@ -64,8 +99,11 @@ def setup_logging(
},
)

file_formatter = ThirdPartyHumanReadableFormatter() if human_readable else ThirdPartyJSONFormatter()

log_file_handler = RotatingFileHandler(filename=log_file, maxBytes=100 * 1024 * 1024, backupCount=20)
log_file_handler.setLevel(level=log_level) # Set the file handler log level
log_file_handler.setFormatter(fmt=file_formatter)

handlers: list[Any] = [log_file_handler]

Expand All @@ -76,23 +114,27 @@ def setup_logging(
if enable_console:
console_handler = logging.StreamHandler()
console_handler.setLevel(level=log_level) # Set the console handler log level
console_handler.setFormatter(
fmt=DelegatingFormatter(
formatters={"basic": basic_log_formatter},
default=root_log_formatter,
)
)
handlers.append(console_handler)

log_queue = multiprocessing.Queue(maxsize=-1) # type: ignore[var-annotated]
log_listener = QueueListener(log_queue, *handlers)

basic_log_queue_handler = QueueHandler(queue=log_queue)
basic_log_queue_handler = PassthroughQueueHandler(queue=log_queue)
basic_log_queue_handler.set_name(name="basic")
basic_log_queue_handler.setFormatter(fmt=basic_log_formatter)

basic_logger = logging.getLogger(name="basic")
basic_logger.setLevel(level=log_level)
basic_logger.handlers.clear()
basic_logger.addHandler(hdlr=basic_log_queue_handler)

root_log_queue_handler = QueueHandler(queue=log_queue)
root_log_queue_handler = PassthroughQueueHandler(queue=log_queue)
root_log_queue_handler.set_name(name="root")
root_log_queue_handler.setFormatter(fmt=root_log_formatter)

root_logger = logging.getLogger(name="root")
root_logger.setLevel(level=log_level)
Expand Down
74 changes: 31 additions & 43 deletions utilities/opendatahub_logger.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
"""
OpenDataHub logging utilities using structlog with third-party logging integration.
OpenDataHub logging utilities using structlog.

This module provides structured JSON logging using structlog with automatic
third-party library logging integration.
Structlog renders plain text for console-friendly output. JSON formatting for the
log file is handled by ThirdPartyJSONFormatter applied on the file handler in setup_logging().

When --readable-logs is passed, both console and file output are human-readable text.

Example:
from utilities.opendatahub_logger import get_logger

logger = get_logger("myapp")
logger.info("User logged in", user_id=123)
# Output: {"timestamp": "...", "logger": "myapp", "level": "info", "event": "User logged in", "user_id": 123}
# Console: 2024-01-01 myapp INFO User logged in [user_id=123]
# File: {"timestamp": "...", "logger": "myapp", "level": "info", "event": "User logged in [user_id=123]"}
"""

import inspect
Expand Down Expand Up @@ -114,19 +117,21 @@ def format(self, record: logging.LogRecord) -> str:
json.loads(msg)
return msg
except json.JSONDecodeError, TypeError:
return json.dumps({
result = {
"timestamp": datetime.fromtimestamp(record.created, tz=UTC).isoformat(),
"logger": record.name,
"level": record.levelname.lower(),
"event": msg,
"filename": record.pathname.split("/")[-1] if record.pathname else "",
"lineno": str(record.lineno),
})
}
if record.exc_text:
result["traceback"] = record.exc_text
return json.dumps(result)


_initialized = False
_human_readable = False
_original_add_handler = logging.Logger.addHandler


def set_human_readable(enabled: bool) -> None:
Expand Down Expand Up @@ -155,11 +160,26 @@ def format(self, record: logging.LogRecord) -> str:
reset = _RESET if color else ""
filename = record.pathname.rsplit("/", 1)[-1] if record.pathname else ""
msg = record.getMessage()
return f"{timestamp} {record.name} {color}{record.levelname}{reset} {msg} ({filename}:{record.lineno})"
result = f"{timestamp} {record.name} {color}{record.levelname}{reset} {msg} ({filename}:{record.lineno})"
if record.exc_text:
result = f"{result}\n{record.exc_text}"
return result


def _plain_text_renderer(_logger: Any, _method_name: str, event_dict: dict[str, Any]) -> str:
"""Render structlog events as plain text for console-friendly output."""
event = event_dict.pop("event", "")
# Remove fields already present on the log record (added by structlog processors)
for key in ("logger", "level", "timestamp"):
event_dict.pop(key, None)
if event_dict:
extras = " ".join(f"{k}={v!r}" for k, v in event_dict.items())
return f"{event} [{extras}]"
return str(event)


def _initialize() -> None:
"""One-time setup for structlog and third-party logging."""
"""One-time setup for structlog."""
global _initialized
if _initialized:
return
Expand All @@ -173,7 +193,7 @@ def _initialize() -> None:
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.processors.TimeStamper(fmt="ISO", utc=True),
structlog.processors.JSONRenderer(),
_plain_text_renderer,
]

structlog.configure(
Expand All @@ -183,33 +203,6 @@ def _initialize() -> None:
cache_logger_on_first_use=False,
)

third_party_formatter = ThirdPartyHumanReadableFormatter() if _human_readable else ThirdPartyJSONFormatter()

# Formatters that should not be replaced:
# - JSONOnlyFormatter: used by StructlogWrapper (structlog already formats the message)
# - WrapperLogFormatter: used by setup_logging's QueueHandlers (only protected in HR mode)
_skip_formatters: tuple[type, ...] = (JSONOnlyFormatter,)
if _human_readable:
_skip_formatters = (*_skip_formatters, WrapperLogFormatter)

# Patch addHandler so new loggers (e.g. from ocp_resources/simple_logger)
# get the correct formatter on their handlers.
def patched_add_handler(self: logging.Logger, hdlr: logging.Handler) -> None:
if not isinstance(hdlr.formatter, _skip_formatters):
hdlr.setFormatter(fmt=third_party_formatter)
_original_add_handler(self, hdlr) # noqa: FCN001

logging.Logger.addHandler = patched_add_handler # type: ignore[method-assign]

# Apply formatter to all existing handlers on all loggers
all_loggers = [logging.getLogger()] + [
logger for logger in logging.root.manager.loggerDict.values() if isinstance(logger, logging.Logger)
]
for logger in all_loggers:
for handler in logger.handlers:
if not isinstance(handler.formatter, _skip_formatters):
handler.setFormatter(fmt=third_party_formatter)

_initialized = True


Expand All @@ -221,11 +214,6 @@ def __init__(self, name: str) -> None:
_initialize()
self._logger = structlog.get_logger(name=name)

underlying_logger = logging.getLogger(name)
for handler in underlying_logger.handlers:
if isinstance(handler.formatter, (logging.Formatter, type(None))):
handler.setFormatter(fmt=JSONOnlyFormatter())

def _log(self, level: str, msg: Any, *args: Any, **kwargs: Any) -> None:
msg_str = str(msg)
if args:
Expand All @@ -234,7 +222,7 @@ def _log(self, level: str, msg: Any, *args: Any, **kwargs: Any) -> None:
if _human_readable:
std_logger = logging.getLogger(self.name)
log_method = getattr(std_logger, level.lower())
extra_str = " ".join(f"{k}={v}" for k, v in kwargs.items()) if kwargs else ""
extra_str = " ".join(f"{k}={v!r}" for k, v in kwargs.items()) if kwargs else ""
log_method(f"{msg_str} {extra_str}" if extra_str else msg_str, stacklevel=3) # noqa: FCN001
else:
log_method = getattr(self._logger, level.lower())
Expand Down