Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 26 additions & 2 deletions libs/core/langchain_core/runnables/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@

not_set: list[Output] = []
result = not_set
last_remaining_indices: list[int] = list(range(len(inputs)))
try:
for attempt in self._sync_retrying():
with attempt:
Expand All @@ -245,6 +246,7 @@
]
if not remaining_indices:
break
last_remaining_indices = remaining_indices
pending_inputs = [inputs[i] for i in remaining_indices]
pending_configs = [config[i] for i in remaining_indices]
pending_run_managers = [run_manager[i] for i in remaining_indices]
Expand Down Expand Up @@ -279,12 +281,22 @@
if result is not_set:
result = cast("list[Output]", [e] * len(inputs))

# Map last retry results back to original indices so that
# successfully-retried values don't overwrite unrelated positions.
# Note: if all items succeed on the very first attempt, remaining_indices
# becomes empty on the second iteration → break before updating
# last_remaining_indices. In that case last_remaining_indices ==
# range(len(inputs)) and result == the full first-attempt output, so the
# zip still pairs correctly. However, every idx will already be in
# results_map, so last_result_map is never actually consulted.
last_result_map = dict(zip(last_remaining_indices, result))

Check failure on line 292 in libs/core/langchain_core/runnables/retry.py

View workflow job for this annotation

GitHub Actions / lint (libs/core, 3.14) / Python 3.14

ruff (B905)

langchain_core/runnables/retry.py:292:32: B905 `zip()` without an explicit `strict=` parameter help: Add explicit value for parameter `strict=`

Check failure on line 292 in libs/core/langchain_core/runnables/retry.py

View workflow job for this annotation

GitHub Actions / lint (libs/core, 3.10) / Python 3.10

ruff (B905)

langchain_core/runnables/retry.py:292:32: B905 `zip()` without an explicit `strict=` parameter help: Add explicit value for parameter `strict=`

Check failure on line 292 in libs/core/langchain_core/runnables/retry.py

View workflow job for this annotation

GitHub Actions / lint (libs/core, 3.13) / Python 3.13

ruff (B905)

langchain_core/runnables/retry.py:292:32: B905 `zip()` without an explicit `strict=` parameter help: Add explicit value for parameter `strict=`

Check failure on line 292 in libs/core/langchain_core/runnables/retry.py

View workflow job for this annotation

GitHub Actions / lint (libs/core, 3.11) / Python 3.11

ruff (B905)

langchain_core/runnables/retry.py:292:32: B905 `zip()` without an explicit `strict=` parameter help: Add explicit value for parameter `strict=`

Check failure on line 292 in libs/core/langchain_core/runnables/retry.py

View workflow job for this annotation

GitHub Actions / lint (libs/core, 3.12) / Python 3.12

ruff (B905)

langchain_core/runnables/retry.py:292:32: B905 `zip()` without an explicit `strict=` parameter help: Add explicit value for parameter `strict=`

outputs: list[Output | Exception] = []
for idx in range(len(inputs)):
if idx in results_map:
outputs.append(results_map[idx])
else:
outputs.append(result.pop(0))
outputs.append(last_result_map[idx])
return outputs

@override
Expand All @@ -311,6 +323,7 @@

not_set: list[Output] = []
result = not_set
last_remaining_indices: list[int] = list(range(len(inputs)))
try:
async for attempt in self._async_retrying():
with attempt:
Expand All @@ -321,6 +334,7 @@
]
if not remaining_indices:
break
last_remaining_indices = remaining_indices
pending_inputs = [inputs[i] for i in remaining_indices]
pending_configs = [config[i] for i in remaining_indices]
pending_run_managers = [run_manager[i] for i in remaining_indices]
Expand Down Expand Up @@ -354,12 +368,22 @@
if result is not_set:
result = cast("list[Output]", [e] * len(inputs))

# Map last retry results back to original indices so that
# successfully-retried values don't overwrite unrelated positions.
# Note: if all items succeed on the very first attempt, remaining_indices
# becomes empty on the second iteration → break before updating
# last_remaining_indices. In that case last_remaining_indices ==
# range(len(inputs)) and result == the full first-attempt output, so the
# zip still pairs correctly. However, every idx will already be in
# results_map, so last_result_map is never actually consulted.
last_result_map = dict(zip(last_remaining_indices, result))

