-
Notifications
You must be signed in to change notification settings - Fork 2.2k
Expand file tree
/
Copy pathhandlers.py
More file actions
336 lines (283 loc) · 12 KB
/
handlers.py
File metadata and controls
336 lines (283 loc) · 12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
from __future__ import annotations
import json
import logging
import sys
import time
import traceback
import uuid
import warnings
from contextlib import asynccontextmanager
from typing import TYPE_CHECKING, Any, Dict, List, TextIO, Type
import pendulum
from rich.console import Console
from rich.highlighter import Highlighter, NullHighlighter
from rich.theme import Theme
from typing_extensions import Self
import prefect.context
from prefect._internal.concurrency.api import create_call, from_sync
from prefect._internal.concurrency.event_loop import get_running_loop
from prefect._internal.concurrency.services import BatchedQueueService
from prefect._internal.concurrency.threads import in_global_loop
from prefect.client.orchestration import get_client
from prefect.client.schemas.actions import LogCreate
from prefect.exceptions import MissingContextError
from prefect.logging.highlighters import PrefectConsoleHighlighter
from prefect.settings import (
PREFECT_API_URL,
PREFECT_LOGGING_COLORS,
PREFECT_LOGGING_INTERNAL_LEVEL,
PREFECT_LOGGING_MARKUP,
PREFECT_LOGGING_TO_API_BATCH_INTERVAL,
PREFECT_LOGGING_TO_API_BATCH_SIZE,
PREFECT_LOGGING_TO_API_MAX_LOG_SIZE,
PREFECT_LOGGING_TO_API_WHEN_MISSING_FLOW,
)
if sys.version_info >= (3, 12):
StreamHandler = logging.StreamHandler[TextIO]
else:
if TYPE_CHECKING:
StreamHandler = logging.StreamHandler[TextIO]
else:
StreamHandler = logging.StreamHandler
class APILogWorker(BatchedQueueService[Dict[str, Any]]):
@property
def _max_batch_size(self):
return max(
PREFECT_LOGGING_TO_API_BATCH_SIZE.value()
- PREFECT_LOGGING_TO_API_MAX_LOG_SIZE.value(),
PREFECT_LOGGING_TO_API_MAX_LOG_SIZE.value(),
)
@property
def _min_interval(self):
return PREFECT_LOGGING_TO_API_BATCH_INTERVAL.value()
async def _handle_batch(self, items: List):
try:
await self._client.create_logs(items)
except Exception as e:
# Roughly replicate the behavior of the stdlib logger error handling
if logging.raiseExceptions and sys.stderr:
sys.stderr.write("--- Error logging to API ---\n")
if PREFECT_LOGGING_INTERNAL_LEVEL.value() == "DEBUG":
traceback.print_exc(file=sys.stderr)
else:
# Only log the exception message in non-DEBUG mode
sys.stderr.write(str(e))
@asynccontextmanager
async def _lifespan(self):
async with get_client() as self._client:
yield
@classmethod
def instance(cls: Type[Self]) -> Self:
settings = (
PREFECT_LOGGING_TO_API_BATCH_SIZE.value(),
PREFECT_API_URL.value(),
PREFECT_LOGGING_TO_API_MAX_LOG_SIZE.value(),
)
# Ensure a unique worker is retrieved per relevant logging settings
return super().instance(*settings)
def _get_size(self, item: Dict[str, Any]) -> int:
return item.pop("__payload_size__", None) or len(json.dumps(item).encode())
class APILogHandler(logging.Handler):
"""
A logging handler that sends logs to the Prefect API.
Sends log records to the `APILogWorker` which manages sending batches of logs in
the background.
"""
@classmethod
def flush(cls) -> None:
"""
Tell the `APILogWorker` to send any currently enqueued logs and block until
completion.
Use `aflush` from async contexts instead.
"""
loop = get_running_loop()
if loop:
if in_global_loop(): # Guard against internal misuse
raise RuntimeError(
"Cannot call `APILogWorker.flush` from the global event loop; it"
" would block the event loop and cause a deadlock. Use"
" `APILogWorker.aflush` instead."
)
# Not ideal, but this method is called by the stdlib and cannot return a
# coroutine so we just schedule the drain in the global loop thread and continue
from_sync.call_soon_in_loop_thread(create_call(APILogWorker.drain_all))
return None
else:
# We set a timeout of 5s because we don't want to block forever if the worker
# is stuck. This can occur when the handler is being shutdown and the
# `logging._lock` is held but the worker is attempting to emit logs resulting
# in a deadlock.
return APILogWorker.drain_all(timeout=5)
@classmethod
async def aflush(cls) -> bool:
"""
Tell the `APILogWorker` to send any currently enqueued logs and block until
completion.
"""
return await APILogWorker.drain_all()
def emit(self, record: logging.LogRecord) -> None:
"""
Send a log to the `APILogWorker`
"""
try:
profile = prefect.context.get_settings_context()
if not profile.settings.logging.to_api.enabled:
return # Respect the global settings toggle
if not getattr(record, "send_to_api", True):
return # Do not send records that have opted out
log = self.prepare(record)
APILogWorker.instance().send(log)
except Exception:
self.handleError(record)
def handleError(self, record: logging.LogRecord) -> None:
_, exc, _ = sys.exc_info()
if isinstance(exc, MissingContextError):
log_handling_when_missing_flow = (
PREFECT_LOGGING_TO_API_WHEN_MISSING_FLOW.value()
)
if log_handling_when_missing_flow == "warn":
# Warn when a logger is used outside of a run context, the stack level here
# gets us to the user logging call
warnings.warn(
f"{exc} Set PREFECT_LOGGING_TO_API_WHEN_MISSING_FLOW=ignore to suppress this warning.",
stacklevel=8,
)
return
elif log_handling_when_missing_flow == "ignore":
return
else:
raise exc
# Display a longer traceback for other errors
return super().handleError(record)
def prepare(self, record: logging.LogRecord) -> Dict[str, Any]:
"""
Convert a `logging.LogRecord` to the API `LogCreate` schema and serialize.
This infers the linked flow or task run from the log record or the current
run context.
If a flow run id cannot be found, the log will be dropped.
Logs exceeding the maximum size will be dropped.
"""
flow_run_id = getattr(record, "flow_run_id", None)
task_run_id = getattr(record, "task_run_id", None)
worker_id = getattr(record, "worker_id", None)
if not flow_run_id:
try:
context = prefect.context.get_run_context()
except MissingContextError:
raise MissingContextError(
f"Logger {record.name!r} attempted to send logs to the API without"
" a flow run id. The API log handler can only send logs within"
" flow run contexts unless the flow run id is manually provided."
) from None
if hasattr(context, "flow_run"):
flow_run_id = context.flow_run.id
elif hasattr(context, "task_run"):
flow_run_id = context.task_run.flow_run_id
task_run_id = task_run_id or context.task_run.id
else:
raise ValueError(
"Encountered malformed run context. Does not contain flow or task "
"run information."
)
# Parsing to a `LogCreate` object here gives us nice parsing error messages
# from the standard lib `handleError` method if something goes wrong and
# prevents malformed logs from entering the queue
try:
is_uuid_like = isinstance(flow_run_id, uuid.UUID) or (
isinstance(flow_run_id, str) and uuid.UUID(flow_run_id)
)
except ValueError:
is_uuid_like = False
log = LogCreate(
flow_run_id=flow_run_id if is_uuid_like else None,
task_run_id=task_run_id,
worker_id=worker_id,
name=record.name,
level=record.levelno,
timestamp=pendulum.from_timestamp(
getattr(record, "created", None) or time.time()
),
message=self.format(record),
).model_dump(mode="json")
log_size = log["__payload_size__"] = self._get_payload_size(log)
if log_size > PREFECT_LOGGING_TO_API_MAX_LOG_SIZE.value():
raise ValueError(
f"Log of size {log_size} is greater than the max size of "
f"{PREFECT_LOGGING_TO_API_MAX_LOG_SIZE.value()}"
)
return log
def _get_payload_size(self, log: Dict[str, Any]) -> int:
return len(json.dumps(log).encode())
class WorkerAPILogHandler(APILogHandler):
def emit(self, record: logging.LogRecord) -> None:
# Open-source API servers do not currently support worker logs, and
# worker logs only have an associated worker ID when connected to Cloud,
# so we won't send worker logs to the API unless they have a worker ID.
if not getattr(record, "worker_id", None):
return
super().emit(record)
def prepare(self, record: logging.LogRecord) -> Dict[str, Any]:
"""
Convert a `logging.LogRecord` to the API `LogCreate` schema and serialize.
This will add in the worker id to the log.
Logs exceeding the maximum size will be dropped.
"""
worker_id = getattr(record, "worker_id", None)
log = LogCreate(
worker_id=worker_id,
name=record.name,
level=record.levelno,
timestamp=pendulum.from_timestamp(
getattr(record, "created", None) or time.time()
),
message=self.format(record),
).model_dump(mode="json")
log_size = log["__payload_size__"] = self._get_payload_size(log)
if log_size > PREFECT_LOGGING_TO_API_MAX_LOG_SIZE.value():
raise ValueError(
f"Log of size {log_size} is greater than the max size of "
f"{PREFECT_LOGGING_TO_API_MAX_LOG_SIZE.value()}"
)
return log
class PrefectConsoleHandler(StreamHandler):
def __init__(
self,
stream: TextIO | None = None,
highlighter: type[Highlighter] = PrefectConsoleHighlighter,
styles: dict[str, str] | None = None,
level: int | str = logging.NOTSET,
):
"""
The default console handler for Prefect, which highlights log levels,
web and file URLs, flow and task (run) names, and state types in the
local console (terminal).
Highlighting can be toggled on/off with the PREFECT_LOGGING_COLORS setting.
For finer control, use logging.yml to add or remove styles, and/or
adjust colors.
"""
super().__init__(stream=stream)
styled_console = PREFECT_LOGGING_COLORS.value()
markup_console = PREFECT_LOGGING_MARKUP.value()
if styled_console:
highlighter = highlighter()
theme = Theme(styles, inherit=False)
else:
highlighter = NullHighlighter()
theme = Theme(inherit=False)
self.level = level
self.console: Console = Console(
highlighter=highlighter,
theme=theme,
file=self.stream,
markup=markup_console,
)
def emit(self, record: logging.LogRecord) -> None:
try:
message = self.format(record)
self.console.print(message, soft_wrap=True)
except RecursionError:
# This was copied over from logging.StreamHandler().emit()
# https://bugs.python.org/issue36272
raise
except Exception:
self.handleError(record)