Skip to content

Commit f8582a0

Browse files
authored
šŸ› fix(limiter): distinguish TransactionConflict from ConditionalCheckFailed in _commit_initial (#332) (#401)
## Summary - Distinguish `TransactionConflict` from `ConditionalCheckFailed` in `_is_condition_check_failure()` by inspecting `CancellationReasons` codes instead of treating all `TransactionCanceledException` the same - Add `_is_transaction_conflict()` helper and retry loop with exponential backoff (25ms base, 3 retries) for transient contention errors in `_commit_initial()` - Fix `_build_retry_failure_statuses()` to compute `retry_after_seconds` from bucket state via `calculate_retry_after()` instead of hardcoding `0.0` - Apply identical fixes to both async (`lease.py`) and sync (`sync_lease.py`) code paths ## Test plan - [x] Unit tests for `_is_condition_check_failure()`: pure ConditionalCheckFailed, pure TransactionConflict, mixed reasons, no response attribute - [x] Unit tests for `_is_transaction_conflict()`: pure conflict, condition-check-only, unrelated exceptions, ClientError fallback - [x] Unit test: TransactionConflict retries original transaction (not consumption-only path) - [x] Unit test: TransactionConflict exhausts retries and propagates exception - [x] Unit test: ConditionalCheckFailed still enters consumption-only retry path (regression) - [x] Unit test: mixed reasons prefer ConditionalCheckFailed over TransactionConflict - [x] Unit test: cascade TransactionConflict does not raise false RateLimitExceeded (issue scenario) - [x] Unit test: `_build_retry_failure_statuses()` computes non-zero `retry_after_seconds` from deficit - [x] Unit test: `_build_retry_failure_statuses()` returns 0.0 when no deficit - [ ] Run full unit test suite: `uv run pytest tests/unit/ -v` Closes #332 šŸ¤– Generated with [Claude Code](https://claude.ai/code)
2 parents 8e129fe + 25a50bb commit f8582a0

4 files changed

Lines changed: 720 additions & 49 deletions

File tree

ā€Žsrc/zae_limiter/lease.pyā€Ž

Lines changed: 82 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,20 @@
11
"""Lease management for rate limit acquisitions."""
22

3+
import asyncio
34
import logging
45
import time
56
from dataclasses import dataclass, field
67
from typing import TYPE_CHECKING, Any
78

8-
from .bucket import calculate_available, force_consume, try_consume
9+
from .bucket import calculate_available, calculate_retry_after, force_consume, try_consume
910
from .exceptions import RateLimitExceeded
1011
from .models import BucketState, Limit, LimitStatus
1112
from .schema import calculate_bucket_ttl_seconds
1213

14+
# TransactionConflict retry constants (Issue #332)
15+
_CONFLICT_MAX_RETRIES = 3
16+
_CONFLICT_BASE_DELAY_S = 0.025 # 25ms, doubles each retry: 25ms, 50ms, 100ms
17+
1318
if TYPE_CHECKING:
1419
from .repository_protocol import RepositoryProtocol
1520

@@ -262,12 +267,34 @@ async def _commit_initial(self) -> None:
262267
self._initial_committed = True
263268
return
264269

265-
try:
266-
await repo.transact_write(items)
267-
except Exception as exc:
268-
if not _is_condition_check_failure(exc):
269-
raise
270+
# Retry loop for TransactionConflict (Issue #332)
271+
condition_failed = False
272+
for attempt in range(_CONFLICT_MAX_RETRIES + 1):
273+
try:
274+
await repo.transact_write(items)
275+
break # success
276+
except Exception as exc:
277+
# Check ConditionalCheckFailed first — it takes priority over
278+
# TransactionConflict because it means the optimistic lock failed,
279+
# requiring the consumption-only retry path.
280+
if _is_condition_check_failure(exc):
281+
condition_failed = True
282+
break
283+
if _is_transaction_conflict(exc):
284+
if attempt < _CONFLICT_MAX_RETRIES:
285+
delay = _CONFLICT_BASE_DELAY_S * (2**attempt)
286+
logger.debug(
287+
"TransactionConflict (attempt %d/%d), retrying in %.3fs",
288+
attempt + 1,
289+
_CONFLICT_MAX_RETRIES,
290+
delay,
291+
)
292+
await asyncio.sleep(delay)
293+
continue
294+
raise # exhausted retries, propagate
295+
raise # other errors propagate unchanged
270296

297+
if condition_failed:
271298
# Retry path: ADD consumption only, CONDITION tk>=consumed per limit
272299
logger.debug("Normal write failed (optimistic lock), retrying consumption-only")
273300
retry_items: list[dict[str, Any]] = []
@@ -402,26 +429,67 @@ async def _rollback(self) -> None:
402429
)
403430

404431

