Skip to content

Commit e7dd43e

Browse files
author
coke
committed
feature: add call
1 parent 9b1a189 commit e7dd43e

7 files changed

Lines changed: 682 additions & 15 deletions

File tree

fastapi_sio_di/__init__.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,15 @@
11
from .async_server import AsyncServer
22
from .docs import EventDoc
3-
from .exceptions import SocketIOValidationError
3+
from .exceptions import CallError, SocketIOValidationError
4+
from .manager import AsyncRedisCallManager
45
from .params import SID, Environ
56

6-
7-
__all__ = ["AsyncServer", "EventDoc", "SID", "Environ", "SocketIOValidationError"]
7+
__all__ = [
8+
"AsyncServer",
9+
"AsyncRedisCallManager",
10+
"CallError",
11+
"EventDoc",
12+
"SID",
13+
"Environ",
14+
"SocketIOValidationError",
15+
]

fastapi_sio_di/async_server.py

Lines changed: 45 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
import asyncio
22
from functools import wraps
3-
from typing import Any, Callable, Optional, Union, overload, TypeVar
3+
from typing import Any, Callable, Optional, TypeVar, Union, overload
4+
45
from pydantic import BaseModel
6+
from socketio import AsyncServer as SocketIOAsyncServer
57

68
from .dependencies import Dependant, LifespanContext, solve_dependant
79
from .docs import EventDoc
10+
from .manager import AsyncRedisCallManager
811
from .params import Environ
9-
from socketio import AsyncServer as SocketIOAsyncServer
1012

1113
T = TypeVar("T")
1214

