forked from LangQi99/NeoFish
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmessage_center.py
More file actions
130 lines (102 loc) · 4.18 KB
/
message_center.py
File metadata and controls
130 lines (102 loc) · 4.18 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
"""
message_center.py - Lightweight message bus and message center for NeoFish.
Provides:
1. In-process async publish/subscribe bus scoped by session_id
2. MessageCenter helper to publish typed events from core logic
"""
from __future__ import annotations
import asyncio
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Awaitable, Callable
@dataclass
class BusEvent:
session_id: str
event_type: str
payload: dict[str, Any]
timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
EventHandler = Callable[[BusEvent], Awaitable[None]]
class InMemoryMessageBus:
"""In-process pub/sub bus with per-session producer/consumer workers."""
def __init__(self) -> None:
self._subscribers: dict[str, list[EventHandler]] = {}
self._queues: dict[str, asyncio.Queue] = {}
self._workers: dict[str, asyncio.Task] = {}
self._lock = asyncio.Lock()
self._stop_sentinel = object()
async def _ensure_worker_locked(self, session_id: str) -> None:
if session_id in self._workers and not self._workers[session_id].done():
return
if session_id not in self._queues:
self._queues[session_id] = asyncio.Queue()
self._workers[session_id] = asyncio.create_task(
self._consume_loop(session_id), name=f"msg-bus-{session_id}"
)
async def _consume_loop(self, session_id: str) -> None:
queue = self._queues.get(session_id)
if queue is None:
return
while True:
item = await queue.get()
if item is self._stop_sentinel:
break
event: BusEvent = item
async with self._lock:
handlers = list(self._subscribers.get(session_id, []))
if not handlers:
continue
for handler in handlers:
try:
await handler(event)
except Exception:
# Keep bus resilient: one bad subscriber must not block others.
continue
async def subscribe(self, session_id: str, handler: EventHandler) -> None:
async with self._lock:
handlers = self._subscribers.setdefault(session_id, [])
if handler not in handlers:
handlers.append(handler)
await self._ensure_worker_locked(session_id)
async def unsubscribe(self, session_id: str, handler: EventHandler) -> None:
worker: asyncio.Task | None = None
queue: asyncio.Queue | None = None
async with self._lock:
handlers = self._subscribers.get(session_id, [])
if handler in handlers:
handlers.remove(handler)
if not handlers and session_id in self._subscribers:
del self._subscribers[session_id]
worker = self._workers.pop(session_id, None)
queue = self._queues.get(session_id)
if queue is not None:
await queue.put(self._stop_sentinel)
if worker is not None:
try:
await worker
except Exception:
pass
async with self._lock:
self._queues.pop(session_id, None)
async def publish(self, event: BusEvent) -> None:
queue: asyncio.Queue | None = None
async with self._lock:
handlers = self._subscribers.get(event.session_id, [])
if handlers:
await self._ensure_worker_locked(event.session_id)
queue = self._queues.get(event.session_id)
if not handlers or queue is None:
return
await queue.put(event)
message_bus = InMemoryMessageBus()
class MessageCenter:
"""Session-scoped message publisher facade."""
def __init__(self, session_id: str, bus: InMemoryMessageBus | None = None) -> None:
self.session_id = session_id
self.bus = bus or message_bus
async def publish(self, event_type: str, payload: dict[str, Any]) -> None:
event = BusEvent(
session_id=self.session_id,
event_type=event_type,
payload=payload,
)
await self.bus.publish(event)