Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions api/context/execution_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ def user(self) -> Any:
"""Get user object."""
...

def refresh_context_vars(self) -> None:
"""Re-capture current context variables for propagation to worker threads."""
...


@final
class ExecutionContext:
Expand Down Expand Up @@ -93,6 +97,15 @@ def user(self) -> Any:
"""Get captured user object."""
return self._user

def refresh_context_vars(self) -> None:
"""Re-capture current context variables.

Call this after ContextVars have been updated in the current thread
(e.g. by a GraphEngine layer's ``on_graph_start``) so that worker
threads created afterwards receive the updated values.
"""
self._context_vars = contextvars.copy_context()

@contextmanager
def enter(self) -> Generator[None, None, None]:
"""Enter this execution context."""
Expand Down
4 changes: 4 additions & 0 deletions api/context/flask_app_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,10 @@ def user(self) -> Any:
"""Get user object."""
return self._user

def refresh_context_vars(self) -> None:
"""Re-capture current context variables for propagation to worker threads."""
self._context_vars = contextvars.copy_context()

def __enter__(self) -> "FlaskExecutionContext":
"""Enter the Flask execution context."""
# Restore non-Flask context variables to avoid leaking Flask tokens across threads
Expand Down
2 changes: 2 additions & 0 deletions api/core/app/workflow/layers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
"""Workflow-level GraphEngine layers that depend on outer infrastructure."""

from .llm_quota import LLMQuotaLayer
from .log_context import WorkflowLogContextLayer
from .observability import ObservabilityLayer
from .persistence import PersistenceWorkflowInfo, WorkflowPersistenceLayer

__all__ = [
"LLMQuotaLayer",
"ObservabilityLayer",
"PersistenceWorkflowInfo",
"WorkflowLogContextLayer",
"WorkflowPersistenceLayer",
]
58 changes: 58 additions & 0 deletions api/core/app/workflow/layers/log_context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
"""GraphEngine layer that manages node_id log context via ContextVars.

This layer tracks ``node_id`` during node execution (set on
``on_node_run_start``, cleared on ``on_node_run_end``). The
``app_id`` / ``workflow_id`` / ``error_source`` lifecycle is managed by
``WorkflowEntry.run`` directly, which has full control over the try/finally
timing relative to ``logger.exception``.

On ``on_graph_start``, this layer refreshes the ``execution_context``
snapshot so that worker threads inherit the ContextVars that
``WorkflowEntry.run`` set just before starting the graph engine.
"""

from typing import override

from context import IExecutionContext
from core.logging.context import set_node_log_context
from graphon.graph_engine.layers import GraphEngineLayer
from graphon.graph_events import GraphEngineEvent, GraphNodeEventBase
from graphon.nodes.base.node import Node


class WorkflowLogContextLayer(GraphEngineLayer):
"""Manage node_id log context lifecycle during graph execution."""

def __init__(self, *, execution_context: IExecutionContext | None = None) -> None:
super().__init__()
self._execution_context = execution_context

@override
def on_graph_start(self) -> None:
# Refresh the execution context snapshot so that worker threads
# (started after on_graph_start) inherit the ContextVars that
# WorkflowEntry.run set before starting the graph engine.
# Without this, workers would see stale default values because the
# snapshot was captured in WorkflowEntry.__init__ before run().
if self._execution_context is not None:
self._execution_context.refresh_context_vars()

@override
def on_node_run_start(self, node: Node) -> None:
set_node_log_context(node.id)

@override
def on_node_run_end(
self, node: Node, error: Exception | None, result_event: GraphNodeEventBase | None = None
) -> None:
set_node_log_context("")

@override
def on_event(self, event: GraphEngineEvent) -> None:
_ = event

@override
def on_graph_end(self, error: Exception | None) -> None:
# app_id / workflow_id / error_source are managed by WorkflowEntry.run.
# node_id is cleared in on_node_run_end after each node.
_ = error
27 changes: 26 additions & 1 deletion api/core/logging/__init__.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,45 @@
"""Structured logging components for Dify."""

