Skip to content

Commit 07214c3

Browse files
davidbrochartandrii-i
authored andcommitted
Fix concurrent room initialization (jupyterlab#255)
1 parent 75631be commit 07214c3

File tree

3 files changed

+120
-92
lines changed

3 files changed

+120
-92
lines changed

jupyter_collaboration/handlers.py

Lines changed: 58 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,12 @@ class YDocWebSocketHandler(WebSocketHandler, JupyterHandler):
5757
_serve_task: asyncio.Task | None
5858
_message_queue: asyncio.Queue[Any]
5959
_background_tasks: set[asyncio.Task]
60+
_room_locks: dict[str, asyncio.Lock] = {}
61+
62+
def _room_lock(self, room_id: str) -> asyncio.Lock:
63+
if room_id not in self._room_locks:
64+
self._room_locks[room_id] = asyncio.Lock()
65+
return self._room_locks[room_id]
6066

6167
def create_task(self, aw):
6268
task = asyncio.create_task(aw)
@@ -71,38 +77,38 @@ async def prepare(self):
7177
# Get room
7278
self._room_id: str = self.request.path.split("/")[-1]
7379

74-
if self._websocket_server.room_exists(self._room_id):
75-
self.room: YRoom = await self._websocket_server.get_room(self._room_id)
76-
77-
else:
78-
if self._room_id.count(":") >= 2:
79-
# DocumentRoom
80-
file_format, file_type, file_id = decode_file_path(self._room_id)
81-
if file_id in self._file_loaders:
82-
self._emit(
83-
LogLevel.WARNING,
84-
None,
85-
"There is another collaborative session accessing the same file.\nThe synchronization between rooms is not supported and you might lose some of your changes.",
80+
async with self._room_lock(self._room_id):
81+
if self._websocket_server.room_exists(self._room_id):
82+
self.room: YRoom = await self._websocket_server.get_room(self._room_id)
83+
else:
84+
if self._room_id.count(":") >= 2:
85+
# DocumentRoom
86+
file_format, file_type, file_id = decode_file_path(self._room_id)
87+
if file_id in self._file_loaders:
88+
self._emit(
89+
LogLevel.WARNING,
90+
None,
91+
"There is another collaborative session accessing the same file.\nThe synchronization between rooms is not supported and you might lose some of your changes.",
92+
)
93+
94+
file = self._file_loaders[file_id]
95+
updates_file_path = f".{file_type}:{file_id}.y"
96+
ystore = self._ystore_class(path=updates_file_path, log=self.log)
97+
self.room = DocumentRoom(
98+
self._room_id,
99+
file_format,
100+
file_type,
101+
file,
102+
self.event_logger,
103+
ystore,
104+
self.log,
105+
self._document_save_delay,
86106
)
87107

88-
file = self._file_loaders[file_id]
89-
updates_file_path = f".{file_type}:{file_id}.y"
90-
ystore = self._ystore_class(path=updates_file_path, log=self.log)
91-
self.room = DocumentRoom(
92-
self._room_id,
93-
file_format,
94-
file_type,
95-
file,
96-
self.event_logger,
97-
ystore,
98-
self.log,
99-
self._document_save_delay,
100-
)
101-
102-
else:
103-
# TransientRoom
104-
# it is a transient document (e.g. awareness)
105-
self.room = TransientRoom(self._room_id, self.log)
108+
else:
109+
# TransientRoom
110+
# it is a transient document (e.g. awareness)
111+
self.room = TransientRoom(self._room_id, self.log)
106112

107113
await self._websocket_server.start_room(self.room)
108114
self._websocket_server.add_room(self._room_id, self.room)
@@ -191,7 +197,8 @@ async def open(self, room_id):
191197

192198
try:
193199
# Initialize the room
194-
await self.room.initialize()
200+
async with self._room_lock(self._room_id):
201+
await self.room.initialize()
195202
self._emit_awareness_event(self.current_user.username, "join")
196203
except Exception as e:
197204
_, _, file_id = decode_file_path(self._room_id)
@@ -319,29 +326,31 @@ async def _clean_room(self) -> None:
319326
contains a copy of the document. In addition, we remove the file if there is no rooms
320327
subscribed to it.
321328
"""
322-
assert isinstance(self.room, DocumentRoom)
329+
async with self._room_lock(self._room_id):
330+
assert isinstance(self.room, DocumentRoom)
323331

324-
if self._cleanup_delay is None:
325-
return
332+
if self._cleanup_delay is None:
333+
return
326334

327-
await asyncio.sleep(self._cleanup_delay)
335+
await asyncio.sleep(self._cleanup_delay)
328336

329-
# Remove the room from the websocket server
330-
self.log.info("Deleting Y document from memory: %s", self.room.room_id)
331-
self._websocket_server.delete_room(room=self.room)
337+
# Remove the room from the websocket server
338+
self.log.info("Deleting Y document from memory: %s", self.room.room_id)
339+
self._websocket_server.delete_room(room=self.room)
332340

333-
# Clean room
334-
del self.room
335-
self.log.info("Room %s deleted", self._room_id)
336-
self._emit(LogLevel.INFO, "clean", "Room deleted.")
341+
# Clean room
342+
del self.room
343+
self.log.info("Room %s deleted", self._room_id)
344+
self._emit(LogLevel.INFO, "clean", "Room deleted.")
337345

338-
# Clean the file loader if there are not rooms using it
339-
_, _, file_id = decode_file_path(self._room_id)
340-
file = self._file_loaders[file_id]
341-
if file.number_of_subscriptions == 0:
342-
self.log.info("Deleting file %s", file.path)
343-
await self._file_loaders.remove(file_id)
344-
self._emit(LogLevel.INFO, "clean", "Loader deleted.")
346+
# Clean the file loader if there are not rooms using it
347+
_, _, file_id = decode_file_path(self._room_id)
348+
file = self._file_loaders[file_id]
349+
if file.number_of_subscriptions == 0:
350+
self.log.info("Deleting file %s", file.path)
351+
await self._file_loaders.remove(file_id)
352+
self._emit(LogLevel.INFO, "clean", "Loader deleted.")
353+
del self._room_locks[self._room_id]
345354

346355
def check_origin(self, origin):
347356
"""

jupyter_collaboration/rooms/document.py

Lines changed: 39 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ def __init__(
4343
self._save_delay = save_delay
4444

4545
self._update_lock = asyncio.Lock()
46-
self._initialization_lock = asyncio.Lock()
46+
self._cleaner: asyncio.Task | None = None
4747
self._saving_document: asyncio.Task | None = None
4848
self._messages: dict[str, asyncio.Lock] = {}
4949

@@ -66,23 +66,19 @@ async def initialize(self) -> None:
6666
It is important to set the ready property in the parent class (`self.ready = True`),
6767
this setter will subscribe for updates on the shared document.
6868
"""
69-
async with self._initialization_lock:
70-
if self.ready:
71-
return
69+
if self.ready: # type: ignore[has-type]
70+
return
7271

73-
self.log.info("Initializing room %s", self._room_id)
72+
self.log.info("Initializing room %s", self._room_id)
7473

75-
model = await self._file.load_content(self._file_format, self._file_type)
74+
model = await self._file.load_content(self._file_format, self._file_type)
7675

77-
async with self._update_lock:
78-
# try to apply Y updates from the YStore for this document
79-
if self.ystore is not None and await self.ystore.exists(self._room_id):
80-
# Load the content from the store
81-
doc = await self.ystore.get(self._room_id)
82-
assert doc
83-
self._session_id = doc["session_id"]
84-
85-
await self.ystore.apply_updates(self._room_id, self.ydoc)
76+
async with self._update_lock:
77+
# try to apply Y updates from the YStore for this document
78+
read_from_source = True
79+
if self.ystore is not None:
80+
try:
81+
await self.ystore.apply_updates(self.ydoc)
8682
self._emit(
8783
LogLevel.INFO,
8884
"load",
@@ -95,37 +91,38 @@ async def initialize(self) -> None:
9591
self._room_id,
9692
self.ystore.__class__.__name__,
9793
)
98-
99-
# if YStore updates and source file are out-of-sync, resync updates with source
100-
if self._document.source != model["content"]:
101-
self._emit(
102-
LogLevel.INFO, "initialize", "The file is out-of-sync with the ystore."
103-
)
104-
self.log.info(
105-
"Content in file %s is out-of-sync with the ystore %s",
106-
self._file.path,
107-
self.ystore.__class__.__name__,
108-
)
109-
110-
# Update the content
111-
self._document.source = model["content"]
112-
await self.ystore.encode_state_as_update(self._room_id, self.ydoc)
113-
114-
else:
115-
self._emit(LogLevel.INFO, "load", "Content loaded from disk.")
94+
read_from_source = False
95+
except YDocNotFound:
96+
# YDoc not found in the YStore, create the document from the source file (no change history)
97+
pass
98+
99+
if not read_from_source:
100+
# if YStore updates and source file are out-of-sync, resync updates with source
101+
if self._document.source != model["content"]:
102+
# TODO: Delete document from the store.
103+
self._emit(
104+
LogLevel.INFO, "initialize", "The file is out-of-sync with the ystore."
105+
)
116106
self.log.info(
117-
"Content in room %s loaded from file %s", self._room_id, self._file.path
107+
"Content in file %s is out-of-sync with the ystore %s",
108+
self._file.path,
109+
self.ystore.__class__.__name__,
118110
)
119-
self._document.source = model["content"]
111+
read_from_source = True
112+
113+
if read_from_source:
114+
self._emit(LogLevel.INFO, "load", "Content loaded from disk.")
115+
self.log.info(
116+
"Content in room %s loaded from file %s", self._room_id, self._file.path
117+
)
118+
self._document.source = model["content"]
120119

121-
if self.ystore is not None:
122-
assert self.session_id
123-
await self.ystore.create(self._room_id, self.session_id)
124-
await self.ystore.encode_state_as_update(self._room_id, self.ydoc)
120+
if self.ystore:
121+
await self.ystore.encode_state_as_update(self.ydoc)
125122

126-
self._document.dirty = False
127-
self.ready = True
128-
self._emit(LogLevel.INFO, "initialize", "Room initialized")
123+
self._document.dirty = False
124+
self.ready = True
125+
self._emit(LogLevel.INFO, "initialize", "Room initialized")
129126

130127
def _emit(self, level: LogLevel, action: str | None = None, msg: str | None = None) -> None:
131128
data = {"level": level.value, "room": self._room_id, "path": self._file.path}

tests/test_documents.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,15 @@
22
# Distributed under the terms of the Modified BSD License.
33

44
import sys
5+
from time import time
56

67
if sys.version_info < (3, 10):
78
from importlib_metadata import entry_points
89
else:
910
from importlib.metadata import entry_points
1011

1112
import pytest
12-
from anyio import sleep
13+
from anyio import create_task_group, sleep
1314
from pycrdt_websocket import WebsocketProvider
1415

1516
jupyter_ydocs = {ep.name: ep.load() for ep in entry_points(group="jupyter_ydoc")}
@@ -37,3 +38,24 @@ async def test_dirty(
3738
jupyter_ydoc.dirty = True
3839
await sleep(rtc_document_save_delay * 1.5)
3940
assert not jupyter_ydoc.dirty
41+
42+
43+
async def test_room_concurrent_initialization(
44+
rtc_create_file,
45+
rtc_connect_doc_client,
46+
):
47+
file_format = "text"
48+
file_type = "file"
49+
file_path = "dummy.txt"
50+
await rtc_create_file(file_path)
51+
52+
async def connect(file_format, file_type, file_path):
53+
async with await rtc_connect_doc_client(file_format, file_type, file_path) as ws:
54+
pass
55+
56+
t0 = time()
57+
async with create_task_group() as tg:
58+
tg.start_soon(connect, file_format, file_type, file_path)
59+
tg.start_soon(connect, file_format, file_type, file_path)
60+
t1 = time()
61+
assert t1 - t0 < 0.5

0 commit comments

Comments
 (0)