Skip to content

Commit 6aae02d

Browse files
authored
Merge pull request #247 from amosproj/56-video-router-dispatching-service
56 video router dispatching service Signed-off-by: Felix Hilgers <felix.hilgers@fau.de>
2 parents 15ac3d4 + 066cabb commit 6aae02d

File tree

24 files changed

+1112
-23
lines changed

24 files changed

+1112
-23
lines changed

Makefile

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
format-check format-check-frontend format-check-backend \
1010
test test-frontend test-backend \
1111
sbom sbom-check \
12-
run-backend-local run-frontend-local run-streamer-webcam run-streamer-file run-analyzer-local \
12+
run-backend-local run-frontend-local run-streamer-webcam run-streamer-file run-analyzer-local run-orchestrator-local \
1313
docker-build docker-build-frontend docker-build-backend docker-build-streamer \
1414
docker-build-analyzer docker-build-analyzer-cuda docker-build-analyzer-rocm \
1515
docker-compose-up docker-compose-down \
@@ -56,6 +56,8 @@ help:
5656
@echo " generates SBOM (sbom.json) and dependency CSV"
5757
@echo " sbom-check"
5858
@echo " checks if SBOM is up-to-date with dependencies"
59+
@echo " run-orchestrator-local"
60+
@echo " runs orchestrator service with dynamic port (starting from 8002)"
5961
@echo " run-backend-local"
6062
@echo " runs backend locally with uvicorn"
6163
@echo " run-frontend-local"
@@ -138,24 +140,28 @@ test-backend:
138140
cd src/backend && uv run pytest -s
139141

140142
run-streamer-webcam:
141-
@echo "Starting video source service (webcam) on port 8000..."
142-
cd src/backend && VIDEO_SOURCE_TYPE=webcam uv run uvicorn streamer.main:app --host 0.0.0.0 --port 8000 --reload
143+
@echo "Starting video source service (webcam) with dynamic port..."
144+
cd src/backend && VIDEO_SOURCE_TYPE=webcam uv run python -m streamer
143145

144146
run-streamer-file:
145-
@echo "Starting video source service (file) on port 8000..."
147+
@echo "Starting video source service (file) with dynamic port..."
146148
@echo "Set VIDEO_FILE_PATH env var to specify file (default: video.mp4)"
147-
cd src/backend && VIDEO_SOURCE_TYPE=file uv run uvicorn streamer.main:app --host 0.0.0.0 --port 8000 --reload
149+
cd src/backend && VIDEO_SOURCE_TYPE=file uv run python -m streamer
148150

149151
run-analyzer-local:
150-
@echo "Starting analyzer service on port 8001..."
151-
cd src/backend && uv run uvicorn analyzer.main:app --host 0.0.0.0 --port 8001 --reload
152+
@echo "Starting analyzer service with dynamic port..."
153+
cd src/backend && uv run python -m analyzer
154+
155+
run-orchestrator-local:
156+
@echo "Starting orchestrator service with dynamic port..."
157+
cd src/backend && uv run python -m orchestrator
152158

153159
run-backend-local: run-streamer-webcam
154160
@echo "Note: To run analyzer, use 'make run-analyzer-local' in another terminal"
155161
@echo "Note: To use file source instead, run 'make run-streamer-file'"
156162

157163
run-frontend-local:
158-
cd src/frontend && VITE_BACKEND_URL=http://localhost:8001 npm run dev
164+
cd src/frontend && VITE_ORCHESTRATOR_URL=http://localhost:8002 npm run dev
159165

160166
docker-build: docker-build-frontend docker-build-backend
161167

docker-compose.yml

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,19 @@
33
# SPDX-License-Identifier: MIT
44

