Skip to content

Stream not sending video or image tracks to Vision Agent processor #147

@EmmanuelTheCoder

Description

@EmmanuelTheCoder

Hi there,

Over the last few days, I've been experimenting with the Vision-Agents SDK (I love it, by the way), building a food detection and calorie estimation agent. However, I noticed that process_image or process_video is not being called by the Processor, which means that Stream is not sending image or video tracks (as the case may be) to the processor or the processor is skipping the process_image/ process_video method so the LLM keep saying that it can't see any food in the camera.

Perhaps I am the one doing something wrong. Here's my code

food_processor.py

from PIL import Image
from vision_agents.core.processors import AudioVideoProcessor, ImageProcessorMixin, VideoProcessorMixin
from ultralytics import YOLO
import logging
from typing import List, Dict, Any, Optional

logger = logging.getLogger(__name__)

class FoodDetectionProcessor(AudioVideoProcessor, VideoProcessorMixin):
    """Detects food items in video frames using YOLO and stores detections for nutrition analysis."""

    def __init__(self, model_path: str = "yolov8n.pt", conf_threshold: float = 0.3, classes: Optional[List[int]] = None, interval: int = 1):
        """
        Initialize food detection processor.

        Args:
            model_path: Path to YOLO model (default: yolov8n.pt)
            conf_threshold: Confidence threshold for detections (default: 0.3)
            classes: List of class IDs to detect. For COCO dataset food items:
                     [46=banana, 47=apple, 48=sandwich, 49=orange, 50=broccoli,
                      51=carrot, 52=hot dog, 53=pizza, 54=donut, 55=cake]
            interval: Process every Nth frame (default: 1 = every frame)
        """
        super().__init__(
            interval=interval,
            receive_audio=False,
            receive_video=True,
            send_audio=False,
            send_video=False
        )
        logger.info(f"Initializing FoodDetectionProcessor with model={model_path}, conf={conf_threshold}, interval={interval}")
        self.model = YOLO(model_path)
        self.conf_threshold = conf_threshold
        self.classes = classes
        self.latest_detections: List[Dict[str, Any]] = []
        self.frame_count = 0
        logger.info("✅ Food detection model loaded successfully")

    async def process_audio(self, track, user_id: str, shared_forwarder=None):
        """
        Entry point - framework calls this even though we don't use audio.
        Just ignore audio data - we only process video via process_image.
        """
        logger.debug(f"process_audio called with {type(track)}, ignoring (we only process video)")
        # Do nothing - audio is not used for food detection
        pass

    async def process_video(self, track, user_id: str, shared_forwarder=None):
        """
        Process video track - this should be called when video is received.
        """
        logger.info(f"🎥🎥🎥 process_video EXECUTING! Track type: {type(track)}, user: {user_id}")

        try:
            async for frame in track:
                if not self.should_process():
                    continue

                try:
                    img_array = frame.to_ndarray(format="rgb24")
                    if img_array is None or img_array.size == 0:
                        continue

                    pil_image = Image.fromarray(img_array)
                    await self.process_image(pil_image, user_id)

                except Exception as e:
                    logger.error(f"Error processing frame: {e}", exc_info=True)

        except Exception as e:
            logger.error(f"Fatal error in process_video: {e}", exc_info=True)


    def _extract_detections(self, results) -> List[Dict[str, Any]]:
        """Extract detection information from YOLO results."""
        detections = []
        if results and len(results) > 0:
            result = results[0]

            if result.boxes is not None and len(result.boxes) > 0:
                boxes = result.boxes

                for i in range(len(boxes)):
                    box = boxes.xyxy[i].cpu().numpy()
                    conf = float(boxes.conf[i].cpu().numpy())
                    cls_id = int(boxes.cls[i].cpu().numpy())
                    class_name = result.names.get(cls_id, f"class_{cls_id}")

                    detections.append({
                        "bbox": [float(x) for x in box],
                        "confidence": conf,
                        "class_id": cls_id,
                        "class_name": class_name
                    })

        return detections

    def get_latest_detections(self) -> List[Dict[str, Any]]:
        """Get the most recent food detections for nutrition analysis."""
        return self.latest_detections.copy()

    async def cleanup(self):
        """Clean up resources when processor is stopped."""
        logger.info("Cleaning up FoodDetectionProcessor resources...")
        self.latest_detections = []
        logger.info("FoodDetectionProcessor cleanup complete")

When I switch the code from process_video to process_image using ImageProcessorMixin, the issue persists


