Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions libs/core/langchain_core/language_models/chat_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2298,6 +2298,73 @@ async def _call_async(
msg = "Unexpected generation type"
raise ValueError(msg)

# -- Resource lifecycle ---------------------------------------------------

def close(self) -> None:
"""Release any synchronous resources held by this model.

Default: no-op. Override in subclasses that own HTTP clients,
connection pools, or other resources whose lifetime should be
tied to the model. Idempotent — safe to call multiple times.

Note:
Provider SDKs typically back chat models with httpx pools that
they only close best-effort from `__del__`. In long-lived
workers that construct models per request, calling `close()`
(or `aclose()`) when done prevents pool accumulation.

Example:
```python
with ChatProvider(model="foo") as model:
model.invoke("hi")
# HTTP clients released here.
```
"""

async def aclose(self) -> None:
"""Release any asynchronous resources held by this model.

Default: dispatches to `close()`. Override when async resources
(e.g., `httpx.AsyncClient`, anyio task groups) need an event-loop
-aware teardown. Idempotent — safe to call multiple times.

Example:
```python
async with ChatProvider(model="foo") as model:
await model.ainvoke("hi")
# Async HTTP clients released here.
```
"""
self.close()

def __enter__(self) -> Self:
"""Enter the sync context — returns self."""
return self

def __exit__(
self,
exc_type: type[BaseException] | None,
exc: BaseException | None,
tb: object,
) -> None:
"""Exit the sync context — calls `close()`."""
del exc_type, exc, tb
self.close()

async def __aenter__(self) -> Self:
"""Enter the async context — returns self."""
return self

async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc: BaseException | None,
tb: object,
) -> None:
"""Exit the async context — calls `aclose()`."""
del exc_type, exc, tb
await self.aclose()

@property
@abstractmethod
def _llm_type(self) -> str:
Expand Down
7 changes: 6 additions & 1 deletion libs/core/langchain_core/runnables/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@
patch_config,
run_in_executor,
)
from langchain_core.runnables.fallbacks import RunnableWithFallbacks
from langchain_core.runnables.fallbacks import (
FallbackLatch,
RunnableWithFallbacks,
)
from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain_core.runnables.passthrough import (
RunnableAssign,
Expand All @@ -66,6 +69,7 @@
"ConfigurableFieldMultiOption",
"ConfigurableFieldSingleOption",
"ConfigurableFieldSpec",
"FallbackLatch",
"RouterInput",
"RouterRunnable",
"Runnable",
Expand Down Expand Up @@ -108,6 +112,7 @@
"get_config_list": "config",
"patch_config": "config",
"run_in_executor": "config",
"FallbackLatch": "fallbacks",
"RunnableWithFallbacks": "fallbacks",
"RunnableWithMessageHistory": "history",
"RunnableAssign": "passthrough",
Expand Down
13 changes: 13 additions & 0 deletions libs/core/langchain_core/runnables/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@
CallbackManagerForChainRun,
)
from langchain_core.prompts.base import BasePromptTemplate
from langchain_core.runnables.fallbacks import (
FallbackLatch as FallbackLatchT,
)
from langchain_core.runnables.fallbacks import (
RunnableWithFallbacks as RunnableWithFallbacksT,
)
Expand Down Expand Up @@ -2128,6 +2131,7 @@ def with_fallbacks(
*,
exceptions_to_handle: tuple[type[BaseException], ...] = (Exception,),
exception_key: str | None = None,
latch: FallbackLatchT | None = None,
) -> RunnableWithFallbacksT[Input, Output]:
"""Add fallbacks to a `Runnable`, returning a new `Runnable`.

Expand Down Expand Up @@ -2183,6 +2187,14 @@ def _generate(input: Iterator) -> Iterator[str]:

If used, the base `Runnable` and its fallbacks must accept a
dictionary as input.
latch: Optional shared
[`FallbackLatch`][langchain_core.runnables.fallbacks.FallbackLatch]
used to circuit-break the primary. When the latch is tripped (on
the primary's first handled exception) every subsequent call skips
the primary and starts at the first fallback. Useful when a primary
failure is unlikely to recover within the wrapper's lifetime — e.g.,
a wrong API key, where the unconditional re-try wastes a round-trip
on every call.

Returns:
A new `Runnable` that will try the original `Runnable`, and then each
Expand All @@ -2198,6 +2210,7 @@ def _generate(input: Iterator) -> Iterator[str]:
fallbacks=fallbacks,
exceptions_to_handle=exceptions_to_handle,
exception_key=exception_key,
latch=latch,
)

""" --- Helper methods for Subclasses --- """
Expand Down
Loading
Loading