55
services:
6+
orchestrator:
7+
build:
8+
context: ./src/backend
9+
dockerfile: Dockerfile.orchestrator
10+
image: robot-orchestrator:latest
11+
container_name: robot-orchestrator
12+
ports:
13+
- "8002:8002"
14+
environment:
15+
- CORS_ORIGINS=*
16+
networks:
17+
- robot-net
18+
619
# Note: Camera device access only works on Linux.
720
# On macOS/Windows, run things locally
821
streamer:
@@ -19,12 +32,16 @@ services:
1932
- CAMERA_INDEX=0
2033
- STUN_SERVER=stun:stun.l.google.com:19302
2134
- CORS_ORIGINS=*
35+
- ORCHESTRATOR_URL=http://orchestrator:8002
36+
- STREAMER_PUBLIC_URL=http://streamer:8000
2237
devices:
2338
- /dev/video0:/dev/video0 # Only works on Linux (ignored for file mode)
2439
volumes:
2540
- ./videos:/app/videos # Mount videos directory for file mode
2641
networks:
2742
- robot-net
43+
depends_on:
44+
- orchestrator
2845

2946
analyzer:
3047
build:
@@ -41,9 +58,12 @@ services:
4158
- MODEL_PATH=models/yolo11n.pt
4259
- STUN_SERVER=stun:stun.l.google.com:19302
4360
- CORS_ORIGINS=*
61+
- ORCHESTRATOR_URL=http://orchestrator:8002
62+
- ANALYZER_PUBLIC_URL=http://analyzer:8001
4463
volumes:
4564
- ./src/backend/models:/app/models
4665
depends_on:
66+
- orchestrator
4767
- streamer
4868
networks:
4969
- robot-net
@@ -54,6 +74,7 @@ services:
5474
dockerfile: Dockerfile
5575
args:
5676
- VITE_BACKEND_URL=http://localhost:8001
77+
- VITE_ORCHESTRATOR_URL=http://localhost:8002
5778
image: robot-frontend:latest
5879
container_name: robot-frontend
5980
ports:
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
# SPDX-FileCopyrightText: 2025 robot-visual-perception
2+
#
3+
# SPDX-License-Identifier: MIT
4+
FROM python:3.11-slim
5+
6+
WORKDIR /app
7+
8+
COPY --from=ghcr.io/astral-sh/uv:latest /uv /bin/uv
9+
10+
COPY pyproject.toml uv.lock ./
11+
# only core dependencies
12+
RUN uv sync --frozen --no-dev
13+
14+
COPY common/ ./common/
15+
COPY orchestrator/ ./orchestrator/
16+
17+
EXPOSE 8002
18+
19+
CMD ["uv", "run", "uvicorn", "orchestrator.main:app", "--host", "0.0.0.0", "--port", "8002"]

src/backend/analyzer/__main__.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
# SPDX-FileCopyrightText: 2025 robot-visual-perception
2+
#
3+
# SPDX-License-Identifier: MIT
4+
5+
"""CLI entry point for analyzer service with dynamic port allocation."""
6+
7+
import os
8+
import sys
9+
10+
11+
# Must set env vars BEFORE importing analyzer.main (which does early initialization)
12+
def main() -> None:
13+
"""Start analyzer service on a free port."""
14+
# Import here to avoid early initialization
15+
from common.port_utils import find_free_port
16+
17+
# Find free port starting from 8001
18+
port = find_free_port(start_port=8001)
19+
if port is None:
20+
print("ERROR: Could not find a free port in range 8001-8100", file=sys.stderr)
21+
sys.exit(1)
22+
23+
# Set public URL for orchestrator registration BEFORE importing main
24+
host = os.getenv("SERVICE_HOST", "localhost")
25+
public_url = f"http://{host}:{port}"
26+
os.environ["ANALYZER_PUBLIC_URL"] = public_url
27+
28+
print(f"Starting analyzer service on {public_url}")
29+
30+
# Now import uvicorn and start
31+
import uvicorn
32+
33+
uvicorn.run(
34+
"analyzer.main:app",
35+
host="0.0.0.0",
36+
port=port,
37+
reload=os.getenv("RELOAD", "true").lower() in ("true", "1", "yes"),
38+
)
39+
40+
41+
if __name__ == "__main__":
42+
main()

src/backend/analyzer/main.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,15 @@
55
# Necessary for running stuff before other imports
66
# ruff: noqa: E402
77

8+
import os
9+
810
from common import __version__
911
from common.logging_config import configure_logging
1012
from common.metrics import configure_metrics
1113

