Skip to content

Commit ef8bc5f

Browse files
committed
Add test for concurrent room initialization
1 parent 3a983e6 commit ef8bc5f

File tree

4 files changed

+170
-106
lines changed

4 files changed

+170
-106
lines changed

jupyter_collaboration/handlers.py

+63-54
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,12 @@ class YDocWebSocketHandler(WebSocketHandler, JupyterHandler):
5656

5757
_message_queue: asyncio.Queue[Any]
5858
_background_tasks: set[asyncio.Task]
59+
_room_locks: dict[str, asyncio.Lock] = {}
60+
61+
def _room_lock(self, room_id: str) -> asyncio.Lock:
62+
if room_id not in self._room_locks:
63+
self._room_locks[room_id] = asyncio.Lock()
64+
return self._room_locks[room_id]
5965

6066
def create_task(self, aw):
6167
task = asyncio.create_task(aw)
@@ -70,38 +76,38 @@ async def prepare(self):
7076
# Get room
7177
self._room_id: str = self.request.path.split("/")[-1]
7278

73-
if self._websocket_server.room_exists(self._room_id):
74-
self.room: YRoom = await self._websocket_server.get_room(self._room_id)
75-
76-
else:
77-
if self._room_id.count(":") >= 2:
78-
# DocumentRoom
79-
file_format, file_type, file_id = decode_file_path(self._room_id)
80-
if file_id in self._file_loaders:
81-
self._emit(
82-
LogLevel.WARNING,
83-
None,
84-
"There is another collaborative session accessing the same file.\nThe synchronization between rooms is not supported and you might lose some of your changes.",
79+
async with self._room_lock(self._room_id):
80+
if self._websocket_server.room_exists(self._room_id):
81+
self.room: YRoom = await self._websocket_server.get_room(self._room_id)
82+
else:
83+
if self._room_id.count(":") >= 2:
84+
# DocumentRoom
85+
file_format, file_type, file_id = decode_file_path(self._room_id)
86+
if file_id in self._file_loaders:
87+
self._emit(
88+
LogLevel.WARNING,
89+
None,
90+
"There is another collaborative session accessing the same file.\nThe synchronization between rooms is not supported and you might lose some of your changes.",
91+
)
92+
93+
file = self._file_loaders[file_id]
94+
updates_file_path = f".{file_type}:{file_id}.y"
95+
ystore = self._ystore_class(path=updates_file_path, log=self.log)
96+
self.room = DocumentRoom(
97+
self._room_id,
98+
file_format,
99+
file_type,
100+
file,
101+
self.event_logger,
102+
ystore,
103+
self.log,
104+
self._document_save_delay,
85105
)
86106

87-
file = self._file_loaders[file_id]
88-
updates_file_path = f".{file_type}:{file_id}.y"
89-
ystore = self._ystore_class(path=updates_file_path, log=self.log)
90-
self.room = DocumentRoom(
91-
self._room_id,
92-
file_format,
93-
file_type,
94-
file,
95-
self.event_logger,
96-
ystore,
97-
self.log,
98-
self._document_save_delay,
99-
)
100-
101-
else:
102-
# TransientRoom
103-
# it is a transient document (e.g. awareness)
104-
self.room = TransientRoom(self._room_id, self.log)
107+
else:
108+
# TransientRoom
109+
# it is a transient document (e.g. awareness)
110+
self.room = TransientRoom(self._room_id, self.log)
105111

106112
await self._websocket_server.start_room(self.room)
107113
self._websocket_server.add_room(self._room_id, self.room)
@@ -184,7 +190,8 @@ async def open(self, room_id):
184190

185191
try:
186192
# Initialize the room
187-
await self.room.initialize()
193+
async with self._room_lock(self._room_id):
194+
await self.room.initialize()
188195
self._emit_awareness_event(self.current_user.username, "join")
189196
except Exception as e:
190197
_, _, file_id = decode_file_path(self._room_id)
@@ -323,29 +330,31 @@ async def _clean_room(self) -> None:
323330
contains a copy of the document. In addition, we remove the file if there is no rooms
324331
subscribed to it.
325332
"""
326-
assert isinstance(self.room, DocumentRoom)
327-
328-
if self._cleanup_delay is None:
329-
return
330-
331-
await asyncio.sleep(self._cleanup_delay)
332-
333-
# Remove the room from the websocket server
334-
self.log.info("Deleting Y document from memory: %s", self.room.room_id)
335-
self._websocket_server.delete_room(room=self.room)
336-
337-
# Clean room
338-
del self.room
339-
self.log.info("Room %s deleted", self._room_id)
340-
self._emit(LogLevel.INFO, "clean", "Room deleted.")
341-
342-
# Clean the file loader if there are not rooms using it
343-
_, _, file_id = decode_file_path(self._room_id)
344-
file = self._file_loaders[file_id]
345-
if file.number_of_subscriptions == 0:
346-
self.log.info("Deleting file %s", file.path)
347-
await self._file_loaders.remove(file_id)
348-
self._emit(LogLevel.INFO, "clean", "Loader deleted.")
333+
async with self._room_lock(self._room_id):
334+
assert isinstance(self.room, DocumentRoom)
335+
336+
if self._cleanup_delay is None:
337+
return
338+
339+
await asyncio.sleep(self._cleanup_delay)
340+
341+
# Remove the room from the websocket server
342+
self.log.info("Deleting Y document from memory: %s", self.room.room_id)
343+
self._websocket_server.delete_room(room=self.room)
344+
345+
# Clean room
346+
del self.room
347+
self.log.info("Room %s deleted", self._room_id)
348+
self._emit(LogLevel.INFO, "clean", "Room deleted.")
349+
350+
# Clean the file loader if there are not rooms using it
351+
_, _, file_id = decode_file_path(self._room_id)
352+
file = self._file_loaders[file_id]
353+
if file.number_of_subscriptions == 0:
354+
self.log.info("Deleting file %s", file.path)
355+
await self._file_loaders.remove(file_id)
356+
self._emit(LogLevel.INFO, "clean", "Loader deleted.")
357+
del self._room_locks[self._room_id]
349358

350359
def check_origin(self, origin):
351360
"""

jupyter_collaboration/rooms.py

+50-52
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ def __init__(
4444
self._save_delay = save_delay
4545

4646
self._update_lock = asyncio.Lock()
47-
self._initialization_lock = asyncio.Lock()
4847
self._cleaner: asyncio.Task | None = None
4948
self._saving_document: asyncio.Task | None = None
5049
self._messages: dict[str, asyncio.Lock] = {}
@@ -89,64 +88,63 @@ async def initialize(self) -> None:
8988
It is important to set the ready property in the parent class (`self.ready = True`),
9089
this setter will subscribe for updates on the shared document.
9190
"""
92-
async with self._initialization_lock:
93-
if self.ready: # type: ignore[has-type]
94-
return
91+
if self.ready: # type: ignore[has-type]
92+
return
9593

96-
self.log.info("Initializing room %s", self._room_id)
94+
self.log.info("Initializing room %s", self._room_id)
9795

98-
model = await self._file.load_content(self._file_format, self._file_type)
96+
model = await self._file.load_content(self._file_format, self._file_type)
9997

100-
async with self._update_lock:
101-
# try to apply Y updates from the YStore for this document
102-
read_from_source = True
103-
if self.ystore is not None:
104-
try:
105-
await self.ystore.apply_updates(self.ydoc)
106-
self._emit(
107-
LogLevel.INFO,
108-
"load",
109-
"Content loaded from the store {}".format(
110-
self.ystore.__class__.__qualname__
111-
),
112-
)
113-
self.log.info(
114-
"Content in room %s loaded from the ystore %s",
115-
self._room_id,
116-
self.ystore.__class__.__name__,
117-
)
118-
read_from_source = False
119-
except YDocNotFound:
120-
# YDoc not found in the YStore, create the document from the source file (no change history)
121-
pass
122-
123-
if not read_from_source:
124-
# if YStore updates and source file are out-of-sync, resync updates with source
125-
if self._document.source != model["content"]:
126-
# TODO: Delete document from the store.
127-
self._emit(
128-
LogLevel.INFO, "initialize", "The file is out-of-sync with the ystore."
129-
)
130-
self.log.info(
131-
"Content in file %s is out-of-sync with the ystore %s",
132-
self._file.path,
133-
self.ystore.__class__.__name__,
134-
)
135-
read_from_source = True
136-
137-
if read_from_source:
138-
self._emit(LogLevel.INFO, "load", "Content loaded from disk.")
98+
async with self._update_lock:
99+
# try to apply Y updates from the YStore for this document
100+
read_from_source = True
101+
if self.ystore is not None:
102+
try:
103+
await self.ystore.apply_updates(self.ydoc)
104+
self._emit(
105+
LogLevel.INFO,
106+
"load",
107+
"Content loaded from the store {}".format(
108+
self.ystore.__class__.__qualname__
109+
),
110+
)
111+
self.log.info(
112+
"Content in room %s loaded from the ystore %s",
113+
self._room_id,
114+
self.ystore.__class__.__name__,
115+
)
116+
read_from_source = False
117+
except YDocNotFound:
118+
# YDoc not found in the YStore, create the document from the source file (no change history)
119+
pass
120+
121+
if not read_from_source:
122+
# if YStore updates and source file are out-of-sync, resync updates with source
123+
if self._document.source != model["content"]:
124+
# TODO: Delete document from the store.
125+
self._emit(
126+
LogLevel.INFO, "initialize", "The file is out-of-sync with the ystore."
127+
)
139128
self.log.info(
140-
"Content in room %s loaded from file %s", self._room_id, self._file.path
129+
"Content in file %s is out-of-sync with the ystore %s",
130+
self._file.path,
131+
self.ystore.__class__.__name__,
141132
)
142-
self._document.source = model["content"]
133+
read_from_source = True
134+
135+
if read_from_source:
136+
self._emit(LogLevel.INFO, "load", "Content loaded from disk.")
137+
self.log.info(
138+
"Content in room %s loaded from file %s", self._room_id, self._file.path
139+
)
140+
self._document.source = model["content"]
143141

144-
if self.ystore:
145-
await self.ystore.encode_state_as_update(self.ydoc)
142+
if self.ystore:
143+
await self.ystore.encode_state_as_update(self.ydoc)
146144

147-
self._document.dirty = False
148-
self.ready = True
149-
self._emit(LogLevel.INFO, "initialize", "Room initialized")
145+
self._document.dirty = False
146+
self.ready = True
147+
self._emit(LogLevel.INFO, "initialize", "Room initialized")
150148

151149
def _emit(self, level: LogLevel, action: str | None = None, msg: str | None = None) -> None:
152150
data = {"level": level.value, "room": self._room_id, "path": self._file.path}

tests/test_documents.py

+22
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
# Copyright (c) Jupyter Development Team.
2+
# Distributed under the terms of the Modified BSD License.
3+
4+
from anyio import create_task_group
5+
6+
7+
async def test_room_concurrent_initialization(
8+
rtc_create_file,
9+
rtc_connect_doc_client,
10+
):
11+
file_format = "text"
12+
file_type = "file"
13+
file_path = "dummy.txt"
14+
await rtc_create_file(file_path)
15+
16+
async def connect(file_format, file_type, file_path):
17+
async with await rtc_connect_doc_client(file_format, file_type, file_path) as ws:
18+
pass
19+
20+
async with create_task_group() as tg:
21+
tg.start_soon(connect, file_format, file_type, file_path)
22+
tg.start_soon(connect, file_format, file_type, file_path)

tests/test_random.py

+35
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
# Copyright (c) Jupyter Development Team.
2+
# Distributed under the terms of the Modified BSD License.
3+
4+
import sys
5+
6+
if sys.version_info < (3, 10):
7+
from importlib_metadata import entry_points
8+
else:
9+
from importlib.metadata import entry_points
10+
11+
from anyio import create_task_group, sleep
12+
from pycrdt_websocket import WebsocketProvider
13+
14+
jupyter_ydocs = {ep.name: ep.load() for ep in entry_points(group="jupyter_ydoc")}
15+
16+
17+
async def test_room_concurrent_initialization(
18+
rtc_create_file,
19+
rtc_connect_doc_client,
20+
):
21+
file_format = "json"
22+
file_type = "notebook"
23+
file_path = "Untitled.ipynb"
24+
await rtc_create_file(file_path)
25+
jupyter_ydoc = jupyter_ydocs[file_type]()
26+
27+
async def connect(file_format, file_type, file_path, jupyter_ydoc):
28+
async with await rtc_connect_doc_client(file_format, file_type, file_path) as ws:
29+
async with WebsocketProvider(jupyter_ydoc.ydoc, ws):
30+
await sleep(0.1)
31+
32+
partial(connect, file_format, file_type, file_path, jupyter_ydoc)
33+
async with create_task_group() as tg:
34+
tg.start_soon(connect, file_format, file_type, file_path, jupyter_ydoc)
35+
tg.start_soon(connect, file_format, file_type, file_path, jupyter_ydoc)

0 commit comments

Comments
 (0)