Skip to content

Commit d5fdfcb

Browse files
EngHabuclaude
authored andcommitted
feat(logging): stamp run/action context via LogRecordFactory (#1038)
## Summary Replaces the per-logger `logging.Filter` approach for context stamping with a global `LogRecordFactory`. The factory runs on every `LogRecord` regardless of which logger created it, so any user-defined logger automatically picks up `run_name`, `action_name`, and `is_flyte_internal` attributes — no per-handler filter wiring required. This also fixes a latent bug in main: the old `ContextFilter` and `FlyteInternalFilter` mutated `record.msg` in place, so a record passing through multiple handlers could get its `[run][action]` and `[flyte]` prefixes prepended multiple times. Stamping attributes on the record (and rendering them at format time) sidesteps that entirely. ## What changed - **LogRecordFactory** (`src/flyte/_logging.py`) — stamps the active flyte action context on every record, lazily importing `flyte._context` to avoid import-time cycles. - **`ContextFormatter`** replaces `ContextFilter` + `FlyteInternalFilter`. The `[flyte]` internal-prefix marker is now a formatter flag (`internal_prefix=True`), so the same record can be formatted by multiple handlers without prefixes compounding. - **`ContextFormatter(inner=...)`** — optional kwarg to delegate base formatting to an existing `logging.Formatter`. Used to wrap pre-existing root-handler formatters without losing their layout. - **`initialize_logger(reset_root_logger=False)`** now wraps each existing root-handler formatter with `ContextFormatter(inner=existing)`, preserving main's "third-party log lines through root get `[run][action]`" behavior. Idempotent on repeated init calls. Same temporal limitation as main: handlers added after `flyte.init()` won't be wrapped, but the factory still stamps the attrs so callers can format them themselves. - **`get_rich_handler(internal_prefix=True)`** — opt-out parameter; the user-logger call site passes `False` so user-emitted lines don't carry the `[flyte]` marker even under rich logging. ## Behavior parity vs main | | main | this PR | |---|---|---| | `[run][action]` on flyte/flyte.user logs | ✅ | ✅ | | `[run][action]` on third-party logs through root (when handler exists at init) | ✅ (via filter) | ✅ (via formatter wrap) | | `[run][action]` on third-party logs through root (handler added after init) | ❌ | ❌ — but attrs still on record | | `[flyte]` marker on flyte internal logs | ✅ | ✅ | | Compounding prefixes when record passes through multiple handlers | ❌ (latent bug) | ✅ fixed | | `record.run_name` / `action_name` available everywhere | partial | ✅ universal (factory) | ## Why this matters for users `flyte.logger` and any child of `flyte.user` continue to work as before. New: anyone using a stdlib logger (`logging.getLogger("myapp")`) outside the flyte namespace also gets `run_name` / `action_name` / `is_flyte_internal` stamped on every record by the factory — they just need a formatter that references them (e.g. `"[%(run_name)s][%(action_name)s] %(message)s"`). ## Example `examples/basics/reuse_concurrent_logging.py` exercises three logger configs with a reusable container running concurrent tasks, so log output from multiple actions interleaves on a single stderr — each line correctly prefixed by its own `[run][action]`: 1. `flyte.logger` — canonical user-facing logger. 2. `logging.getLogger("flyte.user.myapp")` — child of `flyte.user`, inherits the formatter via propagation. 3. `logging.getLogger("myapp")` — fully independent logger with its own handler that references the factory-stamped attrs in its format string. ## Test plan - [x] `pytest tests/user_api/test_logging.py tests/flyte/test_logging.py` passes (23 tests) - [x] `test_user_logger_no_flyte_prefix` updated to check the formatter flag instead of the removed `FlyteInternalFilter` - [x] `test_user_logger_no_flyte_prefix_after_rich_init` — regression test for rich-handler internal prefix bleeding into user logger - [x] `test_initialize_logger_wraps_existing_root_handlers` — covers wrap path + idempotency on repeated init - [ ] Manual: run `examples/basics/reuse_concurrent_logging.py` against a cluster and confirm all three worker variants emit `[run][action]`-prefixed log lines on the shared reused container 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Signed-off-by: Haytham Abuelfutuh <haytham@afutuh.com> Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: Samhita Alla <aallasamhita@gmail.com>
1 parent 31a7c93 commit d5fdfcb

3 files changed

Lines changed: 244 additions & 61 deletions

File tree

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
"""
2+
Demo: reusable container + concurrent tasks, so log lines from different
3+
actions are interleaved in the same worker process. Each line should be
4+
prefixed with its own [run][action] thanks to the LogRecord factory in
5+
flyte._logging.
6+
7+
Workers exercise three logger configs to confirm context stamping works
8+
regardless of how the user obtains a logger:
9+
10+
1. flyte.logger — the canonical user-facing logger
11+
2. logging.getLogger("flyte.user.myapp") — child of flyte.user, inherits handler
12+
3. logging.getLogger("myapp") — fully independent logger w/ its own handler
13+
"""
14+
15+
import asyncio
16+
import logging
17+
import sys
18+
19+
import flyte
20+
21+
# One worker process, many concurrent invocations on it. With replicas=1 and
22+
# concurrency=8, all worker tasks below land in the same Python process, so
23+
# their log output is genuinely interleaved on a single stderr.
24+
env = flyte.TaskEnvironment(
25+
name="reuse_concurrent_logging",
26+
resources=flyte.Resources(cpu="1", memory="500Mi"),
27+
reusable=flyte.ReusePolicy(
28+
replicas=1,
29+
concurrency=8,
30+
idle_ttl=60,
31+
scaledown_ttl=60,
32+
),
33+
image=flyte.Image.from_debian_base().with_pip_packages("unionai-reuse"),
34+
)
35+
36+
# Variant 2: child of flyte.user. No handler/level setup needed — propagation
37+
# carries records up to flyte.user's handler, so [run][action] prefixing and
38+
# the user log level both apply automatically.
39+
inherited_logger = logging.getLogger("flyte.user.myapp")
40+
41+
# Variant 3: independent logger with its own StreamHandler. The flyte
42+
# LogRecordFactory still stamps run_name/action_name on every record, so the
43+
# formatter below can reference them via %(run_name)s / %(action_name)s even
44+
# though this logger lives outside the flyte.* namespace.
45+
independent_logger = logging.getLogger("myapp")
46+
if not independent_logger.handlers:
47+
_h = logging.StreamHandler(sys.stderr)
48+
_h.setFormatter(logging.Formatter("[%(run_name)s][%(action_name)s] myapp: %(message)s"))
49+
independent_logger.addHandler(_h)
50+
independent_logger.setLevel(logging.INFO)
51+
independent_logger.propagate = False
52+
53+
54+
@env.task
55+
async def worker_flyte_logger(label: str, ticks: int = 3) -> str:
56+
"""Variant 1: the canonical flyte.logger."""
57+
flyte.logger.info("starting label=%s", label)
58+
for i in range(ticks):
59+
await asyncio.sleep(1)
60+
flyte.logger.info("label=%s tick=%d/%d", label, i + 1, ticks)
61+
flyte.logger.info("done label=%s", label)
62+
return label
63+
64+
65+
@env.task
66+
async def worker_inherited_logger(label: str, ticks: int = 3) -> str:
67+
"""Variant 2: child of flyte.user — formatting is inherited."""
68+
inherited_logger.info("starting label=%s", label)
69+
for i in range(ticks):
70+
await asyncio.sleep(1)
71+
inherited_logger.info("label=%s tick=%d/%d", label, i + 1, ticks)
72+
inherited_logger.info("done label=%s", label)
73+
return label
74+
75+
76+
@env.task
77+
async def worker_independent_logger(label: str, ticks: int = 3) -> str:
78+
"""Variant 3: a fully independent stdlib logger — record factory still stamps context."""
79+
independent_logger.info("starting label=%s", label)
80+
for i in range(ticks):
81+
await asyncio.sleep(1)
82+
independent_logger.info("label=%s tick=%d/%d", label, i + 1, ticks)
83+
independent_logger.info("done label=%s", label)
84+
return label
85+
86+
87+
# The fan-out parent itself doesn't need to share the reuse pool — clone_with
88+
# turns reuse off for it and depends on the worker env for image + registration.
89+
@env.clone_with(name="reuse_concurrent_main", reusable=None, depends_on=[env]).task
90+
async def main(n: int = 2) -> list[str]:
91+
"""
92+
Dispatches a mix of all three worker variants so logs from each logger
93+
config interleave on the reused container's stderr.
94+
"""
95+
flyte.logger.info("dispatching %d of each worker variant", n)
96+
coros: list = []
97+
for i in range(n):
98+
coros.append(worker_flyte_logger(label=f"flyte-{i}"))
99+
coros.append(worker_inherited_logger(label=f"inherited-{i}"))
100+
coros.append(worker_independent_logger(label=f"independent-{i}"))
101+
results = await asyncio.gather(*coros)
102+
flyte.logger.info("all workers finished: %s", results)
103+
return results
104+
105+
106+
if __name__ == "__main__":
107+
flyte.init_from_config()
108+
run = flyte.run(main, n=2)
109+
print(run.url)
110+
run.wait()

src/flyte/_logging.py

Lines changed: 78 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,44 @@
33
import logging
44
import os
55
from datetime import datetime
6-
from typing import Literal, Optional
6+
from typing import Any, Literal, Optional
77

88
import flyte
99

1010
from ._tools import ipython_check
1111

1212
LogFormat = Literal["console", "json"]
13+
14+
_orig_record_factory = logging.getLogRecordFactory()
15+
16+
17+
def _flyte_record_factory(*args: Any, **kwargs: Any) -> logging.LogRecord:
18+
record = _orig_record_factory(*args, **kwargs)
19+
20+
# Stamp the active flyte action context, if any. Imported lazily because
21+
# this factory runs on every record, including during flyte's own import.
22+
try:
23+
from flyte._context import ctx as _flyte_ctx
24+
25+
c = _flyte_ctx()
26+
except Exception:
27+
c = None
28+
if c is not None:
29+
record.run_name = c.action.run_name
30+
record.action_name = c.action.name
31+
else:
32+
record.run_name = None
33+
record.action_name = None
34+
35+
record.is_flyte_internal = record.name == "flyte" or (
36+
record.name.startswith("flyte.") and not record.name.startswith("flyte.user")
37+
)
38+
return record
39+
40+
41+
logging.setLogRecordFactory(_flyte_record_factory)
42+
43+
1344
_LOG_LEVEL_MAP = {
1445
"critical": logging.CRITICAL, # 50
1546
"error": logging.ERROR, # 40
@@ -91,7 +122,7 @@ def _get_console():
91122
return Console(width=width)
92123

93124

94-
def get_rich_handler(log_level: int) -> Optional[logging.Handler]:
125+
def get_rich_handler(log_level: int, internal_prefix: bool = True) -> Optional[logging.Handler]:
95126
"""
96127
Upgrades the global loggers to use Rich logging.
97128
"""
@@ -117,7 +148,7 @@ def get_rich_handler(log_level: int) -> Optional[logging.Handler]:
117148
markup=True,
118149
)
119150

120-
formatter = logging.Formatter(fmt="%(filename)s:%(lineno)d - %(message)s")
151+
formatter = ContextFormatter(fmt="%(filename)s:%(lineno)d - %(message)s", internal_prefix=internal_prefix)
121152
handler.setFormatter(formatter)
122153
return handler
123154

@@ -190,9 +221,16 @@ def initialize_logger(
190221
if reset_root_logger:
191222
_setup_root_logger(use_json=use_json, use_rich=use_rich, log_level=log_level)
192223
else:
224+
# Wrap each existing root-handler formatter so third-party log lines
225+
# routed through root render with [run][action]. Captures handlers
226+
# registered before this call; handlers added later won't be wrapped
227+
# (factory still stamps the attrs, so callers can format them).
193228
root_logger = logging.getLogger()
194229
for h in root_logger.handlers:
195-
h.addFilter(ContextFilter())
230+
existing = h.formatter
231+
if isinstance(existing, ContextFormatter):
232+
continue
233+
h.setFormatter(ContextFormatter(inner=existing))
196234

197235
# Set up Flyte logger handler
198236
flyte_handler: logging.Handler | None = None
@@ -206,12 +244,7 @@ def initialize_logger(
206244
if flyte_handler is None:
207245
flyte_handler = logging.StreamHandler()
208246
flyte_handler.setLevel(log_level)
209-
formatter = logging.Formatter(fmt="%(message)s")
210-
flyte_handler.setFormatter(formatter)
211-
212-
# Add both filters to Flyte handler
213-
flyte_handler.addFilter(FlyteInternalFilter())
214-
flyte_handler.addFilter(ContextFilter())
247+
flyte_handler.setFormatter(ContextFormatter(fmt="%(message)s", internal_prefix=True))
215248

216249
flyte_logger.addHandler(flyte_handler)
217250
flyte_logger.setLevel(log_level)
@@ -231,17 +264,16 @@ def initialize_logger(
231264
user_handler.setLevel(user_log_level)
232265
user_handler.setFormatter(JSONFormatter())
233266
elif use_rich:
234-
rich_handler = get_rich_handler(user_log_level)
267+
rich_handler = get_rich_handler(user_log_level, internal_prefix=False)
235268
user_handler = rich_handler if rich_handler is not None else logging.StreamHandler()
236269
user_handler.setLevel(user_log_level)
237270
if not rich_handler:
238-
user_handler.setFormatter(logging.Formatter(fmt="%(message)s"))
271+
user_handler.setFormatter(ContextFormatter(fmt="%(message)s"))
239272
else:
240273
user_handler = logging.StreamHandler()
241274
user_handler.setLevel(user_log_level)
242-
user_handler.setFormatter(logging.Formatter(fmt="%(message)s"))
275+
user_handler.setFormatter(ContextFormatter(fmt="%(message)s"))
243276

244-
user_handler.addFilter(ContextFilter())
245277
user_flyte_logger.addHandler(user_handler)
246278
user_flyte_logger.setLevel(user_log_level)
247279
user_flyte_logger.propagate = False
@@ -274,42 +306,38 @@ def wrapper(*args, **kwargs):
274306
return decorator(fn)
275307

276308

277-
class ContextFilter(logging.Filter):
309+
class ContextFormatter(logging.Formatter):
278310
"""
279-
A logging filter that adds the current action's run name and name to all log records.
280-
Applied globally to capture context for both user and Flyte internal logging.
311+
Console formatter that prefixes records with action context and an optional
312+
[flyte] marker, both pulled from attributes stamped by _flyte_record_factory.
313+
Does not mutate record state, so the same record can be formatted by
314+
multiple handlers without compounding prefixes.
281315
"""
282316

283-
def filter(self, record: logging.LogRecord) -> bool:
284-
from flyte._context import ctx
285-
286-
c = ctx()
287-
if c:
288-
action = c.action
289-
# Add as attributes for structured logging (JSON)
290-
record.run_name = action.run_name
291-
record.action_name = action.name
292-
# Also modify message for console/Rich output
293-
record.msg = f"[{action.run_name}][{action.name}] {record.msg}"
294-
else:
295-
record.run_name = None
296-
record.action_name = None
297-
return True
317+
def __init__(
318+
self,
319+
fmt: str = "%(message)s",
320+
*,
321+
internal_prefix: bool = False,
322+
inner: logging.Formatter | None = None,
323+
**kwargs: Any,
324+
) -> None:
325+
super().__init__(fmt=fmt, **kwargs)
326+
self._internal_prefix = internal_prefix
327+
self._inner = inner
298328

299-
300-
class FlyteInternalFilter(logging.Filter):
301-
"""
302-
A logging filter that adds [flyte] prefix to internal Flyte logging only.
303-
"""
304-
305-
def filter(self, record: logging.LogRecord) -> bool:
306-
is_internal = record.name.startswith("flyte")
307-
# Add as attribute for structured logging (JSON)
308-
record.is_flyte_internal = is_internal
309-
# Also modify message for console/Rich output
310-
if is_internal:
311-
record.msg = f"[flyte] {record.msg}"
312-
return True
329+
def format(self, record: logging.LogRecord) -> str:
330+
base = self._inner.format(record) if self._inner is not None else super().format(record)
331+
parts: list[str] = []
332+
run_name = getattr(record, "run_name", None)
333+
action_name = getattr(record, "action_name", None)
334+
if run_name and action_name:
335+
parts.append(f"[{run_name}][{action_name}]")
336+
if self._internal_prefix and getattr(record, "is_flyte_internal", False):
337+
parts.append("[flyte]")
338+
if not parts:
339+
return base
340+
return f"{' '.join(parts)} {base}"
313341

314342

315343
def _setup_root_logger(use_json: bool, use_rich: bool, log_level: int):
@@ -330,28 +358,25 @@ def _setup_root_logger(use_json: bool, use_rich: bool, log_level: int):
330358
# get_rich_handler can return None in some environments
331359
if not root_handler:
332360
root_handler = logging.StreamHandler()
361+
root_handler.setFormatter(ContextFormatter(fmt="%(message)s"))
333362

334-
# Add context filter to ALL logging
335-
root_handler.addFilter(ContextFilter())
336363
root_handler.setLevel(log_level)
337-
338364
root.addHandler(root_handler)
339365
root.setLevel(log_level)
340366

341367

342368
def _create_user_logger() -> logging.Logger:
343369
"""
344370
Create the user-facing logger. Defaults to INFO so user logs are visible by default.
345-
Does not use FlyteInternalFilter — no [flyte] prefix on user messages.
371+
No [flyte] prefix on user messages.
346372
"""
347373
user_flyte_logger = logging.getLogger("flyte.user")
348374
user_log_level = get_env_user_log_level()
349375
user_flyte_logger.setLevel(user_log_level)
350376

351377
handler = logging.StreamHandler()
352378
handler.setLevel(user_log_level)
353-
handler.addFilter(ContextFilter())
354-
handler.setFormatter(logging.Formatter(fmt="%(message)s"))
379+
handler.setFormatter(ContextFormatter(fmt="%(message)s"))
355380

356381
user_flyte_logger.propagate = False
357382
user_flyte_logger.addHandler(handler)
@@ -366,14 +391,9 @@ def _create_flyte_logger() -> logging.Logger:
366391
flyte_logger = logging.getLogger("flyte")
367392
flyte_logger.setLevel(get_env_log_level())
368393

369-
# Add a handler specifically for flyte logging with the prefix filter
370394
handler = logging.StreamHandler()
371395
handler.setLevel(get_env_log_level())
372-
handler.addFilter(FlyteInternalFilter())
373-
handler.addFilter(ContextFilter())
374-
375-
formatter = logging.Formatter(fmt="%(message)s")
376-
handler.setFormatter(formatter)
396+
handler.setFormatter(ContextFormatter(fmt="%(message)s", internal_prefix=True))
377397

378398
# Prevent propagation to root to avoid double logging
379399
flyte_logger.propagate = False

0 commit comments

Comments
 (0)