14+
# Set service type for Prometheus metrics port allocation
15+
os.environ["SERVICE_TYPE"] = "analyzer"
16+
1217
# Initialize logging early
1318
configure_logging(service_name="analyzer", service_version=__version__)
1419

@@ -29,6 +34,7 @@
2934
config.apply_settings_file(config.ANALYZER_SETTINGS_FILE)
3035
from common.core.detector import get_detector
3136
from common.core.depth import get_depth_estimator
37+
from common.orchestrator import register_with_orchestrator, deregister_from_orchestrator
3238
from analyzer.routes import router, on_shutdown
3339

3440

@@ -56,8 +62,20 @@ async def lifespan_context(app: FastAPI) -> AsyncIterator[None]:
5662
# Warm up detector and depth estimator so initial /offer handling is instant.
5763
get_detector(yolo_model_path)
5864
get_depth_estimator(midas_cache_directory)
65+
66+
# Register this analyzer instance with orchestrator (best-effort)
67+
await register_with_orchestrator(
68+
service_type="analyzer",
69+
service_url=config.ANALYZER_PUBLIC_URL,
70+
orchestrator_url=config.ORCHESTRATOR_URL,
71+
)
5972
yield
6073
with suppress(Exception):
74+
await deregister_from_orchestrator(
75+
service_type="analyzer",
76+
service_url=config.ANALYZER_PUBLIC_URL,
77+
orchestrator_url=config.ORCHESTRATOR_URL,
78+
)
6179
await on_shutdown()
6280

6381
return lifespan_context

src/backend/analyzer/manager.py

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,10 @@ def __init__(self) -> None:
7979
self._inference_task: asyncio.Task[None] | None = None
8080
self._intrinsics_logged: bool = False
8181

82+
# Dynamic streamer configuration
83+
self._streamer_url: str | None = None
84+
self._streamer_url_event = asyncio.Event()
85+
8286
self.max_consecutive_errors = 5
8387
# adaptive downscaling parameters
8488
self.target_scale_init = config.TARGET_SCALE_INIT
@@ -116,6 +120,21 @@ async def connect(self, websocket: WebSocket) -> None:
116120
if len(self.active_connections) == 1:
117121
await self._start_processing()
118122

123+
async def set_streamer_url(self, streamer_url: str) -> None:
124+
"""Set the streamer URL and trigger processing startup if needed."""
125+
# Check if URL is changing
126+
url_changed = self._streamer_url and self._streamer_url != streamer_url
127+
128+
self._streamer_url = streamer_url
129+
logger.info(f"Streamer URL configured: {streamer_url}")
130+
self._streamer_url_event.set()
131+
132+
# If URL changed and we have active connections, restart processing
133+
if url_changed and self.active_connections:
134+
logger.info("Streamer URL changed, restarting processing...")
135+
await self._stop_processing()
136+
await self._start_processing()
137+
119138
async def disconnect(self, websocket: WebSocket) -> None:
120139
"""Handle WebSocket disconnection."""
121140
self.active_connections.discard(websocket)
@@ -141,9 +160,16 @@ async def _start_processing(self) -> None:
141160
return # Already running
142161

143162
try:
163+
# Wait for streamer URL to be configured if not already set
164+
if not self._streamer_url:
165+
logger.info("Waiting for streamer URL configuration...")
166+
await asyncio.wait_for(self._streamer_url_event.wait(), timeout=30.0)
167+
168+
if not self._streamer_url:
169+
raise Exception("Streamer URL was not configured")
170+
144171
# Connect to webcam service
145-
upstream_url = config.STREAMER_OFFER_URL
146-
self._webcam_session = WebcamSession(upstream_url)
172+
self._webcam_session = WebcamSession(self._streamer_url)
147173
source_track = await self._webcam_session.connect()
148174

149175
# Start processing task

src/backend/analyzer/routes.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,17 @@
66
from fastapi import APIRouter, WebSocket, WebSocketDisconnect
77
from fastapi.responses import FileResponse, Response
88
from prometheus_client import generate_latest, CONTENT_TYPE_LATEST
9+
from pydantic import BaseModel
910

