Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/deno_sandbox/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
AsyncRevisions,
Revision,
RevisionListItem,
RevisionProgress,
ProgressStage,
ProgressStageStatus,
FileAsset,
SymlinkAsset,
Asset,
Expand Down Expand Up @@ -58,6 +61,9 @@
"RuntimeLogsResponse",
"Revision",
"RevisionListItem",
"RevisionProgress",
"ProgressStage",
"ProgressStageStatus",
"FileAsset",
"SymlinkAsset",
"Asset",
Expand Down
39 changes: 39 additions & 0 deletions src/deno_sandbox/console.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from __future__ import annotations

import json
from typing_extensions import NotRequired
from typing import (
Any,
AsyncIterator,
Generic,
Literal,
Optional,
Expand Down Expand Up @@ -226,6 +228,43 @@ async def get_paginated(

return AsyncPaginatedList(self, items, path, next_cursor, params)

async def stream_ndjson(self, path: str) -> AsyncIterator[dict]:
"""Stream NDJSON responses line by line.

Yields parsed JSON objects for each line in the response.
"""
req_url = self._options["console_url"].join(path)
headers = {
"Accept": "application/x-ndjson",
}
async with self.client.stream(
"GET", req_url, headers=headers, timeout=None
) as response:
if not response.is_success:
await response.aread()
code = "UNKNOWN_ERROR"
message = (
f"Request to {req_url} failed with status {response.status_code}"
)
trace_id = response.headers.get("x-deno-trace-id")
try:
body = response.json()
if (
isinstance(body, dict)
and isinstance(body.get("code"), str)
and isinstance(body.get("message"), str)
):
code = body["code"]
message = body["message"]
except Exception:
pass
raise HTTPStatusError(response.status_code, message, code, trace_id)

async for line in response.aiter_lines():
line = line.strip()
if line:
yield json.loads(line)

async def close(self) -> None:
await self.client.aclose()

Expand Down
83 changes: 82 additions & 1 deletion src/deno_sandbox/revisions.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,17 @@
from __future__ import annotations

import warnings
from typing import Any, Dict, List, TypedDict, Union, cast, overload
from typing import (
Any,
AsyncIterator,
Dict,
Iterator,
List,
TypedDict,
Union,
cast,
overload,
)
from typing_extensions import Literal, NotRequired, Optional

from deno_sandbox.apps import Config, EnvVar, LayerRef
Expand Down Expand Up @@ -89,6 +99,46 @@ class Revision(TypedDict):
"""ISO 8601 timestamp of deletion, or null if active."""


ProgressStageStatus = Literal[
"pending",
"running",
"succeeded",
"skipped",
"failed",
"timed_out",
"cancelled",
"errored",
]


class ProgressStage(TypedDict):
status: ProgressStageStatus
"""The current status of this stage."""

start: NotRequired[str | None]
"""ISO 8601 timestamp when the stage started, or null."""

end: NotRequired[str | None]
"""ISO 8601 timestamp when the stage ended, or null."""


class RevisionProgress(TypedDict):
queued: NotRequired[ProgressStage]
"""Queue stage status."""

preparing: NotRequired[ProgressStage]
"""Preparation stage status."""

installing: NotRequired[ProgressStage]
"""Dependency installation stage status."""

building: NotRequired[ProgressStage]
"""Build command execution stage status."""

deploying: NotRequired[ProgressStage]
"""Artifact upload and routing stage status."""


# Keep old name as alias for backward compatibility
RevisionWithoutTimelines = RevisionListItem

Expand Down Expand Up @@ -173,6 +223,21 @@ async def cancel(self, revision: str) -> Revision:
result = await self._client.post(f"/api/v2/revisions/{revision}/cancel", {})
return cast(Revision, convert_to_snake_case(result))

async def progress(self, revision: str) -> AsyncIterator[RevisionProgress]:
"""Stream revision build progress.

Yields RevisionProgress events as the revision progresses through
its build stages. The stream ends when the revision reaches a
terminal state (succeeded, failed, or skipped).

Args:
revision: The revision ID.
"""
async for event in self._client.stream_ndjson(
f"/api/v2/revisions/{revision}/progress"
):
yield cast(RevisionProgress, convert_to_snake_case(event))

async def deploy(
self,
app: str,
Expand Down Expand Up @@ -268,6 +333,22 @@ def cancel(self, revision: str) -> Revision:
"""
return self._bridge.run(self._async.cancel(revision))

def progress(self, revision: str) -> Iterator[RevisionProgress]:
"""Stream revision build progress.

Yields RevisionProgress events as the revision progresses through
its build stages. The stream ends when the revision reaches a
terminal state (succeeded, failed, or skipped).

Args:
revision: The revision ID.
"""

async def _collect() -> list[RevisionProgress]:
return [event async for event in self._async.progress(revision)]

return iter(self._bridge.run(_collect()))

def deploy(
self,
app: str,
Expand Down
92 changes: 92 additions & 0 deletions tests/test_revisions.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,3 +241,95 @@ async def test_revisions_deploy_preview_only_async():
assert len(preview) > 0, "should be on preview timeline"
finally:
await sdk.apps.delete(app["id"])


VALID_STAGE_STATUSES = {
"pending",
"running",
"succeeded",
"skipped",
"failed",
"timed_out",
"cancelled",
"errored",
}


def _assert_progress_events(events: list[dict]) -> None:
"""Shared assertions for progress event lists."""
assert len(events) > 0, "Expected at least one progress event"

for event in events:
assert isinstance(event, dict)
# Each event should have at least one known stage key
stage_keys = {"queued", "preparing", "installing", "building", "deploying"}
found_keys = stage_keys & event.keys()
assert len(found_keys) > 0, f"No known stage keys in event: {event}"

for key in found_keys:
stage = event[key]
assert "status" in stage, f"Stage {key} missing 'status'"
assert stage["status"] in VALID_STAGE_STATUSES, (
f"Stage {key} has unexpected status: {stage['status']}"
)

# The last event should have at least one stage in a terminal state
last = events[-1]
terminal_statuses = {"succeeded", "failed", "skipped"}
has_terminal = any(
last.get(k, {}).get("status") in terminal_statuses
for k in ("queued", "preparing", "installing", "building", "deploying")
if k in last
)
assert has_terminal, f"Last event has no terminal stage: {last}"


@pytest.mark.timeout(120)
@pytest.mark.asyncio(loop_scope="session")
async def test_revisions_progress_async():
"""Deploy a revision and stream progress until terminal state."""
sdk = AsyncDenoDeploy()
app = await sdk.apps.create()
try:
revision = await sdk.revisions.deploy(
app["id"],
assets={
"main.ts": {
"kind": "file",
"encoding": "utf-8",
"content": 'Deno.serve(() => new Response("Hello"))',
}
},
)

events = []
async for event in sdk.revisions.progress(revision["id"]):
events.append(event)

_assert_progress_events(events)
finally:
await sdk.apps.delete(app["id"])


@pytest.mark.timeout(120)
def test_revisions_progress_sync():
"""Deploy a revision and stream progress until terminal state (sync)."""
sdk = DenoDeploy()
app = sdk.apps.create()
try:
revision = sdk.revisions.deploy(
app["id"],
assets={
"main.ts": {
"kind": "file",
"encoding": "utf-8",
"content": 'Deno.serve(() => new Response("Hello"))',
}
},
)

events = list(sdk.revisions.progress(revision["id"]))

_assert_progress_events(events)
finally:
sdk.apps.delete(app["id"])