Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/deno_sandbox/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def __init__(self, transport: WebSocketTransport):
self._pending_processes: Dict[int, asyncio.StreamReader] = {}
self.__loop: asyncio.AbstractEventLoop | None = None
self._signal_id = 0
self._stream_id = 0

@property
def _loop(self) -> asyncio.AbstractEventLoop:
Expand All @@ -61,6 +62,16 @@ def _loop(self) -> asyncio.AbstractEventLoop:
async def close(self):
await self._transport.close()

async def send_notification(self, method: str, params: dict[str, Any]) -> None:
"""Send a notification (no response expected)."""
payload = {"method": method, "params": params, "jsonrpc": "2.0"}
await self._transport.send(json.dumps(payload))

def next_stream_id(self) -> int:
"""Get next stream ID."""
self._stream_id += 1
return self._stream_id

async def call(self, method: str, params: Mapping[str, Any]) -> Any:
if self._listen_task is None or self._listen_task.done():
self._listen_task = self._loop.create_task(self._listener())
Expand Down
78 changes: 78 additions & 0 deletions src/deno_sandbox/sandbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,19 @@
import json
from typing import (
Any,
AsyncIterable,
AsyncIterator,
BinaryIO,
Iterable,
Optional,
TypedDict,
Union,
cast,
)
from typing_extensions import Literal, NotRequired, TypeAlias

from .stream import stream_data

from .api_generated import (
AsyncSandboxEnv,
AsyncSandboxFs as AsyncSandboxFsGenerated,
Expand All @@ -24,11 +30,13 @@
DenoRunOptions,
FsFileHandle,
FsOpenOptions,
ReadFileOptions,
SandboxListOptions,
SandboxCreateOptions,
SandboxConnectOptions,
SandboxMeta,
SpawnOptions,
WriteFileOptions,
)
from .bridge import AsyncBridge
from .console import (
Expand Down Expand Up @@ -575,6 +583,20 @@ def __exit__(self, exc_type, exc_val, exc_tb):
class AsyncSandboxFs(AsyncSandboxFsGenerated):
"""Filesystem operations inside the sandbox."""

async def read_file(
self, path: str, options: Optional[ReadFileOptions] = None
) -> bytes:
"""Reads the entire contents of a file as bytes."""

params: dict[str, Any] = {"path": path}
if options is not None:
params["options"] = convert_to_camel_case(options)

result = await self._rpc.call("readFile", params)

# Server returns base64-encoded data
return base64.b64decode(result)

async def create(self, path: str) -> AsyncFsFile:
"""Create a new, empty file at the specified path."""

Expand Down Expand Up @@ -602,10 +624,42 @@ async def open(

return AsyncFsFile(self._rpc, handle["file_handle_id"])

async def write_file(
self,
path: str,
data: Union[bytes, AsyncIterable[bytes], Iterable[bytes], BinaryIO],
options: Optional[WriteFileOptions] = None,
) -> None:
"""Write bytes to file. Accepts bytes, async/sync iterables, or file objects."""

if isinstance(data, bytes):
# Stream bytes as a single chunk
content_stream_id = await stream_data(self._rpc, iter([data]))
else:
# Stream data from iterable/file object
content_stream_id = await stream_data(self._rpc, data)

params: dict[str, Any] = {"path": path, "contentStreamId": content_stream_id}
if options is not None:
params["options"] = convert_to_camel_case(options)
await self._rpc.call("writeFile", params)


class SandboxFs(SandboxFsGenerated):
"""Filesystem operations inside the sandbox."""

def read_file(self, path: str, options: Optional[ReadFileOptions] = None) -> bytes:
"""Reads the entire contents of a file as bytes."""

params: dict[str, Any] = {"path": path}
if options is not None:
params["options"] = convert_to_camel_case(options)

result = self._rpc.call("readFile", params)

# Server returns base64-encoded data
return base64.b64decode(result)

def create(self, path: str) -> FsFile:
"""Create a new, empty file at the specified path."""

Expand All @@ -631,6 +685,30 @@ def open(self, path: str, options: Optional[FsOpenOptions] = None) -> FsFile:

return FsFile(self._rpc, handle["file_handle_id"])

def write_file(
self,
path: str,
data: Union[bytes, Iterable[bytes], BinaryIO],
options: Optional[WriteFileOptions] = None,
) -> None:
"""Write bytes to file. Accepts bytes, sync iterables, or file objects."""

if isinstance(data, bytes):
# Stream bytes as a single chunk
content_stream_id = self._client._bridge.run(
stream_data(self._rpc._async_client, iter([data]))
)
else:
# Stream data from iterable/file object
content_stream_id = self._client._bridge.run(
stream_data(self._rpc._async_client, data)
)

params: dict[str, Any] = {"path": path, "contentStreamId": content_stream_id}
if options is not None:
params["options"] = convert_to_camel_case(options)
self._rpc.call("writeFile", params)


class AsyncVsCode:
"""Experimental! A VSCode instance running inside the sandbox."""
Expand Down
93 changes: 93 additions & 0 deletions src/deno_sandbox/stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
from __future__ import annotations

import base64
from typing import TYPE_CHECKING, AsyncIterable, BinaryIO, Iterable, Union

if TYPE_CHECKING:
from .rpc import AsyncRpcClient

Streamable = Union[AsyncIterable[bytes], Iterable[bytes], BinaryIO]


class AsyncStreamWriter:
"""Manages writing a stream to the server."""

def __init__(self, rpc: AsyncRpcClient, stream_id: int):
self._rpc = rpc
self._stream_id = stream_id

async def start(self) -> None:
"""Send $sandbox.stream.start message."""
await self._rpc.send_notification(
"$sandbox.stream.start", {"streamId": self._stream_id}
)

async def enqueue(self, data: bytes) -> None:
"""Send $sandbox.stream.enqueue message with base64-encoded data."""
await self._rpc.send_notification(
"$sandbox.stream.enqueue",
{
"streamId": self._stream_id,
"data": base64.b64encode(data).decode("ascii"),
},
)

async def end(self) -> None:
"""Send $sandbox.stream.end message."""
await self._rpc.send_notification(
"$sandbox.stream.end", {"streamId": self._stream_id}
)

async def error(self, message: str) -> None:
"""Send $sandbox.stream.error message."""
await self._rpc.send_notification(
"$sandbox.stream.error", {"streamId": self._stream_id, "error": message}
)


async def stream_data(
rpc: AsyncRpcClient, data: Streamable, chunk_size: int = 64 * 1024
) -> int:
"""
Stream data to server. Returns stream_id.

Sends: start → enqueue(s) → end
On error: sends error message then re-raises
"""
stream_id = rpc.next_stream_id()
writer = AsyncStreamWriter(rpc, stream_id)

try:
await writer.start()

if hasattr(data, "read"):
# File-like object
while True:
chunk = data.read(chunk_size) # type: ignore[union-attr]
if not chunk:
break
await writer.enqueue(chunk)
elif hasattr(data, "__aiter__"):
# Async iterable
async for chunk in data: # type: ignore[union-attr]
await writer.enqueue(chunk)
else:
# Sync iterable
for chunk in data: # type: ignore[union-attr]
await writer.enqueue(chunk)

await writer.end()
return stream_id

except Exception as e:
await writer.error(str(e))
raise


def is_streamable(obj: object) -> bool:
"""Check if object is streamable (not bytes)."""
return (
hasattr(obj, "read")
or hasattr(obj, "__aiter__")
or (hasattr(obj, "__iter__") and not isinstance(obj, (bytes, str, dict, list)))
)
Loading