diff --git a/copaw/src/copaw_worker/sync.py b/copaw/src/copaw_worker/sync.py index 9d763ac70..49950d9c8 100644 --- a/copaw/src/copaw_worker/sync.py +++ b/copaw/src/copaw_worker/sync.py @@ -36,6 +36,32 @@ _MC_ALIAS = "hiclaw" +def _read_text_with_retry(path: Path, max_retries: int = 5, delay_ms: int = 100) -> Optional[str]: + """Read text from path with retry on UnicodeDecodeError. + + This handles the race condition between mc mirror reporting completion and + the file being fully written to disk. The race can cause partial reads + of multi-byte UTF-8 characters at the end of the file. + """ + for attempt in range(max_retries): + try: + return path.read_text(encoding="utf-8") + except UnicodeDecodeError as e: + if attempt < max_retries - 1: + logger.debug( + "UnicodeDecodeError reading %s (attempt %d/%d): %s — retrying...", + path, attempt + 1, max_retries, e, + ) + time.sleep(delay_ms / 1000.0) + else: + logger.warning( + "UnicodeDecodeError reading %s after %d attempts: %s", + path, max_retries, e, + ) + raise + return None + + def _deep_merge(base: dict, override: dict) -> dict: """Deep merge override into base (override wins leaf conflicts).""" result = dict(base) @@ -302,7 +328,7 @@ def _get_team_id(self) -> Optional[str]: agents_path = self.local_dir / "AGENTS.md" if agents_path.exists(): try: - content = agents_path.read_text() + content = _read_text_with_retry(agents_path) import re m = re.search(r'\*\*Team\*\*:\s*(\S+)', content) if m: @@ -312,7 +338,7 @@ def _get_team_id(self) -> Optional[str]: config_path = self.local_dir / "openclaw.json" if config_path.exists(): try: - config = json.loads(config_path.read_text()) + config = json.loads(_read_text_with_retry(config_path)) return config.get("team_id") or None except Exception: pass @@ -323,7 +349,7 @@ def _is_team_leader(self) -> bool: agents_path = self.local_dir / "AGENTS.md" if agents_path.exists(): try: - content = agents_path.read_text() + content = _read_text_with_retry(agents_path) return "Upstream coordinator" in content except Exception: pass diff --git a/copaw/src/copaw_worker/worker.py b/copaw/src/copaw_worker/worker.py index d6271c275..b8edd2787 100644 --- a/copaw/src/copaw_worker/worker.py +++ b/copaw/src/copaw_worker/worker.py @@ -15,6 +15,7 @@ import platform import shutil import stat +import time from pathlib import Path from typing import Optional @@ -29,6 +30,43 @@ logger = logging.getLogger(__name__) +def _read_text_with_retry(path: Path, max_retries: int = 5, delay_ms: int = 100) -> str: + """Read text from path with retry on UnicodeDecodeError. + + This handles the race condition between mc mirror reporting completion and + the file being fully written to disk. The race can cause partial reads + of multi-byte UTF-8 characters at the end of the file. + + Args: + path: Path to read from + max_retries: Maximum number of retry attempts (default 5) + delay_ms: Delay between retries in milliseconds (default 100) + + Returns: + The file contents as a string + + Raises: + UnicodeDecodeError: If all retries fail + FileNotFoundError: If file does not exist + """ + for attempt in range(max_retries): + try: + return path.read_text(encoding="utf-8") + except UnicodeDecodeError as e: + if attempt < max_retries - 1: + logger.debug( + "UnicodeDecodeError reading %s (attempt %d/%d): %s — retrying...", + path, attempt + 1, max_retries, e, + ) + time.sleep(delay_ms / 1000.0) + else: + logger.warning( + "UnicodeDecodeError reading %s after %d attempts: %s", + path, max_retries, e, + ) + raise + + class Worker: def __init__(self, config: WorkerConfig) -> None: self.config = config @@ -125,14 +163,14 @@ async def start(self) -> bool: for name in ("SOUL.md", "AGENTS.md"): src = self.sync.local_dir / name if src.exists(): - (workspace_dir / name).write_text(src.read_text()) + (workspace_dir / name).write_text(_read_text_with_retry(src)) for name in ("HEARTBEAT.md",): dst = workspace_dir / name if dst.exists(): continue src = self.sync.local_dir / name if src.exists(): - dst.write_text(src.read_text()) + dst.write_text(_read_text_with_retry(src)) # 5. Bridge openclaw.json -> CoPaw workspaces/default/agent.json + providers.json # Infer gateway port from FS endpoint so bridge's _port_remap uses @@ -510,9 +548,10 @@ async def _on_files_pulled(self, pulled_files: list[str]) -> None: console.print("[yellow]Config changed, re-bridging...[/yellow]") try: openclaw_cfg = self.sync.get_config() - # Use local Worker-managed files; fallback to MinIO for initial bootstrap - soul = (self.sync.local_dir / "SOUL.md").read_text() if (self.sync.local_dir / "SOUL.md").exists() else self.sync.get_soul() - agents = (self.sync.local_dir / "AGENTS.md").read_text() if (self.sync.local_dir / "AGENTS.md").exists() else self.sync.get_agents_md() + soul_path = self.sync.local_dir / "SOUL.md" + soul = _read_text_with_retry(soul_path) if soul_path.exists() else self.sync.get_soul() + agents_path = self.sync.local_dir / "AGENTS.md" + agents = _read_text_with_retry(agents_path) if agents_path.exists() else self.sync.get_agents_md() if soul: workspace_dir = self._copaw_working_dir / "workspaces" / "default"