432+
def _get_cancellation_reason_codes(exc: Exception) -> list[str] | None:
433+
"""Extract CancellationReasons codes from a TransactionCanceledException.
434+
435+
Returns a list of reason codes (e.g. ["ConditionalCheckFailed", "None"]),
436+
or None if the exception is not a TransactionCanceledException or has no reasons.
437+
"""
438+
response = getattr(exc, "response", None)
439+
if response is None:
440+
return None
441+
error_code = response.get("Error", {}).get("Code", "")
442+
exc_name = type(exc).__name__
443+
if exc_name != "TransactionCanceledException" and error_code != "TransactionCanceledException":
444+
return None
445+
reasons = response.get("CancellationReasons", [])
446+
return [r.get("Code", "None") for r in reasons]
447+
448+
405449
def _is_condition_check_failure(exc: Exception) -> bool:
406-
"""Check if an exception is a DynamoDB ConditionalCheckFailedException."""
450+
"""Check if an exception is a DynamoDB ConditionalCheckFailedException.
451+
452+
For TransactionCanceledException, inspects CancellationReasons to distinguish
453+
ConditionalCheckFailed (returns True) from TransactionConflict (returns False).
454+
"""
407455
exc_name = type(exc).__name__
408-
if exc_name in ("ConditionalCheckFailedException", "TransactionCanceledException"):
456+
if exc_name == "ConditionalCheckFailedException":
409457
return True
410-
# botocore wraps it in ClientError
458+
# Check CancellationReasons for TransactionCanceledException
459+
reason_codes = _get_cancellation_reason_codes(exc)
460+
if reason_codes is not None:
461+
return "ConditionalCheckFailed" in reason_codes
462+
# botocore ClientError fallback (non-transaction)
411463
if hasattr(exc, "response"):
412464
error_code = getattr(exc, "response", {}).get("Error", {}).get("Code", "")
413-
if error_code in (
414-
"ConditionalCheckFailedException",
415-
"TransactionCanceledException",
416-
):
465+
if error_code == "ConditionalCheckFailedException":
417466
return True
418467
return False
419468

420469

470+
def _is_transaction_conflict(exc: Exception) -> bool:
471+
"""Check if an exception is a DynamoDB TransactionConflict.
472+
473+
TransactionConflict occurs when concurrent transactions touch the same items.
474+
Unlike ConditionalCheckFailed, this indicates transient contention that should
475+
be retried as-is (not via the consumption-only retry path).
476+
"""
477+
reason_codes = _get_cancellation_reason_codes(exc)
478+
if reason_codes is not None:
479+
return "TransactionConflict" in reason_codes
480+
return False
481+
482+
421483
def _build_retry_failure_statuses(entries: list[LeaseEntry]) -> list[LimitStatus]:
422484
"""Build LimitStatus list for a retry failure (rate limit exceeded)."""
423485
statuses: list[LimitStatus] = []
424486
for entry in entries:
487+
deficit_milli = max(0, entry.consumed * 1000 - entry.state.tokens_milli)
488+
retry_after = calculate_retry_after(
489+
deficit_milli=deficit_milli,
490+
refill_amount_milli=entry.limit.refill_amount * 1000,
491+
refill_period_ms=entry.limit.refill_period_seconds * 1000,
492+
)
425493
statuses.append(
426494
LimitStatus(
427495
entity_id=entry.entity_id,
@@ -431,7 +499,7 @@ def _build_retry_failure_statuses(entries: list[LeaseEntry]) -> list[LimitStatus
431499
available=entry.state.tokens_milli // 1000,
432500
requested=entry.consumed,
433501
exceeded=entry.consumed > 0,
434-
retry_after_seconds=0.0,
502+
retry_after_seconds=retry_after,
435503
)
436504
)
437505
return statuses

ā€Žsrc/zae_limiter/sync_lease.pyā€Ž

Lines changed: 72 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,13 @@
1111
from dataclasses import dataclass, field
1212
from typing import TYPE_CHECKING, Any
1313

14-
from .bucket import calculate_available, force_consume, try_consume
14+
from .bucket import calculate_available, calculate_retry_after, force_consume, try_consume
1515
from .exceptions import RateLimitExceeded
1616
from .models import BucketState, Limit, LimitStatus
1717
from .schema import calculate_bucket_ttl_seconds
1818

19+
_CONFLICT_MAX_RETRIES = 3
20+
_CONFLICT_BASE_DELAY_S = 0.025
1921
if TYPE_CHECKING:
2022
from .sync_repository_protocol import SyncRepositoryProtocol
2123
logger = logging.getLogger(__name__)
@@ -232,11 +234,29 @@ def _commit_initial(self) -> None:
232234
if not items:
233235
self._initial_committed = True
234236
return
235-
try:
236-
repo.transact_write(items)
237-
except Exception as exc:
238-
if not _is_condition_check_failure(exc):
237+
condition_failed = False
238+
for attempt in range(_CONFLICT_MAX_RETRIES + 1):
239+
try:
240+
repo.transact_write(items)
241+
break
242+
except Exception as exc:
243+
if _is_condition_check_failure(exc):
244+
condition_failed = True
245+
break
246+
if _is_transaction_conflict(exc):
247+
if attempt < _CONFLICT_MAX_RETRIES:
248+
delay = _CONFLICT_BASE_DELAY_S * 2**attempt
249+
logger.debug(
250+
"TransactionConflict (attempt %d/%d), retrying in %.3fs",
251+
attempt + 1,
252+
_CONFLICT_MAX_RETRIES,
253+
delay,
254+
)
255+
time.sleep(delay)
256+
continue
257+
raise
239258
raise
259+
if condition_failed:
240260
logger.debug("Normal write failed (optimistic lock), retrying consumption-only")
241261
retry_items: list[dict[str, Any]] = []
242262
for (entity_id, resource), group_entries in groups.items():
@@ -339,22 +359,65 @@ def _rollback(self) -> None:
339359
)
340360

