Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion custom_components/frigate/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
valid_entity_id,
)
from homeassistant.exceptions import ConfigEntryNotReady, ServiceValidationError
from homeassistant.helpers import device_registry as dr, entity_registry as er
from homeassistant.helpers import device_registry as dr, entity_registry as er, llm
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.entity import Entity
from homeassistant.helpers.typing import ConfigType
Expand All @@ -57,6 +57,7 @@
ATTR_CONFIG,
ATTR_COORDINATOR,
ATTR_END_TIME,
ATTR_LLM_UNREGISTER,
ATTR_START_TIME,
ATTR_WS_EVENT_PROXY,
ATTR_WS_REVIEW_PROXY,
Expand All @@ -73,6 +74,7 @@
STATUS_RUNNING,
STATUS_STARTING,
)
from .llm_functions import FrigateServiceAPI
from .views import async_setup as views_async_setup
from .ws_api import async_setup as ws_api_async_setup
from .ws_proxy import WSEventProxy, WSReviewProxy
Expand Down Expand Up @@ -424,6 +426,15 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
entry.async_on_unload(entry.add_update_listener(_async_entry_updated))

# Register LLM API if Frigate 0.18+ and not already registered
if (
verify_frigate_version(config, "0.18")
and ATTR_LLM_UNREGISTER not in hass.data[DOMAIN]
):
hass.data[DOMAIN][ATTR_LLM_UNREGISTER] = llm.async_register_api(
hass, FrigateServiceAPI(hass=hass)
)

# Register review summarize service if Frigate version is 0.17+
if verify_frigate_version(config, "0.17"):
hass.services.async_register(
Expand Down Expand Up @@ -508,6 +519,15 @@ async def async_unload_entry(hass: HomeAssistant, config_entry: ConfigEntry) ->
)
hass.data[DOMAIN].pop(config_entry.entry_id)

# Unregister LLM API if no more Frigate entries remain
remaining = {
k
for k, v in hass.data[DOMAIN].items()
if isinstance(v, dict) and ATTR_CLIENT in v
}
if not remaining and ATTR_LLM_UNREGISTER in hass.data[DOMAIN]:
hass.data[DOMAIN].pop(ATTR_LLM_UNREGISTER)()

return unload_ok


Expand Down
24 changes: 24 additions & 0 deletions custom_components/frigate/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,30 @@ async def async_review_summarize(
)
return cast(dict[str, Any], result) if decode_json else result

async def async_chat_completion(
self,
query: str,
camera_name: str | None = None,
) -> dict[str, Any]:
"""Send a chat completion request to Frigate."""
data: dict[str, Any] = {
"messages": [{"role": "user", "content": query}],
"max_tool_iterations": 5,
"stream": False,
}
if camera_name:
data["include_live_image"] = camera_name

return cast(
dict[str, Any],
await self.api_wrapper(
"post",
str(URL(self._host) / "api/chat/completion"),
data=data,
timeout=REVIEW_SUMMARIZE_TIMEOUT,
),
)

