Skip to content

Allow room to sync with provider #128

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ classifiers = [
]
dependencies = [
"anyio >=3.6.2,<5",
"pycrdt >=0.12.13,<0.13.0",
"pycrdt >=0.12.16,<0.13.0",
"pycrdt-store >=0.1.0,<0.2.0",
]

Expand All @@ -48,6 +48,7 @@ test = [
"hypercorn >=0.16.0",
"trio >=0.25.0",
"sniffio",
"channels",
]
docs = [
"mkdocs",
Expand Down
1 change: 0 additions & 1 deletion src/pycrdt/websocket/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from .asgi_server import ASGIServer as ASGIServer
from .websocket_provider import WebsocketProvider as WebsocketProvider
from .websocket_server import WebsocketServer as WebsocketServer
from .websocket_server import exception_logger as exception_logger
from .yroom import YRoom as YRoom
Expand Down
7 changes: 3 additions & 4 deletions src/pycrdt/websocket/django_channels_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,21 @@
from logging import getLogger
from typing import TypedDict

from channels.generic.websocket import AsyncWebsocketConsumer # type: ignore[import-not-found]
from channels.generic.websocket import AsyncWebsocketConsumer # type: ignore[import-untyped]

from pycrdt import (
Channel,
Doc,
YMessageType,
YSyncMessageType,
create_sync_message,
handle_sync_message,
)

from .websocket import Websocket

logger = getLogger(__name__)


class _WebsocketShim(Websocket):
class _WebsocketShim(Channel):
def __init__(self, path, send_func) -> None:
self._path = path
self._send_func = send_func
Expand Down
66 changes: 6 additions & 60 deletions src/pycrdt/websocket/websocket.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,13 @@
from typing import Protocol

from anyio import Lock

from pycrdt import Channel

class Websocket(Protocol):
"""WebSocket.

The Websocket instance can receive messages using an async iterator,
until the connection is closed:
```py
async for message in websocket:
...
```
Or directly by calling `recv()`:
```py
message = await websocket.recv()
```
Sending messages is done with `send()`:
```py
await websocket.send(message)
```
"""

@property
def path(self) -> str:
"""The WebSocket path."""
...

def __aiter__(self):
return self
class HttpxWebsocket(Channel):
def __init__(self, websocket, path: str):
self._websocket = websocket
self._path = path
self._send_lock = Lock()

async def __anext__(self) -> bytes:
try:
Expand All @@ -38,43 +17,10 @@ async def __anext__(self) -> bytes:

return message

async def send(self, message: bytes) -> None:
"""Send a message.

Arguments:
message: The message to send.
"""
...

async def recv(self) -> bytes:
"""Receive a message.

Returns:
The received message.
"""
...


class HttpxWebsocket(Websocket):
def __init__(self, websocket, path: str):
self._websocket = websocket
self._path = path
self._send_lock = Lock()

@property
def path(self) -> str:
return self._path

def __aiter__(self):
return self

async def __anext__(self) -> bytes:
try:
message = await self.recv()
except Exception:
raise StopAsyncIteration()
return message

async def send(self, message: bytes):
async with self._send_lock:
await self._websocket.send_bytes(message)
Expand Down
170 changes: 0 additions & 170 deletions src/pycrdt/websocket/websocket_provider.py

This file was deleted.

20 changes: 16 additions & 4 deletions src/pycrdt/websocket/websocket_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@
from anyio import TASK_STATUS_IGNORED, Event, Lock, create_task_group
from anyio.abc import TaskGroup, TaskStatus

from .websocket import Websocket
from .yroom import YRoom
from pycrdt import Channel

from .yroom import ProviderFactory, YRoom


class WebsocketServer:
Expand All @@ -28,6 +29,7 @@ def __init__(
auto_clean_rooms: bool = True,
exception_handler: Callable[[Exception, Logger], bool] | None = None,
log: Logger | None = None,
provider_factory: ProviderFactory | None = None,
) -> None:
"""Initialize the object.

Expand All @@ -50,11 +52,14 @@ def __init__(
exception_handler: An optional callback to call when an exception is raised, that
returns True if the exception was handled.
log: An optional logger.
provider_factory: An optional provider factory used to synchronize the rooms with
external documents.
"""
self.rooms_ready = rooms_ready
self.auto_clean_rooms = auto_clean_rooms
self.exception_handler = exception_handler
self.log = log or getLogger(__name__)
self.provider_factory = provider_factory
self.rooms = {}
self._stopped = Event()

Expand All @@ -81,7 +86,14 @@ async def get_room(self, name: str) -> YRoom:
The room with the given name, or a new one if no room with that name was found.
"""
if name not in self.rooms.keys():
self.rooms[name] = YRoom(ready=self.rooms_ready, log=self.log)
provider_factory = (
partial(self.provider_factory, path=name)
if self.provider_factory is not None
else None
)
self.rooms[name] = YRoom(
ready=self.rooms_ready, log=self.log, provider_factory=provider_factory
)
room = self.rooms[name]
await self.start_room(room)
return room
Expand Down Expand Up @@ -144,7 +156,7 @@ async def delete_room(self, *, name: str | None = None, room: YRoom | None = Non
room = self.rooms.pop(name)
await room.stop()

async def serve(self, websocket: Websocket) -> None:
async def serve(self, websocket: Channel) -> None:
"""Serve a client through a WebSocket.

Arguments:
Expand Down
Loading
Loading