Skip to content

Commit 7d18595

Browse files
authored
šŸ› fix(repository): restore slow-path bucket read dropped by shard migration (backport to 0.10.x) (#431)
## Summary Backports the slow-path bucket-read fix from #429 (merged to `main`) to the `release/0.10.x` maintenance branch for a **v0.10.3** patch release. **Bug:** `Repository.batch_get_entity_and_buckets()` — the `acquire()` slow-path / refill-recovery read — classified `BatchGetItem` response items using the stale `sk.startswith(schema.SK_BUCKET)` (`"#BUCKET#"`) prefix. Bucket state records use `SK == "#STATE"` (`sk_state()`) since the per-shard partition-key migration ([GHSA-76rv-2r9v-c5m6](GHSA-76rv-2r9v-c5m6)), so the filter never matched and every bucket was silently dropped. The slow path then treated existing buckets as new, the conditional write failed, and `acquire()` raised `RateLimitExceeded` with `retry_after=0.0` instead of refilling. **Effect:** wait-then-acquire refill recovery was broken. - With `speculative_writes=True`, it breaks the refill fallback (masked in production only when the aggregator Lambda refills out-of-band). - With `speculative_writes=False` or `--no-aggregator`, it is fully broken. This bug shipped in **v0.10.1** and is present in **v0.10.2**; this PR delivers the fix as **v0.10.3**. **Fix:** one line — `elif sk.startswith(schema.SK_BUCKET):` → `elif sk == schema.sk_state():` (async source `repository.py`; regenerated into `sync_repository.py`). Cherry-picked cleanly from `main` commits `7e61fd3` (fix + unit tests) and `8123fea` (LocalStack tests), with `-x` provenance recorded. ## Test plan All verified locally on this branch against live LocalStack: - [x] **unit (moto):** `batch_get_entity_and_buckets` returns existing buckets incl. no-`#META` trigger; wait-then-acquire recovery on speculative + non-speculative paths (async + generated sync) — 8 passed. - [x] **integration/e2e/benchmark (LocalStack, no-aggregator stacks):** direct read coverage, exhaust→wait→acquire recovery, recovery stress loop — 4 passed. - [x] **sync-generation:** no drift; `ruff` + `mypy` clean. ## References - Original fix: #429 - Bug report: #428 (already closed on `main`; referenced here as context only) This PR targets the `0.10.x` maintenance line. šŸ¤– Generated with [Claude Code](https://claude.ai/code)
2 parents 4eb1a2b + d2c2325 commit 7d18595

9 files changed

Lines changed: 296 additions & 3 deletions

File tree

ā€Žsrc/zae_limiter/repository.pyā€Ž

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -449,6 +449,13 @@ async def _get_caller_identity_arn(self) -> str | None:
449449

450450
def _now_ms(self) -> int:
451451
"""Current time in milliseconds."""
452+
# TODO(clock-seam): _now_ms() is not the single source of truth for time.
453+
# limiter.py and lease.py compute now via inline `int(time.time() * 1000)`
454+
# instead of routing through here, so a single acquire() reads the clock
455+
# from multiple places and `_now_ms` cannot be monkeypatched to control
456+
# time in tests. Make `_now_ms()` the injectable clock seam: add it to
457+
# RepositoryProtocol and route limiter.py/lease.py through
458+
# `self._repository._now_ms()` (regenerating all sync twins).
452459
return int(time.time() * 1000)
453460

454461
# -------------------------------------------------------------------------
@@ -1650,7 +1657,7 @@ async def batch_get_entity_and_buckets(
16501657
sk = item.get("SK", {}).get("S", "")
16511658
if sk == schema.sk_meta():
16521659
entity = self._deserialize_entity(item)
1653-
elif sk.startswith(schema.SK_BUCKET):
1660+
elif sk == schema.sk_state():
16541661
for bucket in self._deserialize_composite_bucket(item):
16551662
key = (bucket.entity_id, bucket.resource, bucket.limit_name)
16561663
buckets[key] = bucket

ā€Žsrc/zae_limiter/sync_repository.pyā€Ž

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1362,7 +1362,7 @@ def batch_get_entity_and_buckets(
13621362
sk = item.get("SK", {}).get("S", "")
13631363
if sk == schema.sk_meta():
13641364
entity = self._deserialize_entity(item)
1365-
elif sk.startswith(schema.SK_BUCKET):
1365+
elif sk == schema.sk_state():
13661366
for bucket in self._deserialize_composite_bucket(item):
13671367
key = (bucket.entity_id, bucket.resource, bucket.limit_name)
13681368
buckets[key] = bucket

ā€Žtests/benchmark/test_localstack.pyā€Ž

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121

2222
import pytest
2323

24-
from zae_limiter import Limit
24+
from zae_limiter import Limit, RateLimitExceeded
2525

2626
pytestmark = [pytest.mark.benchmark, pytest.mark.integration]
2727

@@ -656,3 +656,53 @@ def operation():
656656
time.sleep(0.5) # Final wait for processing
657657

658658
benchmark(operation)
659+
660+
661+
class TestLocalStackRefillRecoveryStress:
662+
"""E2E stress test for client-side refill recovery (regression for #428).
663+
664+
Repeatedly drives the acquire() slow-path refill-recovery against LocalStack:
665+
exhaust -> brief real wait -> re-acquire succeeds. Before the fix this raised
666+
RateLimitExceeded(retry_after=0.0) once the bucket was drained, because
667+
batch_get_entity_and_buckets() dropped the existing bucket (stale "#BUCKET#"
668+
SK filter vs SK=#STATE, GHSA-76rv-2r9v-c5m6) and the slow path treated it as new.
669+
670+
Uses sync_localstack_limiter (the no-aggregator minimal stack) on purpose: the
671+
aggregator would refill buckets out-of-band and mask the client path that broke.
672+
673+
This is a correctness/stress test, not a micro-benchmark -- it does not use the
674+
`benchmark` fixture, so it is skipped under `--benchmark-only` but runs on a
675+
normal `pytest tests/benchmark/test_localstack.py` invocation.
676+
"""
677+
678+
def test_refill_recovery_stress_loop(self, sync_localstack_limiter):
679+
"""Exhaust -> wait -> recover, repeated over many entities."""
680+
# 100 tokens, refills the full bucket every second.
681+
limits = [Limit.custom("rpm", capacity=100, refill_amount=100, refill_period_seconds=1)]
682+
683+
iterations = 5
684+
for i in range(iterations):
685+
entity_id = f"ls-recover-{i}"
686+
687+
# Drain the bucket completely.
688+
with sync_localstack_limiter.acquire(
689+
entity_id=entity_id, resource="api", limits=limits, consume={"rpm": 100}
690+
):
691+
pass
692+
693+
# Exhausted: rejection must report a real wait, not the buggy 0.0.
694+
with pytest.raises(RateLimitExceeded) as exc_info:
695+
with sync_localstack_limiter.acquire(
696+
entity_id=entity_id, resource="api", limits=limits, consume={"rpm": 100}
697+
):
698+
pass
699+
assert exc_info.value.retry_after_seconds > 0, (
700+
f"iteration {i}: rejection should report a real retry_after"
701+
)
702+
703+
# Wait for partial refill, then recover via the slow path.
704+
time.sleep(0.8)
705+
with sync_localstack_limiter.acquire(
706+
entity_id=entity_id, resource="api", limits=limits, consume={"rpm": 50}
707+
) as lease:
708+
assert lease.consumed == {"rpm": 50}, f"iteration {i}: recovery acquire failed"

ā€Žtests/e2e/test_localstack.pyā€Ž

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -915,6 +915,40 @@ async def test_negative_bucket_handling(self, e2e_limiter_minimal):
915915
# Consumed 15 tokens with capacity 10, so at least -3 after some refill
916916
assert available["rpm"] <= -3, "Bucket should still be significantly negative"
917917

918+
@pytest.mark.asyncio(loop_scope="class")
919+
async def test_acquire_recovers_after_refill_wait(self, e2e_limiter_minimal):
920+
"""An exhausted bucket recovers after enough time passes (regression #428).
921+
922+
Runs on the no-aggregator stack so the client refill-recovery slow path is
923+
what's exercised. On the aggregator stack the Lambda refills the bucket
924+
out-of-band and the client path never runs -- which is exactly why this bug
925+
(acquire raising RateLimitExceeded with retry_after=0.0 instead of
926+
refilling) reached production in v0.10.1 undetected.
927+
"""
928+
# 100 tokens, refills the full bucket every second.
929+
limits = [Limit.custom("rpm", capacity=100, refill_amount=100, refill_period_seconds=1)]
930+
931+
# Drain the bucket completely.
932+
async with e2e_limiter_minimal.acquire(
933+
entity_id="recover-user", resource="api", limits=limits, consume={"rpm": 100}
934+
):
935+
pass
936+
937+
# Immediately exhausted: the rejection must report a real wait, not 0.0.
938+
with pytest.raises(RateLimitExceeded) as exc_info:
939+
async with e2e_limiter_minimal.acquire(
940+
entity_id="recover-user", resource="api", limits=limits, consume={"rpm": 100}
941+
):
942+
pass
943+
assert exc_info.value.retry_after_seconds > 0
944+
945+
# After refilling, the same acquire must succeed (slow-path refill recovery).
946+
await asyncio.sleep(1.1)
947+
async with e2e_limiter_minimal.acquire(
948+
entity_id="recover-user", resource="api", limits=limits, consume={"rpm": 50}
949+
) as lease:
950+
assert lease.consumed == {"rpm": 50}
951+
918952

919953
class TestE2ECloudFormationStackVariations:
920954
"""E2E tests for CloudFormation stack deployment variations."""

ā€Žtests/integration/test_repository.pyā€Ž

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,54 @@ async def test_batch_get_buckets_empty_key_list(self, test_repo):
338338
result = await test_repo.batch_get_buckets([])
339339
assert result == {}
340340

341+
@pytest.mark.asyncio
342+
async def test_batch_get_entity_and_buckets_returns_existing_buckets(self, test_repo):
343+
"""The acquire slow-path read must return buckets that exist (real DynamoDB).
344+
345+
Regression for #428: batch_get_entity_and_buckets() classified response
346+
items with the stale "#BUCKET#" SK prefix, but buckets use SK=#STATE since
347+
the per-shard migration (GHSA-76rv-2r9v-c5m6), so every bucket was dropped.
348+
The sibling batch_get_buckets() (tested above) had no such filter, which is
349+
why this gap survived. This exercises a real BatchGetItem response.
350+
"""
351+
await test_repo.create_entity("bge-entity")
352+
limits = [Limit.per_minute("rpm", 100), Limit.per_minute("tpm", 10_000)]
353+
now_ms = int(time.time() * 1000)
354+
states = [BucketState.from_limit("bge-entity", "gpt-4", limit, now_ms) for limit in limits]
355+
await test_repo.transact_write(
356+
[test_repo.build_composite_create("bge-entity", "gpt-4", states, now_ms)]
357+
)
358+
359+
entity, buckets = await test_repo.batch_get_entity_and_buckets(
360+
"bge-entity", [("bge-entity", "gpt-4")]
361+
)
362+
363+
assert entity is not None
364+
assert ("bge-entity", "gpt-4", "rpm") in buckets
365+
assert ("bge-entity", "gpt-4", "tpm") in buckets
366+
367+
@pytest.mark.asyncio
368+
async def test_batch_get_entity_and_buckets_finds_bucket_without_meta(self, test_repo):
369+
"""Buckets must be returned even when the entity has no #META record.
370+
371+
Real-world trigger: acquire() without a prior create_entity() writes a
372+
bucket but no META record. The slow-path read must still find the bucket
373+
(entity is None, bucket dict populated). Regression for #428.
374+
"""
375+
limits = [Limit.per_minute("rpm", 100)]
376+
now_ms = int(time.time() * 1000)
377+
states = [BucketState.from_limit("bge-bare", "gpt-4", limit, now_ms) for limit in limits]
378+
await test_repo.transact_write(
379+
[test_repo.build_composite_create("bge-bare", "gpt-4", states, now_ms)]
380+
)
381+
382+
entity, buckets = await test_repo.batch_get_entity_and_buckets(
383+
"bge-bare", [("bge-bare", "gpt-4")]
384+
)
385+
386+
assert entity is None
387+
assert ("bge-bare", "gpt-4", "rpm") in buckets
388+
341389
@pytest.mark.asyncio
342390
async def test_batch_get_buckets_deduplication(self, test_repo):
343391
"""Should deduplicate duplicate keys in the request."""

ā€Žtests/unit/test_limiter.pyā€Ž

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,54 @@ async def test_acquire_fallback_when_batch_not_supported(self, limiter, monkeypa
212212
assert lease.consumed == {"rpm": 1}
213213

214214

215+
class TestRateLimiterRefillRecovery:
216+
"""Wait-then-acquire: an exhausted bucket recovers after enough time passes.
217+
218+
Regression for the stale slow-path bucket discriminator (buckets moved to
219+
SK=#STATE in the per-shard migration, but batch_get_entity_and_buckets still
220+
filtered on the old "#BUCKET#" prefix). With buckets silently dropped, the
221+
refill-recovery fallback treated existing buckets as new and the conditional
222+
write failed with a bogus retry_after=0.0 instead of refilling.
223+
"""
224+
225+
async def test_acquire_succeeds_after_refill_wait(self, limiter):
226+
"""Exhaust a bucket, wait for refill, and acquire again (speculative on)."""
227+
# 100 tokens, refills the full bucket every second.
228+
limits = [Limit.custom("rpm", capacity=100, refill_amount=100, refill_period_seconds=1)]
229+
230+
# Drain the bucket completely.
231+
async with limiter.acquire("key-1", "gpt-4", limits=limits, consume={"rpm": 100}):
232+
pass
233+
234+
# Immediately exhausted: rejection must report a real wait, not 0.0.
235+
with pytest.raises(RateLimitExceeded) as exc_info:
236+
async with limiter.acquire("key-1", "gpt-4", limits=limits, consume={"rpm": 100}):
237+
pass
238+
assert exc_info.value.retry_after_seconds > 0
239+
240+
# After refilling, the same acquire must succeed.
241+
await asyncio.sleep(1.1)
242+
async with limiter.acquire("key-1", "gpt-4", limits=limits, consume={"rpm": 50}) as lease:
243+
assert lease.consumed == {"rpm": 50}
244+
245+
async def test_acquire_succeeds_after_refill_wait_non_speculative(self, limiter):
246+
"""Same recovery on the pure slow path (speculative writes disabled)."""
247+
slow = RateLimiter(repository=limiter._repository, speculative_writes=False)
248+
limits = [Limit.custom("rpm", capacity=100, refill_amount=100, refill_period_seconds=1)]
249+
250+
async with slow.acquire("key-2", "gpt-4", limits=limits, consume={"rpm": 100}):
251+
pass
252+
253+
with pytest.raises(RateLimitExceeded) as exc_info:
254+
async with slow.acquire("key-2", "gpt-4", limits=limits, consume={"rpm": 100}):
255+
pass
256+
assert exc_info.value.retry_after_seconds > 0
257+
258+
await asyncio.sleep(1.1)
259+
async with slow.acquire("key-2", "gpt-4", limits=limits, consume={"rpm": 50}) as lease:
260+
assert lease.consumed == {"rpm": 50}
261+
262+
215263
class TestRateLimiterLease:
216264
"""Tests for Lease functionality."""
217265

ā€Žtests/unit/test_repository.pyā€Ž

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -330,6 +330,43 @@ async def test_batch_get_buckets_empty_keys(self, repo):
330330
result = await repo.batch_get_buckets([])
331331
assert result == {}
332332

333+
@pytest.mark.asyncio
334+
async def test_batch_get_entity_and_buckets_returns_existing_buckets(self, repo_with_buckets):
335+
"""The acquire slow-path read must return buckets that exist.
336+
337+
Regression for the stale bucket discriminator: buckets use SK=#STATE
338+
since the per-shard migration (GHSA-76rv), but the response filter still
339+
checked the pre-shard SK_BUCKET prefix ("#BUCKET#"), silently dropping
340+
every bucket. That made the slow path treat existing buckets as new.
341+
"""
342+
entity, buckets = await repo_with_buckets.batch_get_entity_and_buckets(
343+
"entity-1", [("entity-1", "gpt-4")]
344+
)
345+
346+
assert entity is not None # entity-1 was created via create_entity
347+
# Bucket must be discovered (the bug returned an empty dict here)
348+
assert ("entity-1", "gpt-4", "rpm") in buckets
349+
assert ("entity-1", "gpt-4", "tpm") in buckets
350+
351+
@pytest.mark.asyncio
352+
async def test_batch_get_entity_and_buckets_finds_bucket_without_meta(self, repo):
353+
"""Buckets must be returned even when the entity has no #META record.
354+
355+
This is the real-world trigger: acquire() on an entity that was never
356+
registered via create_entity() creates a bucket but no META record.
357+
The slow-path read still must find the bucket (entity is None, but the
358+
bucket dict is populated).
359+
"""
360+
limits = [Limit.per_minute("rpm", 100)]
361+
now_ms = int(time.time() * 1000)
362+
states = [BucketState.from_limit("bare-1", "gpt-4", limit, now_ms) for limit in limits]
363+
await repo.transact_write([repo.build_composite_create("bare-1", "gpt-4", states, now_ms)])
364+
365+
entity, buckets = await repo.batch_get_entity_and_buckets("bare-1", [("bare-1", "gpt-4")])
366+
367+
assert entity is None # never created via create_entity
368+
assert ("bare-1", "gpt-4", "rpm") in buckets # bug returned {} here
369+
333370
# -------------------------------------------------------------------------
334371
# batch_get_configs tests (issue #298)
335372
# -------------------------------------------------------------------------

ā€Žtests/unit/test_sync_limiter.pyā€Ž

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,44 @@ def test_acquire_fallback_when_batch_not_supported(self, sync_limiter, monkeypat
163163
assert lease.consumed == {"rpm": 1}
164164

165165

166+
class TestRateLimiterRefillRecovery:
167+
"""Wait-then-acquire: an exhausted bucket recovers after enough time passes.
168+
169+
Regression for the stale slow-path bucket discriminator (buckets moved to
170+
SK=#STATE in the per-shard migration, but batch_get_entity_and_buckets still
171+
filtered on the old "#BUCKET#" prefix). With buckets silently dropped, the
172+
refill-recovery fallback treated existing buckets as new and the conditional
173+
write failed with a bogus retry_after=0.0 instead of refilling.
174+
"""
175+
176+
def test_acquire_succeeds_after_refill_wait(self, sync_limiter):
177+
"""Exhaust a bucket, wait for refill, and acquire again (speculative on)."""
178+
limits = [Limit.custom("rpm", capacity=100, refill_amount=100, refill_period_seconds=1)]
179+
with sync_limiter.acquire("key-1", "gpt-4", limits=limits, consume={"rpm": 100}):
180+
pass
181+
with pytest.raises(RateLimitExceeded) as exc_info:
182+
with sync_limiter.acquire("key-1", "gpt-4", limits=limits, consume={"rpm": 100}):
183+
pass
184+
assert exc_info.value.retry_after_seconds > 0
185+
time.sleep(1.1)
186+
with sync_limiter.acquire("key-1", "gpt-4", limits=limits, consume={"rpm": 50}) as lease:
187+
assert lease.consumed == {"rpm": 50}
188+
189+
def test_acquire_succeeds_after_refill_wait_non_speculative(self, sync_limiter):
190+
"""Same recovery on the pure slow path (speculative writes disabled)."""
191+
slow = SyncRateLimiter(repository=sync_limiter._repository, speculative_writes=False)
192+
limits = [Limit.custom("rpm", capacity=100, refill_amount=100, refill_period_seconds=1)]
193+
with slow.acquire("key-2", "gpt-4", limits=limits, consume={"rpm": 100}):
194+
pass
195+
with pytest.raises(RateLimitExceeded) as exc_info:
196+
with slow.acquire("key-2", "gpt-4", limits=limits, consume={"rpm": 100}):
197+
pass
198+
assert exc_info.value.retry_after_seconds > 0
199+
time.sleep(1.1)
200+
with slow.acquire("key-2", "gpt-4", limits=limits, consume={"rpm": 50}) as lease:
201+
assert lease.consumed == {"rpm": 50}
202+
203+
166204
class TestRateLimiterLease:
167205
"""Tests for SyncLease functionality."""
168206

ā€Žtests/unit/test_sync_repository.pyā€Ž

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,37 @@ def test_batch_get_buckets_empty_keys(self, repo):
267267
result = repo.batch_get_buckets([])
268268
assert result == {}
269269

270+
def test_batch_get_entity_and_buckets_returns_existing_buckets(self, repo_with_buckets):
271+
"""The acquire slow-path read must return buckets that exist.
272+
273+
Regression for the stale bucket discriminator: buckets use SK=#STATE
274+
since the per-shard migration (GHSA-76rv), but the response filter still
275+
checked the pre-shard SK_BUCKET prefix ("#BUCKET#"), silently dropping
276+
every bucket. That made the slow path treat existing buckets as new.
277+
"""
278+
entity, buckets = repo_with_buckets.batch_get_entity_and_buckets(
279+
"entity-1", [("entity-1", "gpt-4")]
280+
)
281+
assert entity is not None
282+
assert ("entity-1", "gpt-4", "rpm") in buckets
283+
assert ("entity-1", "gpt-4", "tpm") in buckets
284+
285+
def test_batch_get_entity_and_buckets_finds_bucket_without_meta(self, repo):
286+
"""Buckets must be returned even when the entity has no #META record.
287+
288+
This is the real-world trigger: acquire() on an entity that was never
289+
registered via create_entity() creates a bucket but no META record.
290+
The slow-path read still must find the bucket (entity is None, but the
291+
bucket dict is populated).
292+
"""
293+
limits = [Limit.per_minute("rpm", 100)]
294+
now_ms = int(time.time() * 1000)
295+
states = [BucketState.from_limit("bare-1", "gpt-4", limit, now_ms) for limit in limits]
296+
repo.transact_write([repo.build_composite_create("bare-1", "gpt-4", states, now_ms)])
297+
entity, buckets = repo.batch_get_entity_and_buckets("bare-1", [("bare-1", "gpt-4")])
298+
assert entity is None
299+
assert ("bare-1", "gpt-4", "rpm") in buckets
300+
270301
def test_batch_get_configs_empty_keys(self, repo):
271302
"""batch_get_configs should return empty dict for empty keys list."""
272303
result = repo.batch_get_configs([])

0 commit comments

Comments
Ā (0)