-
Notifications
You must be signed in to change notification settings - Fork 23.2k
Expand file tree
/
Copy pathstructured_formatter.py
More file actions
172 lines (139 loc) · 5.39 KB
/
Copy pathstructured_formatter.py
File metadata and controls
172 lines (139 loc) · 5.39 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
"""Structured JSON log formatter for Dify."""
import logging
import traceback
from datetime import UTC, datetime
from typing import Any, NotRequired, TypedDict, override
import orjson
from configs import dify_config
from core.logging.context import get_error_source
class IdentityDict(TypedDict, total=False):
tenant_id: str
user_id: str
user_type: str
class LogContextDict(TypedDict, total=False):
app_id: str
workflow_id: str
node_id: str
class LogDict(TypedDict):
ts: str
severity: str
service: str
caller: str
message: str
trace_id: NotRequired[str]
span_id: NotRequired[str]
identity: NotRequired[IdentityDict]
context: NotRequired[LogContextDict]
error_source: NotRequired[str]
attributes: NotRequired[dict[str, Any]]
stack_trace: NotRequired[str]
class StructuredJSONFormatter(logging.Formatter):
"""
JSON log formatter following the specified schema:
{
"ts": "ISO 8601 UTC",
"severity": "INFO|ERROR|WARN|DEBUG",
"service": "service name",
"caller": "file:line",
"trace_id": "hex 32",
"span_id": "hex 16",
"identity": { "tenant_id", "user_id", "user_type" },
"message": "log message",
"attributes": { ... },
"stack_trace": "..."
}
"""
SEVERITY_MAP: dict[int, str] = {
logging.DEBUG: "DEBUG",
logging.INFO: "INFO",
logging.WARNING: "WARN",
logging.ERROR: "ERROR",
logging.CRITICAL: "ERROR",
}
def __init__(self, service_name: str | None = None):
super().__init__()
self._service_name = service_name or dify_config.APPLICATION_NAME
@override
def format(self, record: logging.LogRecord) -> str:
log_dict = self._build_log_dict(record)
try:
return orjson.dumps(log_dict).decode("utf-8")
except TypeError:
# Fallback: convert non-serializable objects to string
import json
return json.dumps(log_dict, default=str, ensure_ascii=False)
def _build_log_dict(self, record: logging.LogRecord) -> LogDict:
# Core fields
log_dict: LogDict = {
"ts": datetime.now(UTC).isoformat(timespec="milliseconds").replace("+00:00", "Z"),
"severity": self.SEVERITY_MAP.get(record.levelno, "INFO"),
"service": self._service_name,
"caller": f"{record.filename}:{record.lineno}",
"message": record.getMessage(),
}
# Trace context (from TraceContextFilter)
trace_id = getattr(record, "trace_id", "")
span_id = getattr(record, "span_id", "")
if trace_id:
log_dict["trace_id"] = trace_id
if span_id:
log_dict["span_id"] = span_id
# Identity context (from IdentityContextFilter)
identity = self._extract_identity(record)
if identity:
log_dict["identity"] = identity
# Workflow log context (from WorkflowLogContextFilter)
context = self._extract_log_context(record)
if context:
log_dict["context"] = context
# Error source inference (ERROR and above only)
if record.levelno >= logging.ERROR:
log_dict["error_source"] = self._infer_error_source(record)
# Dynamic attributes
attributes = getattr(record, "attributes", None)
if attributes:
log_dict["attributes"] = attributes
# Stack trace for errors with exceptions
if record.exc_info and record.levelno >= logging.ERROR:
log_dict["stack_trace"] = self._format_exception(record.exc_info)
return log_dict
def _extract_identity(self, record: logging.LogRecord) -> IdentityDict | None:
tenant_id = getattr(record, "tenant_id", None)
user_id = getattr(record, "user_id", None)
user_type = getattr(record, "user_type", None)
if not any([tenant_id, user_id, user_type]):
return None
identity: IdentityDict = {}
if tenant_id:
identity["tenant_id"] = tenant_id
if user_id:
identity["user_id"] = user_id
if user_type:
identity["user_type"] = user_type
return identity
def _extract_log_context(self, record: logging.LogRecord) -> LogContextDict | None:
"""Extract workflow log context (app_id, workflow_id, node_id) from record."""
app_id = getattr(record, "app_id", "") or ""
workflow_id = getattr(record, "workflow_id", "") or ""
node_id = getattr(record, "node_id", "") or ""
if not any([app_id, workflow_id, node_id]):
return None
context: LogContextDict = {}
if app_id:
context["app_id"] = app_id
if workflow_id:
context["workflow_id"] = workflow_id
if node_id:
context["node_id"] = node_id
return context
def _infer_error_source(self, record: logging.LogRecord) -> str:
"""Return the error_source for this ERROR+ log record.
The value comes from the ``_error_source`` ContextVar, which defaults
to ``"system"`` and is set to ``"workflow"`` by ``WorkflowEntry.run``
during workflow graph execution.
"""
return get_error_source().value
def _format_exception(self, exc_info: tuple[Any, ...]) -> str:
if exc_info and exc_info[0] is not None:
return "".join(traceback.format_exception(*exc_info))
return ""