Skip to content

Commit e004c09

Browse files
authored
fix: use session_key instead of chat_id for adapter interrupt lookups
* fix: use session_key instead of chat_id for adapter interrupt lookups monitor_for_interrupt() in _run_agent was using source.chat_id to query the adapter's has_pending_interrupt() and get_pending_message() methods. But the adapter stores interrupt events under build_session_key(source), which produces a different string (e.g. 'agent:main:telegram:dm' vs '123456'). This key mismatch meant the interrupt was never detected through the adapter path, which is the only active interrupt path for all adapter-based platforms (Telegram, Discord, Slack, etc.). The gateway-level interrupt path (in dispatch_message) is unreachable because the adapter intercepts the 2nd message in handle_message() before it reaches dispatch_message(). Result: sending a new message while subagents were running had no effect — the interrupt was silently lost. Fix: replace all source.chat_id references in the interrupt-related code within _run_agent() with the session_key parameter, which matches the adapter's storage keys. Also adds regression tests verifying session_key vs chat_id consistency. * debug: add file-based logging to CLI interrupt path Temporary instrumentation to diagnose why message-based interrupts don't seem to work during subagent execution. Logs to ~/.hermes/interrupt_debug.log (immune to redirect_stdout). Two log points: 1. When Enter handler puts message into _interrupt_queue 2. When chat() reads it and calls agent.interrupt() This will reveal whether the message reaches the queue and whether the interrupt is actually fired.
1 parent 5c54128 commit e004c09

9 files changed

+1045
-9
lines changed

cli.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3608,6 +3608,19 @@ def run_agent():
36083608
continue
36093609
print(f"\n⚡ New message detected, interrupting...")
36103610
self.agent.interrupt(interrupt_msg)
3611+
# Debug: log to file (stdout may be devnull from redirect_stdout)
3612+
try:
3613+
import pathlib as _pl
3614+
_dbg = _pl.Path.home() / ".hermes" / "interrupt_debug.log"
3615+
with open(_dbg, "a") as _f:
3616+
import time as _t
3617+
_f.write(f"{_t.strftime('%H:%M:%S')} interrupt fired: msg={str(interrupt_msg)[:60]!r}, "
3618+
f"children={len(self.agent._active_children)}, "
3619+
f"parent._interrupt={self.agent._interrupt_requested}\n")
3620+
for _ci, _ch in enumerate(self.agent._active_children):
3621+
_f.write(f" child[{_ci}]._interrupt={_ch._interrupt_requested}\n")
3622+
except Exception:
3623+
pass
36113624
break
36123625
except queue.Empty:
36133626
pass # Queue empty or timeout, continue waiting
@@ -3877,6 +3890,16 @@ def handle_enter(event):
38773890
payload = (text, images) if images else text
38783891
if self._agent_running and not (text and text.startswith("/")):
38793892
self._interrupt_queue.put(payload)
3893+
# Debug: log to file when message enters interrupt queue
3894+
try:
3895+
import pathlib as _pl
3896+
_dbg = _pl.Path.home() / ".hermes" / "interrupt_debug.log"
3897+
with open(_dbg, "a") as _f:
3898+
import time as _t
3899+
_f.write(f"{_t.strftime('%H:%M:%S')} ENTER: queued interrupt msg={str(payload)[:60]!r}, "
3900+
f"agent_running={self._agent_running}\n")
3901+
except Exception:
3902+
pass
38803903
else:
38813904
self._pending_input.put(payload)
38823905
event.app.current_buffer.reset(append_to_history=True)

gateway/run.py

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3418,17 +3418,19 @@ async def track_agent():
34183418
# Monitor for interrupts from the adapter (new messages arriving)
34193419
async def monitor_for_interrupt():
34203420
adapter = self.adapters.get(source.platform)
3421-
if not adapter:
3421+
if not adapter or not session_key:
34223422
return
34233423

