Conversation
|
Welcome to beacon-skill! Thanks for your first pull request. Quick checklist:
Bounty tiers: Micro (1-10 RTC) | Standard (20-50) | Major (75-100) A maintainer will review your PR soon! |
There was a problem hiding this comment.
Pull request overview
Adds a direct “store-and-forward” path for relay agents by persisting per-target pending messages on disk and returning them on the target’s next heartbeat.
Changes:
- Added a file-backed per-agent JSONL queue (
relay_messages/<agent_id>.jsonl) and surfaced pending messages inRelayManager.heartbeat(). - Updated
RelayManager.forward_message()to enqueue messages whenenvelope["to"]matches a registered relay agent. - Added a unit test covering the direct messaging + heartbeat flush behavior; updated BCOS documentation content.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 5 comments.
| File | Description |
|---|---|
beacon_skill/relay.py |
Implements disk-backed pending message queue and returns queued messages on heartbeat. |
tests/test_relay_direct_messaging.py |
New test validating enqueue-on-forward + dequeue-on-heartbeat behavior. |
BCOS.md |
Updates BCOS program wording/links and certification details. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| target_agent = envelope.get("to") | ||
| if target_agent and target_agent in self._load_agents(): | ||
| self._push_message(target_agent, envelope) | ||
|
|
There was a problem hiding this comment.
The new relay_messages file queue introduces an additional file I/O failure mode before the inbox append. If writing to the per-agent queue fails (permissions/disk full), forward_message will raise and the message won't even be recorded in inbox.jsonl. If direct delivery is best-effort, wrap _push_message in a narrow exception handler that logs the failure and continues with the existing inbox persistence (or return a structured error if you want to make delivery guarantees explicit).
| @@ -0,0 +1,40 @@ | |||
| import pytest | |||
There was a problem hiding this comment.
Unused import: pytest is not referenced in this test module (tmp_path fixture works without importing pytest). Removing it avoids lint noise in environments that check unused imports.
| import pytest |
| # BCOS — Beacon Certified Open Source | ||
|
|
||
| [](https://github.com/nicholaelaw/awesome-bcos) | ||
| [](https://rustchain.org/bcos/) | ||
|
|
||
| ## What is BCOS? | ||
| This repository is certified under the **Beacon Certified Open Source (BCOS)** program by [Elyan Labs](https://elyanlabs.ai). | ||
|
|
||
| **Blockchain Certified Open Source (BCOS)** is a human-review certification for open source repositories. It means: | ||
| ## Verification |
There was a problem hiding this comment.
BCOS.md is substantially rewritten (badge/link/spec content) but the PR description is focused on relay direct messaging. This looks like an unrelated documentation change and makes the PR harder to review/trace to issue Scottcjn#4; consider moving the BCOS update into a separate PR (or update the PR description to explicitly include this change).
| msgs = [] | ||
| try: | ||
| for line in path.read_text(encoding="utf-8").splitlines(): | ||
| if line.strip(): | ||
| msgs.append(json.loads(line)) | ||
| path.unlink() | ||
| except Exception: | ||
| pass |
There was a problem hiding this comment.
_pop_messages swallows all exceptions and may leave the message file in place when a single line is malformed (e.g., partial write). In that case, already-delivered messages can be re-delivered on every subsequent heartbeat, and parsing failures become invisible. Handle JSON decode errors per-line and ensure the file is atomically cleared/rotated (e.g., rename+truncate/unlink in a finally), and log failures via _log (or at least preserve the bad file for inspection).
| msgs = [] | |
| try: | |
| for line in path.read_text(encoding="utf-8").splitlines(): | |
| if line.strip(): | |
| msgs.append(json.loads(line)) | |
| path.unlink() | |
| except Exception: | |
| pass | |
| msgs: List[Dict[str, Any]] = [] | |
| tmp_path = path.with_suffix(path.suffix + ".tmp") | |
| had_error = False | |
| # First, atomically rotate the file so we don't repeatedly deliver the | |
| # same messages if something goes wrong while reading. | |
| try: | |
| path.replace(tmp_path) | |
| except FileNotFoundError: | |
| # File disappeared between exists() check and replace(); nothing to do. | |
| return [] | |
| except OSError as e: | |
| # Failed to rotate the file; log and bail out without consuming. | |
| self._log( | |
| { | |
| "type": "relay_message_rotate_error", | |
| "agent_id": agent_id, | |
| "path": str(path), | |
| "error": str(e), | |
| "ts": time.time(), | |
| } | |
| ) | |
| return [] | |
| try: | |
| # Read and parse line-by-line so a single bad line doesn't discard all. | |
| with tmp_path.open("r", encoding="utf-8") as f: | |
| for line_no, raw_line in enumerate(f, 1): | |
| line = raw_line.strip() | |
| if not line: | |
| continue | |
| try: | |
| msgs.append(json.loads(line)) | |
| except json.JSONDecodeError as e: | |
| had_error = True | |
| # Log decode error but continue processing other lines. | |
| self._log( | |
| { | |
| "type": "relay_message_decode_error", | |
| "agent_id": agent_id, | |
| "path": str(tmp_path), | |
| "line_number": line_no, | |
| "error": str(e), | |
| "raw_line": line, | |
| "ts": time.time(), | |
| } | |
| ) | |
| except Exception as e: | |
| # Some unexpected I/O or other error while reading. | |
| had_error = True | |
| self._log( | |
| { | |
| "type": "relay_message_read_error", | |
| "agent_id": agent_id, | |
| "path": str(tmp_path), | |
| "error": str(e), | |
| "ts": time.time(), | |
| } | |
| ) | |
| finally: | |
| # Cleanup: if we had any errors, try to preserve the bad file; | |
| # otherwise, delete it so messages are not re-delivered. | |
| try: | |
| if tmp_path.exists(): | |
| if had_error: | |
| bad_path = tmp_path.with_suffix(tmp_path.suffix + ".bad") | |
| try: | |
| # If renaming fails, leave tmp_path in place for inspection. | |
| tmp_path.replace(bad_path) | |
| except OSError: | |
| pass | |
| else: | |
| tmp_path.unlink() | |
| except Exception: | |
| # Never let cleanup failures propagate from here. | |
| pass |
| if target_agent and target_agent in self._load_agents(): | ||
| self._push_message(target_agent, envelope) |
There was a problem hiding this comment.
forward_message calls _load_agents() again solely to check whether the target exists, which adds an extra disk read per message (on top of authenticate()). Consider reusing the already-loaded agents state (e.g., load once at the start of the method or have authenticate return the underlying agent data) to avoid unnecessary I/O on hot paths.
| if target_agent and target_agent in self._load_agents(): | |
| self._push_message(target_agent, envelope) | |
| if target_agent: | |
| # Use a simple in-memory cache to avoid re-loading agents | |
| agents = getattr(self, "_agents_cache", None) | |
| if agents is None: | |
| agents = self._load_agents() | |
| self._agents_cache = agents | |
| if target_agent in agents: | |
| self._push_message(target_agent, envelope) |
|
I have added the required |
|
Re-review complete: I spotted and proactively fixed a Time-Of-Check to Time-Of-Use (TOCTOU) race condition in |
Resolves Scottcjn#4. This introduces a file-backed store-and-forward architecture. When a message aims at a registered relay agent, it is appended to a temporary JSONL file and flushed when that target checks in via heartbeat.