Skip to content

Commit b72827b

Browse files
yogeshmpandeyYogesh
andauthored
[Metro AI suite] Live video Captioning : New Features for LVC application (open-edge-platform#2082)
Co-authored-by: Yogesh <yogeshpandey@intel.com>
1 parent 61b42d8 commit b72827b

30 files changed

+1910
-467
lines changed

metro-ai-suite/live-video-analysis/live-video-captioning/AGENTS.md

Lines changed: 458 additions & 0 deletions
Large diffs are not rendered by default.

metro-ai-suite/live-video-analysis/live-video-captioning/app/backend/config.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,16 @@
44
import os
55
from pathlib import Path
66

7+
8+
def _read_non_negative_int(var_name: str, default: int) -> int:
9+
raw = os.environ.get(var_name)
10+
if raw is None:
11+
return default
12+
try:
13+
return max(0, int(raw))
14+
except (TypeError, ValueError):
15+
return default
16+
717
APP_PORT = int(os.environ.get("DASHBOARD_PORT", "4173"))
818
PEER_ID = os.environ.get("WEBRTC_PEER_ID", "genai_pipeline")
919
SIGNALING_URL = os.environ.get("SIGNALING_URL", "http://localhost:8889")
@@ -13,6 +23,7 @@
1323
ENABLE_DETECTION_PIPELINE = os.environ.get(
1424
"ENABLE_DETECTION_PIPELINE", "false"
1525
).lower() in ("true", "1", "yes")
26+
CAPTION_HISTORY = _read_non_negative_int("CAPTION_HISTORY", 3)
1627

1728
# Metrics Service Configuration
1829
METRICS_SERVICE_PORT = os.environ.get("METRICS_SERVICE_PORT", "9090")

metro-ai-suite/live-video-analysis/live-video-captioning/app/backend/models/requests.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@ class StartRunRequest(BaseModel):
2525
maxNewTokens: int = Field(default=70, ge=1, le=4096)
2626
pipelineName: Optional[str] = Field(default=None)
2727
runName: Optional[str] = Field(default=None)
28+
frameRate: Optional[int] = Field(default=None, ge=0)
29+
chunkSize: Optional[int] = Field(default=None, ge=1)
30+
frameWidth: Optional[int] = Field(default=None, ge=1)
31+
frameHeight: Optional[int] = Field(default=None, ge=1)
2832

2933
@field_validator("rtspUrl")
3034
@classmethod

metro-ai-suite/live-video-analysis/live-video-captioning/app/backend/models/responses.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@ class RunInfo(BaseModel):
1616
prompt: Optional[str] = None
1717
maxTokens: Optional[int] = None
1818
rtspUrl: Optional[str] = None
19+
frameRate: Optional[int] = None
20+
chunkSize: Optional[int] = None
21+
frameWidth: Optional[int] = None
22+
frameHeight: Optional[int] = None
1923

2024

2125
class ModelList(BaseModel):

metro-ai-suite/live-video-analysis/live-video-captioning/app/backend/routes/config.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from fastapi import APIRouter, Response
66
from ..config import (
77
ALERT_MODE,
8+
CAPTION_HISTORY,
89
DEFAULT_RTSP_URL,
910
PEER_ID,
1011
SIGNALING_URL,
@@ -28,6 +29,7 @@ async def runtime_config() -> Response:
2829
"defaultPrompt": DEFAULT_PROMPT,
2930
"defaultRtspUrl": DEFAULT_RTSP_URL,
3031
"enableDetectionPipeline": ENABLE_DETECTION_PIPELINE,
32+
"captionHistory": CAPTION_HISTORY,
3133
"metricsServicePort": METRICS_SERVICE_PORT,
3234
}
3335
body = f"window.RUNTIME_CONFIG = {json.dumps(payload)};"

metro-ai-suite/live-video-analysis/live-video-captioning/app/backend/routes/runs.py

Lines changed: 105 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import logging
77
import re
88
import uuid
9-
from typing import AsyncGenerator
9+
from typing import AsyncGenerator, Optional
1010
from fastapi import APIRouter, HTTPException
1111
from fastapi.responses import StreamingResponse
1212
from ..config import (
@@ -22,69 +22,126 @@
2222

2323
router = APIRouter(prefix="/api", tags=["runs"])
2424
logger = logging.getLogger("app.runs")
25+
WEBRTC_PEER_ID_MAX_LENGTH = 8
26+
WEBRTC_PEER_ID_PREFIX = "s"
27+
28+
29+
def _sanitize_run_name(run_name: str) -> str:
30+
"""Normalize a user-supplied run name into a safe run identifier."""
31+
sanitized = re.sub(r"\s+", "_", run_name.strip())
32+
return re.sub(r"[^a-zA-Z0-9_-]", "", sanitized)
33+
34+
35+
def _build_unique_run_name(requested_name: Optional[str]) -> Optional[str]:
36+
"""Return a sanitized, unique run name or None when no valid name was provided."""
37+
if not requested_name or not requested_name.strip():
38+
return None
39+
40+
sanitized = _sanitize_run_name(requested_name)
41+
if not sanitized:
42+
return None
43+
44+
run_name = sanitized
45+
counter = 1
46+
while run_name in RUNS:
47+
run_name = f"{sanitized}_{counter}"
48+
counter += 1
49+
50+
return run_name
51+
52+
53+
def _generate_peer_id() -> str:
54+
"""Generate a short, unique WebRTC peer ID accepted by the pipeline server."""
55+
existing_peer_ids = {run.peerId for run in RUNS.values()}
56+
peer_body_length = WEBRTC_PEER_ID_MAX_LENGTH - len(WEBRTC_PEER_ID_PREFIX)
57+
if peer_body_length < 1:
58+
raise RuntimeError("Invalid WebRTC peer ID configuration")
59+
60+
while True:
61+
candidate = f"{WEBRTC_PEER_ID_PREFIX}{uuid.uuid4().hex[:peer_body_length]}"
62+
if candidate not in existing_peer_ids:
63+
return candidate
64+
65+
66+
def _build_pipeline_parameters(req: StartRunRequest, run_id: str) -> dict:
67+
parameters = {
68+
"captioner-prompt": (req.prompt or "").strip() or DEFAULT_PROMPT,
69+
"captioner_model_name": (req.modelName or "").strip()
70+
or "OpenGVLab/InternVL2-2B",
71+
"captioner_max_new_tokens": req.maxNewTokens,
72+
"detection_model_name": (req.detectionModelName or "").strip() or "yolov8s",
73+
"detection_threshold": req.detectionThreshold,
74+
"mqtt_publisher": {
75+
"topic": f"{MQTT_TOPIC_PREFIX}/{run_id}",
76+
"publish_frame": False,
77+
},
78+
}
79+
80+
optional_parameters = {
81+
"captioner_frame_rate": req.frameRate,
82+
"captioner_chunk_size": req.chunkSize,
83+
"frame_width": req.frameWidth,
84+
"frame_height": req.frameHeight,
85+
}
86+
parameters.update(
87+
{key: value for key, value in optional_parameters.items() if value is not None}
88+
)
89+
90+
if req.chunkSize is not None:
91+
parameters["captioner_queue_size"] = max(1, req.chunkSize)
92+
93+
return parameters
94+
95+
96+
def _build_start_payload(req: StartRunRequest, run_id: str, peer_id: str) -> dict:
97+
return {
98+
"source": {"uri": req.rtspUrl, "type": "uri"},
99+
"destination": {
100+
"frame": {"type": "webrtc", "peer-id": peer_id, "bitrate": WEBRTC_BITRATE},
101+
},
102+
"parameters": _build_pipeline_parameters(req, run_id),
103+
}
104+
105+
106+
def _extract_pipeline_id(raw: str) -> str:
107+
pipeline_id = raw.replace('"', "").strip()
108+
if not pipeline_id:
109+
raise HTTPException(
110+
status_code=502,
111+
detail={
112+
"message": "Pipeline server returned empty pipeline id",
113+
"body": raw,
114+
},
115+
)
116+
return pipeline_id
25117

26118

27119
@router.post("/runs")
28120
async def start_run(req: StartRunRequest) -> RunInfo:
29121
"""Start a new video captioning run."""
30-
# Process optional runName - use it for run_id if provided
31-
run_name = None
32-
if req.runName and req.runName.strip():
33-
# Sanitize: replace spaces with underscores, remove special chars
34-
sanitized = re.sub(r"\s+", "_", req.runName.strip())
35-
sanitized = re.sub(r"[^a-zA-Z0-9_-]", "", sanitized)
36-
if sanitized:
37-
run_name = sanitized
38-
# Check for duplicates and append suffix if needed
39-
base_name = sanitized
40-
counter = 1
41-
while run_name in RUNS:
42-
run_name = f"{base_name}_{counter}"
43-
counter += 1
122+
run_name = _build_unique_run_name(req.runName)
44123

45124
# Use runName for run_id if provided, otherwise generate UUID
46125
if run_name:
47126
run_id = run_name
48127
else:
49128
run_id = uuid.uuid4().hex[:10]
50129

51-
peer_id = f"stream-{run_id[:10] if len(run_id) > 10 else run_id}"
130+
peer_id = _generate_peer_id()
52131

53132
# MQTT topic for this run's metadata
54133
mqtt_topic = f"{MQTT_TOPIC_PREFIX}"
55134

56135
pipeline_name = (req.pipelineName or PIPELINE_NAME).strip() or PIPELINE_NAME
57136

58137
start_url = f"{PIPELINE_SERVER_URL.rstrip('/')}/pipelines/user_defined_pipelines/{pipeline_name}"
59-
payload = {
60-
"source": {"uri": req.rtspUrl, "type": "uri"},
61-
"destination": {
62-
"frame": {"type": "webrtc", "peer-id": peer_id, "bitrate": WEBRTC_BITRATE},
63-
},
64-
"parameters": {
65-
"captioner-prompt": (req.prompt or "").strip() or DEFAULT_PROMPT,
66-
"captioner_model_name": (req.modelName or "").strip()
67-
or "OpenGVLab/InternVL2-2B",
68-
"captioner_max_new_tokens": req.maxNewTokens,
69-
"detection_model_name": (req.detectionModelName or "").strip() or "yolov8s",
70-
"detection_threshold": req.detectionThreshold,
71-
"mqtt_publisher": {
72-
"topic": f"{MQTT_TOPIC_PREFIX}/{run_id}",
73-
"publish_frame": False,
74-
},
75-
},
76-
}
138+
payload = _build_start_payload(req, run_id, peer_id)
139+
140+
logger.debug(f"Starting pipeline {pipeline_name} with URL: {start_url}")
141+
logger.debug(f"Pipeline payload: {json.dumps(payload, indent=2)}")
77142

78143
raw = http_json("POST", start_url, payload=payload)
79-
pipeline_id = raw.replace('"', "").strip()
80-
if not pipeline_id:
81-
raise HTTPException(
82-
status_code=502,
83-
detail={
84-
"message": "Pipeline server returned empty pipeline id",
85-
"body": raw,
86-
},
87-
)
144+
pipeline_id = _extract_pipeline_id(raw)
88145

89146
model_name = (req.modelName or "").strip() or "InternVL2-2B"
90147
# Use full run_id for custom names, truncated for UUID-based
@@ -100,6 +157,10 @@ async def start_run(req: StartRunRequest) -> RunInfo:
100157
prompt=(req.prompt or "").strip() or DEFAULT_PROMPT,
101158
maxTokens=req.maxNewTokens,
102159
rtspUrl=req.rtspUrl,
160+
frameRate=req.frameRate,
161+
chunkSize=req.chunkSize,
162+
frameWidth=req.frameWidth,
163+
frameHeight=req.frameHeight,
103164
)
104165
RUNS[info.runId] = info
105166
return info
@@ -216,7 +277,7 @@ async def stop_run(run_id: str) -> dict[str, str]:
216277
# A failure (502) usually means the pipeline is already stopped
217278
try:
218279
http_json("DELETE", stop_url)
219-
except HTTPException:
280+
except Exception:
220281
# Pipeline may already be stopped or unreachable - continue cleanup
221282
pass
222283

metro-ai-suite/live-video-analysis/live-video-captioning/app/backend/services/discovery.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import json
22
from pathlib import Path
33
from typing import List, Dict
4+
from fastapi import HTTPException
45
from ..config import PIPELINE_NAME, PIPELINE_SERVER_URL, ENABLE_DETECTION_PIPELINE
56
from .http_client import http_json
67

@@ -125,12 +126,17 @@ def discover_pipelines_remote() -> List[Dict[str, str]]:
125126
if not ENABLE_DETECTION_PIPELINE:
126127
results = [r for r in results if r["pipeline_type"] != "detection"]
127128

129+
# Filter out proxy pipelines (hidden from UI, used internally for default resolution)
130+
results = [r for r in results if not r["pipeline_name"].endswith("_Default_Resolution")]
131+
128132
# Fallback if nothing usable left
129133
if not results:
130134
return [{"pipeline_name": PIPELINE_NAME, "pipeline_type": "non-detection"}]
131135

132136
return results
133137

138+
except HTTPException:
139+
raise
134140
except Exception:
135-
# Conservative fallback
141+
# Conservative fallback for parse / unexpected errors
136142
return [{"pipeline_name": PIPELINE_NAME, "pipeline_type": "non-detection"}]

metro-ai-suite/live-video-analysis/live-video-captioning/app/backend/services/http_client.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,3 +39,8 @@ def http_json(method: str, url: str, payload: Optional[dict[str, Any]] = None) -
3939
status_code=502,
4040
detail={"message": "Pipeline server unreachable", "error": str(err)},
4141
)
42+
except OSError as err:
43+
raise HTTPException(
44+
status_code=502,
45+
detail={"message": "Pipeline server connection failed", "error": str(err)},
46+
)