3424-
chat_id = source.chat_id
34253424
while True:
34263425
await asyncio.sleep(0.2) # Check every 200ms
3427-
# Check if adapter has a pending interrupt for this session
3428-
if hasattr(adapter, 'has_pending_interrupt') and adapter.has_pending_interrupt(chat_id):
3426+
# Check if adapter has a pending interrupt for this session.
3427+
# Must use session_key (build_session_key output) — NOT
3428+
# source.chat_id — because the adapter stores interrupt events
3429+
# under the full session key.
3430+
if hasattr(adapter, 'has_pending_interrupt') and adapter.has_pending_interrupt(session_key):
34293431
agent = agent_holder[0]
34303432
if agent:
3431-
pending_event = adapter.get_pending_message(chat_id)
3433+
pending_event = adapter.get_pending_message(session_key)
34323434
pending_text = pending_event.text if pending_event else None
34333435
logger.debug("Interrupt detected from adapter, signaling agent...")
34343436
agent.interrupt(pending_text)
@@ -3445,10 +3447,11 @@ async def monitor_for_interrupt():
34453447
result = result_holder[0]
34463448
adapter = self.adapters.get(source.platform)
34473449

3448-
# Get pending message from adapter if interrupted
3450+
# Get pending message from adapter if interrupted.
3451+
# Use session_key (not source.chat_id) to match adapter's storage keys.
34493452
pending = None
34503453
if result and result.get("interrupted") and adapter:
3451-
pending_event = adapter.get_pending_message(source.chat_id)
3454+
pending_event = adapter.get_pending_message(session_key) if session_key else None
34523455
if pending_event:
34533456
pending = pending_event.text
34543457
elif result.get("interrupt_message"):
@@ -3460,8 +3463,8 @@ async def monitor_for_interrupt():
34603463
# Clear the adapter's interrupt event so the next _run_agent call
34613464
# doesn't immediately re-trigger the interrupt before the new agent
34623465
# even makes its first API call (this was causing an infinite loop).
3463-
if adapter and hasattr(adapter, '_active_sessions') and source.chat_id in adapter._active_sessions:
3464-
adapter._active_sessions[source.chat_id].clear()
3466+
if adapter and hasattr(adapter, '_active_sessions') and session_key and session_key in adapter._active_sessions:
3467+
adapter._active_sessions[session_key].clear()
34653468

