Skip to content

Commit dbabe1b

Browse files
fix: prevent write_file from failing when streaming data. (#42)
We need to send writeFile data concurrently with rpc call, like in the JS API
1 parent a08157c commit dbabe1b

File tree

2 files changed

+41
-7
lines changed

2 files changed

+41
-7
lines changed

src/deno_sandbox/fs.py

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
from re import Pattern
1919

2020
from .process import AbortSignal
21-
from .stream import stream_data
21+
from .stream import complete_stream, start_stream
2222
from .utils import convert_to_camel_case, convert_to_snake_case
2323

2424
if TYPE_CHECKING:
@@ -317,13 +317,13 @@ async def write_file(
317317
mode: Set the file permission mode.
318318
"""
319319
if isinstance(data, bytes):
320-
# Stream bytes as a single chunk
321-
content_stream_id = await stream_data(self._rpc, iter([data]))
320+
streamable = iter([data])
322321
else:
323-
# Stream data from iterable/file object
324-
content_stream_id = await stream_data(self._rpc, data)
322+
streamable = data
325323

326-
params: dict[str, Any] = {"path": path, "contentStreamId": content_stream_id}
324+
stream_id, writer = await start_stream(self._rpc)
325+
326+
params: dict[str, Any] = {"path": path, "contentStreamId": stream_id}
327327
options: dict[str, Any] = {}
328328
if create is not None:
329329
options["create"] = create
@@ -335,7 +335,20 @@ async def write_file(
335335
options["mode"] = mode
336336
if options:
337337
params["options"] = convert_to_camel_case(options)
338-
await self._rpc.call("writeFile", params)
338+
339+
# Send data concurrently with the RPC call to avoid deadlock:
340+
# the server waits for stream data before responding to writeFile.
341+
async def _send() -> None:
342+
try:
343+
await complete_stream(writer, streamable)
344+
except Exception as e:
345+
await writer.error(str(e))
346+
347+
task = self._rpc._loop.create_task(_send())
348+
try:
349+
await self._rpc.call("writeFile", params)
350+
finally:
351+
await task
339352

340353
async def read_text_file(
341354
self,

tests/test_stream.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import asyncio
12
import io
23
import os
34
import tempfile
@@ -165,6 +166,26 @@ def large_chunks():
165166
assert content == b"x" * 102400
166167

167168

169+
@pytest.mark.asyncio(loop_scope="session")
170+
async def test_write_file_slow_async_generator(async_shared_sandbox):
171+
"""Test write_file with a slow async generator that yields chunks with delays.
172+
173+
This verifies that stream data is sent concurrently with the RPC call.
174+
With incorrect ordering (all data sent before RPC), this would deadlock
175+
because the server waits for stream data before responding to writeFile.
176+
"""
177+
sb = async_shared_sandbox
178+
179+
async def slow_chunks():
180+
for i in range(5):
181+
await asyncio.sleep(0.05)
182+
yield f"chunk{i} ".encode()
183+
184+
await sb.fs.write_file("test_slow_async.txt", slow_chunks())
185+
content = await sb.fs.read_file("test_slow_async.txt")
186+
assert content == b"chunk0 chunk1 chunk2 chunk3 chunk4 "
187+
188+
168189
# stdin streaming tests for spawn
169190

170191

0 commit comments

Comments
 (0)