-
Notifications
You must be signed in to change notification settings - Fork 5.5k
Description
Describe the Bug
Race Condition in ConcurrentStorage:
The ConcurrentStorage.save_run_sync() method claims to use file locking for thread-safe writes (as stated in its docstring), but the actual implementation bypasses all locking mechanisms. This creates a race condition when synchronous saves are mixed with asynchronous operations, potentially leading to data corruption or lost writes.
Cache Invalidation Bug:
Cache consistency issue where the cache is updated before the write is flushed to disk, allowing stale reads if a cached read occurs between cache update and batch flush.
To Reproduce
Steps to reproduce the behavior:
- Create a ConcurrentStorage instance and start it with await storage.start().
- Call storage.save_run_sync(run1) from one thread.
- Simultaneously call await storage.save_run(run2) for the same run ID (or overlapping file) from an async context.
- Observe potential data corruption or inconsistent state.
Minimal reproduction code:
import asyncio
import threading
from framework.storage.concurrent import ConcurrentStorage
from framework.schemas.run import Run
async def main():
storage = ConcurrentStorage("/tmp/test_storage")
await storage.start()
run = Run(id="test_run", goal_id="goal_1", ...)
# Start async save (uses locking)
async_task = asyncio.create_task(storage.save_run(run))
# Simultaneously do sync save (NO locking!)
thread = threading.Thread(target=storage.save_run_sync, args=(run,))
thread.start()
await async_task
thread.join()
# Data may be corrupted or inconsistent
Expected Behavior
- The save_run_sync() method should use proper file locking (consistent with the docstring that says "uses base storage directly with lock").
- Cache should only be updated after the write is confirmed/flushed, not before.
- Synchronous and asynchronous operations should be safe to use together.
Screenshots
N/A - Code-level bug
Environment
- OS: Any (Linux, macOS, Windows)
- Python version: 3.11+
- Docker version: N/A
Configuration
No special configuration required - this affects the default ConcurrentStorage implementation.
Logs
No error logs produced - this is a silent data corruption issue.
Additional Context
File: core/framework/storage/concurrent.py
Problematic Code (Lines 349-356):
# === SYNC API (for backward compatibility) ===
def save_run_sync(self, run: Run) -> None:
"""Synchronous save (uses base storage directly with lock)."""
# Comment says "with lock" but NO LOCK IS USED!
self._base_storage.save_run(run)
def load_run_sync(self, run_id: str) -> Run | None:
"""Synchronous load (uses base storage directly)."""
return self._base_storage.load_run(run_id)
Cache Issue (Lines 139-141):
async def save_run(self, run: Run, immediate: bool = False) -> None:
if immediate or not self._running:
await self._save_run_locked(run)
else:
await self._write_queue.put(("run", run))
# Cache updated BEFORE write is flushed!
self._cache[f"run:{run.id}"] = CacheEntry(run, time.time())
Suggested Fix:
For the sync API:
def save_run_sync(self, run: Run) -> None:
"""Synchronous save (uses base storage directly with lock)."""
import threading
if not hasattr(self, '_sync_lock'):
self._sync_lock = threading.Lock()
with self._sync_lock:
self._base_storage.save_run(run)
For the cache issue:
async def save_run(self, run: Run, immediate: bool = False) -> None:
if immediate or not self._running:
await self._save_run_locked(run)
# Update cache AFTER successful write
self._cache[f"run:{run.id}"] = CacheEntry(run, time.time())
else:
await self._write_queue.put(("run", run))
# Don't update cache until batch flush confirms write