Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion libs/core/langchain_core/callbacks/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2480,7 +2480,12 @@ def _configure(
run_tree.trace_id,
run_tree.dotted_order,
)
handler.run_map[str(run_tree.id)] = run_tree
run_id_str = str(run_tree.id)
if run_id_str not in handler.run_map:
handler.run_map[run_id_str] = run_tree
handler._external_run_ids.setdefault( # noqa: SLF001
run_id_str, 0
)
Comment on lines +2483 to +2488
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hard to follow what's going on here with variable names are not self-explnatory

for var, inheritable, handler_class, env_var in _configure_hooks:
create_one = (
env_var is not None
Expand Down
18 changes: 18 additions & 0 deletions libs/core/langchain_core/tracers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,15 @@ def _end_trace(self, run: Run) -> None:
if not run.parent_run_id:
self._persist_run(run)
self.run_map.pop(str(run.id))
# If this run's parent was injected from an external tracing context
# (e.g. a langsmith @traceable), decrement its child refcount and
# remove it from run_map once the last child is done.
parent_id = str(run.parent_run_id) if run.parent_run_id else None
if parent_id and parent_id in self._external_run_ids:
self._external_run_ids[parent_id] -= 1
if self._external_run_ids[parent_id] <= 0:
self.run_map.pop(parent_id, None)
del self._external_run_ids[parent_id]
self._on_run_update(run)

def on_chat_model_start(
Expand Down Expand Up @@ -568,6 +577,15 @@ async def _end_trace(self, run: Run) -> None:
if not run.parent_run_id:
await self._persist_run(run)
self.run_map.pop(str(run.id))
# If this run's parent was injected from an external tracing context
# (e.g. a langsmith @traceable), decrement its child refcount and
# remove it from run_map once the last child is done.
parent_id = str(run.parent_run_id) if run.parent_run_id else None
if parent_id and parent_id in self._external_run_ids:
self._external_run_ids[parent_id] -= 1
if self._external_run_ids[parent_id] <= 0:
self.run_map.pop(parent_id, None)
del self._external_run_ids[parent_id]
await self._on_run_update(run)

@override
Expand Down
15 changes: 15 additions & 0 deletions libs/core/langchain_core/tracers/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def __init__(
] = "original",
run_map: dict[str, Run] | None = None,
order_map: dict[UUID, tuple[UUID, str]] | None = None,
_external_run_ids: dict[str, int] | None = None,
**kwargs: Any,
) -> None:
"""Initialize the tracer.
Expand All @@ -74,6 +75,7 @@ def __init__(
it does NOT raise an attribute error `on_chat_model_start`
run_map: Optional shared map of run ID to run.
order_map: Optional shared map of run ID to trace ordering data.
_external_run_ids: Optional shared set of externally injected run IDs.
**kwargs: Additional keyword arguments that will be passed to the
superclass.
"""
Expand All @@ -87,6 +89,16 @@ def __init__(
self.order_map = order_map if order_map is not None else {}
"""Map of run ID to (trace_id, dotted_order). Cleared when tracer GCed."""

self._external_run_ids: dict[str, int] = (
_external_run_ids if _external_run_ids is not None else {}
)
"""Refcount of active children per externally-injected run ID.

These runs are added to `run_map` so child runs can find their parent,
but they are not managed by the tracer's callback lifecycle. When
the last child finishes the entry is evicted to avoid memory leaks.
"""

@abstractmethod
def _persist_run(self, run: Run) -> Coroutine[Any, Any, None] | None:
"""Persist a run."""
Expand Down Expand Up @@ -117,6 +129,9 @@ def _start_trace(self, run: Run) -> Coroutine[Any, Any, None] | None: # type: i
run.dotted_order += "." + current_dotted_order
if parent_run := self.run_map.get(str(run.parent_run_id)):
self._add_child_run(parent_run, run)
parent_key = str(run.parent_run_id)
if parent_key in self._external_run_ids:
self._external_run_ids[parent_key] += 1
else:
if self.log_missing_parent:
logger.debug(
Expand Down
1 change: 1 addition & 0 deletions libs/core/langchain_core/tracers/langchain.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ def copy_with_metadata_defaults(
metadata=merged_metadata,
run_map=self.run_map,
order_map=self.order_map,
_external_run_ids=self._external_run_ids,
)

def _start_trace(self, run: Run) -> None:
Expand Down
106 changes: 106 additions & 0 deletions libs/core/tests/unit_tests/runnables/test_tracing_interops.py
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,112 @@ def parent(_: Any) -> str:
assert kitten_run.dotted_order.startswith(grandchild_run.dotted_order)


def test_traceable_parent_run_map_cleanup() -> None:
"""External RunTree injected into run_map is cleaned up when its child ends.

When a `@traceable` function invokes a LangChain `Runnable`, the
`RunTree` is added to the tracer's `run_map` so child runs can
reference it. Previously the entry was never removed, causing a
memory leak that grew with every call.

Uses an explicit tracer so we can inspect `run_map` directly after
the call — the `_configure` insertion path is identical regardless
of whether the tracer was created internally or passed in.
"""
tracer = _create_tracer_with_mocked_client()

@RunnableLambda
def child(x: str) -> str:
return x

with tracing_context(client=tracer.client, enabled=True):

@traceable
def parent(x: str) -> str:
return child.invoke(x, config={"callbacks": [tracer]})

parent("hello")

assert tracer.run_map == {}, (
f"run_map should be empty but contains: "
f"{[getattr(v, 'name', k) for k, v in tracer.run_map.items()]}"
)


def test_traceable_parent_run_map_cleanup_with_sibling_children() -> None:
"""External parent survives in run_map until ALL its children finish.

When a `@traceable` function invokes a chain with multiple steps
(e.g. prompt | llm), each step is a sibling child of the same
intermediate run. The external parent must stay in `run_map` until
the last child completes, not be removed when the first child ends.
"""
from langchain_core.language_models.fake_chat_models import ( # noqa: PLC0415
FakeListChatModel,
)
from langchain_core.prompts import ChatPromptTemplate # noqa: PLC0415

tracer = _create_tracer_with_mocked_client()

prompt = ChatPromptTemplate.from_messages([("system", "bot"), ("human", "{input}")])
llm = FakeListChatModel(responses=["hi"])
chain = prompt | llm

with tracing_context(client=tracer.client, enabled=True):

@traceable
def parent(x: dict) -> Any:
return chain.invoke(x, config={"callbacks": [tracer]})

result = parent({"input": "hello"})

assert result is not None
assert tracer.run_map == {}, (
f"run_map should be empty but contains: "
f"{[getattr(v, 'name', k) for k, v in tracer.run_map.items()]}"
)


def test_traceable_parent_run_map_no_runttree_accumulation() -> None:
"""RunTree objects reachable from run_map must not grow across calls.

This is the memory-level regression test: a long-lived tracer is
reused across many @traceable → Runnable invocations. Without the
fix, each call leaves a RunTree (plus its child tree) in run_map,
causing unbounded growth. With the fix, run_map is empty after
every call, so the count stays flat.
"""
import gc # noqa: PLC0415

tracer = _create_tracer_with_mocked_client()

@RunnableLambda
def child(x: str) -> str:
return x

counts: list[int] = []
with tracing_context(client=tracer.client, enabled=True):

@traceable
def parent(x: str) -> str:
return child.invoke(x, config={"callbacks": [tracer]})

for _ in range(5):
parent("hello")
gc.collect()
# Count RunTree objects reachable from the tracer's run_map.
run_map_runtrees = sum(
1 + len(v.child_runs) for v in tracer.run_map.values()
)
counts.append(run_map_runtrees)

# With the fix every call cleans up → counts are all 0.
# Without the fix they grow: [1, 2, 3, 4, 5] (or more with children).
assert counts == [0, 0, 0, 0, 0], (
f"RunTree objects in run_map should not accumulate, got counts: {counts}"
)


class TestTracerMetadataThroughInvoke:
"""Tests for tracer metadata merging through invoke calls."""

Expand Down
Loading