From a8409a161f1a7ba500a4110817b98459bc2146fe Mon Sep 17 00:00:00 2001 From: teknium1 Date: Wed, 11 Mar 2026 09:19:10 -0700 Subject: [PATCH] fix: guard all print() calls against OSError with _SafeWriter When hermes-agent runs as a systemd service, Docker container, or headless daemon, the stdout pipe can become unavailable (idle timeout, buffer exhaustion, socket reset). Any print() call then raises OSError: [Errno 5] Input/output error, crashing run_conversation() and causing cron jobs to fail. Rather than wrapping individual print() calls (68 in run_conversation alone), this adds a transparent _SafeWriter wrapper installed once at the start of run_conversation(). It delegates all writes to the real stdout and silently catches OSError. Zero overhead on the happy path, comprehensive coverage of all print calls including future ones. Fixes #845 Co-authored-by: J0hnLawMississippi --- run_agent.py | 50 ++++++++++++++++++++++++++ tests/test_run_agent.py | 80 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 130 insertions(+) diff --git a/run_agent.py b/run_agent.py index e98863f5e..db35d85fd 100644 --- a/run_agent.py +++ b/run_agent.py @@ -99,6 +99,51 @@ ) +class _SafeWriter: + """Transparent stdout wrapper that catches OSError from broken pipes. + + When hermes-agent runs as a systemd service, Docker container, or headless + daemon, the stdout pipe can become unavailable (idle timeout, buffer + exhaustion, socket reset). Any print() call then raises + ``OSError: [Errno 5] Input/output error``, which can crash + run_conversation() — especially via double-fault when the except handler + also tries to print. + + This wrapper delegates all writes to the underlying stream and silently + catches OSError. It is installed once at the start of run_conversation() + and is transparent when stdout is healthy (zero overhead on the happy path). + """ + + __slots__ = ("_inner",) + + def __init__(self, inner): + object.__setattr__(self, "_inner", inner) + + def write(self, data): + try: + return self._inner.write(data) + except OSError: + return len(data) if isinstance(data, str) else 0 + + def flush(self): + try: + self._inner.flush() + except OSError: + pass + + def fileno(self): + return self._inner.fileno() + + def isatty(self): + try: + return self._inner.isatty() + except OSError: + return False + + def __getattr__(self, name): + return getattr(self._inner, name) + + class IterationBudget: """Thread-safe shared iteration counter for parent and child agents. @@ -3157,6 +3202,11 @@ def run_conversation( Returns: Dict: Complete conversation result with final response and message history """ + # Guard stdout against OSError from broken pipes (systemd/headless/daemon). + # Installed once, transparent when stdout is healthy, prevents crash on write. + if not isinstance(sys.stdout, _SafeWriter): + sys.stdout = _SafeWriter(sys.stdout) + # Generate unique task_id if not provided to isolate VMs between concurrent tasks effective_task_id = task_id or str(uuid.uuid4()) diff --git a/tests/test_run_agent.py b/tests/test_run_agent.py index 283498ebf..a3a822832 100644 --- a/tests/test_run_agent.py +++ b/tests/test_run_agent.py @@ -1283,3 +1283,83 @@ def test_appends_to_non_json_tool_result(self, agent): messages[-1]["content"] = last_content + f"\n\n{warning}" assert "plain text result" in messages[-1]["content"] assert "BUDGET WARNING" in messages[-1]["content"] + + +class TestSafeWriter: + """Verify _SafeWriter guards stdout against OSError (broken pipes).""" + + def test_write_delegates_normally(self): + """When stdout is healthy, _SafeWriter is transparent.""" + from run_agent import _SafeWriter + from io import StringIO + inner = StringIO() + writer = _SafeWriter(inner) + writer.write("hello") + assert inner.getvalue() == "hello" + + def test_write_catches_oserror(self): + """OSError on write is silently caught, returns len(data).""" + from run_agent import _SafeWriter + from unittest.mock import MagicMock + inner = MagicMock() + inner.write.side_effect = OSError(5, "Input/output error") + writer = _SafeWriter(inner) + result = writer.write("hello") + assert result == 5 # len("hello") + + def test_flush_catches_oserror(self): + """OSError on flush is silently caught.""" + from run_agent import _SafeWriter + from unittest.mock import MagicMock + inner = MagicMock() + inner.flush.side_effect = OSError(5, "Input/output error") + writer = _SafeWriter(inner) + writer.flush() # should not raise + + def test_print_survives_broken_stdout(self, monkeypatch): + """print() through _SafeWriter doesn't crash on broken pipe.""" + import sys + from run_agent import _SafeWriter + from unittest.mock import MagicMock + broken = MagicMock() + broken.write.side_effect = OSError(5, "Input/output error") + original = sys.stdout + sys.stdout = _SafeWriter(broken) + try: + print("this should not crash") # would raise without _SafeWriter + finally: + sys.stdout = original + + def test_installed_in_run_conversation(self, agent): + """run_conversation installs _SafeWriter on sys.stdout.""" + import sys + from run_agent import _SafeWriter + resp = _mock_response(content="Done", finish_reason="stop") + agent.client.chat.completions.create.return_value = resp + original = sys.stdout + try: + with ( + patch.object(agent, "_persist_session"), + patch.object(agent, "_save_trajectory"), + patch.object(agent, "_cleanup_task_resources"), + ): + agent.run_conversation("test") + assert isinstance(sys.stdout, _SafeWriter) + finally: + sys.stdout = original + + def test_double_wrap_prevented(self): + """Wrapping an already-wrapped stream doesn't add layers.""" + import sys + from run_agent import _SafeWriter + from io import StringIO + inner = StringIO() + wrapped = _SafeWriter(inner) + # isinstance check should prevent double-wrapping + assert isinstance(wrapped, _SafeWriter) + # The guard in run_conversation checks isinstance before wrapping + if not isinstance(wrapped, _SafeWriter): + wrapped = _SafeWriter(wrapped) + # Still just one layer + wrapped.write("test") + assert inner.getvalue() == "test"