Check failure on line 379 in libs/core/langchain_core/runnables/retry.py

View workflow job for this annotation

GitHub Actions / lint (libs/core, 3.14) / Python 3.14

ruff (B905)

langchain_core/runnables/retry.py:379:32: B905 `zip()` without an explicit `strict=` parameter help: Add explicit value for parameter `strict=`

Check failure on line 379 in libs/core/langchain_core/runnables/retry.py

View workflow job for this annotation

GitHub Actions / lint (libs/core, 3.10) / Python 3.10

ruff (B905)

langchain_core/runnables/retry.py:379:32: B905 `zip()` without an explicit `strict=` parameter help: Add explicit value for parameter `strict=`

Check failure on line 379 in libs/core/langchain_core/runnables/retry.py

View workflow job for this annotation

GitHub Actions / lint (libs/core, 3.13) / Python 3.13

ruff (B905)

langchain_core/runnables/retry.py:379:32: B905 `zip()` without an explicit `strict=` parameter help: Add explicit value for parameter `strict=`

Check failure on line 379 in libs/core/langchain_core/runnables/retry.py

View workflow job for this annotation

GitHub Actions / lint (libs/core, 3.11) / Python 3.11

ruff (B905)

langchain_core/runnables/retry.py:379:32: B905 `zip()` without an explicit `strict=` parameter help: Add explicit value for parameter `strict=`

Check failure on line 379 in libs/core/langchain_core/runnables/retry.py

View workflow job for this annotation

GitHub Actions / lint (libs/core, 3.12) / Python 3.12

ruff (B905)

langchain_core/runnables/retry.py:379:32: B905 `zip()` without an explicit `strict=` parameter help: Add explicit value for parameter `strict=`

outputs: list[Output | Exception] = []
for idx in range(len(inputs)):
if idx in results_map:
outputs.append(results_map[idx])
else:
outputs.append(result.pop(0))
outputs.append(last_result_map[idx])
return outputs

@override
Expand Down
71 changes: 71 additions & 0 deletions libs/core/tests/unit_tests/runnables/test_runnable.py
Original file line number Diff line number Diff line change
Expand Up @@ -3979,6 +3979,77 @@ def sometimes_fail(x: int) -> int: # pragma: no cover - trivial
assert results == [0, 1, 2]


def test_retry_batch_no_corruption_on_partial_retry() -> None:
"""Regression: items that still fail after retries must stay as exceptions.

When one item succeeds on retry while another permanently fails, the stale
index mapping caused the permanent failure to be replaced by the retried
success value (GH-35475).
"""
failed_once = False

def process(name: str) -> str:
nonlocal failed_once
if name == "ok":
return "ok-result"
if name == "retry_then_ok":
if not failed_once:
failed_once = True
msg = "transient"
raise ValueError(msg)
return "retry-result"
msg = "permanent"
raise ValueError(msg)

runnable = RunnableLambda(process).with_retry(
stop_after_attempt=2,
wait_exponential_jitter=False,
retry_if_exception_type=(ValueError,),
)

result = runnable.batch(
["ok", "retry_then_ok", "always_fail"],
return_exceptions=True,
)

assert result[0] == "ok-result"
assert result[1] == "retry-result"
assert isinstance(result[2], Exception)


async def test_async_retry_batch_no_corruption_on_partial_retry() -> None:
"""Async variant of the partial-retry corruption regression test."""
failed_once = False

def process(name: str) -> str:
nonlocal failed_once
if name == "ok":
return "ok-result"
if name == "retry_then_ok":
if not failed_once:
failed_once = True
msg = "transient"
raise ValueError(msg)
return "retry-result"
msg = "permanent"
raise ValueError(msg)

runnable = RunnableLambda(process).with_retry(
stop_after_attempt=2,
wait_exponential_jitter=False,
retry_if_exception_type=(ValueError,),
)

result = await runnable.abatch(
["ok", "retry_then_ok", "always_fail"],
return_exceptions=True,
)

assert result[0] == "ok-result"
assert result[1] == "retry-result"
assert isinstance(result[2], Exception)


async def test_async_retrying(mocker: MockerFixture) -> None:
def _lambda(x: int) -> int:
if x == 1:
Expand Down
Loading