Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 29 additions & 3 deletions copaw/src/copaw_worker/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,32 @@
_MC_ALIAS = "hiclaw"


def _read_text_with_retry(path: Path, max_retries: int = 5, delay_ms: int = 100) -> Optional[str]:
"""Read text from path with retry on UnicodeDecodeError.

This handles the race condition between mc mirror reporting completion and
the file being fully written to disk. The race can cause partial reads
of multi-byte UTF-8 characters at the end of the file.
"""
for attempt in range(max_retries):
try:
return path.read_text(encoding="utf-8")
except UnicodeDecodeError as e:
if attempt < max_retries - 1:
logger.debug(
"UnicodeDecodeError reading %s (attempt %d/%d): %s — retrying...",
path, attempt + 1, max_retries, e,
)
time.sleep(delay_ms / 1000.0)
else:
logger.warning(
"UnicodeDecodeError reading %s after %d attempts: %s",
path, max_retries, e,
)
raise
return None


def _deep_merge(base: dict, override: dict) -> dict:
"""Deep merge override into base (override wins leaf conflicts)."""
result = dict(base)
Expand Down Expand Up @@ -302,7 +328,7 @@ def _get_team_id(self) -> Optional[str]:
agents_path = self.local_dir / "AGENTS.md"
if agents_path.exists():
try:
content = agents_path.read_text()
content = _read_text_with_retry(agents_path)
import re
m = re.search(r'\*\*Team\*\*:\s*(\S+)', content)
if m:
Expand All @@ -312,7 +338,7 @@ def _get_team_id(self) -> Optional[str]:
config_path = self.local_dir / "openclaw.json"
if config_path.exists():
try:
config = json.loads(config_path.read_text())
config = json.loads(_read_text_with_retry(config_path))
return config.get("team_id") or None
except Exception:
pass
Expand All @@ -323,7 +349,7 @@ def _is_team_leader(self) -> bool:
agents_path = self.local_dir / "AGENTS.md"
if agents_path.exists():
try:
content = agents_path.read_text()
content = _read_text_with_retry(agents_path)
return "Upstream coordinator" in content
except Exception:
pass
Expand Down
49 changes: 44 additions & 5 deletions copaw/src/copaw_worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import platform
import shutil
import stat
import time
from pathlib import Path
from typing import Optional

Expand All @@ -29,6 +30,43 @@
logger = logging.getLogger(__name__)


def _read_text_with_retry(path: Path, max_retries: int = 5, delay_ms: int = 100) -> str:
"""Read text from path with retry on UnicodeDecodeError.

This handles the race condition between mc mirror reporting completion and
the file being fully written to disk. The race can cause partial reads
of multi-byte UTF-8 characters at the end of the file.

Args:
path: Path to read from
max_retries: Maximum number of retry attempts (default 5)
delay_ms: Delay between retries in milliseconds (default 100)

Returns:
The file contents as a string

Raises:
UnicodeDecodeError: If all retries fail
FileNotFoundError: If file does not exist
"""
for attempt in range(max_retries):
try:
return path.read_text(encoding="utf-8")
except UnicodeDecodeError as e:
if attempt < max_retries - 1:
logger.debug(
"UnicodeDecodeError reading %s (attempt %d/%d): %s — retrying...",
path, attempt + 1, max_retries, e,
)
time.sleep(delay_ms / 1000.0)
else:
logger.warning(
"UnicodeDecodeError reading %s after %d attempts: %s",
path, max_retries, e,
)
raise


class Worker:
def __init__(self, config: WorkerConfig) -> None:
self.config = config
Expand Down Expand Up @@ -125,14 +163,14 @@ async def start(self) -> bool:
for name in ("SOUL.md", "AGENTS.md"):
src = self.sync.local_dir / name
if src.exists():
(workspace_dir / name).write_text(src.read_text())
(workspace_dir / name).write_text(_read_text_with_retry(src))
for name in ("HEARTBEAT.md",):
dst = workspace_dir / name
if dst.exists():
continue
src = self.sync.local_dir / name
if src.exists():
dst.write_text(src.read_text())
dst.write_text(_read_text_with_retry(src))

# 5. Bridge openclaw.json -> CoPaw workspaces/default/agent.json + providers.json
# Infer gateway port from FS endpoint so bridge's _port_remap uses
Expand Down Expand Up @@ -510,9 +548,10 @@ async def _on_files_pulled(self, pulled_files: list[str]) -> None:
console.print("[yellow]Config changed, re-bridging...[/yellow]")
try:
openclaw_cfg = self.sync.get_config()
# Use local Worker-managed files; fallback to MinIO for initial bootstrap
soul = (self.sync.local_dir / "SOUL.md").read_text() if (self.sync.local_dir / "SOUL.md").exists() else self.sync.get_soul()
agents = (self.sync.local_dir / "AGENTS.md").read_text() if (self.sync.local_dir / "AGENTS.md").exists() else self.sync.get_agents_md()
soul_path = self.sync.local_dir / "SOUL.md"
soul = _read_text_with_retry(soul_path) if soul_path.exists() else self.sync.get_soul()
agents_path = self.sync.local_dir / "AGENTS.md"
agents = _read_text_with_retry(agents_path) if agents_path.exists() else self.sync.get_agents_md()

if soul:
workspace_dir = self._copaw_working_dir / "workspaces" / "default"
Expand Down
Loading