from core.logging.context import (
ErrorSource,
clear_error_source,
clear_request_context,
clear_workflow_log_context,
get_app_id,
get_error_source,
get_node_id,
get_request_id,
get_trace_id,
get_workflow_id,
init_request_context,
set_error_source,
set_node_log_context,
set_workflow_log_context,
)
from core.logging.filters import (
IdentityContextFilter,
TraceContextFilter,
WorkflowLogContextFilter,
)
from core.logging.filters import IdentityContextFilter, TraceContextFilter
from core.logging.structured_formatter import StructuredJSONFormatter

__all__ = [
"ErrorSource",
"IdentityContextFilter",
"StructuredJSONFormatter",
"TraceContextFilter",
"WorkflowLogContextFilter",
"clear_error_source",
"clear_request_context",
"clear_workflow_log_context",
"get_app_id",
"get_error_source",
"get_node_id",
"get_request_id",
"get_trace_id",
"get_workflow_id",
"init_request_context",
"set_error_source",
"set_node_log_context",
"set_workflow_log_context",
]
104 changes: 104 additions & 0 deletions api/core/logging/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,32 @@

import uuid
from contextvars import ContextVar
from enum import StrEnum


class ErrorSource(StrEnum):
"""Classification of error sources for structured logging.

Used in the ``error_source`` field of ERROR+ log records to enable
differentiated alerting rules (e.g. workflow errors are user-caused,
system errors trigger on-call alerts).
"""

WORKFLOW = "workflow"
SYSTEM = "system"


_request_id: ContextVar[str] = ContextVar("log_request_id", default="")
_trace_id: ContextVar[str] = ContextVar("log_trace_id", default="")

# Workflow log context
_app_id: ContextVar[str] = ContextVar("log_app_id", default="")
_workflow_id: ContextVar[str] = ContextVar("log_workflow_id", default="")
_node_id: ContextVar[str] = ContextVar("log_node_id", default="")

# Error source context (set by WorkflowEntry.run during workflow execution)
_error_source: ContextVar[ErrorSource] = ContextVar("log_error_source", default=ErrorSource.SYSTEM)


def get_request_id() -> str:
"""Get current request ID (10 hex chars)."""
Expand All @@ -33,3 +55,85 @@ def clear_request_context() -> None:
"""Clear request context. Call at end of request (optional)."""
_request_id.set("")
_trace_id.set("")


# ---------------------------------------------------------------------------
# Workflow log context
# ---------------------------------------------------------------------------


def get_app_id() -> str:
"""Get current workflow app_id for logging."""
return _app_id.get()


def get_workflow_id() -> str:
"""Get current workflow_id for logging."""
return _workflow_id.get()


def get_node_id() -> str:
"""Get current node_id for logging."""
return _node_id.get()


def set_workflow_log_context(app_id: str, workflow_id: str) -> None:
"""Set workflow-level log context (app_id, workflow_id).

Call at graph start. Use ``clear_workflow_log_context`` at graph end.
"""
_app_id.set(app_id)
_workflow_id.set(workflow_id)


def set_node_log_context(node_id: str) -> None:
"""Set or clear node-level log context.

Pass empty string to clear node_id between node executions.
"""
_node_id.set(node_id)


def clear_workflow_log_context() -> None:
"""Clear workflow log context (app_id, workflow_id, node_id).

Call at graph end to ensure no stale context leaks to subsequent logs.

Note: This does **not** reset ``error_source``. When ``on_graph_end``
receives a non-None error, the subsequent ``logger.exception`` call in
``WorkflowEntry.run`` still needs ``error_source == WORKFLOW`` to
correctly classify the error. ``error_source`` is reset separately
via ``clear_error_source`` after the error has been logged.
"""
_app_id.set("")
_workflow_id.set("")
_node_id.set("")


# ---------------------------------------------------------------------------
# Error source context
# ---------------------------------------------------------------------------


def get_error_source() -> ErrorSource:
"""Get current error_source for logging.

Defaults to ``ErrorSource.SYSTEM`` when no execution context is active.
Set to ``ErrorSource.WORKFLOW`` by ``WorkflowEntry.run`` during
workflow graph execution.
"""
return _error_source.get()


