Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions .github/workflows/pr-test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
name: PR Test

on:
pull_request:
branches: [main]

jobs:
pr-test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4

- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: "3.12"

- name: Install package and test deps
run: |
python -m pip install --upgrade pip
python -m pip install -e .
python -m pip install pytest

- name: Run CPU-only tests
run: pytest tests/unit -v
4 changes: 4 additions & 0 deletions development.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@ Run tests:

```bash
pip install pytest
# CPU-only tests (unit + fake e2e)
pytest tests/unit -v

# Real E2E tests (GPU required, longer runtime)
pytest tests/e2e/test_e2e_sglang.py -v -s
```

## Benchmark Scripts
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,4 @@ package-dir = { "" = "src" }
where = ["src"]

[tool.pytest.ini_options]
testpaths = ["tests/unit"]
testpaths = ["tests/unit", "tests/e2e"]
10 changes: 0 additions & 10 deletions src/sglang_diffusion_routing/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from __future__ import annotations

import argparse
import asyncio
import sys

from sglang_diffusion_routing import DiffusionRouter
Expand All @@ -26,18 +25,9 @@ def _run_router_server(
worker_urls if worker_urls is not None else args.worker_urls or []
)
router = DiffusionRouter(args, verbose=args.verbose)
refresh_tasks = []
for url in worker_urls:
normalized_url = router.normalize_worker_url(url)
router.register_worker(normalized_url)
Comment on lines 28 to 30
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The removal of the refresh_tasks list and the _refresh_all_worker_video_support async function simplifies the router startup logic in _run_router_server. This logic has been correctly moved to the _start_background_health_check method in diffusion_router.py.

refresh_tasks.append(router.refresh_worker_video_support(normalized_url))

if refresh_tasks:

async def _refresh_all_worker_video_support() -> None:
await asyncio.gather(*refresh_tasks)

asyncio.run(_refresh_all_worker_video_support())

print(f"{log_prefix} starting router on {args.host}:{args.port}", flush=True)
print(
Expand Down
16 changes: 16 additions & 0 deletions src/sglang_diffusion_routing/router/diffusion_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,16 @@ def _setup_routes(self) -> None:
)

async def _start_background_health_check(self) -> None:
# Probe video capability for pre-registered workers in the running event loop.
unknown_workers = [
url for url, support in self.worker_video_support.items() if support is None
]
if unknown_workers:
await asyncio.gather(
*(self.refresh_worker_video_support(url) for url in unknown_workers),
return_exceptions=True,
)
Comment on lines +72 to +80
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Moving the video capability probing for pre-registered workers into _start_background_health_check is a good architectural improvement. This ensures that the probing happens asynchronously within the running event loop, preventing blocking during router startup and making the process more robust.


if self._health_task is None or self._health_task.done():
self._health_task = asyncio.create_task(self._health_check_loop())

Expand Down Expand Up @@ -383,6 +393,12 @@ async def generate(self, request: Request):

async def generate_video(self, request: Request):
"""Route video generation to /v1/videos."""
if not self.worker_request_counts:
return JSONResponse(
status_code=503,
content={"error": "No workers registered in the pool"},
)
Comment on lines +396 to +400
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Adding a check for worker_request_counts before attempting to route video generation requests is crucial. This prevents errors when no workers are registered and provides a clear 503 Service Unavailable response to the client.


candidate_workers = [
worker_url
for worker_url, support in self.worker_video_support.items()
Expand Down
11 changes: 11 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
"""Pytest configuration: force local src import precedence."""

from __future__ import annotations

import sys
from pathlib import Path

src_str = str(Path(__file__).resolve().parent.parent / "src")
while src_str in sys.path:
sys.path.remove(src_str)
sys.path.insert(0, src_str)
Empty file added tests/e2e/__init__.py
Empty file.
Loading