1011
from analyzer.manager import AnalyzerWebSocketManager
1112

1213

14+
class ConfigureAnalyzerRequest(BaseModel):
15+
"""Request to configure analyzer with streamer URL."""
16+
17+
streamer_url: str
18+
19+
1320
# Create a global instance of the WebSocket manager
1421
websocket_manager = AnalyzerWebSocketManager()
1522

@@ -23,6 +30,22 @@ def health() -> dict[str, str]:
2330
return {"status": "ok", "service": "analyzer"}
2431

2532

33+
@router.post("/configure")
34+
async def configure_analyzer(request: ConfigureAnalyzerRequest) -> dict[str, str]:
35+
"""Configure analyzer to use a specific streamer.
36+
37+
This endpoint is called by the orchestrator/frontend after analyzer assignment
38+
to tell this analyzer which streamer service to connect to.
39+
"""
40+
# Append /offer endpoint path to the streamer base URL
41+
streamer_offer_url = f"{request.streamer_url.rstrip('/')}/offer"
42+
await websocket_manager.set_streamer_url(streamer_offer_url)
43+
return {
44+
"status": "configured",
45+
"streamer_url": streamer_offer_url,
46+
}
47+
48+
2649
@router.get("/metrics")
2750
def metrics() -> Response:
2851
"""Prometheus metrics endpoint."""

src/backend/common/config.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,11 @@ class Config:
130130
"yes",
131131
)
132132

133+
# Service discovery / orchestrator
134+
ORCHESTRATOR_URL: str = os.getenv("ORCHESTRATOR_URL", "http://localhost:8002")
135+
ANALYZER_PUBLIC_URL: str = os.getenv("ANALYZER_PUBLIC_URL", "http://localhost:8001")
136+
STREAMER_PUBLIC_URL: str = os.getenv("STREAMER_PUBLIC_URL", "http://localhost:8000")
137+
133138
# Tracking/interpolation settings
134139
# Minimum IoU to match detection to track
135140
TRACKING_IOU_THRESHOLD: float = float(os.getenv("TRACKING_IOU_THRESHOLD", "0.1"))

src/backend/common/metrics.py

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
# SPDX-FileCopyrightText: 2025 robot-visual-perception
22
#
33
# SPDX-License-Identifier: MIT
4+
import logging
5+
import os
46
from typing import Optional
57

68
from prometheus_client import (
@@ -16,13 +18,22 @@
1618
_depth_estimation_duration: Optional[Histogram] = None
1719
_detections_count: Optional[Counter] = None
1820

21+
# Service-specific Prometheus ports to avoid conflicts
22+
PROMETHEUS_PORTS = {
23+
"analyzer": 9001,
24+
"streamer": 9002,
25+
"orchestrator": 9003,
26+
}
27+
1928

2029
def configure_metrics() -> None:
2130
"""
2231
Configure Prometheus metrics.
2332
"""
2433
global _detection_duration, _depth_estimation_duration, _detections_count
2534

35+
logger = logging.getLogger(__name__)
36+
2637
if _detection_duration is not None:
2738
return # Already configured
2839

@@ -44,7 +55,24 @@ def configure_metrics() -> None:
4455
["interpolated"],
4556
)
4657

47-
start_http_server(9000)
58+
# Determine service-specific port
59+
service_type = os.getenv("SERVICE_TYPE", "unknown")
60+
port = PROMETHEUS_PORTS.get(service_type, 9000)
61+
62+
# Start Prometheus HTTP server with error handling for --reload mode
63+
try:
64+
start_http_server(port)
65+
logger.info(
66+
f"Prometheus metrics server started on port {port} (service: {service_type})"
67+
)
68+
except OSError as e:
69+
if "Address already in use" in str(e):
70+
logger.warning(
71+
f"Port {port} already in use (likely uvicorn --reload worker). "
72+
"Metrics endpoint will be available on the main process only."
73+
)
74+
else:
75+
raise
4876

4977

5078
def get_detection_duration() -> Histogram:

0 commit comments

Comments
 (0)