async def process_image(self, image: Image.Image, user_id: str, metadata: Optional[dict] = None):
        """
        Process individual video frames as PIL Images.

        Args:
            image: PIL Image extracted from video frame
            user_id: User ID
            metadata: Optional metadata about the frame

        Returns:
            None (we just store detections internally)
        """
        self.frame_count += 1

        if self.frame_count % 10 == 0:
            logger.info(f"📹 Processing frame {self.frame_count}, size: {image.size}")

        try:
            # Run YOLO detection
            results = self.model(
                image,
                conf=self.conf_threshold,
                classes=self.classes,
                verbose=False
            )

            # Extract and store detections
            self.latest_detections = self._extract_detections(results)

            # Log detections
            if self.frame_count % 10 == 0:
                if self.latest_detections:
                    detected_items = [f"{d['class_name']} ({d['confidence']:.2f})" for d in self.latest_detections]
                    logger.info(f"🎯 Food detected: {', '.join(detected_items)}")
                else:
                    logger.info("❌ No food items detected")

            return None

        except Exception as e:
            logger.error(f"Error in YOLO detection: {e}", exc_info=True)
            return None

I tried to debug the issue by adding getattribute to my code to see what method the processor calls and it seem like it only call process_image / process_video at the start of the call but once the call fully starts (i.e when the agent joins the call), it doesn't call process_video / process_image anymore. It just calls other methods like process_audio get_latest_detection


def __getattribute__(self, name):
        if not name.startswith('_'):
            logger.info(f"🔍 Method called: {name}")
        return super().__getattribute__(name)

Here's my complete main.py code:


import asyncio
import logging
import sys
import os
from uuid import uuid4
from typing import Dict, List, Any
import httpx

from dotenv import load_dotenv

from vision_agents.core.edge.types import User
from vision_agents.plugins import getstream, openai, ultralytics, deepgram, cartesia
from vision_agents.core import agents
from vision_agents.core.events import CallSessionParticipantJoinedEvent
from yolo_processor import YOLOProcessor
from yolo_image_processor import YOLOImageProcessor
from food_detection import FoodDetectionProcessor
import vision_agents.plugins.ultralytics as ultralytics




logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

load_dotenv()
print("directories in YOLO", dir(ultralytics))

# USDA FoodData Central API configuration
USDA_API_KEY = os.getenv("USDA_API_KEY", "DEMO_KEY")  # Get your free key at https://fdc.nal.usda.gov/api-key-signup.html
USDA_API_BASE = "https://api.nal.usda.gov/fdc/v1"

# Store detected food items from YOLO
detected_foods: Dict[str, Any] = {}


async def search_food_calories(food_name: str, portion_grams: float = 100.0) -> Dict[str, Any]:
    """
    Search for food calorie information using USDA FoodData Central API.

    Args:
        food_name: Name of the food item
        portion_grams: Weight of the portion in grams (default 100g)

    Returns:
        Dictionary containing calorie and nutritional information
    """
    try:
        async with httpx.AsyncClient() as client:
            # Search for the food item
            search_url = f"{USDA_API_BASE}/foods/search"
            params = {
                "api_key": USDA_API_KEY,
                "query": food_name,
                "pageSize": 1,
                "dataType": ["Survey (FNDDS)", "Foundation", "SR Legacy"]
            }

            response = await client.get(search_url, params=params, timeout=10.0)
            response.raise_for_status()
            data = response.json()

            logger.info("xyz calorie information", data)

            if not data.get("foods"):
                logger.warning(f"No nutritional data found for: {food_name}")
                return {
                    "food_name": food_name,
                    "calories": "Unknown",
                    "error": "Food not found in database"
                }

            food_item = data["foods"][0]

            # Extract nutritional information
            nutrients = {n["nutrientName"]: n["value"] for n in food_item.get("foodNutrients", [])}

            # Calculate calories for the given portion
            calories_per_100g = nutrients.get("Energy", 0)
            calories_for_portion = (calories_per_100g * portion_grams) / 100

            result = {
                "food_name": food_item.get("description", food_name),
                "portion_grams": portion_grams,
                "calories": round(calories_for_portion, 1),
                "protein_g": round(nutrients.get("Protein", 0) * portion_grams / 100, 1),
                "carbs_g": round(nutrients.get("Carbohydrate, by difference", 0) * portion_grams / 100, 1),
                "fat_g": round(nutrients.get("Total lipid (fat)", 0) * portion_grams / 100, 1),
            }

            logger.info(f"Found nutritional data for {food_name}: {result}")
            return result

    except Exception as e:
        logger.error(f"Error fetching calorie data for {food_name}: {e}")
        return {
            "food_name": food_name,
            "calories": "Unknown",
            "error": str(e)
        }


