-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathlogging_utils.py
More file actions
77 lines (60 loc) · 2.34 KB
/
logging_utils.py
File metadata and controls
77 lines (60 loc) · 2.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
"""
Airflow-aware logging utilities for data pipeline tasks.
This module provides logging that automatically uses Airflow's native task logger when
running in an Airflow context, and falls back to standard Python logging otherwise. This
ensures clean, properly formatted logs in Airflow.
"""
import logging
from typing import Union
from airflow.sdk import get_current_context
class AirflowAwareLogger:
"""
Logger that automatically uses Airflow's task logger when available.
Detects if code is running within an Airflow task context and uses the task instance
logger. Falls back to standard Python logging otherwise.
"""
def __init__(self, name: str):
"""
Initialize the logger.
:param name: Logger name (typically __name__ of the module).
"""
self.name = name
self._fallback_logger = logging.getLogger(name)
def _get_logger(self) -> logging.Logger:
"""
Get the appropriate logger for the current context.
:return: Task logger if in Airflow context, otherwise standard logger.
"""
try:
context = get_current_context()
return context["task_instance"].log
except (RuntimeError, KeyError, AttributeError):
# Not in Airflow context, use fallback logger.
return self._fallback_logger
def debug(self, msg: Union[str, object], *args, **kwargs) -> None:
"""
Log a debug message.
"""
return self._get_logger().debug(msg, *args, **kwargs)
def info(self, msg: Union[str, object], *args, **kwargs) -> None:
"""
Log an info message.
"""
return self._get_logger().info(msg, *args, **kwargs)
def warning(self, msg: Union[str, object], *args, **kwargs) -> None:
"""
Log a warning message.
"""
return self._get_logger().warning(msg, *args, **kwargs)
def error(self, msg: Union[str, object], *args, **kwargs) -> None:
"""
Log an error message.
"""
return self._get_logger().error(msg, *args, **kwargs)
def critical(self, msg: Union[str, object], *args, **kwargs) -> None:
"""
Log a critical message.
"""
return self._get_logger().critical(msg, *args, **kwargs)
# Global logger instance for module-level imports.
LOGGER = AirflowAwareLogger(__name__)