Skip to content

Commit f3bf7ab

Browse files
yogeshmpandeyYogesh
andauthored
[Metro AI Suite] Live Video Captioning Helm Chart (#2351)
Co-authored-by: Yogesh <yogeshpandey@intel.com>
1 parent 6d2e041 commit f3bf7ab

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

74 files changed

+4339
-250
lines changed

metro-ai-suite/live-video-analysis/live-video-captioning/.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,5 @@ ov_detection_models
66
shared/
77
*__pycache__*
88
app/live_video_captioning.egg-info/*
9+
charts/Chart.lock
10+
charts/charts/*.tgz

metro-ai-suite/live-video-analysis/live-video-captioning/AGENTS.md

Lines changed: 0 additions & 231 deletions
Original file line numberDiff line numberDiff line change
@@ -116,128 +116,6 @@ pytest tests/ --cov=backend --cov=main --cov-fail-under=80
116116
# Note: Install linting tools as needed for your style checks
117117
```
118118

119-
### Running the Application Locally
120-
121-
```bash
122-
# Development server (auto-reload on code changes)
123-
uvicorn main:app --host 127.0.0.1 --port 4173 --reload
124-
125-
# Production server
126-
uvicorn main:app --host 0.0.0.0 --port 4173
127-
```
128-
129-
### Docker & Compose
130-
131-
```bash
132-
# Build and start all services
133-
docker-compose -f compose.yaml up --build
134-
135-
# Stop all services
136-
docker-compose -f compose.yaml down
137-
138-
# View logs from all services
139-
docker-compose -f compose.yaml logs -f
140-
```
141-
142-
## Code Style & Conventions
143-
144-
### Python Style Requirements
145-
146-
- **Type Hints**: All functions must have type hints on parameters and return types
147-
- **Naming Conventions**:
148-
- Functions/variables: `snake_case` (e.g., `get_mqtt_subscriber()`, `models_dir`)
149-
- Classes: `PascalCase` (e.g., `APIResponse`, `MQTTSubscriber`)
150-
- Constants: `UPPER_SNAKE_CASE` (e.g., `APP_PORT`, `MQTT_BROKER_HOST`)
151-
- Private attributes/methods: Prefix with `_` (e.g., `_patch_config`, `_get()`)
152-
153-
- **Docstrings**: Use module-level and function docstrings in Google style:
154-
```python
155-
"""Brief description spanning one line.
156-
157-
Longer explanation with more detail if needed.
158-
159-
Args:
160-
param_name: Description of parameter
161-
162-
Returns:
163-
Description of return value
164-
165-
Raises:
166-
ExceptionType: When this occurs
167-
"""
168-
```
169-
170-
### FastAPI Patterns
171-
172-
**Good** - Clear route with proper async handling and types:
173-
```python
174-
from fastapi import APIRouter, HTTPException
175-
from typing import Optional
176-
177-
router = APIRouter(prefix="/runs", tags=["runs"])
178-
179-
@router.get("/{run_id}")
180-
async def get_run_by_id(run_id: str) -> dict:
181-
"""Retrieve a specific run by ID."""
182-
run = RUNS.get(run_id)
183-
if not run:
184-
raise HTTPException(status_code=404, detail="Run not found")
185-
return run
186-
```
187-
188-
**Bad** - Missing types, no error handling, sync function with async I/O:
189-
```python
190-
@router.get("/{run_id}")
191-
def get_run(x):
192-
return RUNS.get(x)
193-
```
194-
195-
### Testing Patterns
196-
197-
**Good** - Named fixtures, clear assertions, proper async handling:
198-
```python
199-
@pytest.mark.asyncio
200-
async def test_mqtt_subscriber_connects(mock_mqtt):
201-
"""Test MQTT subscriber establishes connection."""
202-
subscriber = await get_mqtt_subscriber()
203-
assert subscriber is not None
204-
subscriber.connect.assert_called_once()
205-
```
206-
207-
**Bad** - Generic test names, no setup isolation, missing assertions:
208-
```python
209-
def test_mqtt():
210-
sub = get_mqtt_subscriber()
211-
sub.connect()
212-
```
213-
214-
### Import Organization
215-
216-
1. Standard library imports first
217-
2. Third-party imports second
218-
3. Local imports last
219-
4. Blank line between each group
220-
221-
```python
222-
import os
223-
import logging
224-
from pathlib import Path
225-
from contextlib import asynccontextmanager
226-
227-
from fastapi import FastAPI
228-
from paho.mqtt.client import Client
229-
230-
from backend.config import APP_PORT
231-
from backend.routes import config_router
232-
```
233-
234-
### Configuration & Environment Variables
235-
236-
- All environment variables are read in `backend/config.py` with defaults
237-
- Use `os.environ.get("VARIABLE_NAME", "default_value")`
238-
- For integer values, always validate: use `int(os.environ.get(...))` with try-except
239-
- Configuration is immutable after import; never modify `backend.config` values at runtime
240-
241119
## Testing Guidelines
242120

243121
### Test Organization
@@ -323,115 +201,6 @@ The FastAPI application exposes these route groups:
323201
🚫 **Never hardcode configuration values** - all config must go through `backend/config.py`
324202
🚫 **Never commit model files** - only include config, tokenizers, and references; models are downloaded at build time
325203

326-
## Git Workflow
327-
328-
### Branch Naming
329-
- Feature branches: `feature/description` (e.g., `feature/add-caption-filtering`)
330-
- Bug fixes: `fix/description` (e.g., `fix/mqtt-reconnection-issue`)
331-
- Documentation: `docs/description` (e.g., `docs/api-reference`)
332-
333-
### Commit Message Format
334-
- Use descriptive, present-tense messages
335-
- Keep first line under 50 characters
336-
- Reference issue/task if applicable
337-
338-
**Good**:
339-
```
340-
feat: Add WebSocket caption streaming endpoint
341-
342-
- Implement /runs/{run_id}/captions WebSocket
343-
- Add real-time frame processing
344-
- Include unit tests for streaming
345-
- Closes #42
346-
```
347-
348-
**Bad**:
349-
```
350-
fixed stuff
351-
update routes
352-
asdf
353-
```
354-
355-
### Before Pushing
356-
357-
1. Run full test suite: `pytest -v --cov=backend --cov=main --cov-fail-under=80`
358-
2. Verify no hardcoded secrets or credentials
359-
3. Ensure all type hints are present
360-
4. Check that docstrings are updated on new functions
361-
362-
## Development Workflow for Agents
363-
364-
**When implementing a feature:**
365-
366-
1. **Understand the context**: Read the relevant route(`backend/routes/*.py`), service, and test files
367-
2. **Write tests first**: Create test cases in `app/tests/test_*.py` before implementing
368-
3. **Implement the feature**: Add code to the appropriate module (route, service, model)
369-
4. **Run tests immediately**: Use `pytest -v <test_file>` to verify your changes
370-
5. **Check coverage**: Run `pytest --cov=backend --cov-fail-under=80` to ensure 80%+ coverage
371-
6. **Review your code**: Verify type hints, docstrings, and naming conventions
372-
7. **Commit with clear message**: Reference the issue or task
373-
374-
**Example workflow for adding a new endpoint:**
375-
376-
```bash
377-
# 1. Create test file
378-
touch app/tests/test_routes_new_feature.py
379-
380-
# 2. Write test cases
381-
# (edit test file with test_* functions)
382-
383-
# 3. Run tests (they'll fail)
384-
pytest -v app/tests/test_routes_new_feature.py
385-
386-
# 4. Implement the feature
387-
# (edit backend/routes/new_feature.py or modify existing routes)
388-
389-
# 5. Run tests again
390-
pytest -v app/tests/test_routes_new_feature.py
391-
392-
# 6. Check full coverage
393-
pytest -v --cov=backend --cov=main --cov-fail-under=80
394-
395-
# 7. Commit
396-
git add app/tests/ app/backend/
397-
git commit -m "feat: Add new feature endpoint with tests"
398-
```
399-
400-
## Debugging Tips
401-
402-
### Enable Debug Logging
403-
```bash
404-
# Run app with DEBUG level logging
405-
LOGLEVEL=DEBUG uvicorn main:app --reload --log-level debug
406-
407-
# Run tests with logging
408-
pytest -v tests/ --log-cli-level=DEBUG
409-
```
410-
411-
### Inspect MQTT Messages
412-
```bash
413-
# Connect to MQTT broker and listen to topics
414-
mosquitto_sub -h localhost -p 1883 -t "live-video-captioning/#"
415-
```
416-
417-
### Test Individual Routes
418-
```bash
419-
# Start dev server
420-
uvicorn main:app --reload
421-
422-
# In another terminal, test endpoint
423-
curl -X GET http://localhost:4173/health
424-
curl -X GET http://localhost:4173/config
425-
```
426-
427-
### Common Issues
428-
429-
**Tests fail with config errors**: Make sure `conftest.py` has `@pytest.fixture(autouse=True)` for `_patch_config` - it should auto-patch environment variables before tests run
430-
431-
**MQTT connection times out in tests**: Verify `mock_mqtt` fixture is auto-used in `conftest.py` - it stubs out real MQTT connections
432-
433-
**Coverage threshold fails**: Run `pytest --cov-report=html` and open `htmlcov/index.html` to see which lines aren't covered
434-
435204
## Key Files Reference
436205

437206
| File | Purpose |

metro-ai-suite/live-video-analysis/live-video-captioning/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ For more information see [How it works](./docs/user-guide/how-it-works.md)
2525
- [Overview](./docs/user-guide/index.md)
2626
- [System Requirements](./docs/user-guide/get-started/system-requirements.md)
2727
- [Get Started](./docs/user-guide/get-started.md)
28+
- [Deploy with Helm](./docs/user-guide/deploy-with-helm.md)
2829
- [API Reference](./docs/user-guide/api-reference.md)
2930
- [How to Build Source](./docs/user-guide/get-started/build-from-source.md)
3031
- [Known Issues](./docs/user-guide/known-issues.md)

metro-ai-suite/live-video-analysis/live-video-captioning/app/backend/config.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@ def _read_non_negative_int(var_name: str, default: int) -> int:
3737
"PIPELINE_SERVER_URL", "http://dlstreamer-pipeline-server:8080"
3838
)
3939
PIPELINE_NAME = os.environ.get("PIPELINE_NAME", "genai_pipeline")
40+
# How often (in seconds) to poll the pipeline server for run health. 0 disables polling.
41+
# Keep this low (≤10 s) so the UI reflects a crashed pipeline server quickly.
42+
PIPELINE_POLL_INTERVAL = _read_non_negative_int("PIPELINE_POLL_INTERVAL", 8)
4043

4144
BASE_DIR = Path(__file__).parent.parent
4245
MODELS_DIR = Path(os.environ.get("MODELS_DIR", str(BASE_DIR / "ov_models")))

metro-ai-suite/live-video-analysis/live-video-captioning/app/backend/models/responses.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ class RunInfo(BaseModel):
1010
pipelineId: str
1111
peerId: str
1212
mqttTopic: str
13+
status: str = "running"
1314
modelName: Optional[str] = None
1415
pipelineName: Optional[str] = None
1516
runName: Optional[str] = None

metro-ai-suite/live-video-analysis/live-video-captioning/app/backend/routes/runs.py

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,12 @@ async def list_runs() -> list[RunInfo]:
173173

174174

175175
async def _multiplexed_metadata_generator() -> AsyncGenerator[str, None]:
176-
"""Generator that receives metadata from MQTT and multiplexes into a single SSE stream."""
176+
"""Generator that receives metadata from MQTT and multiplexes into a single SSE stream.
177+
178+
A status heartbeat is sent every second when no MQTT message arrives, carrying
179+
the current status of every active run so the frontend can react when a run
180+
transitions to ``"error"`` (detected by the background health monitor).
181+
"""
177182
message_queue: asyncio.Queue = asyncio.Queue()
178183
subscribed_runs: set[str] = set()
179184

@@ -186,8 +191,8 @@ def on_message(run_id: str, data: dict, received_at: float):
186191
except Exception as e:
187192
logger.error(f"Error queueing MQTT message: {e}")
188193

194+
mqtt_subscriber = await get_mqtt_subscriber()
189195
try:
190-
mqtt_subscriber = await get_mqtt_subscriber()
191196

192197
while True:
193198
try:
@@ -223,17 +228,22 @@ def on_message(run_id: str, data: dict, received_at: float):
223228
yield f"data: {json.dumps(envelope)}\n\n"
224229

225230
except asyncio.TimeoutError:
226-
# No message received, send heartbeat
227-
yield ": heartbeat\n\n"
231+
# No MQTT message – send a status heartbeat so the frontend
232+
# learns when a run transitions to "error".
233+
status_payload = {
234+
"type": "status",
235+
"runs": {rid: info.status for rid, info in RUNS.items()},
236+
}
237+
yield f"data: {json.dumps(status_payload)}\n\n"
228238

229239
except Exception as e:
230240
logger.error(f"Error in multiplexed metadata generator: {e}")
231241
yield f": error - {e}\n\n"
232242
await asyncio.sleep(1)
233243

234244
finally:
235-
# Cleanup subscriptions when generator is closed
236-
mqtt_subscriber = await get_mqtt_subscriber()
245+
# Reuse the already-resolved subscriber — avoids creating a new connection
246+
# during app shutdown when the global subscriber may already be torn down.
237247
for run_id in subscribed_runs:
238248
mqtt_subscriber.unsubscribe_from_run(run_id)
239249
logger.info("Cleaned up MQTT subscriptions")

metro-ai-suite/live-video-analysis/live-video-captioning/app/backend/services/__init__.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,28 @@
44
discover_detection_models,
55
discover_pipelines_remote,
66
)
7-
from .http_client import http_json
7+
from .http_client import http_json, try_get_json
88
from .mqtt_subscriber import (
99
MQTTSubscriber,
1010
get_mqtt_subscriber,
1111
shutdown_mqtt_subscriber,
1212
)
13+
from .pipeline_health import (
14+
check_pipeline_health,
15+
start_pipeline_health_monitor,
16+
stop_pipeline_health_monitor,
17+
)
1318

1419
__all__ = [
1520
"discover_models",
1621
"discover_detection_models",
1722
"discover_pipelines_remote",
1823
"http_json",
24+
"try_get_json",
1925
"MQTTSubscriber",
2026
"get_mqtt_subscriber",
2127
"shutdown_mqtt_subscriber",
28+
"check_pipeline_health",
29+
"start_pipeline_health_monitor",
30+
"stop_pipeline_health_monitor",
2231
]

0 commit comments

Comments
 (0)