def estimate_portion_size(bbox_area: float, reference_object: str = "plate") -> Dict[str, Any]:
    """
    Estimate portion size based on bounding box area and reference objects.

    Args:
        bbox_area: Area of the detected food bounding box in pixels
        reference_object: Reference object for size comparison (e.g., "plate", "hand")

    Returns:
        Dictionary with estimated weight and volume
    """
    # These are rough estimates - can be calibrated based on your camera setup
    # Assuming standard plate (10 inches / 25cm diameter) takes ~50000 pixels at typical distance

    portion_multiplier = bbox_area / 50000  # Normalized to standard plate

    # Rough estimates for common portions
    estimated_weight_grams = portion_multiplier * 150  # Average meal portion
    estimated_volume_ml = portion_multiplier * 200

    size_description = "small"
    if portion_multiplier > 1.5:
        size_description = "large"
    elif portion_multiplier > 0.8:
        size_description = "medium"

    result = {
        "estimated_weight_grams": round(estimated_weight_grams, 1),
        "estimated_volume_ml": round(estimated_volume_ml, 1),
        "size_description": size_description,
        "reference": reference_object
    }

    logger.info(f"Estimated portion size: {result}")
    return result


async def analyze_food_and_calories(food_detections: List[Dict[str, Any]]) -> str:
    """
    Analyze detected foods and calculate total calories.

    Args:
        food_detections: List of detected food items with bounding boxes

    Returns:
        Formatted string with food analysis and calorie information
    """
    if not food_detections:
        return "No food items detected in the frame."

    results = []
    total_calories = 0

    for detection in food_detections:
        food_name = detection.get("class_name", "unknown food")
        bbox = detection.get("bbox", [0, 0, 0, 0])
        confidence = detection.get("confidence", 0)

        # Calculate bounding box area
        bbox_area = (bbox[2] - bbox[0]) * (bbox[3] - bbox[1])

        # Estimate portion size
        portion_info = estimate_portion_size(bbox_area)

        # Get calorie information
        calorie_info = await search_food_calories(
            food_name,
            portion_info["estimated_weight_grams"]
        )

        if isinstance(calorie_info.get("calories"), (int, float)):
            total_calories += calorie_info["calories"]

        results.append({
            "food": food_name,
            "confidence": f"{confidence * 100:.1f}%",
            "portion": portion_info,
            "nutrition": calorie_info
        })

    # Format response
    response_parts = [f"I detected {len(food_detections)} food item(s):\n"]

    for i, result in enumerate(results, 1):
        nutrition = result["nutrition"]
        portion = result["portion"]

        response_parts.append(
            f"{i}. {result['food'].title()} "
            f"({portion['size_description']} portion, ~{portion['estimated_weight_grams']}g): "
        )

        if isinstance(nutrition.get("calories"), (int, float)):
            response_parts.append(
                f"{nutrition['calories']} calories, "
                f"{nutrition.get('protein_g', 0)}g protein, "
                f"{nutrition.get('carbs_g', 0)}g carbs, "
                f"{nutrition.get('fat_g', 0)}g fat\n"
            )
        else:
            response_parts.append("Nutritional data unavailable\n")

    if total_calories > 0:
        response_parts.append(f"\nTotal estimated calories: {round(total_calories)} kcal")

    return "".join(response_parts)


