Skip to content

Commit c05c606

Browse files
committed
Merge PR #298: Make process_registry checkpoint writes atomic
Authored by aydnOktay. Companion to PR #297 (batch_runner). Applies the same atomic write pattern (temp file + fsync + os.replace) to both _write_checkpoint() and recover_from_checkpoint() in process_registry.py. Prevents checkpoint corruption on gateway crashes. Also improves error handling: bare 'pass' replaced with logger.debug(..., exc_info=True) for better debugging.
2 parents b4873a5 + 5fa3e24 commit c05c606

File tree

1 file changed

+42
-7
lines changed

1 file changed

+42
-7
lines changed

tools/process_registry.py

Lines changed: 42 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import shutil
3838
import signal
3939
import subprocess
40+
import tempfile
4041
import threading
4142
import time
4243
import uuid
@@ -690,7 +691,7 @@ def cleanup_expired(self):
690691
# ----- Checkpoint (crash recovery) -----
691692

692693
def _write_checkpoint(self):
693-
"""Write running process metadata to checkpoint file."""
694+
"""Write running process metadata to checkpoint file atomically."""
694695
try:
695696
with self._lock:
696697
entries = []
@@ -705,12 +706,28 @@ def _write_checkpoint(self):
705706
"task_id": s.task_id,
706707
"session_key": s.session_key,
707708
})
709+
710+
# Atomic write: temp file + os.replace to avoid corruption on crash
708711
CHECKPOINT_PATH.parent.mkdir(parents=True, exist_ok=True)
709-
CHECKPOINT_PATH.write_text(
710-
json.dumps(entries, indent=2), encoding="utf-8"
712+
fd, tmp_path = tempfile.mkstemp(
713+
dir=str(CHECKPOINT_PATH.parent),
714+
prefix='.checkpoint_',
715+
suffix='.tmp',
711716
)
712-
except Exception:
713-
pass # Best-effort
717+
try:
718+
with os.fdopen(fd, 'w', encoding='utf-8') as f:
719+
json.dump(entries, f, indent=2, ensure_ascii=False)
720+
f.flush()
721+
os.fsync(f.fileno())
722+
os.replace(tmp_path, CHECKPOINT_PATH)
723+
except BaseException:
724+
try:
725+
os.unlink(tmp_path)
726+
except OSError:
727+
pass
728+
raise
729+
except Exception as e:
730+
logger.debug("Failed to write checkpoint file: %s", e, exc_info=True)
714731

715732
def recover_from_checkpoint(self) -> int:
716733
"""
@@ -757,10 +774,28 @@ def recover_from_checkpoint(self) -> int:
757774
logger.info("Recovered detached process: %s (pid=%d)", session.command[:60], pid)
758775

759776
# Clear the checkpoint (will be rewritten as processes finish)
777+
# Use atomic write to avoid corruption
760778
try:
761-
CHECKPOINT_PATH.write_text("[]", encoding="utf-8")
779+
CHECKPOINT_PATH.parent.mkdir(parents=True, exist_ok=True)
780+
fd, tmp_path = tempfile.mkstemp(
781+
dir=str(CHECKPOINT_PATH.parent),
782+
prefix='.checkpoint_',
783+
suffix='.tmp',
784+
)
785+
try:
786+
with os.fdopen(fd, 'w', encoding='utf-8') as f:
787+
f.write("[]")
788+
f.flush()
789+
os.fsync(f.fileno())
790+
os.replace(tmp_path, CHECKPOINT_PATH)
791+
except BaseException:
792+
try:
793+
os.unlink(tmp_path)
794+
except OSError:
795+
pass
796+
raise
762797
except Exception as e:
763-
logger.debug("Could not write checkpoint file: %s", e)
798+
logger.debug("Could not clear checkpoint file: %s", e, exc_info=True)
764799

765800
return recovered
766801

0 commit comments

Comments
 (0)