34663469
# Don't send the interrupted response to the user — it's just noise
34673470
# like "Operation interrupted." They already know they sent a new
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
"""Tests verifying interrupt key consistency between adapter and gateway.
2+
3+
Regression test for a bug where monitor_for_interrupt() in _run_agent used
4+
source.chat_id to query the adapter, but the adapter stores interrupts under
5+
the full session key (build_session_key output). This mismatch meant
6+
interrupts were never detected, causing subagents to ignore new messages.
7+
"""
8+
9+
import asyncio
10+
11+
import pytest
12+
13+
from gateway.config import Platform, PlatformConfig
14+
from gateway.platforms.base import BasePlatformAdapter, MessageEvent, SendResult
15+
from gateway.session import SessionSource, build_session_key
16+
17+
18+
class StubAdapter(BasePlatformAdapter):
19+
"""Minimal adapter for interrupt tests."""
20+
21+
def __init__(self):
22+
super().__init__(PlatformConfig(enabled=True, token="test"), Platform.TELEGRAM)
23+
24+
async def connect(self):
25+
return True
26+
27+
async def disconnect(self):
28+
pass
29+
30+
async def send(self, chat_id, content, reply_to=None, metadata=None):
31+
return SendResult(success=True, message_id="1")
32+
33+
async def send_typing(self, chat_id, metadata=None):
34+
pass
35+
36+
async def get_chat_info(self, chat_id):
37+
return {"id": chat_id}
38+
39+
40+
def _source(chat_id="123456", chat_type="dm", thread_id=None):
41+
return SessionSource(
42+
platform=Platform.TELEGRAM,
43+
chat_id=chat_id,
44+
chat_type=chat_type,
45+
thread_id=thread_id,
46+
)
47+
48+
49+
class TestInterruptKeyConsistency:
50+
"""Ensure adapter interrupt methods are queried with session_key, not chat_id."""
51+
52+
def test_session_key_differs_from_chat_id_for_dm(self):
53+
"""Session key for a DM is NOT the same as chat_id."""
54+
source = _source("123456", "dm")
55+
session_key = build_session_key(source)
56+
assert session_key != source.chat_id
57+
assert session_key == "agent:main:telegram:dm"
58+
59+
def test_session_key_differs_from_chat_id_for_group(self):
60+
"""Session key for a group chat includes prefix, unlike raw chat_id."""
61+
source = _source("-1001234", "group")
62+
session_key = build_session_key(source)
63+
assert session_key != source.chat_id
64+
assert "agent:main:" in session_key
65+
assert source.chat_id in session_key
66+
67+
@pytest.mark.asyncio
68+
async def test_has_pending_interrupt_requires_session_key(self):
69+
"""has_pending_interrupt returns True only when queried with session_key."""
70+
adapter = StubAdapter()
71+
source = _source("123456", "dm")
72+
session_key = build_session_key(source)
73+
74+
# Simulate adapter storing interrupt under session_key
75+
interrupt_event = asyncio.Event()
76+
adapter._active_sessions[session_key] = interrupt_event
77+
interrupt_event.set()
78+
79+
# Using session_key → found
80+
assert adapter.has_pending_interrupt(session_key) is True
81+
82+
# Using chat_id → NOT found (this was the bug)
83+
assert adapter.has_pending_interrupt(source.chat_id) is False
84+
85+
@pytest.mark.asyncio
86+
async def test_get_pending_message_requires_session_key(self):
87+
"""get_pending_message returns the event only with session_key."""
88+
adapter = StubAdapter()
89+
source = _source("123456", "dm")
90+
session_key = build_session_key(source)
91+
92+
event = MessageEvent(text="hello", source=source, message_id="42")
93+
adapter._pending_messages[session_key] = event
94+
95+
# Using chat_id → None (the bug)
96+
assert adapter.get_pending_message(source.chat_id) is None
97+
98+
# Using session_key → found
99+
result = adapter.get_pending_message(session_key)
100+
assert result is event
101+
102+
@pytest.mark.asyncio
103+
async def test_handle_message_stores_under_session_key(self):
104+
"""handle_message stores pending messages under session_key, not chat_id."""
105+
adapter = StubAdapter()
106+
adapter.set_message_handler(lambda event: asyncio.sleep(0, result=None))
107+
108+
source = _source("-1001234", "group")
109+
session_key = build_session_key(source)
110+
111+
# Mark session as active
112+
adapter._active_sessions[session_key] = asyncio.Event()
113+
114+
# Send a second message while session is active
115+
event = MessageEvent(text="interrupt!", source=source, message_id="2")
116+
await adapter.handle_message(event)
117+
118+
# Stored under session_key
119+
assert session_key in adapter._pending_messages
120+
# NOT stored under chat_id
121+
assert source.chat_id not in adapter._pending_messages
122+
123+
# Interrupt event was set
124+
assert adapter._active_sessions[session_key].is_set()

