Skip to content
Open
2 changes: 1 addition & 1 deletion metro-ai-suite/smart-traffic-intersection-agent/setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ print_all_service_host_endpoints() {
done
echo -e "${MAGENTA}=======================================================${NC}"
echo -e
}
}

# Build service images without starting containers
build_service() {
Expand Down
172 changes: 115 additions & 57 deletions metro-ai-suite/smart-traffic-intersection-agent/src/api/routes.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
# Copyright (C) 2025 Intel Corporation
# SPDX-License-Identifier: Apache-2.0
"""API routes for Traffic Intersection Agent."""

from datetime import datetime, timedelta
from typing import Dict, Any
from typing import Annotated, Dict, Any

from fastapi import APIRouter, HTTPException, Depends, Query, Request
from fastapi.responses import JSONResponse
from fastapi import APIRouter, HTTPException, Query, Request, WebSocket, WebSocketDisconnect
from fastapi.encoders import jsonable_encoder
import structlog

from services.data_aggregator import DataAggregatorService
from services.weather_service import WeatherService

logger = structlog.get_logger(__name__)

logger = structlog.get_logger(__name__)
router = APIRouter()


def get_data_aggregator(request):
"""Dependency to get data aggregator service from app state."""
return request.app.state.data_aggregator
Expand All @@ -26,10 +24,62 @@ def get_weather_service(request):
return request.app.state.weather_service


def _build_response_dict(traffic_response: Any, weather_data: Any, include_images: bool) -> Dict[str, Any]:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refactoring: Common method used for use with both REST API and Websocket endpoint.

"""Helper to build traffic intelligence response dictionary."""
response_dict = {
"timestamp": traffic_response.timestamp,
"response_age": traffic_response.response_age if traffic_response.response_age else None,
"intersection_id": traffic_response.intersection_id,
"data": {
"intersection_id": traffic_response.data.intersection_id,
"intersection_name": traffic_response.data.intersection_name,
"latitude": traffic_response.data.latitude,
"longitude": traffic_response.data.longitude,
"timestamp": traffic_response.data.timestamp.isoformat(),
"north_camera": traffic_response.data.north_camera,
"south_camera": traffic_response.data.south_camera,
"east_camera": traffic_response.data.east_camera,
"west_camera": traffic_response.data.west_camera,
"total_density": traffic_response.data.total_density,
"intersection_status": traffic_response.data.intersection_status,
"north_pedestrian": traffic_response.data.north_pedestrian,
"south_pedestrian": traffic_response.data.south_pedestrian,
"east_pedestrian": traffic_response.data.east_pedestrian,
"west_pedestrian": traffic_response.data.west_pedestrian,
"total_pedestrian_count": traffic_response.data.total_pedestrian_count,
"north_timestamp": traffic_response.data.north_timestamp.isoformat() if traffic_response.data.north_timestamp else None,
"south_timestamp": traffic_response.data.south_timestamp.isoformat() if traffic_response.data.south_timestamp else None,
"east_timestamp": traffic_response.data.east_timestamp.isoformat() if traffic_response.data.east_timestamp else None,
"west_timestamp": traffic_response.data.west_timestamp.isoformat() if traffic_response.data.west_timestamp else None,
},
"weather_data": weather_data.__dict__,
"vlm_analysis": {
"traffic_summary": traffic_response.vlm_analysis.traffic_summary,
"alerts": [
{
"alert_type": alert.alert_type.value,
"level": alert.level.value,
"description": alert.description,
"weather_related": alert.weather_related
}
for alert in traffic_response.vlm_analysis.alerts
],
"recommendations": traffic_response.vlm_analysis.recommendations or [],
"analysis_timestamp": traffic_response.vlm_analysis.analysis_timestamp.isoformat() if traffic_response.vlm_analysis.analysis_timestamp else None
}
}

# Add camera images only if requested
if include_images:
response_dict["camera_images"] = traffic_response.camera_images

return response_dict


@router.get("/traffic/current", response_model=Dict[str, Any])
async def get_current_traffic_intelligence(
request: Request,
images: bool = Query(default=True, description="Include camera images in response")
images: Annotated[bool, Query(description="Include camera images in response")] = True,
) -> Dict[str, Any]:
"""
Get current traffic intelligence data for the intersection.
Expand All @@ -49,62 +99,16 @@ async def get_current_traffic_intelligence(
raise HTTPException(status_code=404, detail="No traffic data available")

# Get current weather data
weather_service = get_weather_service(request)
weather_service: WeatherService = get_weather_service(request)
weather_data = await weather_service.get_current_weather()

# Convert to dict for JSON response
response_dict = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed reusable code - to shift to common method.

"timestamp": traffic_response.timestamp,
"response_age": traffic_response.response_age if traffic_response.response_age else None,
"intersection_id": traffic_response.intersection_id,
"data": {
"intersection_id": traffic_response.data.intersection_id,
"intersection_name": traffic_response.data.intersection_name,
"latitude": traffic_response.data.latitude,
"longitude": traffic_response.data.longitude,
"timestamp": traffic_response.data.timestamp.isoformat(),
"north_camera": traffic_response.data.north_camera,
"south_camera": traffic_response.data.south_camera,
"east_camera": traffic_response.data.east_camera,
"west_camera": traffic_response.data.west_camera,
"total_density": traffic_response.data.total_density,
"intersection_status": traffic_response.data.intersection_status,
"north_pedestrian": traffic_response.data.north_pedestrian,
"south_pedestrian": traffic_response.data.south_pedestrian,
"east_pedestrian": traffic_response.data.east_pedestrian,
"west_pedestrian": traffic_response.data.west_pedestrian,
"total_pedestrian_count": traffic_response.data.total_pedestrian_count,
"north_timestamp": traffic_response.data.north_timestamp,
"south_timestamp": traffic_response.data.south_timestamp,
"east_timestamp": traffic_response.data.east_timestamp,
"west_timestamp": traffic_response.data.west_timestamp,
},
"weather_data": weather_data.__dict__,
"vlm_analysis": {
"traffic_summary": traffic_response.vlm_analysis.traffic_summary,
"alerts": [
{
"alert_type": alert.alert_type.value,
"level": alert.level.value,
"description": alert.description,
"weather_related": alert.weather_related
}
for alert in traffic_response.vlm_analysis.alerts
],
"recommendations": traffic_response.vlm_analysis.recommendations or [],
"analysis_timestamp": traffic_response.vlm_analysis.analysis_timestamp.isoformat() if traffic_response.vlm_analysis.analysis_timestamp else None
}
}

# Add camera images only if requested
if images:
response_dict["camera_images"] = traffic_response.camera_images
response_dict = _build_response_dict(traffic_response, weather_data, images)

logger.info("Current traffic intelligence served",
intersection_id=traffic_response.intersection_id,
total_density=traffic_response.data.total_density,
total_pedestrian_count=traffic_response.data.total_pedestrian_count,
alerts_count=len(traffic_response.vlm_analysis.alerts))
total_pedestrian_count=traffic_response.data.total_pedestrian_count)

return response_dict

Expand All @@ -113,3 +117,57 @@ async def get_current_traffic_intelligence(
except Exception as e:
logger.error("Failed to get current traffic intelligence", error=str(e))
raise HTTPException(status_code=500, detail="Internal server error")


@router.websocket("/traffic/current/ws")
async def ws_current_traffic_intelligence(
websocket: WebSocket,
images: Annotated[bool, Query()] = True,
):
"""
WebSocket endpoint for real-time traffic intersection data.

Pushes updated traffic intersection data to the client whenever
new VLM analysis results become available. This is a drop-in
alternative to the REST GET /traffic/current endpoint and returns
the same data in the same format.

Query Parameters:
images: If false, camera_images will be excluded from response (default: true)
"""
await websocket.accept()

try:
data_aggregator: DataAggregatorService = get_data_aggregator(websocket)
weather_service: WeatherService = get_weather_service(websocket)

# Run a loop to wait for new data events and push updates to the client
while True:
traffic_response = await data_aggregator.get_current_traffic_intelligence()
if traffic_response:
weather_data = await weather_service.get_current_weather()
response_dict = _build_response_dict(traffic_response, weather_data, images)

await websocket.send_json(jsonable_encoder(response_dict))

logger.debug("Traffic Intersection data pushed to client",
intersection_id=traffic_response.intersection_id,
total_density=traffic_response.data.total_density,
total_pedestrian_count=traffic_response.data.total_pedestrian_count)
else:
await websocket.send_json({
"status": "waiting",
"message": "VLM analysis not yet available."
})

event = data_aggregator.new_data_event
logger.debug("WebSocket waiting for new data event")
await event.wait()
except WebSocketDisconnect:
logger.info("WebSocket client disconnected")
except Exception as e:
logger.error("WebSocket error in Smart Traffic Intersection Agent", error=str(e))
try:
await websocket.close(code=1011, reason="Internal server error")
except Exception as e:
logger.error("Failed to close WebSocket after error", error=str(e))
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def main():
app = create_app()

# When running application on host we can override host and port via env variables
port = int(os.getenv("AGENT_BACKEND_HOSTPORT", "8081"))
port = int(os.getenv("AGENT_BACKEND_HOSTPORT") or 8081)
host = os.getenv("AGENT_BACKEND_HOST", "0.0.0.0")

logger.info("Starting Traffic Intersection Agent",
Expand All @@ -159,9 +159,11 @@ def main():
app,
host=host,
port=port,
ws_max_size=100_000_000, # Increase WebSocket max size to handle base64 images
log_level=log_level.lower(),
access_log=True
)
#TODO: Base64 images are not optimal, consider switching to binary


if __name__ == "__main__":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ class TrafficIntersectionAgentResponse:
timestamp: str # ISO format timestamp
intersection_id: str # UUID of intersection
data: IntersectionData # Core intersection data
camera_images: Dict[str, Dict[str, Any]] # Camera images by direction
weather_data: WeatherData # Weather information
vlm_analysis: VLMAnalysisData # VLM analysis with alerts
camera_images: Dict[str, CameraImage] # Camera images by direction
weather_data: Optional[WeatherData] # Weather information
vlm_analysis: Optional[VLMAnalysisData] # VLM analysis with alerts
response_age: Optional[float] = None


Expand All @@ -86,7 +86,7 @@ class TrafficSnapshot:
intersection_id: str
directional_counts: Dict[str, int] # camera direction -> count
total_count: int
camera_images: Optional[Dict[str, CameraImage]] = None
camera_images: Dict[str, CameraImage]
weather_conditions: Optional[WeatherData] = None
intersection_data: Optional[IntersectionData] = None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@ class VLMAnalysisData:
"""VLM analysis results with structured traffic and alert data."""
traffic_summary: str # General traffic conditions summary
alerts: List[VLMAlert] # Structured alerts list
recommendations: List[str] = None # Traffic management recommendations
recommendations: List[str] # Traffic management recommendations
analysis_timestamp: Optional[datetime] = None
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
# Copyright (C) 2025 Intel Corporation
# SPDX-License-Identifier: Apache-2.0
"""Data aggregator service for Traffic Intersection Agent."""

import asyncio
from datetime import datetime, timezone
from typing import Dict, List, Optional, Any
from collections import deque
from typing import Dict, Optional, Any
import structlog

from models import (
Expand Down Expand Up @@ -50,8 +48,10 @@ def __init__(self, config_service: ConfigService, vlm_service: VLMService):

# Current state
self.current_vlm_analysis: Optional[VLMAnalysisData] = None
self.last_analysis_time: Optional[float] = 0.0
self.last_analysis_time: float = 0.0

# Event to notify WebSocket clients when new VLM-analyzed data is available
self.new_data_event: asyncio.Event = asyncio.Event()

logger.info("Data aggregator service initialized")

Expand Down Expand Up @@ -204,14 +204,16 @@ def _save_vlm_analyzed_data(self, vlm_analysis: VLMAnalysisData, traffic_snapsho
"""Save data that was used in VLM analysis as the current analyzed data."""

self.current_vlm_analysis = vlm_analysis

# Copy temporary camera data to VLM-analyzed storage
self.vlm_analyzed_camera_images = traffic_snapshot.camera_images
self.vlm_analyzed_intersection_data = traffic_snapshot.intersection_data
self.vlm_analyzed_weather_data = self.vlm_service.get_weather_details()


# Add to historical snapshots (only VLM-analyzed data)
# Notify WebSocket clients of new data
old_event: asyncio.Event = self.new_data_event
self.new_data_event = asyncio.Event()
old_event.set()
logger.debug("Event notification sent for new VLM-analyzed data")

logger.info("VLM-analyzed data saved",
total_density=traffic_snapshot.total_count,
Expand All @@ -228,7 +230,7 @@ async def _check_analysis_trigger(self) -> None:
# Get current threshold dynamically from config
high_density_threshold = self.config.get_high_density_threshold()

logger.info("Checking if VLM analysis should be triggered",
logger.debug("Checking if VLM analysis should be triggered",
total_density=self.temp_intersection_data.total_density,
threshold=high_density_threshold,
last_analysis_time=self.last_analysis_time)
Expand Down Expand Up @@ -280,7 +282,7 @@ async def _trigger_vlm_analysis(self) -> None:

# Trigger VLM analysis
try:
vlm_analysis: VLMAnalysisData = await self.vlm_service.analyze_traffic_non_blocking(
vlm_analysis: Optional[VLMAnalysisData] = await self.vlm_service.analyze_traffic_non_blocking(
traffic_snapshot=traffic_snapshot
)

Expand Down Expand Up @@ -321,13 +323,7 @@ async def get_current_traffic_intelligence(self) -> Optional[TrafficIntersection
# Prepare camera images for response (only VLM-analyzed images)
camera_images_dict = {}
for direction, camera_image in self.vlm_analyzed_camera_images.items():
camera_images_dict[f"{direction}_camera"] = {
'camera_id': camera_image.camera_id,
'direction': camera_image.direction,
'timestamp': camera_image.timestamp,
'image_base64': camera_image.image_base64, # Include full base64 image
'image_size_bytes': camera_image.image_size_bytes
}
camera_images_dict[f"{direction}_camera"] = camera_image

# Create response with VLM-analyzed data only
response = TrafficIntersectionAgentResponse(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ def _build_vlm_request(self, prompt: str, camera_images: List[CameraImage]) -> D
camera_images_count=len(camera_images))

# Prepare content with text prompt and images
content = [
content: list[dict[str, Any]] = [
{
"type": "text",
"text": prompt
Expand Down
Loading