def set_error_source(source: ErrorSource) -> None:
"""Set error_source context.

Typically called by ``WorkflowEntry.run`` with
``ErrorSource.WORKFLOW`` before graph execution starts.
"""
_error_source.set(source)


def clear_error_source() -> None:
"""Reset error_source context to the default (SYSTEM)."""
_error_source.set(ErrorSource.SYSTEM)
24 changes: 23 additions & 1 deletion api/core/logging/filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,13 @@

import flask

from core.logging.context import get_request_id, get_trace_id
from core.logging.context import (
get_app_id,
get_node_id,
get_request_id,
get_trace_id,
get_workflow_id,
)
from core.logging.structured_formatter import IdentityDict


Expand Down Expand Up @@ -97,3 +103,19 @@ def _extract_identity(self) -> IdentityDict:
return identity
except Exception:
return {}


class WorkflowLogContextFilter(logging.Filter):
"""Inject workflow log context (app_id, workflow_id, node_id) into log records.

Values are read from ContextVars that are managed by ``WorkflowEntry.run``
(app_id / workflow_id / error_source) and ``WorkflowLogContextLayer``
(node_id).
"""

@override
def filter(self, record: logging.LogRecord) -> bool:
record.app_id = get_app_id()
record.workflow_id = get_workflow_id()
record.node_id = get_node_id()
return True
45 changes: 45 additions & 0 deletions api/core/logging/structured_formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import orjson

from configs import dify_config
from core.logging.context import get_error_source


class IdentityDict(TypedDict, total=False):
Expand All @@ -16,6 +17,12 @@ class IdentityDict(TypedDict, total=False):
user_type: str


class LogContextDict(TypedDict, total=False):
app_id: str
workflow_id: str
node_id: str


class LogDict(TypedDict):
ts: str
severity: str
Expand All @@ -25,6 +32,8 @@ class LogDict(TypedDict):
trace_id: NotRequired[str]
span_id: NotRequired[str]
identity: NotRequired[IdentityDict]
context: NotRequired[LogContextDict]
error_source: NotRequired[str]
attributes: NotRequired[dict[str, Any]]
stack_trace: NotRequired[str]

Expand Down Expand Up @@ -93,6 +102,15 @@ def _build_log_dict(self, record: logging.LogRecord) -> LogDict:
if identity:
log_dict["identity"] = identity

# Workflow log context (from WorkflowLogContextFilter)
context = self._extract_log_context(record)
if context:
log_dict["context"] = context

# Error source inference (ERROR and above only)
if record.levelno >= logging.ERROR:
log_dict["error_source"] = self._infer_error_source(record)

# Dynamic attributes
attributes = getattr(record, "attributes", None)
if attributes:
Expand Down Expand Up @@ -121,6 +139,33 @@ def _extract_identity(self, record: logging.LogRecord) -> IdentityDict | None:
identity["user_type"] = user_type
return identity

def _extract_log_context(self, record: logging.LogRecord) -> LogContextDict | None:
"""Extract workflow log context (app_id, workflow_id, node_id) from record."""
app_id = getattr(record, "app_id", "") or ""
workflow_id = getattr(record, "workflow_id", "") or ""
node_id = getattr(record, "node_id", "") or ""

if not any([app_id, workflow_id, node_id]):
return None

context: LogContextDict = {}
if app_id:
context["app_id"] = app_id
if workflow_id:
context["workflow_id"] = workflow_id
if node_id:
context["node_id"] = node_id
return context

def _infer_error_source(self, record: logging.LogRecord) -> str:
"""Return the error_source for this ERROR+ log record.

The value comes from the ``_error_source`` ContextVar, which defaults
to ``"system"`` and is set to ``"workflow"`` by ``WorkflowEntry.run``
during workflow graph execution.
"""
return get_error_source().value

def _format_exception(self, exc_info: tuple[Any, ...]) -> str:
if exc_info and exc_info[0] is not None:
return "".join(traceback.format_exception(*exc_info))
Expand Down
Loading
Loading