Skip to content

Commit 6c03d23

Browse files
committed
refactor(django_channels): move redis storage to its own file so redis its not required and update imports
1 parent bfd11c4 commit 6c03d23

File tree

4 files changed

+91
-82
lines changed

4 files changed

+91
-82
lines changed
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1+
from .storage.base_yroom_storage import BaseYRoomStorage as BaseYRoomStorage
12
from .yjs_consumer import YjsConsumer as YjsConsumer
2-
from .yroom_storage import BaseYRoomStorage as BaseYRoomStorage

pycrdt_websocket/django_channels/yroom_storage.py renamed to pycrdt_websocket/django_channels/storage/base_yroom_storage.py

Lines changed: 0 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
from abc import ABC, abstractmethod
33
from typing import Optional
44

5-
import redis.asyncio as redis
65
from pycrdt import Doc
76

87

@@ -108,80 +107,3 @@ async def throttled_save_snapshot(self) -> None:
108107
await self.save_snapshot()
109108

110109
self.last_saved_at = time.time()
111-
112-
113-
class RedisYRoomStorage(BaseYRoomStorage):
114-
"""A YRoom storage that uses Redis as main storage, without
115-
persistent storage.
116-
Args:
117-
room_name: The name of the room.
118-
"""
119-
120-
def __init__(self, room_name: str, save_throttle_interval: int | None = None) -> None:
121-
super().__init__(room_name, save_throttle_interval)
122-
123-
self.redis_key = f"document:{self.room_name}"
124-
self.redis = self._make_redis()
125-
126-
async def get_document(self) -> Doc:
127-
snapshot = await self.redis.get(self.redis_key)
128-
129-
if not snapshot:
130-
snapshot = await self.load_snapshot()
131-
132-
document = Doc()
133-
134-
if snapshot:
135-
document.apply_update(snapshot)
136-
137-
return document
138-
139-
async def update_document(self, update: bytes):
140-
await self.redis.watch(self.redis_key)
141-
142-
try:
143-
current_document = await self.get_document()
144-
updated_snapshot = self._apply_update_to_document(current_document, update)
145-
146-
async with self.redis.pipeline() as pipe:
147-
while True:
148-
try:
149-
pipe.multi()
150-
pipe.set(self.redis_key, updated_snapshot)
151-
152-
await pipe.execute()
153-
154-
break
155-
except redis.WatchError:
156-
current_document = await self.get_document()
157-
updated_snapshot = self._apply_update_to_document(
158-
current_document,
159-
update,
160-
)
161-
162-
continue
163-
finally:
164-
await self.redis.unwatch()
165-
166-
await self.throttled_save_snapshot()
167-
168-
async def load_snapshot(self) -> Optional[bytes]:
169-
return None
170-
171-
async def save_snapshot(self) -> Optional[bytes]:
172-
return None
173-
174-
async def close(self):
175-
await self.save_snapshot()
176-
await self.redis.close()
177-
178-
def _apply_update_to_document(self, document: Doc, update: bytes) -> bytes:
179-
document.apply_update(update)
180-
181-
return document.get_update()
182-
183-
def _make_redis(self):
184-
"""Makes a Redis client.
185-
Defaults to a local client"""
186-
187-
return redis.Redis(host="localhost", port=6379, db=0)
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
from typing import Optional
2+
3+
import redis.asyncio as redis
4+
from pycrdt import Doc
5+
6+
from .base_yroom_storage import BaseYRoomStorage
7+
8+
9+
class RedisYRoomStorage(BaseYRoomStorage):
10+
"""A YRoom storage that uses Redis as main storage, without
11+
persistent storage.
12+
Args:
13+
room_name: The name of the room.
14+
"""
15+
16+
def __init__(self, room_name: str, save_throttle_interval: int | None = None) -> None:
17+
super().__init__(room_name, save_throttle_interval)
18+
19+
self.redis_key = f"document:{self.room_name}"
20+
self.redis = self._make_redis()
21+
22+
async def get_document(self) -> Doc:
23+
snapshot = await self.redis.get(self.redis_key)
24+
25+
if not snapshot:
26+
snapshot = await self.load_snapshot()
27+
28+
document = Doc()
29+
30+
if snapshot:
31+
document.apply_update(snapshot)
32+
33+
return document
34+
35+
async def update_document(self, update: bytes):
36+
await self.redis.watch(self.redis_key)
37+
38+
try:
39+
current_document = await self.get_document()
40+
updated_snapshot = self._apply_update_to_document(current_document, update)
41+
42+
async with self.redis.pipeline() as pipe:
43+
while True:
44+
try:
45+
pipe.multi()
46+
pipe.set(self.redis_key, updated_snapshot)
47+
48+
await pipe.execute()
49+
50+
break
51+
except redis.WatchError:
52+
current_document = await self.get_document()
53+
updated_snapshot = self._apply_update_to_document(
54+
current_document,
55+
update,
56+
)
57+
58+
continue
59+
finally:
60+
await self.redis.unwatch()
61+
62+
await self.throttled_save_snapshot()
63+
64+
async def load_snapshot(self) -> Optional[bytes]:
65+
return None
66+
67+
async def save_snapshot(self) -> Optional[bytes]:
68+
return None
69+
70+
async def close(self):
71+
await self.save_snapshot()
72+
await self.redis.close()
73+
74+
def _apply_update_to_document(self, document: Doc, update: bytes) -> bytes:
75+
document.apply_update(update)
76+
77+
return document.get_update()
78+
79+
def _make_redis(self):
80+
"""Makes a Redis client.
81+
Defaults to a local client"""
82+
83+
return redis.Redis(host="localhost", port=6379, db=0)

pycrdt_websocket/django_channels/yjs_consumer.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from channels.generic.websocket import AsyncWebsocketConsumer # type: ignore[import-not-found]
77
from pycrdt import Doc
88

9-
from pycrdt_websocket.django_channels.yroom_storage import BaseYRoomStorage
9+
from pycrdt_websocket.django_channels.storage.base_yroom_storage import BaseYRoomStorage
1010

1111
from ..websocket import Websocket
1212
from ..yutils import (
@@ -96,13 +96,14 @@ class YjsConsumer(AsyncWebsocketConsumer):
9696
from channels.layers import get_channel_layer
9797
from pycrdt_websocket.django_channels_consumer import YjsConsumer
9898
from pycrdt_websocket.yutils import create_update_message
99+
from pycrdt_websocket.django_channels.storage.redis_yroom_storage import RedisYRoomStorage
99100
100101
101102
class DocConsumer(YjsConsumer):
102103
def make_room_storage(self) -> BaseYRoomStorage:
103104
# Modify the room storage here
104105
105-
return RedisYRoomStorage(self.room_name)
106+
return RedisYRoomStorage(room_name=self.room_name)
106107
107108
def make_room_name(self) -> str:
108109
# Modify the room name here
@@ -147,7 +148,10 @@ def make_room_storage(self) -> BaseYRoomStorage | None:
147148
Defaults to not using any (just broadcast updates between consumers).
148149
149150
Example:
150-
self.room_storage = RedisYRoomStorage(self.room_name)
151+
self.room_storage = YourCustomRedisYRoomStorage(
152+
room_name=self.room_name,
153+
save_throttle_interval=5
154+
)
151155
"""
152156
return None
153157

0 commit comments

Comments
 (0)