|
| 1 | +# Copyright (c) Jupyter Development Team. |
| 2 | +# Distributed under the terms of the Modified BSD License. |
| 3 | + |
| 4 | +import sys |
| 5 | +from collections import Counter |
| 6 | +from functools import partial |
| 7 | +from random import randrange, uniform |
| 8 | + |
| 9 | +if sys.version_info < (3, 10): |
| 10 | + from importlib_metadata import entry_points |
| 11 | +else: |
| 12 | + from importlib.metadata import entry_points |
| 13 | + |
| 14 | +from anyio import TASK_STATUS_IGNORED, Event, create_task_group, sleep |
| 15 | +from anyio.abc import TaskStatus |
| 16 | +from pycrdt import Text |
| 17 | +from pycrdt_websocket import WebsocketProvider |
| 18 | + |
| 19 | +jupyter_ydocs = {ep.name: ep.load() for ep in entry_points(group="jupyter_ydoc")} |
| 20 | + |
| 21 | + |
| 22 | +async def test_random( |
| 23 | + rtc_create_file, |
| 24 | + rtc_connect_doc_client, |
| 25 | +): |
| 26 | + test_duration = 10 |
| 27 | + client_nb = 10 |
| 28 | + change_max_delay = 0.5 |
| 29 | + |
| 30 | + file_format = "text" |
| 31 | + file_type = "file" |
| 32 | + file_path = "untitled.txt" |
| 33 | + await rtc_create_file(file_path) |
| 34 | + ref_ydoc = jupyter_ydocs[file_type]() |
| 35 | + ref_ytext = ref_ydoc.ydoc.get("source", type=Text) |
| 36 | + |
| 37 | + async def connect( |
| 38 | + file_format: str, |
| 39 | + file_type: str, |
| 40 | + file_path: str, |
| 41 | + ref_ytext: Text, |
| 42 | + stop_request: Event, |
| 43 | + do_stop: Event, |
| 44 | + *, |
| 45 | + task_status: TaskStatus[None] = TASK_STATUS_IGNORED, |
| 46 | + ) -> None: |
| 47 | + await sleep(uniform(0, 1)) |
| 48 | + async with await rtc_connect_doc_client(file_format, file_type, file_path) as ws: |
| 49 | + jupyter_ydoc = jupyter_ydocs[file_type]() |
| 50 | + ydoc = jupyter_ydoc.ydoc |
| 51 | + ytext = ydoc.get("source", type=Text) |
| 52 | + stop_ready = Event() |
| 53 | + stop_done = Event() |
| 54 | + task_status.started({"ytext": ytext, "stop_ready": stop_ready, "stop_done": stop_done}) |
| 55 | + async with WebsocketProvider(ydoc, ws): |
| 56 | + while True: |
| 57 | + if stop_request.is_set(): |
| 58 | + stop_ready.set() |
| 59 | + await do_stop.wait() |
| 60 | + # allow some time for last messages to arrive through websocket |
| 61 | + # FIXME: how long? |
| 62 | + await sleep(10) |
| 63 | + stop_done.set() |
| 64 | + return |
| 65 | + await sleep(uniform(0, change_max_delay)) |
| 66 | + length = len(ytext) |
| 67 | + index = 0 if length == 0 else randrange(length) |
| 68 | + character = chr(randrange(32, 127)) |
| 69 | + ytext.insert(index, character) |
| 70 | + ref_ytext.insert(index, character) |
| 71 | + |
| 72 | + stop_request = Event() |
| 73 | + do_stop = Event() |
| 74 | + connect = partial(connect, file_format, file_type, file_path, ref_ytext, stop_request, do_stop) |
| 75 | + try: |
| 76 | + async with create_task_group() as tg: |
| 77 | + clients = [await tg.start(connect) for i in range(client_nb)] |
| 78 | + await sleep(test_duration) |
| 79 | + stop_request.set() |
| 80 | + for client in clients: |
| 81 | + await client["stop_ready"].wait() |
| 82 | + do_stop.set() |
| 83 | + for client in clients: |
| 84 | + await client["stop_done"].wait() |
| 85 | + except Exception as e: |
| 86 | + print(f"{e=}") |
| 87 | + |
| 88 | + await sleep(1) |
| 89 | + ref_text = str(ref_ytext) |
| 90 | + text0 = str(clients[0]["ytext"]) |
| 91 | + # check that the first peer has all the changes of the reference |
| 92 | + # (but not necessarily in the same order) |
| 93 | + assert Counter(ref_text) == Counter(text0) |
| 94 | + # check that all peers have the same content |
| 95 | + for client in clients[1:]: |
| 96 | + text = str(client["ytext"]) |
| 97 | + assert text == text0 |
0 commit comments