tests/run_interrupt_test.py

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
#!/usr/bin/env python3
2+
"""Run a real interrupt test with actual AIAgent + delegate child.
3+
4+
Not a pytest test — runs directly as a script for live testing.
5+
"""
6+
7+
import threading
8+
import time
9+
import sys
10+
import os
11+
12+
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
13+
14+
from unittest.mock import MagicMock, patch
15+
from run_agent import AIAgent, IterationBudget
16+
from tools.delegate_tool import _run_single_child
17+
from tools.interrupt import set_interrupt, is_interrupted
18+
19+
set_interrupt(False)
20+
21+
# Create parent agent (minimal)
22+
parent = AIAgent.__new__(AIAgent)
23+
parent._interrupt_requested = False
24+
parent._interrupt_message = None
25+
parent._active_children = []
26+
parent.quiet_mode = True
27+
parent.model = "test/model"
28+
parent.base_url = "http://localhost:1"
29+
parent.api_key = "test"
30+
parent.provider = "test"
31+
parent.api_mode = "chat_completions"
32+
parent.platform = "cli"
33+
parent.enabled_toolsets = ["terminal", "file"]
34+
parent.providers_allowed = None
35+
parent.providers_ignored = None
36+
parent.providers_order = None
37+
parent.provider_sort = None
38+
parent.max_tokens = None
39+
parent.reasoning_config = None
40+
parent.prefill_messages = None
41+
parent._session_db = None
42+
parent._delegate_depth = 0
43+
parent._delegate_spinner = None
44+
parent.tool_progress_callback = None
45+
parent.iteration_budget = IterationBudget(max_total=100)
46+
parent._client_kwargs = {"api_key": "test", "base_url": "http://localhost:1"}
47+
48+
child_started = threading.Event()
49+
result_holder = [None]
50+
51+
52+
def run_delegate():
53+
with patch("run_agent.OpenAI") as MockOpenAI:
54+
mock_client = MagicMock()
55+
56+
def slow_create(**kwargs):
57+
time.sleep(3)
58+
resp = MagicMock()
59+
resp.choices = [MagicMock()]
60+
resp.choices[0].message.content = "Done"
61+
resp.choices[0].message.tool_calls = None
62+
resp.choices[0].message.refusal = None
63+
resp.choices[0].finish_reason = "stop"
64+
resp.usage.prompt_tokens = 100
65+
resp.usage.completion_tokens = 10
66+
resp.usage.total_tokens = 110
67+
resp.usage.prompt_tokens_details = None
68+
return resp
69+
70+
mock_client.chat.completions.create = slow_create
71+
mock_client.close = MagicMock()
72+
MockOpenAI.return_value = mock_client
73+
74+
original_init = AIAgent.__init__
75+
76+
def patched_init(self_agent, *a, **kw):
77+
original_init(self_agent, *a, **kw)
78+
child_started.set()
79+
80+
with patch.object(AIAgent, "__init__", patched_init):
81+
try:
82+
result = _run_single_child(
83+
task_index=0,
84+
goal="Test slow task",
85+
context=None,
86+
toolsets=["terminal"],
87+
model="test/model",
88+
max_iterations=5,
89+
parent_agent=parent,
90+
task_count=1,
91+
override_provider="test",
92+
override_base_url="http://localhost:1",
93+
override_api_key="test",
94+
override_api_mode="chat_completions",
95+
)
96+
result_holder[0] = result
97+
except Exception as e:
98+
print(f"ERROR in delegate: {e}")
99+
import traceback
100+
traceback.print_exc()
101+
102+
103+
print("Starting agent thread...")
104+
agent_thread = threading.Thread(target=run_delegate, daemon=True)
105+
agent_thread.start()
106+
107+
started = child_started.wait(timeout=10)
108+
if not started:
109+
print("ERROR: Child never started")
110+
sys.exit(1)
111+
112+
time.sleep(0.5)
113+
114+
print(f"Active children: {len(parent._active_children)}")
115+
for i, c in enumerate(parent._active_children):
116+
print(f" Child {i}: _interrupt_requested={c._interrupt_requested}")
117+
118+
t0 = time.monotonic()
119+
parent.interrupt("User typed a new message")
120+
print(f"Called parent.interrupt()")
121+
122+
for i, c in enumerate(parent._active_children):
123+
print(f" Child {i} after interrupt: _interrupt_requested={c._interrupt_requested}")
124+
print(f"Global is_interrupted: {is_interrupted()}")
125+
126+
agent_thread.join(timeout=10)
127+
elapsed = time.monotonic() - t0
128+
print(f"Agent thread finished in {elapsed:.2f}s")
129+
130+
result = result_holder[0]
131+
if result:
132+
print(f"Status: {result['status']}")
133+
print(f"Duration: {result['duration_seconds']}s")
134+
if elapsed < 2.0:
135+
print("✅ PASS: Interrupt detected quickly!")
136+
else:
137+
print(f"❌ FAIL: Took {elapsed:.2f}s — interrupt was too slow or not detected")
138+
else:
139+
print("❌ FAIL: No result!")
140+
141+
set_interrupt(False)

0 commit comments

Comments
 (0)