Skip to content

Commit 414a89e

Browse files
authored
fix(agent-server): propagate subscriber init errors (OpenHands#3200)
1 parent f7171de commit 414a89e

3 files changed

Lines changed: 24 additions & 36 deletions

File tree

openhands-agent-server/openhands/agent_server/conversation_service.py

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -993,28 +993,32 @@ async def _start_event_service(self, stored: StoredConversation) -> EventService
993993
cipher=self.cipher,
994994
owner_instance_id=self.owner_instance_id,
995995
)
996-
# Create subscribers...
997-
await event_service.subscribe_to_events(_EventSubscriber(service=event_service))
998-
if stored.autotitle and stored.title is None:
999-
await event_service.subscribe_to_events(
1000-
AutoTitleSubscriber(service=event_service)
1001-
)
1002-
asyncio.gather(
1003-
*[
1004-
event_service.subscribe_to_events(
1005-
WebhookSubscriber(
1006-
conversation_id=stored.id,
1007-
service=event_service,
1008-
spec=webhook_spec,
1009-
session_api_key=self.session_api_key,
1010-
)
1011-
)
1012-
for webhook_spec in self.webhook_specs
1013-
]
1014-
)
1015996

1016997
try:
1017998
await event_service.start()
999+
# Register subscribers after start() so subscribe_to_events runs
1000+
# its initial-state push synchronously and any failure surfaces to
1001+
# the caller instead of being silently logged on a later publish.
1002+
await event_service.subscribe_to_events(
1003+
_EventSubscriber(service=event_service)
1004+
)
1005+
if stored.autotitle and stored.title is None:
1006+
await event_service.subscribe_to_events(
1007+
AutoTitleSubscriber(service=event_service)
1008+
)
1009+
await asyncio.gather(
1010+
*[
1011+
event_service.subscribe_to_events(
1012+
WebhookSubscriber(
1013+
conversation_id=stored.id,
1014+
service=event_service,
1015+
spec=webhook_spec,
1016+
session_api_key=self.session_api_key,
1017+
)
1018+
)
1019+
for webhook_spec in self.webhook_specs
1020+
]
1021+
)
10181022
# Save metadata immediately after successful start to ensure persistence
10191023
# even if the system is not shut down gracefully
10201024
await event_service.save_meta()

openhands-agent-server/openhands/agent_server/event_service.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -424,10 +424,7 @@ async def subscribe_to_events(self, subscriber: Subscriber[Event]) -> UUID:
424424
f"Initial state push to subscriber {subscriber_id} timed "
425425
f"out after {INITIAL_STATE_PUSH_TIMEOUT_SECONDS}s."
426426
)
427-
except Exception as e:
428-
logger.error(
429-
f"Error sending initial state to subscriber {subscriber_id}: {e}"
430-
)
427+
# Non-timeout errors propagate to caller (e.g. webhook failures).
431428

432429
return subscriber_id
433430

tests/agent_server/test_webhook_subscriber.py

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1304,19 +1304,6 @@ async def test_timer_cancelled_when_buffer_full(
13041304
subscriber._post_events.assert_called_once()
13051305

13061306

1307-
@pytest.mark.xfail(
1308-
strict=True,
1309-
reason=(
1310-
"Subscribe-time errors from a subscriber's initial __call__ never "
1311-
"reach the start_conversation caller. Two swallow sites in series: "
1312-
"event_service.py:411-416 wraps the initial state sync in "
1313-
"try/except + logger.error (no re-raise), and "
1314-
"conversation_service.py:862 fires asyncio.gather() on the webhook "
1315-
"subscribes without awaiting. Both must be fixed for init errors "
1316-
"to surface. "
1317-
"Tracked in https://github.com/OpenHands/software-agent-sdk/issues/3121."
1318-
),
1319-
)
13201307
@pytest.mark.timeout(30)
13211308
async def test_webhook_subscribe_errors_surface(tmp_path, monkeypatch):
13221309
persist = tmp_path / "persist"

0 commit comments

Comments
 (0)