Skip to content

Commit 20bea9c

Browse files
Merge pull request #2273 from krish341360/fix/concurrent-storage-race-condition
fix: race condition in ConcurrentStorage and cache invalidation bug
2 parents 92d0b6a + a7709d4 commit 20bea9c

File tree

2 files changed

+173
-3
lines changed

2 files changed

+173
-3
lines changed

core/framework/storage/concurrent.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -167,14 +167,18 @@ async def save_run(self, run: Run, immediate: bool = False) -> None:
167167
run: Run to save
168168
immediate: If True, save immediately (bypasses batching)
169169
"""
170+
# Invalidate summary cache since the run data is changing
171+
# This ensures load_summary() fetches fresh data after the save
172+
self._cache.pop(f"summary:{run.id}", None)
173+
170174
if immediate or not self._running:
171175
await self._save_run_locked(run)
176+
# Update cache only after successful immediate write
177+
self._cache[f"run:{run.id}"] = CacheEntry(run, time.time())
172178
else:
179+
# For batched writes, cache will be updated in _flush_batch after successful write
173180
await self._write_queue.put(("run", run))
174181

175-
# Update cache
176-
self._cache[f"run:{run.id}"] = CacheEntry(run, time.time())
177-
178182
async def _save_run_locked(self, run: Run) -> None:
179183
"""Save a run with file locking, including index locks."""
180184
lock_key = f"run:{run.id}"
@@ -363,8 +367,12 @@ async def _flush_batch(self, batch: list[tuple[str, Any]]) -> None:
363367
try:
364368
if item_type == "run":
365369
await self._save_run_locked(item)
370+
# Update cache only after successful batched write
371+
# This fixes the race condition where cache was updated before write completed
372+
self._cache[f"run:{item.id}"] = CacheEntry(item, time.time())
366373
except Exception as e:
367374
logger.error(f"Failed to save {item_type}: {e}")
375+
# Cache is NOT updated on failure - prevents stale/inconsistent cache state
368376

369377
async def _flush_pending(self) -> None:
370378
"""Flush all pending writes."""
Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
"""Tests for ConcurrentStorage race condition and cache invalidation fixes."""
2+
3+
import asyncio
4+
from pathlib import Path
5+
6+
import pytest
7+
8+
from framework.schemas.run import Run, RunMetrics, RunStatus
9+
from framework.storage.concurrent import ConcurrentStorage
10+
11+
12+
def create_test_run(
13+
run_id: str, goal_id: str = "test-goal", status: RunStatus = RunStatus.RUNNING
14+
) -> Run:
15+
"""Create a minimal test Run object."""
16+
return Run(
17+
id=run_id,
18+
goal_id=goal_id,
19+
status=status,
20+
narrative="Test run",
21+
metrics=RunMetrics(
22+
nodes_executed=[],
23+
),
24+
decisions=[],
25+
problems=[],
26+
)
27+
28+
29+
@pytest.mark.asyncio
30+
async def test_cache_invalidation_on_save(tmp_path: Path):
31+
"""Test that summary cache is invalidated when a run is saved.
32+
33+
This tests the fix for the cache invalidation bug where load_summary()
34+
would return stale data after a run was updated.
35+
"""
36+
storage = ConcurrentStorage(tmp_path)
37+
await storage.start()
38+
39+
try:
40+
run_id = "test-run-1"
41+
42+
# Create and save initial run
43+
run = create_test_run(run_id, status=RunStatus.RUNNING)
44+
await storage.save_run(run, immediate=True)
45+
46+
# Load summary to populate the cache
47+
summary = await storage.load_summary(run_id)
48+
assert summary is not None
49+
assert summary.status == RunStatus.RUNNING
50+
51+
# Update run with new status
52+
run.status = RunStatus.COMPLETED
53+
await storage.save_run(run, immediate=True)
54+
55+
# Load summary again - should get fresh data, not cached stale data
56+
summary = await storage.load_summary(run_id)
57+
assert summary is not None
58+
assert summary.status == RunStatus.COMPLETED, (
59+
"Summary cache should be invalidated on save - got stale data"
60+
)
61+
finally:
62+
await storage.stop()
63+
64+
65+
@pytest.mark.asyncio
66+
async def test_batched_write_cache_consistency(tmp_path: Path):
67+
"""Test that cache is only updated after successful batched write.
68+
69+
This tests the fix for the race condition where cache was updated
70+
before the batched write completed.
71+
"""
72+
storage = ConcurrentStorage(tmp_path, batch_interval=0.05)
73+
await storage.start()
74+
75+
try:
76+
run_id = "test-run-2"
77+
78+
# Save via batching (immediate=False)
79+
run = create_test_run(run_id, status=RunStatus.RUNNING)
80+
await storage.save_run(run, immediate=False)
81+
82+
# Before batch flush, cache should NOT contain the run
83+
# (This is the fix - previously cache was updated immediately)
84+
cache_key = f"run:{run_id}"
85+
assert cache_key not in storage._cache, (
86+
"Cache should not be updated before batch is flushed"
87+
)
88+
89+
# Wait for batch to flush
90+
await asyncio.sleep(0.1)
91+
92+
# After batch flush, cache should contain the run
93+
assert cache_key in storage._cache, "Cache should be updated after batch flush"
94+
95+
# Verify data on disk matches cache
96+
loaded_run = await storage.load_run(run_id, use_cache=False)
97+
assert loaded_run is not None
98+
assert loaded_run.id == run_id
99+
assert loaded_run.status == RunStatus.RUNNING
100+
finally:
101+
await storage.stop()
102+
103+
104+
@pytest.mark.asyncio
105+
async def test_immediate_write_updates_cache(tmp_path: Path):
106+
"""Test that immediate writes still update cache correctly."""
107+
storage = ConcurrentStorage(tmp_path)
108+
await storage.start()
109+
110+
try:
111+
run_id = "test-run-3"
112+
113+
# Save with immediate=True
114+
run = create_test_run(run_id, status=RunStatus.COMPLETED)
115+
await storage.save_run(run, immediate=True)
116+
117+
# Cache should be updated immediately for immediate writes
118+
cache_key = f"run:{run_id}"
119+
assert cache_key in storage._cache, "Cache should be updated after immediate write"
120+
121+
# Verify cached value is correct
122+
cached_run = storage._cache[cache_key].value
123+
assert cached_run.id == run_id
124+
assert cached_run.status == RunStatus.COMPLETED
125+
finally:
126+
await storage.stop()
127+
128+
129+
@pytest.mark.asyncio
130+
async def test_summary_cache_invalidated_on_multiple_saves(tmp_path: Path):
131+
"""Test that summary cache is invalidated on each save, not just the first."""
132+
storage = ConcurrentStorage(tmp_path)
133+
await storage.start()
134+
135+
try:
136+
run_id = "test-run-4"
137+
138+
# First save
139+
run = create_test_run(run_id, status=RunStatus.RUNNING)
140+
await storage.save_run(run, immediate=True)
141+
142+
# Load summary to cache it
143+
summary1 = await storage.load_summary(run_id)
144+
assert summary1.status == RunStatus.RUNNING
145+
146+
# Second save with new status
147+
run.status = RunStatus.RUNNING
148+
await storage.save_run(run, immediate=True)
149+
150+
# Load summary - should be fresh
151+
summary2 = await storage.load_summary(run_id)
152+
assert summary2.status == RunStatus.RUNNING
153+
154+
# Third save with final status
155+
run.status = RunStatus.COMPLETED
156+
await storage.save_run(run, immediate=True)
157+
158+
# Load summary - should be fresh again
159+
summary3 = await storage.load_summary(run_id)
160+
assert summary3.status == RunStatus.COMPLETED
161+
finally:
162+
await storage.stop()

0 commit comments

Comments
 (0)