metro-ai-suite/live-video-analysis/live-video-captioning/app/backend/services/mqtt_subscriber.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,10 @@ async def process_messages(self):
179179
else:
180180
data = raw_data
181181

182+
# Only forward messages that contain inference results
183+
if not isinstance(data, dict) or "result" not in data:
184+
continue
185+
182186
# Extract run_id from topic
183187
# Topic format: {prefix}/{run_id}
184188
parts = topic.split("/")

metro-ai-suite/live-video-analysis/live-video-captioning/app/main.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# Copyright (C) 2025 Intel Corporation
22
# SPDX-License-Identifier: Apache-2.0
33

4+
import logging
45
from contextlib import asynccontextmanager
56
from fastapi import FastAPI
67
from fastapi.responses import FileResponse
@@ -16,6 +17,12 @@
1617
)
1718
from backend.services import get_mqtt_subscriber, shutdown_mqtt_subscriber
1819

20+
# Configure logging
21+
logging.basicConfig(
22+
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
23+
)
24+
logger = logging.getLogger("app")
25+
1926

2027
@asynccontextmanager
2128
async def lifespan(app: FastAPI):
@@ -24,9 +31,7 @@ async def lifespan(app: FastAPI):
2431
try:
2532
await get_mqtt_subscriber()
2633
except Exception as e:
27-
import logging
28-
29-
logging.getLogger("app").warning(f"Failed to initialize MQTT subscriber: {e}")
34+
logger.warning(f"Failed to initialize MQTT subscriber: {e}")
3035

3136
yield
3237

0 commit comments

Comments
 (0)