341361

362+
def _get_cancellation_reason_codes(exc: Exception) -> list[str] | None:
363+
"""Extract CancellationReasons codes from a TransactionCanceledException.
364+
365+
Returns a list of reason codes (e.g. ["ConditionalCheckFailed", "None"]),
366+
or None if the exception is not a TransactionCanceledException or has no reasons.
367+
"""
368+
response = getattr(exc, "response", None)
369+
if response is None:
370+
return None
371+
error_code = response.get("Error", {}).get("Code", "")
372+
exc_name = type(exc).__name__
373+
if exc_name != "TransactionCanceledException" and error_code != "TransactionCanceledException":
374+
return None
375+
reasons = response.get("CancellationReasons", [])
376+
return [r.get("Code", "None") for r in reasons]
377+
378+
342379
def _is_condition_check_failure(exc: Exception) -> bool:
343-
"""Check if an exception is a DynamoDB ConditionalCheckFailedException."""
380+
"""Check if an exception is a DynamoDB ConditionalCheckFailedException.
381+
382+
For TransactionCanceledException, inspects CancellationReasons to distinguish
383+
ConditionalCheckFailed (returns True) from TransactionConflict (returns False).
384+
"""
344385
exc_name = type(exc).__name__
345-
if exc_name in ("ConditionalCheckFailedException", "TransactionCanceledException"):
386+
if exc_name == "ConditionalCheckFailedException":
346387
return True
388+
reason_codes = _get_cancellation_reason_codes(exc)
389+
if reason_codes is not None:
390+
return "ConditionalCheckFailed" in reason_codes
347391
if hasattr(exc, "response"):
348392
error_code = getattr(exc, "response", {}).get("Error", {}).get("Code", "")
349-
if error_code in ("ConditionalCheckFailedException", "TransactionCanceledException"):
393+
if error_code == "ConditionalCheckFailedException":
350394
return True
351395
return False
352396

353397

398+
def _is_transaction_conflict(exc: Exception) -> bool:
399+
"""Check if an exception is a DynamoDB TransactionConflict.
400+
401+
TransactionConflict occurs when concurrent transactions touch the same items.
402+
Unlike ConditionalCheckFailed, this indicates transient contention that should
403+
be retried as-is (not via the consumption-only retry path).
404+
"""
405+
reason_codes = _get_cancellation_reason_codes(exc)
406+
if reason_codes is not None:
407+
return "TransactionConflict" in reason_codes
408+
return False
409+
410+
354411
def _build_retry_failure_statuses(entries: list[LeaseEntry]) -> list[LimitStatus]:
355412
"""Build LimitStatus list for a retry failure (rate limit exceeded)."""
356413
statuses: list[LimitStatus] = []
357414
for entry in entries:
415+
deficit_milli = max(0, entry.consumed * 1000 - entry.state.tokens_milli)
416+
retry_after = calculate_retry_after(
417+
deficit_milli=deficit_milli,
418+
refill_amount_milli=entry.limit.refill_amount * 1000,
419+
refill_period_ms=entry.limit.refill_period_seconds * 1000,
420+
)
358421
statuses.append(
359422
LimitStatus(
360423
entity_id=entry.entity_id,
@@ -364,7 +427,7 @@ def _build_retry_failure_statuses(entries: list[LeaseEntry]) -> list[LimitStatus
364427
available=entry.state.tokens_milli // 1000,
365428
requested=entry.consumed,
366429
exceeded=entry.consumed > 0,
367-
retry_after_seconds=0.0,
430+
retry_after_seconds=retry_after,
368431
)
369432
)
370433
return statuses

0 commit comments

Comments
Ā (0)