Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions viseron/components/ffmpeg/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
import json
import logging
import os
import socket
import subprocess as sp
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any
from urllib.parse import urlparse

from viseron.const import (
CAMERA_SEGMENT_DURATION,
Expand Down Expand Up @@ -720,6 +722,7 @@ def run_ffprobe(
stream_config: dict[str, Any],
) -> dict[str, Any]:
"""Run FFprobe command."""
self._preflight_tcp_connection(stream_url)
ffprobe_command = (
[
"ffprobe",
Expand Down Expand Up @@ -780,3 +783,35 @@ def run_ffprobe(
)

return output

def _preflight_tcp_connection(self, stream_url: str) -> None:
"""Check TCP reachability before starting ffprobe."""
parsed = urlparse(stream_url)
if parsed.scheme not in ("http", "https", "rtmp", "rtsp", "rtsps"):
return
if not parsed.hostname:
return

default_ports = {
"http": 80,
"https": 443,
"rtmp": 1935,
"rtsp": 554,
"rtsps": 322,
}
port = parsed.port or default_ports.get(parsed.scheme)
if port is None:
return

try:
with socket.create_connection((parsed.hostname, port), timeout=2):
return
except OSError as error:
raise FFprobeError(
{
"error": {
"code": getattr(error, "errno", None),
"string": f"TCP preflight failed: {error}",
}
}
) from error
11 changes: 9 additions & 2 deletions viseron/components/webserver/api/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,8 @@ def validate_auth_header(self) -> bool:
auth_header = self.request.headers.get("Authorization", None)

if auth_header is None:
if self.browser_request and self.validate_cookie_session():
return True
LOGGER.debug("Auth header is missing")
return False

Expand All @@ -278,9 +280,14 @@ def validate_auth_header(self) -> bool:
LOGGER.debug(f"Auth type not Bearer: {auth_type}")
return False

return self.validate_access_token(
if self.validate_access_token(
auth_val, check_refresh_token=self.browser_request
)
):
return True

if self.browser_request and self.validate_cookie_session():
return True
return False

def _allow_token_parameter(self, schema: Schema, route: Route) -> Schema:
"""Allow token parameter in schema."""
Expand Down
42 changes: 42 additions & 0 deletions viseron/components/webserver/request_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,48 @@ def validate_access_token(

return True

def validate_cookie_session(self) -> bool:
"""Validate the browser cookie session.

Browser API requests normally authenticate with the JWT header/payload from
localStorage plus the signature cookie. If localStorage is stale or missing
while the httpOnly session cookies are still valid, fall back to the cookie
session instead of leaving the UI in an unauthenticated limbo.
"""
if not self._webserver.auth:
raise RuntimeError("Auth is not set up, cannot validate cookie session.")

refresh_token_cookie = self.get_secure_cookie("refresh_token")
static_asset_key = self.get_secure_cookie("static_asset_key")
if refresh_token_cookie is None or static_asset_key is None:
LOGGER.debug("Cookie session is missing refresh token or static asset key")
return False

refresh_token = self._webserver.auth.get_refresh_token_from_token(
refresh_token_cookie.decode()
)
if refresh_token is None:
LOGGER.debug("Refresh token cookie is not valid")
return False

if not hmac.compare_digest(
refresh_token.static_asset_key, static_asset_key.decode()
):
LOGGER.debug("Static asset key does not belong to the refresh token")
return False

user = self._webserver.auth.get_user(refresh_token.user_id)
if user is None or not user.enabled:
LOGGER.debug("Cookie session user not found or disabled")
return False

if self.current_user is not None and self.current_user != user:
LOGGER.debug("Cookie session user mismatch")
return False

self.current_user = user
return True

def _get_cameras(self) -> None | dict[str, AbstractCamera]:
"""Get all registered camera instances."""
try:
Expand Down
67 changes: 63 additions & 4 deletions viseron/components/webserver/stream_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from typing import TYPE_CHECKING, ClassVar

import cv2
import httpx
import imutils
import tornado.ioloop
import tornado.web
Expand All @@ -17,6 +18,15 @@
from viseron.components.nvr.const import EVENT_PROCESSED_FRAME_TOPIC
from viseron.const import TOPIC_STATIC_MJPEG_STREAMS
from viseron.domains.camera.config import MJPEG_STREAM_SCHEMA
from viseron.domains.camera.const import (
AUTHENTICATION_BASIC,
AUTHENTICATION_DIGEST,
CONFIG_AUTHENTICATION,
CONFIG_PASSWORD,
CONFIG_REFRESH_INTERVAL,
CONFIG_URL,
CONFIG_USERNAME,
)
from viseron.domains.motion_detector import AbstractMotionDetectorScanner
from viseron.events import EventData
from viseron.exceptions import DomainNotRegisteredError
Expand Down Expand Up @@ -90,14 +100,51 @@ def _set_stream_headers(self) -> None:
)
self.set_header("Pragma", "no-cache")

async def write_jpg(self, jpg: np.ndarray) -> None:
async def write_jpg(self, jpg: np.ndarray | bytes) -> None:
"""Set the headers and write the jpg data."""
jpg_bytes = jpg if isinstance(jpg, bytes) else jpg.tobytes()
self.write(f"{BOUNDARY}\r\n")
self.write("Content-type: image/jpeg\r\n")
self.write(f"Content-length: {len(jpg)}\r\n\r\n")
self.write(jpg.tobytes())
self.write(f"Content-length: {len(jpg_bytes)}\r\n\r\n")
self.write(jpg_bytes)
await self.flush()

@staticmethod
def _get_still_image_auth(camera):
"""Return auth for a configured still image."""
if (
camera.still_image
and camera.still_image[CONFIG_USERNAME]
and camera.still_image[CONFIG_PASSWORD]
):
if camera.still_image[CONFIG_AUTHENTICATION] == AUTHENTICATION_DIGEST:
return httpx.DigestAuth(
camera.still_image[CONFIG_USERNAME],
camera.still_image[CONFIG_PASSWORD],
)
if camera.still_image[CONFIG_AUTHENTICATION] == AUTHENTICATION_BASIC:
return httpx.BasicAuth(
camera.still_image[CONFIG_USERNAME],
camera.still_image[CONFIG_PASSWORD],
)
return None

def still_image_jpg(self, camera) -> bytes | None:
"""Fetch a configured still image and return JPEG bytes."""
response = httpx.get(
camera.still_image[CONFIG_URL],
auth=self._get_still_image_auth(camera),
timeout=10,
)
if response.status_code != HTTPStatus.OK.value:
LOGGER.warning(
"Failed to fetch still image for MJPEG stream %s: %s",
camera.identifier,
response.status_code,
)
return None
return response.content

@staticmethod
def process_frame(
nvr: NVR, processed_frame: EventProcessedFrame, mjpeg_stream_config: dict
Expand Down Expand Up @@ -200,15 +247,27 @@ async def get(self, camera: str) -> None:

self._set_stream_headers()

wait_timeout = 1 if nvr.camera.still_image_configured else None

while True:
try:
processed_frame = await frame_queue.get()
processed_frame = (
await asyncio.wait_for(frame_queue.get(), timeout=wait_timeout)
if wait_timeout
else await frame_queue.get()
)
ret, jpg = await self.run_in_executor(
self.process_frame, nvr, processed_frame.data, mjpeg_stream_config
)

if ret:
await self.write_jpg(jpg)
except asyncio.TimeoutError:
if not nvr.camera.still_image_configured:
continue
jpg = await self.run_in_executor(self.still_image_jpg, nvr.camera)
if jpg:
await self.write_jpg(jpg)
except (
tornado.iostream.StreamClosedError,
asyncio.exceptions.CancelledError,
Expand Down
39 changes: 37 additions & 2 deletions viseron/domains/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from viseron.helpers.named_timer import NamedTimer

SETUP_WITH_TRIES_PARAM_COUNT = 4
OFFLINE_CAMERA_RETRY_BACKOFF = (30, 120, 300, 900, 1800)

if TYPE_CHECKING:
from viseron import Viseron
Expand Down Expand Up @@ -117,6 +118,23 @@ def _wait_for_dependencies(
def _slow_warning(futures: list[Future]) -> None:
running = [f for f in futures if f.running()]
if running:
retrying_dependencies = []
for dependency in entry.require_domains + entry.optional_domains:
dependency_entry = registry.get(
dependency.domain, dependency.identifier
)
if dependency_entry and dependency_entry.state == DomainState.RETRYING:
retrying_dependencies.append(dependency_entry)
if retrying_dependencies:
LOGGER.debug(
f"Domain {entry.domain} with identifier {entry.identifier} "
"is waiting for retrying dependencies: "
+ ", ".join(
f"{dependency.domain}/{dependency.identifier}"
for dependency in retrying_dependencies
)
)
return
LOGGER.warning(
f"Domain {entry.domain} with identifier {entry.identifier} "
"still waiting for dependencies",
Expand Down Expand Up @@ -176,6 +194,19 @@ def _slow_warning(futures: list[Future]) -> None:
return True


def _is_ffmpeg_camera(entry: DomainEntry) -> bool:
"""Return if the entry is an ffmpeg camera domain."""
return entry.component_name == "ffmpeg" and entry.domain == "camera"


def _retry_wait_time(entry: DomainEntry, tries: int) -> int:
"""Return retry wait time for a failed domain setup."""
if _is_ffmpeg_camera(entry):
backoff_index = min(tries - 1, len(OFFLINE_CAMERA_RETRY_BACKOFF) - 1)
return OFFLINE_CAMERA_RETRY_BACKOFF[backoff_index]
return min(tries * DOMAIN_RETRY_INTERVAL, DOMAIN_RETRY_INTERVAL_MAX)


def _setup_single_domain(vis: Viseron, entry: DomainEntry, tries: int = 1) -> bool:
"""Set up a single domain."""
registry = vis.domain_registry
Expand Down Expand Up @@ -282,14 +313,18 @@ def _setup_single_domain(vis: Viseron, entry: DomainEntry, tries: int = 1) -> bo
_handle_failed_domain(vis, entry, DomainState.RETRYING, error=str(error))
slow_setup_warning.cancel()

wait_time = min(tries * DOMAIN_RETRY_INTERVAL, DOMAIN_RETRY_INTERVAL_MAX)
LOGGER.error(
wait_time = _retry_wait_time(entry, tries)
log_message = (
f"Domain {entry.domain} "
f"with identifier {entry.identifier} "
f"for component {entry.component_name} is not ready. "
f"Retrying in {wait_time} seconds. "
f"Error: {error!s}"
)
if _is_ffmpeg_camera(entry) and tries >= 3:
LOGGER.warning("%s Camera is in offline backoff.", log_message)
else:
LOGGER.error(log_message)

# Block until wait_time elapses or the domain is cancelled/shutdown.
# cancel_event.wait() returns True if the event was set (cancelled),
Expand Down