Skip to content

Commit ef37f9d

Browse files
perf: dispatch PubSub notifications concurrently (OpenHands#3162)
Co-authored-by: openhands <openhands@all-hands.dev>
1 parent 7ded28a commit ef37f9d

2 files changed

Lines changed: 58 additions & 6 deletions

File tree

openhands-agent-server/openhands/agent_server/pub_sub.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,16 +62,26 @@ def unsubscribe(self, subscriber_id: UUID) -> bool:
6262

6363
async def __call__(self, event: T) -> None:
6464
"""Invoke all registered callbacks with the given event.
65-
Each callback is invoked in its own try/catch block to prevent
66-
one failing callback from affecting others.
65+
Subscribers are notified concurrently so a slow client cannot
66+
block delivery to others. Each callback runs in its own
67+
error-handling wrapper to preserve fault isolation.
6768
Args:
6869
event: The event to pass to all callbacks
6970
"""
70-
for subscriber_id, subscriber in list(self._subscribers.items()):
71+
subscribers = list(self._subscribers.items())
72+
if not subscribers:
73+
return
74+
75+
async def _notify(subscriber_id: UUID, subscriber: Subscriber[T]):
7176
try:
7277
await subscriber(event)
7378
except Exception as e:
74-
logger.error(f"Error in subscriber {subscriber_id}: {e}", exc_info=True)
79+
logger.error(
80+
f"Error in subscriber {subscriber_id}: {e}",
81+
exc_info=True,
82+
)
83+
84+
await asyncio.gather(*[_notify(sid, sub) for sid, sub in subscribers])
7585

7686
async def close(self):
7787
await asyncio.gather(

tests/agent_server/test_pub_sub.py

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,14 +84,21 @@ def unsubscribe(self, subscriber_id: UUID) -> bool:
8484

8585
async def __call__(self, event) -> None:
8686
"""Invoke all registered callbacks with the given event."""
87-
for subscriber_id, subscriber in list(self._subscribers.items()):
87+
subscribers = list(self._subscribers.items())
88+
if not subscribers:
89+
return
90+
91+
async def _notify(subscriber_id, subscriber):
8892
try:
8993
await subscriber(event)
9094
except Exception as e:
9195
self._logger.error(
92-
f"Error in subscriber {subscriber_id}: {e}", exc_info=True
96+
f"Error in subscriber {subscriber_id}: {e}",
97+
exc_info=True,
9398
)
9499

100+
await asyncio.gather(*[_notify(sid, sub) for sid, sub in subscribers])
101+
95102
async def close(self):
96103
await asyncio.gather(
97104
*[subscriber.close() for subscriber in self._subscribers.values()],
@@ -424,6 +431,41 @@ async def test_call_with_subscriber_error_isolation(
424431
assert pubsub._logger.error_calls[0][1] is True # exc_info=True
425432

426433

434+
class _TimedSubscriber(SubscriberForTesting):
435+
"""Subscriber that records delivery wall-time after an artificial delay."""
436+
437+
def __init__(self, name: str, delay: float, log: list[tuple[str, float]]):
438+
self.name = name
439+
self.delay = delay
440+
self.log = log
441+
442+
async def __call__(self, event):
443+
start = asyncio.get_event_loop().time()
444+
await asyncio.sleep(self.delay)
445+
self.log.append((self.name, asyncio.get_event_loop().time() - start))
446+
447+
448+
class TestPubSubConcurrentDispatch:
449+
"""Test that __call__ dispatches to subscribers concurrently."""
450+
451+
@pytest.mark.asyncio
452+
async def test_slow_subscriber_does_not_block_others(self, pubsub):
453+
"""A slow subscriber must not delay delivery to faster ones."""
454+
delivery_log: list[tuple[str, float]] = []
455+
456+
pubsub.subscribe(_TimedSubscriber("slow", 0.2, delivery_log))
457+
pubsub.subscribe(_TimedSubscriber("fast", 0.0, delivery_log))
458+
459+
start = asyncio.get_event_loop().time()
460+
await pubsub(MockEvent())
461+
elapsed = asyncio.get_event_loop().time() - start
462+
463+
# Both subscribers were called
464+
assert len(delivery_log) == 2
465+
# Wall time ≈ 0.2s (concurrent), not ≈ 0.2s+ (sequential)
466+
assert elapsed < 0.3
467+
468+
427469
class TestPubSubEventIsolation:
428470
"""Test cases ensuring removed subscribers don't receive events."""
429471

0 commit comments

Comments
 (0)