Skip to content

Commit 9676541

Browse files
perf: throttle _checkpoint_state to avoid per-entry disk writes (#1086)
CHECKPOINT_INTERVAL_SECONDS (120s) was defined but not applied to the per-entry checkpoint calls inside _process_entry. This caused up to 250,000 atomic file writes for a 250k-entry mass-eval run. Structural boundaries (file completion, retries, init, end-of-run) still force-write immediately. Per-entry calls are now skipped if fewer than 120s have elapsed since the last write. Co-authored-by: florath-ai-assistant[bot] <Andreas.Florath@telekom.de>
1 parent c92c613 commit 9676541

File tree

1 file changed

+21
-11
lines changed

1 file changed

+21
-11
lines changed

src/aletheia_probe/cli_logic/mass_eval.py

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,8 @@ def __init__(self, state_path: Path, mode: str, input_path: Path):
198198
self.written_records: int = 0
199199
self.retry_count: int = 0
200200
self.collect_cache_hits: int = 0
201+
# Runtime-only: not persisted to disk
202+
self._last_checkpoint_time: float = 0.0
201203

202204
def to_dict(self) -> dict[str, Any]:
203205
"""Serialize state to dictionary."""
@@ -250,15 +252,23 @@ def _utc_now() -> str:
250252
return datetime.now(timezone.utc).isoformat()
251253

252254

253-
def _checkpoint_state(state: MassEvalState) -> None:
254-
"""Persist checkpoint state atomically."""
255+
def _checkpoint_state(state: MassEvalState, *, force: bool = False) -> None:
256+
"""Persist checkpoint state atomically.
257+
258+
Throttled to CHECKPOINT_INTERVAL_SECONDS to avoid writing on every entry.
259+
Use force=True at structural boundaries (file completion, retries, end-of-run).
260+
"""
261+
now = time.monotonic()
262+
if not force and (now - state._last_checkpoint_time) < CHECKPOINT_INTERVAL_SECONDS:
263+
return
255264
state.updated_at = _utc_now()
256265
payload = state.to_dict()
257266
tmp_path = state.state_path.with_suffix(state.state_path.suffix + ".tmp")
258267
tmp_path.parent.mkdir(parents=True, exist_ok=True)
259268
with open(tmp_path, "w", encoding="utf-8") as f:
260269
json.dump(payload, f, indent=2)
261270
tmp_path.replace(state.state_path)
271+
state._last_checkpoint_time = now
262272

263273

264274
def _load_or_init_state(
@@ -296,7 +306,7 @@ def _load_or_init_state(
296306
return state
297307

298308
state = MassEvalState(state_path=state_path, mode=mode, input_path=input_path)
299-
_checkpoint_state(state)
309+
_checkpoint_state(state, force=True)
300310
return state
301311

302312

@@ -454,7 +464,7 @@ async def _assess_with_retry(
454464
attempt_number = await on_retry()
455465
else:
456466
state.retry_count += 1
457-
_checkpoint_state(state)
467+
_checkpoint_state(state, force=True)
458468
attempt_number = state.retry_count
459469

460470
sleep_seconds = min(retry_delay, RETRY_MAX_SECONDS)
@@ -532,7 +542,7 @@ async def _collect_with_retry(
532542
attempt_number = await on_retry()
533543
else:
534544
state.retry_count += 1
535-
_checkpoint_state(state)
545+
_checkpoint_state(state, force=True)
536546
attempt_number = state.retry_count
537547

538548
sleep_seconds = min(retry_delay, RETRY_MAX_SECONDS)
@@ -694,7 +704,7 @@ async def _log_file_completion(summary_status: str) -> None:
694704
state.failed_files.pop(file_key, None)
695705
progress["completed_entry_indices"] = []
696706
progress["last_error"] = None
697-
_checkpoint_state(state)
707+
_checkpoint_state(state, force=True)
698708
await _log_file_completion("already_complete")
699709
return
700710

@@ -709,7 +719,7 @@ async def _log_file_completion(summary_status: str) -> None:
709719
state.completed_files.append(file_key)
710720
state.failed_files.pop(file_key, None)
711721
progress["last_error"] = None
712-
_checkpoint_state(state)
722+
_checkpoint_state(state, force=True)
713723
await _log_file_completion("already_complete_sparse")
714724
return
715725

@@ -723,7 +733,7 @@ async def _log_file_completion(summary_status: str) -> None:
723733
async def _reserve_retry_attempt() -> int:
724734
async with state_lock:
725735
state.retry_count += 1
726-
_checkpoint_state(state)
736+
_checkpoint_state(state, force=True)
727737
return state.retry_count
728738

729739
async def _process_entry(entry_index: int) -> None:
@@ -899,7 +909,7 @@ async def _worker() -> None:
899909
state.failed_files.pop(file_key, None)
900910
progress["completed_entry_indices"] = []
901911
progress["last_error"] = None
902-
_checkpoint_state(state)
912+
_checkpoint_state(state, force=True)
903913
await _log_file_completion("processed")
904914

905915

@@ -1014,13 +1024,13 @@ async def _async_mass_eval_main(
10141024

10151025
now = time.time()
10161026
if now - last_checkpoint_at >= checkpoint_interval_seconds:
1017-
_checkpoint_state(state)
1027+
_checkpoint_state(state, force=True)
10181028
if collect_dedupe_cache is not None:
10191029
await collect_dedupe_cache.flush()
10201030
last_checkpoint_at = now
10211031

10221032
state.current_file = None
1023-
_checkpoint_state(state)
1033+
_checkpoint_state(state, force=True)
10241034
if collect_dedupe_cache is not None:
10251035
await collect_dedupe_cache.flush(force=True)
10261036

0 commit comments

Comments
 (0)