@@ -164,17 +166,49 @@ async def call(
164166
timeout: int = 60,
165167
ignore_queue: bool = False,
166168
) -> Any:
167-
"""Emit a custom event to a client and wait for the response."""
168-
return await super().call(
169-
event=event,
170-
data=data,
171-
to=to,
172-
sid=sid,
173-
namespace=namespace,
174-
timeout=timeout,
175-
ignore_queue=ignore_queue,
169+
"""Emit a custom event to a client and wait for the response.
170+
171+
Supports cross-instance calls when using AsyncRedisCallManager.
172+
If the target client is on another instance, the call is routed
173+
through Redis (BLPOP) for the response.
174+
"""
175+
data = self._pydantic_model_to_dict(data)
176+
target = to or sid
177+
namespace = namespace or '/'
178+
179+
# If not using cross-instance manager, fall back to parent
180+
if not isinstance(self.manager, AsyncRedisCallManager):
181+
return await super().call(
182+
event=event, data=data, to=to, sid=sid,
183+
namespace=namespace, timeout=timeout,
184+
ignore_queue=ignore_queue,
185+
)
186+
187+
# Local short-circuit: target is on this instance
188+
if self.manager.is_connected(target, namespace):
189+
return await super().call(
190+
event=event, data=data, to=target, sid=None,
191+
namespace=namespace, timeout=timeout,
192+
ignore_queue=True, # local, no need for queue
193+
)
194+
195+
# Cross-instance path
196+
call_id = self.manager._generate_call_id()
197+
key = f"sio:call:{call_id}"
198+
199+
await self.manager._emit_with_call_id(
200+
event=event, data=data, namespace=namespace,
201+
room=target, call_id=call_id, timeout=timeout,
176202
)
177203

204+
result = await self.manager.redis.blpop(key, timeout=timeout)
205+
if result is None:
206+
raise TimeoutError(
207+
f"call({event}) to {target} timed out after {timeout}s"
208+
)
209+
210+
return self.manager._unpack_result(result[1])
211+
178212
async def enter_room(
179213
self, sid: str, room: str, namespace: Optional[str] = None
180214
) -> None:

fastapi_sio_di/exceptions.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,12 @@ def __init__(self, errors: list, model_name: str):
55
self.errors = errors
66
self.model_name = model_name
77
super().__init__(f"Validation error for {model_name}: {errors}")
8+
9+
10+
class CallError(Exception):
11+
"""Raised when a cross-instance call fails."""
12+
13+
def __init__(self, code: str, message: str):
14+
self.code = code
15+
self.message = message
16+
super().__init__(f"CallError({code}): {message}")

fastapi_sio_di/manager.py

Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
"""Cross-instance call manager using Redis List + BLPOP."""
2+
import asyncio
3+
4+
import msgpack
5+
from socketio import AsyncRedisManager
6+
7+
from .exceptions import CallError
8+
9+
10+
class AsyncRedisCallManager(AsyncRedisManager):
11+
"""Redis manager that supports cross-instance call() via BLPOP.
12+
13+
Usage::
14+
15+
sio = AsyncServer(
16+
client_manager=AsyncRedisCallManager('redis://localhost:6379/0')
17+
)
18+
"""
19+
20+
name = 'aioredis-call'
21+
22+
def __init__(self, url='redis://localhost:6379/0', channel='socketio',
23+
write_only=False, logger=None, json=None, redis_options=None):
24+
super().__init__(url=url, channel=channel, write_only=write_only,
25+
logger=logger, json=json, redis_options=redis_options)
26+
self._call_counter = 0
27+
28+
def _generate_call_id(self) -> str:
29+
"""Generate a unique call ID: {host_id}:{counter}."""
30+
self._call_counter += 1
31+
return f"{self.host_id}:{self._call_counter}"
32+
33+
def _pack_result(self, status: str, *, data=None, code=None, message=None) -> bytes:
34+
"""Serialize a call result to msgpack bytes for RPUSH."""
35+
if status == "ok":
36+
payload = {"status": "ok", "data": list(data) if data else []}
37+
else:
38+
payload = {"status": "error", "code": code, "message": message}
39+
return msgpack.packb(payload, use_bin_type=True)
40+
41+
def _unpack_result(self, raw: bytes):
42+
"""Deserialize a call result from msgpack bytes.
43+
44+
Returns the ACK args as a tuple, or raises CallError on error.
45+
"""
46+
result = msgpack.unpackb(raw, raw=False)
47+
if result["status"] == "error":
48+
raise CallError(result["code"], result["message"])
49+
data = result["data"]
50+
if not data:
51+
return None
52+
if len(data) == 1:
53+
return (data[0],)
54+
return tuple(data)
55+
56+
async def _emit_with_call_id(self, event: str, data, namespace: str,
57+
room: str, call_id: str, timeout: int):
58+
"""Publish a call_emit message to the pub/sub channel."""
59+
if isinstance(data, tuple):
60+
data = list(data)
61+
elif not isinstance(data, list):
62+
data = [data]
63+
message = {
64+
'method': 'call_emit',
65+
'event': event,
66+
'data': data,
67+
'namespace': namespace,
68+
'room': room,
69+
'call_id': call_id,
70+
'timeout': timeout,
71+
'host_id': self.host_id,
72+
}
73+
await self.redis.publish(self.channel, self.json.dumps(message))
74+
75+
async def _handle_call_emit(self, message):
76+
"""Handle a call_emit message from another instance.
77+
78+
If the target sid is connected locally, emit the event and register
79+
a callback that writes the ACK result to Redis.
80+
"""
81+
room = message['room']
82+
namespace = message.get('namespace', '/')
83+
84+
if not self.is_connected(room, namespace):
85+
return
86+
87+
call_id = message['call_id']
88+
timeout = message['timeout']
89+
event = message['event']
90+
data = message['data']
91+
92+
# Unwrap data list
93+
if isinstance(data, list):
94+
if len(data) == 1:
95+
data = data[0]
96+
else:
97+
data = tuple(data)
98+
99+
async def redis_callback(*args):
100+
key = f"sio:call:{call_id}"
101+
payload = self._pack_result("ok", data=args)
102+
await self.redis.rpush(key, payload)
103+
await self.redis.expire(key, timeout + 5)
104+
105+
await self._local_emit(event, data, namespace=namespace,
106+
room=room, callback=redis_callback)
107+
108+
async def _local_emit(self, event, data, namespace=None, room=None,
109+
skip_sid=None, callback=None):
110+
"""Emit directly to local clients, bypassing pub/sub broadcast."""
111+
from socketio.async_manager import AsyncManager
112+
await AsyncManager.emit(self, event, data, namespace=namespace,
113+
room=room, skip_sid=skip_sid, callback=callback)
114+
115+
async def _thread(self):
116+
"""Override parent _thread to handle call_emit messages."""
117+
while True:
118+
try:
119+
async for message in self._listen():
120+
data = None
121+
if isinstance(message, dict):
122+
data = message
123+
else:
124+
try:
125+
data = self.json.loads(message)
126+
except Exception:
127+
pass
128+
if data and 'method' in data:
129+
self._get_logger().debug(
130+
'pubsub message: {}'.format(data['method']))
131+
try:
132+
if data['method'] == 'call_emit':
133+
await self._handle_call_emit(data)
134+
elif data['method'] == 'callback':
135+
await self._handle_callback(data)
136+
elif data.get('host_id') != self.host_id:
137+
if data['method'] == 'emit':
138+
await self._handle_emit(data)
139+
elif data['method'] == 'disconnect':
140+
await self._handle_disconnect(data)
141+
elif data['method'] == 'enter_room':
142+
await self._handle_enter_room(data)
143+
elif data['method'] == 'leave_room':
144+
await self._handle_leave_room(data)
145+
elif data['method'] == 'close_room':
146+
await self._handle_close_room(data)
147+
except asyncio.CancelledError:
148+
raise
149+
except Exception:
150+
self.server.logger.exception(
151+
'Handler error in pubsub listening thread')
152+
self.server.logger.error('pubsub listen() exited unexpectedly')
153+
break
154+
except asyncio.CancelledError:
155+
break
156+
except Exception:
157+
self.server.logger.exception(
158+
'Unexpected Error in pubsub listening thread')

pyproject.toml

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,18 @@
11
[project]
22
name = "fastapi-sio-di"
3-
version = "0.5.4"
3+
version = "0.6.0"
44
description = "FastAPI-SIO-DI is a library tailored for integrating Socket.IO with FastAPI. It allows you to develop real-time WebSocket applications using the familiar FastAPI style (Dependency Injection, Pydantic models)."
55
requires-python = ">=3.10"
66
dependencies = [
77
"python-socketio>=5.16.1",
88
]
99

10+
[project.optional-dependencies]
11+
redis = [
12+
"redis>=4.0.0",
13+
"msgpack>=1.0.0",
14+
]
15+
1016
[dependency-groups]
1117
dev = [
1218
"fastapi>=0.128.0",
@@ -17,6 +23,8 @@ dev = [
1723
"pytest-asyncio>=1.3.0",
1824
"starlette>=0.45.0",
1925
"httpx>=0.28.0",
26+
"redis>=4.0.0",
27+
"msgpack>=1.0.0",
2028
]
2129

2230
[build-system]

0 commit comments

Comments
 (0)