async def _get_token(self) -> None:
"""
Obtain a new JWT token using the provided username and password.
Expand Down
1 change: 1 addition & 0 deletions custom_components/frigate/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
ATTR_START_TIME = "start_time"
ATTR_WS_EVENT_PROXY = "ws_event_proxy"
ATTR_WS_REVIEW_PROXY = "ws_review_proxy"
ATTR_LLM_UNREGISTER = "llm_unregister"
ATTR_LABEL = "label"
ATTR_SUB_LABEL = "sub_label"
ATTR_DURATION = "duration"
Expand Down
133 changes: 133 additions & 0 deletions custom_components/frigate/llm_functions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
"""LLM API for Frigate integration."""

from __future__ import annotations

import logging
from typing import Any

import voluptuous as vol

from homeassistant.core import HomeAssistant
from homeassistant.helpers import config_validation as cv, llm
from homeassistant.util.json import JsonObjectType

from .api import FrigateApiClientError
from .const import ATTR_CLIENT, ATTR_CONFIG, DOMAIN

_LOGGER = logging.getLogger(__name__)

FRIGATE_SERVICES_API_ID = "frigate_services"


class FrigateQueryTool(llm.Tool):
"""Tool that queries the Frigate NVR chat API."""

name = "frigate_query"
description = (
"Ask Frigate NVR a question about your security cameras, recent events, "
"detected objects, or what is currently visible on a camera. Use this tool "
"when the user asks about their security cameras, surveillance footage, "
"who or what was detected, or wants to know what a camera sees right now. "
"You can optionally specify a camera name to include a live image from "
"that camera for visual analysis."
)

def __init__(self, camera_names: list[str]) -> None:
"""Initialize the tool with available camera names."""
schema: dict[vol.Marker, Any] = {
vol.Required(
"query",
description="The user's question about their security cameras or surveillance system",
): cv.string,
}
if camera_names:
schema[
vol.Optional(
"camera_name",
description=(
"The name of a specific camera to include a live image "
"from for visual context. Use when the user asks about "
"what a specific camera sees right now."
),
)
] = vol.In(camera_names)
self.parameters = vol.Schema(schema)

async def async_call(
self,
hass: HomeAssistant,
tool_input: llm.ToolInput,
llm_context: llm.LLMContext,
) -> JsonObjectType:
"""Call the Frigate chat completion API."""
query = tool_input.tool_args["query"]
camera_name = tool_input.tool_args.get("camera_name")

# Find the right client
client = None
for entry_id, entry_data in hass.data[DOMAIN].items():
if not isinstance(entry_data, dict) or ATTR_CLIENT not in entry_data:
continue
if camera_name:
config = entry_data.get(ATTR_CONFIG, {})
if camera_name in config.get("cameras", {}):
client = entry_data[ATTR_CLIENT]
break
else:
client = entry_data[ATTR_CLIENT]
break

if client is None:
return {"error": "No Frigate instance available"}

try:
result = await client.async_chat_completion(query, camera_name)
content = result.get("message", {}).get("content", "")
return {"response": content}
except FrigateApiClientError as exc:
_LOGGER.error("Frigate query failed: %s", exc)
return {"error": f"Frigate query failed: {exc}"}


class FrigateServiceAPI(llm.API):
"""LLM API exposing Frigate Services."""

def __init__(self, hass: HomeAssistant) -> None:
"""Initialize the API."""
super().__init__(
hass=hass,
id=FRIGATE_SERVICES_API_ID,
name="Frigate Services",
)

async def async_get_api_instance(
self, llm_context: llm.LLMContext
) -> llm.APIInstance:
"""Return the instance of the API."""
# Collect camera names from all Frigate config entries
camera_names: list[str] = []
for entry_id, entry_data in self.hass.data.get(DOMAIN, {}).items():
if not isinstance(entry_data, dict) or ATTR_CONFIG not in entry_data:
continue
config = entry_data[ATTR_CONFIG]
for cam_name in config.get("cameras", {}).keys():
if cam_name not in camera_names:
camera_names.append(cam_name)

camera_list = ", ".join(camera_names) if camera_names else "none detected"
api_prompt = (
"Use Frigate Services to ask questions about security cameras, "
"detected events, and live camera feeds. Frigate is an NVR "
"(Network Video Recorder) that monitors security cameras, detects "
"objects like people, cars, and animals, and records events. "
f"Available cameras: {camera_list}."
)

tool = FrigateQueryTool(camera_names)

return llm.APIInstance(
api=self,
api_prompt=api_prompt,
llm_context=llm_context,
tools=[tool],
)
69 changes: 69 additions & 0 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,75 @@ async def test_async_review_summarize(
assert post_handler.called


async def test_async_chat_completion(
aiohttp_session: aiohttp.ClientSession, aiohttp_server: Any
) -> None:
"""Test async_chat_completion."""
chat_response = {
"message": {
"role": "assistant",
"content": "There is a person at the front door.",
},
"finish_reason": "stop",
"tool_iterations": 1,
"tool_calls": [],
}

async def chat_handler(request: web.Request) -> web.Response:
"""Chat completion handler."""
body = await request.json()
assert body["messages"] == [
{"role": "user", "content": "Is there anyone at the front door?"}
]
assert body["max_tool_iterations"] == 5
assert body["stream"] is False
assert "include_live_image" not in body
return web.json_response(chat_response)

server = await start_frigate_server(
aiohttp_server,
[web.post("/api/chat/completion", chat_handler)],
)

frigate_client = FrigateApiClient(str(server.make_url("/")), aiohttp_session)
result = await frigate_client.async_chat_completion(
"Is there anyone at the front door?"
)
assert result == chat_response


async def test_async_chat_completion_with_camera(
aiohttp_session: aiohttp.ClientSession, aiohttp_server: Any
) -> None:
"""Test async_chat_completion with camera name for live image."""
chat_response = {
"message": {
"role": "assistant",
"content": "I can see a car in the driveway.",
},
"finish_reason": "stop",
"tool_iterations": 1,
"tool_calls": [],
}

async def chat_handler(request: web.Request) -> web.Response:
"""Chat completion handler."""
body = await request.json()
assert body["include_live_image"] == "front_door"
return web.json_response(chat_response)

server = await start_frigate_server(
aiohttp_server,
[web.post("/api/chat/completion", chat_handler)],
)

frigate_client = FrigateApiClient(str(server.make_url("/")), aiohttp_session)
result = await frigate_client.async_chat_completion(
"What do you see?", camera_name="front_door"
)
assert result == chat_response


async def test_async_get_recordings_summary(
aiohttp_session: aiohttp.ClientSession, aiohttp_server: Any
) -> None:
Expand Down
Loading
Loading