forked from opendatahub-io/opendatahub-tests
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlogger.py
More file actions
179 lines (139 loc) · 6.37 KB
/
logger.py
File metadata and controls
179 lines (139 loc) · 6.37 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
import logging
import multiprocessing
import shutil
from logging.handlers import QueueHandler, QueueListener, RotatingFileHandler
from typing import Any
from utilities.opendatahub_logger import (
DuplicateFilter,
ThirdPartyHumanReadableFormatter,
ThirdPartyJSONFormatter,
WrapperLogFormatter,
set_human_readable,
)
LOGGER = logging.getLogger(__name__)
class RedactedString(str):
"""
Used to redact the representation of a sensitive string.
"""
def __new__(cls, *, value: object) -> RedactedString: # noqa: PYI034
return super().__new__(cls, value)
def __repr__(self) -> str:
return "'***REDACTED***'"
class PassthroughQueueHandler(QueueHandler):
"""QueueHandler that preserves raw log records for independent formatting by output handlers."""
def prepare(self, record: logging.LogRecord) -> logging.LogRecord:
# Convert exc_info to text for pickling (traceback objects can't be pickled)
if record.exc_info:
if not record.exc_text:
record.exc_text = logging.Formatter().formatException(record.exc_info)
record.exc_info = None
return record
class DelegatingFormatter(logging.Formatter):
"""Routes formatting to different formatters based on logger name."""
def __init__(self, formatters: dict[str, logging.Formatter], default: logging.Formatter) -> None:
super().__init__()
self._formatters = formatters
self._default = default
def format(self, record: logging.LogRecord) -> str:
formatter = self._formatters.get(record.name, self._default)
return formatter.format(record)
def setup_logging(
log_level: int,
log_file: str = "/tmp/pytest-tests.log",
thread_name: str | None = None,
enable_console: bool = True,
human_readable: bool = False,
) -> QueueListener:
"""
Setup basic/root logging using PassthroughQueueHandler/QueueListener
to consolidate log messages into a single stream to be written to multiple outputs.
Raw log records flow through the queue unformatted, allowing each output handler
to format independently: text for console, JSON for file.
Args:
log_level (int): log level
log_file (str): logging output file
thread_name (str | None): optional thread_name id prefix, e.g., [gw0]
human_readable (bool): if True, file output is also human-readable text instead of JSON
Returns:
QueueListener: Process monitoring the log Queue
Eg:
root QueueHandler ┐ ┌> StreamHandler (text)
├> Queue -> QueueListener ┤
basic QueueHandler ┘ └> FileHandler (JSON)
"""
basic_fmt_str = "%(message)s"
root_fmt_str = "%(asctime)s %(name)s %(log_color)s%(levelname)s%(reset)s %(message)s"
if thread_name:
basic_fmt_str = f"[{thread_name}] {basic_fmt_str}"
root_fmt_str = f"[{thread_name}] {root_fmt_str}"
basic_log_formatter = logging.Formatter(fmt=basic_fmt_str)
root_log_formatter = WrapperLogFormatter(
fmt=root_fmt_str,
log_colors={
"DEBUG": "cyan",
"INFO": "green",
"WARNING": "yellow",
"ERROR": "red",
"CRITICAL": "red,bg_white",
},
)
file_formatter = ThirdPartyHumanReadableFormatter() if human_readable else ThirdPartyJSONFormatter()
log_file_handler = RotatingFileHandler(filename=log_file, maxBytes=100 * 1024 * 1024, backupCount=20)
log_file_handler.setLevel(level=log_level) # Set the file handler log level
log_file_handler.setFormatter(fmt=file_formatter)
handlers: list[Any] = [log_file_handler]
# Convert log_level to int if it's a string
if isinstance(log_level, str):
log_level = getattr(logging, log_level.upper(), logging.INFO)
if enable_console:
console_handler = logging.StreamHandler()
console_handler.setLevel(level=log_level) # Set the console handler log level
console_handler.setFormatter(
fmt=DelegatingFormatter(
formatters={"basic": basic_log_formatter},
default=root_log_formatter,
)
)
handlers.append(console_handler)
log_queue = multiprocessing.Queue(maxsize=-1) # type: ignore[var-annotated]
log_listener = QueueListener(log_queue, *handlers)
basic_log_queue_handler = PassthroughQueueHandler(queue=log_queue)
basic_log_queue_handler.set_name(name="basic")
basic_logger = logging.getLogger(name="basic")
basic_logger.setLevel(level=log_level)
basic_logger.handlers.clear()
basic_logger.addHandler(hdlr=basic_log_queue_handler)
root_log_queue_handler = PassthroughQueueHandler(queue=log_queue)
root_log_queue_handler.set_name(name="root")
root_logger = logging.getLogger(name="root")
root_logger.setLevel(level=log_level)
root_logger.handlers.clear()
root_logger.addHandler(hdlr=root_log_queue_handler)
root_logger.addFilter(filter=DuplicateFilter())
root_logger.propagate = False
basic_logger.propagate = False
# Always configure all loggers to use our queue system
# This ensures test loggers and third-party loggers respect our console setting
for name, logger in logging.root.manager.loggerDict.items():
if isinstance(logger, logging.Logger) and (name not in ("root", "basic")):
logger.handlers.clear()
logger.addHandler(hdlr=root_log_queue_handler)
logger.propagate = False
# Configure the root logger to catch any new loggers that inherit from it
# First, completely clear any existing configuration
logging.root.handlers.clear()
logging.root.setLevel(level=log_level) # Set root logger to respect our log level
logging.root.addHandler(hdlr=root_log_queue_handler)
# Also ensure the root logger doesn't have any lingering configuration
for handler in logging.root.handlers[:]:
if handler != root_log_queue_handler:
logging.root.removeHandler(hdlr=handler)
set_human_readable(enabled=human_readable)
log_listener.start()
return log_listener
def separator(symbol_: str, val: str | None = None) -> str:
terminal_width = shutil.get_terminal_size(fallback=(120, 40))[0]
if not val:
return f"{symbol_ * terminal_width}"
sepa = int((terminal_width - len(val) - 2) // 2)
return f"{symbol_ * sepa} {val} {symbol_ * sepa}"