async def start_agent() -> None:
    """Initialize and start the video AI agent with image-based food detection."""
    food_processor = None

    try:
        logger.info("Initializing agent with Food Detection processor...")

        # Initialize Food Detection
        food_processor = FoodDetectionProcessor(
            model_path="yolov8n.pt",  # Using YOLOv8 nano model (fastest, good for real-time)
            conf_threshold=0.3,  # Lower threshold for easier detection
            classes=[46, 47, 48, 49, 50, 51, 52, 53, 54, 55] # COCO dataset food classes:
            
        )

        llm = openai.LLM(model="gpt-4o-mini")
        # Register custom tool for food analysis using decorator
        @llm.register_function(
            description="Analyzes food items currently visible in the video feed and provides detailed nutritional information including calories, portion size, and macronutrients"
        )
        async def analyze_food() -> str:
            """
            Analyzes detected food items from the video feed and provides calorie information.

            Returns:
                Formatted nutritional analysis of detected food items
            """
            try:
                # Get current detections from food processor
                detections = food_processor.get_latest_detections()

                if not detections:
                    return "I don't see any food items in the video right now. Please show me some food to analyze!"

                result = await analyze_food_and_calories(detections)
                logger.info("Analysis result:", result)
                return result
            except Exception as e:
                logger.error(f"Error in analyze_food tool: {e}", exc_info=True)
                return f"Sorry, I had trouble analyzing the food. Error: {str(e)}"

        
        agent = agents.Agent(
            edge=getstream.Edge(),
            agent_user=User(name="Food Nutrition Assistant", id="agent"),
            instructions=(
                "You're a friendly food portion and calorie estimator. "
                "When you detect food items in the video, use the analyze_food tool to get detailed "
                "nutritional information including portion size and calories. "
                "Present the information in a natural, conversational way. "
                "Be encouraging and helpful. Keep responses concise and easy to understand."
            ),
            llm=llm,
            stt=deepgram.STT(),
            tts=cartesia.TTS(),
            processors=[food_processor] 
        )

        logger.info(f"Video processors: {agent.video_processors}")
        logger.info(f"Image processors: {agent.image_processors}")
        call_id = str(uuid4())
        logger.info(f"Creating call with ID: {call_id}")

        await agent.create_user()
        call = agent.edge.client.video.call("default", call_id)

        # Register event handler for participant joins
        @agent.events.subscribe
        async def handle_participant_joined(event: CallSessionParticipantJoinedEvent):
            logger.info(f"👤 Participant joined: {event.participant.user_id}")
            if event.participant.user_id != "agent":
                logger.info(f"📹 Subscribing to video from {event.participant.user_id}")
        
            logger.info(f"New participant joined: {event.participant.user.name}")
        


        await agent.edge.open_demo(call)
        logger.info("Demo opened successfully - waiting for browser to connect...")

        # Give the browser time to request and grant permissions
        await asyncio.sleep(3)

        session = await agent.join(call)
        session_active = True

        try:
            with session:
                logger.info("Agent joined call with image-based food detection active")

                # Initial greeting
                await agent.llm.simple_response(
                    "Hello! I'm your food nutrition assistant. Point your camera at any food items, "
                    "and I'll automatically detect them and tell you about their nutrition!"
                )

                logger.info("Agent is now actively monitoring for food items...")

                # Monitor food detections and automatically analyze
                last_detection_time = asyncio.get_event_loop().time()
                detection_cooldown = 3.0  # Wait 3 seconds between analyses

                try:
                    while session_active:
                        await asyncio.sleep(1)  # Check every second

                        # Get latest detections from food processor
                        detections = food_processor.get_latest_detections()

                        # Debug logging - show what's detected
                        if detections:
                            detected_items = [f"{d['class_name']} ({d['confidence']:.2f})" for d in detections]
                            logger.info(f"Food detected: {', '.join(detected_items)}")
                        else:
                            logger.debug("No food items detected in current frame")

                        if detections:
                            current_time = asyncio.get_event_loop().time()

                            # Only analyze if cooldown period has passed
                            if current_time - last_detection_time >= detection_cooldown:
                                logger.info(f"Analyzing {len(detections)} detected food item(s)...")

                                # Analyze food and get nutrition information
                                try:
                                    analysis = await analyze_food_and_calories(detections)
                                    await agent.llm.simple_response(analysis)
                                    last_detection_time = current_time
                                except Exception as e:
                                    logger.warning(f"Error during analysis (connection may be closing): {e}")
                                    session_active = False
                                    break
                            else:
                                logger.debug(f"Cooldown active ({detection_cooldown - (current_time - last_detection_time):.1f}s remaining)")

                except asyncio.CancelledError:
                    logger.info("Call ended by user")
                    session_active = False
        finally:
            # Clean up resources
            session_active = False
            if food_processor:
                logger.info("Cleaning up food detection processor...")
                await food_processor.cleanup()
                logger.info("Cleanup complete")

    except Exception as e:
        logger.error(f"Error running agent: {e}", exc_info=True)
        raise


def main() -> None:
    """Main entry point for the application."""
    try:
        asyncio.run(start_agent())
    except KeyboardInterrupt:
        logger.info("Application interrupted by user")
        sys.exit(0)
    except Exception as e:
        logger.error(f"Application failed: {e}")
        sys.exit(1)


if __name__ == "__main__":
    main()


What am I doing wrong? or what should I do to make Stream call the Processor and allow it detect food?

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions