|
21 | 21 | class DocumentRoom(YRoom):
|
22 | 22 | """A Y room for a possibly stored document (e.g. a notebook)."""
|
23 | 23 |
|
| 24 | + _background_tasks: set[asyncio.Task] |
| 25 | + |
24 | 26 | def __init__(
|
25 | 27 | self,
|
26 | 28 | room_id: str,
|
@@ -48,6 +50,7 @@ def __init__(
|
48 | 50 | self._cleaner: asyncio.Task | None = None
|
49 | 51 | self._saving_document: asyncio.Task | None = None
|
50 | 52 | self._messages: dict[str, asyncio.Lock] = {}
|
| 53 | + self._background_tasks = set() |
51 | 54 |
|
52 | 55 | # Listen for document changes
|
53 | 56 | self._document.observe(self._on_document_change)
|
@@ -100,6 +103,10 @@ async def initialize(self) -> None:
|
100 | 103 | # try to apply Y updates from the YStore for this document
|
101 | 104 | read_from_source = True
|
102 | 105 | if self.ystore is not None:
|
| 106 | + async with self.ystore.start_lock: |
| 107 | + if not self.ystore.started.is_set(): |
| 108 | + self.create_task(self.ystore.start()) |
| 109 | + await self.ystore.started.wait() |
103 | 110 | try:
|
104 | 111 | await self.ystore.apply_updates(self.ydoc)
|
105 | 112 | self._emit(
|
@@ -177,6 +184,11 @@ async def stop(self) -> None:
|
177 | 184 | self._document.unobserve()
|
178 | 185 | self._file.unobserve(self.room_id)
|
179 | 186 |
|
| 187 | + def create_task(self, aw): |
| 188 | + task = asyncio.create_task(aw) |
| 189 | + self._background_tasks.add(task) |
| 190 | + task.add_done_callback(self._background_tasks.discard) |
| 191 | + |
180 | 192 | async def _broadcast_updates(self):
|
181 | 193 | # FIXME should be upstreamed
|
182 | 194 | try:
|
|
0 commit comments