@@ -56,6 +56,12 @@ class YDocWebSocketHandler(WebSocketHandler, JupyterHandler):
56
56
57
57
_message_queue : asyncio .Queue [Any ]
58
58
_background_tasks : set [asyncio .Task ]
59
+ _room_locks : dict [str , asyncio .Lock ] = {}
60
+
61
+ def _room_lock (self , room_id : str ) -> asyncio .Lock :
62
+ if room_id not in self ._room_locks :
63
+ self ._room_locks [room_id ] = asyncio .Lock ()
64
+ return self ._room_locks [room_id ]
59
65
60
66
def create_task (self , aw ):
61
67
task = asyncio .create_task (aw )
@@ -70,38 +76,38 @@ async def prepare(self):
70
76
# Get room
71
77
self ._room_id : str = self .request .path .split ("/" )[- 1 ]
72
78
73
- if self ._websocket_server .room_exists (self ._room_id ):
74
- self .room : YRoom = await self ._websocket_server .get_room (self ._room_id )
75
-
76
- else :
77
- if self ._room_id .count (":" ) >= 2 :
78
- # DocumentRoom
79
- file_format , file_type , file_id = decode_file_path (self ._room_id )
80
- if file_id in self ._file_loaders :
81
- self ._emit (
82
- LogLevel .WARNING ,
83
- None ,
84
- "There is another collaborative session accessing the same file.\n The synchronization between rooms is not supported and you might lose some of your changes." ,
79
+ async with self ._room_lock (self ._room_id ):
80
+ if self ._websocket_server .room_exists (self ._room_id ):
81
+ self .room : YRoom = await self ._websocket_server .get_room (self ._room_id )
82
+ else :
83
+ if self ._room_id .count (":" ) >= 2 :
84
+ # DocumentRoom
85
+ file_format , file_type , file_id = decode_file_path (self ._room_id )
86
+ if file_id in self ._file_loaders :
87
+ self ._emit (
88
+ LogLevel .WARNING ,
89
+ None ,
90
+ "There is another collaborative session accessing the same file.\n The synchronization between rooms is not supported and you might lose some of your changes." ,
91
+ )
92
+
93
+ file = self ._file_loaders [file_id ]
94
+ updates_file_path = f".{ file_type } :{ file_id } .y"
95
+ ystore = self ._ystore_class (path = updates_file_path , log = self .log )
96
+ self .room = DocumentRoom (
97
+ self ._room_id ,
98
+ file_format ,
99
+ file_type ,
100
+ file ,
101
+ self .event_logger ,
102
+ ystore ,
103
+ self .log ,
104
+ self ._document_save_delay ,
85
105
)
86
106
87
- file = self ._file_loaders [file_id ]
88
- updates_file_path = f".{ file_type } :{ file_id } .y"
89
- ystore = self ._ystore_class (path = updates_file_path , log = self .log )
90
- self .room = DocumentRoom (
91
- self ._room_id ,
92
- file_format ,
93
- file_type ,
94
- file ,
95
- self .event_logger ,
96
- ystore ,
97
- self .log ,
98
- self ._document_save_delay ,
99
- )
100
-
101
- else :
102
- # TransientRoom
103
- # it is a transient document (e.g. awareness)
104
- self .room = TransientRoom (self ._room_id , self .log )
107
+ else :
108
+ # TransientRoom
109
+ # it is a transient document (e.g. awareness)
110
+ self .room = TransientRoom (self ._room_id , self .log )
105
111
106
112
await self ._websocket_server .start_room (self .room )
107
113
self ._websocket_server .add_room (self ._room_id , self .room )
@@ -184,7 +190,8 @@ async def open(self, room_id):
184
190
185
191
try :
186
192
# Initialize the room
187
- await self .room .initialize ()
193
+ async with self ._room_lock (self ._room_id ):
194
+ await self .room .initialize ()
188
195
self ._emit_awareness_event (self .current_user .username , "join" )
189
196
except Exception as e :
190
197
_ , _ , file_id = decode_file_path (self ._room_id )
@@ -323,29 +330,31 @@ async def _clean_room(self) -> None:
323
330
contains a copy of the document. In addition, we remove the file if there is no rooms
324
331
subscribed to it.
325
332
"""
326
- assert isinstance (self .room , DocumentRoom )
327
-
328
- if self ._cleanup_delay is None :
329
- return
330
-
331
- await asyncio .sleep (self ._cleanup_delay )
332
-
333
- # Remove the room from the websocket server
334
- self .log .info ("Deleting Y document from memory: %s" , self .room .room_id )
335
- self ._websocket_server .delete_room (room = self .room )
336
-
337
- # Clean room
338
- del self .room
339
- self .log .info ("Room %s deleted" , self ._room_id )
340
- self ._emit (LogLevel .INFO , "clean" , "Room deleted." )
341
-
342
- # Clean the file loader if there are not rooms using it
343
- _ , _ , file_id = decode_file_path (self ._room_id )
344
- file = self ._file_loaders [file_id ]
345
- if file .number_of_subscriptions == 0 :
346
- self .log .info ("Deleting file %s" , file .path )
347
- await self ._file_loaders .remove (file_id )
348
- self ._emit (LogLevel .INFO , "clean" , "Loader deleted." )
333
+ async with self ._room_lock (self ._room_id ):
334
+ assert isinstance (self .room , DocumentRoom )
335
+
336
+ if self ._cleanup_delay is None :
337
+ return
338
+
339
+ await asyncio .sleep (self ._cleanup_delay )
340
+
341
+ # Remove the room from the websocket server
342
+ self .log .info ("Deleting Y document from memory: %s" , self .room .room_id )
343
+ self ._websocket_server .delete_room (room = self .room )
344
+
345
+ # Clean room
346
+ del self .room
347
+ self .log .info ("Room %s deleted" , self ._room_id )
348
+ self ._emit (LogLevel .INFO , "clean" , "Room deleted." )
349
+
350
+ # Clean the file loader if there are not rooms using it
351
+ _ , _ , file_id = decode_file_path (self ._room_id )
352
+ file = self ._file_loaders [file_id ]
353
+ if file .number_of_subscriptions == 0 :
354
+ self .log .info ("Deleting file %s" , file .path )
355
+ await self ._file_loaders .remove (file_id )
356
+ self ._emit (LogLevel .INFO , "clean" , "Loader deleted." )
357
+ del self ._room_locks [self ._room_id ]
349
358
350
359
def check_origin (self , origin ):
351
360
"""
0 commit comments