diff --git a/api/app.py b/api/app.py index b2b28111f..cea7a0604 100644 --- a/api/app.py +++ b/api/app.py @@ -26,6 +26,7 @@ from contextlib import asynccontextmanager +import asyncio from fastapi import APIRouter, FastAPI from fastapi.middleware.cors import CORSMiddleware @@ -68,6 +69,10 @@ async def lifespan(app: FastAPI): await sync_manager.start() set_worker_sync_manager(sync_manager) + # Start async background backfill of Paygent agents (non-blocking) + from api.services.pipecat.paygent_agent_sync import run_backfill_if_requested + asyncio.create_task(run_backfill_if_requested()) + yield # Run app # Shutdown sequence - this runs when FastAPI is shutting down diff --git a/api/routes/workflow.py b/api/routes/workflow.py index a15594953..6f3378c95 100644 --- a/api/routes/workflow.py +++ b/api/routes/workflow.py @@ -1,5 +1,6 @@ import json import re +import asyncio import uuid from datetime import datetime from typing import List, Literal, Optional @@ -399,6 +400,9 @@ async def create_workflow( }, ) + from api.services.pipecat.paygent_agent_sync import ensure_agent_async + asyncio.create_task(ensure_agent_async(workflow.id, workflow.name)) + if trigger_paths: await db_client.sync_triggers_for_workflow( workflow_id=workflow.id, @@ -499,6 +503,9 @@ async def create_workflow_from_template( }, ) + from api.services.pipecat.paygent_agent_sync import ensure_agent_async + asyncio.create_task(ensure_agent_async(workflow.id, workflow.name)) + if trigger_paths: await db_client.sync_triggers_for_workflow( workflow_id=workflow.id, diff --git a/api/services/pipecat/paygent.py b/api/services/pipecat/paygent.py new file mode 100644 index 000000000..f617c27e0 --- /dev/null +++ b/api/services/pipecat/paygent.py @@ -0,0 +1,767 @@ +""" +Paygent cost-tracking integration for Dograh pipelines. + +Design principles: + - Pure REST (no SDK dependency). All HTTP calls are fire-and-forget via a + ThreadPoolExecutor — the pipeline is NEVER blocked. + - Configuration is read exclusively from environment variables so no secrets + live in source code. + - Every code path is wrapped in try/except; a Paygent failure NEVER + propagates to the calling pipeline. + - Mode-aware: + • Standard (STT + LLM + TTS) pipelines: + - LLM usage tracked per turn via /api/v1/voice/llm + - TTS usage tracked per turn via /api/v1/voice/tts (with fallback + estimation for providers like Deepgram WS that don't emit metrics) + - STT approximated from wall-clock call duration via /api/v1/voice/stt + - Indicator + billing sent at session end via /api/v1/voice/indicator + • Realtime / STS (speech-to-speech) pipelines: + - Each LLM turn forwarded as a STS event via /api/v1/voice/sts with + the raw usageMetadata payload. No separate STT/TTS events are sent — + the STS endpoint handles the full multimodal cost. + - Indicator + billing sent at session end via /api/v1/voice/indicator + - Docker-aware: localhost/127.0.0.1 in PAYGENT_BASE_URL is automatically + rewritten to host.docker.internal when running inside a container. +""" + +import logging +import os +import time +from concurrent.futures import ThreadPoolExecutor +from typing import Any, Dict, Optional + +import requests + +from pipecat.frames.frames import CancelFrame, EndFrame, Frame, MetricsFrame, StartFrame +from pipecat.metrics.metrics import LLMUsageMetricsData, TTSUsageMetricsData +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor + +logger = logging.getLogger("api.services.pipecat.paygent") + +# ── Constants ───────────────────────────────────────────────────────────────── + +_DEFAULT_BASE_URL = "http://localhost:8082" +_REQUEST_TIMEOUT_SECONDS = 10 + +# ── Provider detection ──────────────────────────────────────────────────────── + + +def _detect_provider(name: str, fallback: str = "unknown") -> str: + """Map a processor/model name to a canonical Paygent provider slug dynamically. + + Strips common Pipecat class/service name suffixes in a clean, iterative loop + to find the base provider. Special-cases Gemini to map to Google. + """ + if not name: + return fallback + + # 1. Lowercase and clean the input + clean_name = name.lower().strip() + + # 2. Handle known exceptions (e.g., Gemini maps to Google in Paygent) + if "gemini" in clean_name: + return "google" + + # 3. Strip common Pipecat class/service name suffixes + # Order matches suffix length/specificity to avoid partial matches + suffixes = [ + "service", "multimodallive", "realtime", + "vertex", "llm", "tts", "stt", "helper", "transport" + ] + + changed = True + while changed: + changed = False + for suffix in suffixes: + if clean_name.endswith(suffix): + clean_name = clean_name[:-len(suffix)].rstrip("_").rstrip("-") + changed = True + break + + if clean_name: + return clean_name + + return fallback + + +def _resolve_base_url(raw: str) -> str: + """Rewrite localhost/127.0.0.1 -> host.docker.internal when inside Docker.""" + if os.path.exists("/.dockerenv") or os.environ.get("RUNNING_IN_DOCKER", "").lower() == "true": + raw = raw.replace("127.0.0.1", "host.docker.internal") + raw = raw.replace("localhost", "host.docker.internal") + return raw.rstrip("/") + + +def _google_live_usage_to_sts_metadata(usage: Dict[str, Any]) -> Dict[str, Any]: + """ + Pure Python translation of Google GenAI Live usage_metadata to + Paygent's canonical speech-to-speech /api/v1/voice/speech-to-speech API schema. + """ + if not usage: + return {"schemaVersion": 1} + + def _get_val(obj, *keys): + if not obj: + return None + for k in keys: + if isinstance(obj, dict): + if k in obj: return obj[k] + else: + if hasattr(obj, k): return getattr(obj, k) + return None + + def _get_list(obj, *keys): + val = _get_val(obj, *keys) + if val is None: + return None + return list(val) if not isinstance(val, list) else val + + def _optional_int(obj, *keys): + val = _get_val(obj, *keys) + if val is not None: + try: + return int(val) + except (TypeError, ValueError): + return None + return None + + def _modality_token_count(details, modality_name): + if not details: + return 0 + want = modality_name.upper() + total = 0 + for d in details: + try: + mod = _get_val(d, "modality") + if mod is None: + continue + label = _get_val(mod, "name") or _get_val(mod, "value") or mod + if str(label).upper() != want: + continue + tc = _get_val(d, "token_count", "tokenCount") + total += int(tc or 0) + except Exception: + continue + return total + + prompt_details = _get_list(usage, "prompt_tokens_details", "promptTokensDetails") + response_details = _get_list(usage, "response_tokens_details", "responseTokensDetails") + tool_details = _get_list(usage, "tool_use_prompt_tokens_details", "toolUsePromptTokensDetails") + cache_details = _get_list(usage, "cache_tokens_details", "cacheTokensDetails") + + # input side: TEXT + DOCUMENT + AUDIO + IMAGE + VIDEO + text_in = _modality_token_count(prompt_details, "TEXT") + _modality_token_count(tool_details, "TEXT") + audio_in = _modality_token_count(prompt_details, "AUDIO") + _modality_token_count(tool_details, "AUDIO") + image_in = _modality_token_count(prompt_details, "IMAGE") + _modality_token_count(tool_details, "IMAGE") + video_in = _modality_token_count(prompt_details, "VIDEO") + _modality_token_count(tool_details, "VIDEO") + doc_as_text = _modality_token_count(prompt_details, "DOCUMENT") + _modality_token_count(tool_details, "DOCUMENT") + text_in += doc_as_text + + # fallback aggregate mapping + tutc = _optional_int(usage, "tool_use_prompt_token_count", "toolUsePromptTokenCount") + if tutc is not None and not tool_details: + text_in += int(tutc) + + ptc = _optional_int(usage, "prompt_token_count", "promptTokenCount") + if ptc is not None and not prompt_details and not tool_details: + text_in += int(ptc) + + # output side: TEXT + DOCUMENT + AUDIO + VIDEO + text_out = _modality_token_count(response_details, "TEXT") + _modality_token_count(response_details, "DOCUMENT") + audio_out = _modality_token_count(response_details, "AUDIO") + _modality_token_count(response_details, "VIDEO") + + rtc = _optional_int(usage, "response_token_count", "responseTokenCount") + if text_out == 0 and audio_out == 0 and rtc is not None: + # Default fallback to audio output for STS audio connection + audio_out = int(rtc) + + # Cache breakdowns + cached_text = _modality_token_count(cache_details, "TEXT") + _modality_token_count(cache_details, "DOCUMENT") + cached_audio = _modality_token_count(cache_details, "AUDIO") + _modality_token_count(cache_details, "VIDEO") + cached_image = _modality_token_count(cache_details, "IMAGE") + cached_legacy = _optional_int(usage, "cached_content_token_count", "cachedContentTokenCount") + + # Build response payload + out = {"schemaVersion": 1} + + # Input Side + inp = {} + if text_in > 0: inp["text"] = {"tokens": text_in} + if audio_in > 0: inp["audio"] = {"tokens": audio_in} + if image_in > 0: inp["image"] = {"tokens": image_in} + if video_in > 0: inp["video"] = {"tokens": video_in} + if inp: out["input"] = inp + + # Output Side + o = {} + if text_out > 0: o["text"] = {"tokens": text_out} + if audio_out > 0: o["audio"] = {"tokens": audio_out} + if o: out["output"] = o + + # Cached breakdown + has_split = bool(cached_text or cached_audio or cached_image) + if cached_legacy is not None and cached_legacy > 0 and not has_split: + out["cached"] = {"tokens": int(cached_legacy)} + elif has_split: + cd = {} + if cached_text > 0: cd["text"] = {"tokens": cached_text} + if cached_audio > 0: cd["audio"] = {"tokens": cached_audio} + if cached_image > 0: cd["image"] = {"tokens": cached_image} + if cd: out["cached"] = cd + + return out + + +def _openai_realtime_usage_to_sts_metadata(usage: Dict[str, Any]) -> Dict[str, Any]: + """ + Pure Python translation of OpenAI Realtime usage_metadata to + Paygent's canonical speech-to-speech /api/v1/voice/speech-to-speech API schema. + """ + if not usage: + return {"schemaVersion": 1} + + def _get_val(obj, *keys): + if not obj: + return None + for k in keys: + if isinstance(obj, dict): + if k in obj: return obj[k] + else: + if hasattr(obj, k): return getattr(obj, k) + return None + + total_in = int(_get_val(usage, "input_tokens", "inputTokens") or 0) + total_out = int(_get_val(usage, "output_tokens", "outputTokens") or 0) + + in_details = _get_val(usage, "input_token_details", "inputTokenDetails") or {} + out_details = _get_val(usage, "output_token_details", "outputTokenDetails") or {} + + audio_in = int(_get_val(in_details, "audio_tokens", "audioTokens") or 0) + text_in = int(_get_val(in_details, "text_tokens", "textTokens") or 0) + image_in = int(_get_val(in_details, "image_tokens", "imageTokens") or 0) + + cached_total = int(_get_val(usage, "cached_tokens", "cachedTokens") or _get_val(in_details, "cached_tokens", "cachedTokens") or 0) + + cached_details = _get_val(in_details, "cached_tokens_details", "cachedTokensDetails") or {} + cached_audio = int(_get_val(cached_details, "audio_tokens", "audioTokens") or 0) + cached_text = int(_get_val(cached_details, "text_tokens", "textTokens") or 0) + cached_image = int(_get_val(cached_details, "image_tokens", "imageTokens") or 0) + + if not (cached_audio or cached_text or cached_image): + cached_audio = int(_get_val(in_details, "cached_audio_tokens", "cachedAudioTokens") or 0) + cached_text = int(_get_val(in_details, "cached_text_tokens", "cachedTextTokens") or 0) + cached_image = int(_get_val(in_details, "cached_image_tokens", "cachedImageTokens") or 0) + + audio_out = int(_get_val(out_details, "audio_tokens", "audioTokens") or 0) + text_out = int(_get_val(out_details, "text_tokens", "textTokens") or 0) + + if not (text_in or audio_in or image_in) and total_in > 0: + text_in = total_in - cached_total + + out = {"schemaVersion": 1} + inp = {} + if text_in > 0: inp["text"] = {"tokens": text_in} + if audio_in > 0: inp["audio"] = {"tokens": audio_in} + if image_in > 0: inp["image"] = {"tokens": image_in} + if inp: out["input"] = inp + + o = {} + if text_out > 0: o["text"] = {"tokens": text_out} + if audio_out > 0: o["audio"] = {"tokens": audio_out} + if o: out["output"] = o + + has_split = bool(cached_text or cached_audio or cached_image) + if cached_total > 0 and not has_split: + out["cached"] = {"tokens": int(cached_total)} + elif has_split: + cd = {} + if cached_text > 0: cd["text"] = {"tokens": cached_text} + if cached_audio > 0: cd["audio"] = {"tokens": cached_audio} + if cached_image > 0: cd["image"] = {"tokens": cached_image} + if cd: out["cached"] = cd + + return out +# ── Main aggregator ─────────────────────────────────────────────────────────── + + +class PaygentPipelineMetricsAggregator(FrameProcessor): + """ + Pipecat FrameProcessor that intercepts MetricsFrames from the pipeline + and forwards usage events to the Paygent REST API. + + Operates in two distinct modes controlled by the ``is_realtime`` flag: + + Standard mode (STT + LLM + TTS): + - Fires /api/v1/voice/llm per LLM turn. + - Fires /api/v1/voice/tts per TTS turn (with fallback estimation for + providers like Deepgram WebSocket that do not emit usage metrics). + - Fires /api/v1/voice/stt at session end using call wall-clock duration + as the proxy for audio minutes (Pipecat does not expose raw STT mins). + - Fires /api/v1/voice/indicator at session end. + + Realtime / STS mode (speech-to-speech — e.g. OpenAI Realtime, Gemini Live): + - Fires /api/v1/voice/sts per LLM turn with the raw usageMetadata payload. + The STS endpoint handles the complete multimodal cost (audio-in, + audio-out, text tokens). No separate STT or TTS events are sent. + - Fires /api/v1/voice/indicator at session end. + + All HTTP calls are fire-and-forget (ThreadPoolExecutor). Pipeline latency + is NEVER impacted. All failures are silently logged. + """ + + def __init__( + self, + api_key: str, + agent_id: str, + customer_id: str, + session_id: str, + indicator: str = "per-minute", + base_url: str = _DEFAULT_BASE_URL, + is_realtime: bool = False, + stt_provider: str = "unknown", + stt_model: str = "default", + tts_provider: str = "unknown", + tts_model: str = "default", + llm_provider: str = "unknown", + llm_model: str = "default", + ) -> None: + super().__init__() + self._api_key = api_key + self._agent_id = agent_id + self._customer_id = customer_id + self._session_id = session_id + self._indicator = indicator + self._base_url = _resolve_base_url(base_url) + self._is_realtime = is_realtime + self._stt_provider = stt_provider + self._stt_model = stt_model + self._tts_provider = tts_provider + self._tts_model = tts_model + self._llm_provider = llm_provider + self._llm_model = llm_model + + self._start_time: Optional[float] = None + self._finalized: bool = False + + # Standard mode only — used for TTS fallback estimation + self._has_received_tts_metrics: bool = False + self._accumulated_completion_tokens: int = 0 + + # De-duplicate MetricsFrame objects that travel multiple pipeline branches + self._seen_frame_ids: set[int] = set() + + # Fire-and-forget thread pool — daemon threads die with the process + self._executor = ThreadPoolExecutor( + max_workers=4, + thread_name_prefix="paygent_worker", + ) + + logger.debug( + "[Paygent] Aggregator ready — session=%s agent=%s customer=%s " + "base_url=%s is_realtime=%s", + self._session_id, + self._agent_id, + self._customer_id, + self._base_url, + self._is_realtime, + ) + + # ── Frame processing ────────────────────────────────────────────────────── + + async def process_frame(self, frame: Frame, direction: FrameDirection) -> None: + await super().process_frame(frame, direction) + + try: + # Guarantee session is registered even if StartFrame was dropped by another processor + if self._start_time is None and not isinstance(frame, (EndFrame, CancelFrame)): + self._start_time = time.monotonic() + self._fire_ensure_customer() + self._fire_initialize_session() + + if isinstance(frame, StartFrame): + pass # Already handled by the lazy init above + + elif isinstance(frame, MetricsFrame): + frame_id = id(frame) + if frame_id not in self._seen_frame_ids: + self._seen_frame_ids.add(frame_id) + for item in frame.data: + if isinstance(item, LLMUsageMetricsData): + if self._is_realtime: + # STS: forward as multimodal STS event + self._fire_sts_event(item) + else: + # Standard: forward as LLM token event + self._fire_llm_event(item) + elif isinstance(item, TTSUsageMetricsData): + # In STS mode the realtime model handles TTS internally — + # never send a separate TTS event. + if not self._is_realtime: + self._fire_tts_event(item) + + elif isinstance(frame, (EndFrame, CancelFrame)): + self._finalize_session() + + except Exception as exc: + # Should never reach here, but guarantee the pipeline is not blocked + logger.error("[Paygent] Unexpected error in process_frame: %s", exc) + + await self.push_frame(frame, direction) + + async def cleanup(self) -> None: + """Called by the pipeline task on shutdown.""" + self._finalize_session() + self._executor.shutdown(wait=False) + await super().cleanup() + + # ── Private helpers ─────────────────────────────────────────────────────── + + def _post(self, path: str, payload: Dict[str, Any]) -> None: + """Submit a fire-and-forget POST to the Paygent REST API.""" + url = f"{self._base_url}{path}" + headers = { + "Content-Type": "application/json", + "paygent-api-key": self._api_key, + } + + def _do_post() -> None: + for attempt in range(3): + try: + resp = requests.post( + url, + json=payload, + headers=headers, + timeout=_REQUEST_TIMEOUT_SECONDS, + ) + + # Retry on server errors or rate limits + if resp.status_code >= 500 or resp.status_code == 429: + if attempt < 2: + logger.debug("[Paygent] Retry %d posting to %s (HTTP %s)", attempt + 1, path, resp.status_code) + time.sleep(1) + continue + + if resp.status_code >= 400: + logger.error( + "[Paygent] HTTP %s posting to %s: %s", + resp.status_code, + path, + resp.text[:200], + ) + else: + logger.info("[Paygent] OK %s -> %s", path, resp.status_code) + + break # Success or non-retriable error + + except requests.exceptions.RequestException as exc: + if attempt < 2: + logger.debug("[Paygent] Retry %d posting to %s due to exception: %s", attempt + 1, path, exc) + time.sleep(1) + else: + logger.error("[Paygent] Exception posting to %s after 3 attempts: %s", path, exc) + except Exception as exc: + logger.error("[Paygent] Unhandled exception posting to %s: %s", path, exc) + break + + try: + self._executor.submit(_do_post) + except RuntimeError: + # Executor already shut down — happens during late cleanup calls + logger.debug("[Paygent] Executor shut down; skipping post to %s", path) + + def _fire_ensure_customer(self) -> None: + """Idempotently register the customer in the tracking service.""" + try: + self._post( + "/api/v1/customers/create-or-get", + { + "name": f"Customer {self._customer_id}", + "externalId": self._customer_id, + }, + ) + logger.info( + "[Paygent] Customer auto-registration queued — customer_id=%s", self._customer_id + ) + except Exception as exc: + logger.error("[Paygent] Error queuing customer registration: %s", exc) + + def _fire_initialize_session(self) -> None: + """Register the voice session on the Paygent service.""" + try: + self._post( + "/api/v1/voice/session", + { + "sessionId": self._session_id, + "agentId": self._agent_id, + "customerId": self._customer_id, + }, + ) + logger.info( + "[Paygent] Voice session init queued — session=%s", self._session_id + ) + except Exception as exc: + logger.error("[Paygent] Error queuing session init: %s", exc) + + def _fire_llm_event(self, data: LLMUsageMetricsData) -> None: + """Forward LLM token usage to Paygent (standard STT+LLM+TTS mode only).""" + try: + provider = _detect_provider(data.processor or "", fallback=self._llm_provider) + if provider == "unknown": + provider = self._llm_provider + model = data.model or self._llm_model + usage = data.value + + prompt_tokens = getattr(usage, "prompt_tokens", 0) or 0 + completion_tokens = getattr(usage, "completion_tokens", 0) or 0 + cached_tokens = ( + getattr(usage, "cache_read_input_tokens", 0) + or getattr(usage, "cached_tokens", 0) + or 0 + ) + + if prompt_tokens <= 0 and completion_tokens <= 0: + return + + # Track completion tokens for TTS fallback estimation + self._accumulated_completion_tokens += completion_tokens + + payload: Dict[str, Any] = { + "sessionId": self._session_id, + "provider": provider, + "model": model, + "plan": "default", + "promptTokens": prompt_tokens, + "completionTokens": completion_tokens, + } + if cached_tokens > 0: + payload["cachedTokens"] = cached_tokens + + logger.debug( + "[Paygent] LLM event — model=%s provider=%s prompt=%d completion=%d cached=%d", + model, provider, prompt_tokens, completion_tokens, cached_tokens, + ) + self._post("/api/v1/voice/llm", payload) + except Exception as exc: + logger.error("[Paygent] Error building LLM payload: %s", exc) + + def _fire_sts_event(self, data: LLMUsageMetricsData) -> None: + """Forward realtime/STS usage to Paygent via /api/v1/voice/sts. + + In STS mode the realtime provider (e.g. OpenAI Realtime, Gemini Live) + handles STT + LLM + TTS internally. The usageMetadata payload forwarded + here contains all input/output modality details so the tracking service + can calculate the exact multimodal cost. No separate LLM/STT/TTS events + are sent in this mode. + """ + try: + provider = _detect_provider(data.processor or "", fallback=self._llm_provider) + if provider == "unknown": + provider = self._llm_provider + + # Grok and Ultravox are billed purely per-minute at the end of the session. + # Skip per-turn STS events for these providers; they are handled in _finalize_session. + if provider in ("grok", "ultravox", "grok_realtime", "ultravox_realtime"): + return + + model = data.model or self._llm_model + usage = data.value + + # If the LLMTokenUsage object contains raw_usage_metadata, translate it + # using our robust multimodal payload mappers. + raw_metadata = getattr(usage, "raw_usage_metadata", None) + if raw_metadata: + if provider in ("openai", "openai_realtime"): + usage_metadata = _openai_realtime_usage_to_sts_metadata(raw_metadata) + else: + usage_metadata = _google_live_usage_to_sts_metadata(raw_metadata) + else: + # Build usageMetadata dict from the pipecat metrics object. + # Pipecat currently strips modality information and emits only basic + # token fields. We map these to text tokens for STS tracking. + prompt_tokens = getattr(usage, "prompt_tokens", 0) or 0 + completion_tokens = getattr(usage, "completion_tokens", 0) or 0 + cached_tokens = ( + getattr(usage, "cache_read_input_tokens", 0) + or getattr(usage, "cached_tokens", 0) + or 0 + ) + + usage_metadata = {"schemaVersion": 1} + if prompt_tokens > 0: + usage_metadata.setdefault("input", {})["text"] = {"tokens": prompt_tokens} + if completion_tokens > 0: + usage_metadata.setdefault("output", {})["text"] = {"tokens": completion_tokens} + if cached_tokens > 0: + usage_metadata["cached"] = {"tokens": cached_tokens} + + # Also include any additional attributes from the usage object + if hasattr(usage, "__dict__"): + for k, v in vars(usage).items(): + if not k.startswith("_") and v is not None and k not in usage_metadata: + usage_metadata[k] = v + + if not usage_metadata: + logger.debug( + "[Paygent] Skipping empty STS event — session=%s", self._session_id + ) + return + + logger.debug( + "[Paygent] STS event — model=%s provider=%s", model, provider + ) + self._post( + "/api/v1/voice/speech-to-speech", + { + "sessionId": self._session_id, + "provider": provider, + "model": model, + "plan": "default", + "usageMetadata": usage_metadata, + }, + ) + except Exception as exc: + logger.error("[Paygent] Error building STS payload: %s", exc) + + def _fire_tts_event(self, data: TTSUsageMetricsData) -> None: + """Forward TTS character usage to Paygent (standard mode only).""" + try: + provider = _detect_provider(data.processor or "", fallback=self._tts_provider) + if provider == "unknown": + provider = self._tts_provider + model = data.model or self._tts_model + char_count = int(data.value or 0) + + if char_count <= 0: + return + + self._has_received_tts_metrics = True + + logger.debug( + "[Paygent] TTS event — model=%s provider=%s chars=%d", + model, provider, char_count, + ) + self._post( + "/api/v1/voice/tts", + { + "sessionId": self._session_id, + "provider": provider, + "model": model, + "plan": "default", + "characters": char_count, + }, + ) + except Exception as exc: + logger.error("[Paygent] Error building TTS payload: %s", exc) + + def _finalize_session(self) -> None: + """ + Fire end-of-session tracking events and the billing indicator. + + Standard mode: + 1. /api/v1/voice/stt — call duration used as audio-minute proxy + 2. /api/v1/voice/tts — fallback estimation from completion tokens + (only when no per-turn TTS metrics were received, e.g. Deepgram WS) + 3. /api/v1/voice/indicator — billing indicator with total duration + + STS / realtime mode: + 1. /api/v1/voice/indicator — billing indicator with total duration + (STS per-turn events already contain the full multimodal cost; + no STT or TTS events should be sent) + """ + if self._finalized: + return + self._finalized = True + + if self._start_time is None: + logger.warning( + "[Paygent] Session never started; skipping finalization — session=%s", + self._session_id, + ) + return + + try: + duration_minutes = (time.monotonic() - self._start_time) / 60.0 + logger.info( + "[Paygent] Finalizing session=%s mode=%s indicator=%s duration=%.3f min", + self._session_id, + "sts" if self._is_realtime else "standard", + self._indicator, + duration_minutes, + ) + + if not self._is_realtime: + # 1. STT — approximate from call wall-clock duration + self._post( + "/api/v1/voice/stt", + { + "sessionId": self._session_id, + "provider": self._stt_provider, + "model": self._stt_model, + "plan": "default", + "audioMinutes": duration_minutes, + }, + ) + + # 2. TTS fallback — only when the TTS provider never emitted + # per-turn metrics (e.g. Deepgram WebSocket TTS). Estimate + # using ~4 characters per LLM output token as the proxy. + if ( + not self._has_received_tts_metrics + and self._accumulated_completion_tokens > 0 + ): + estimated_chars = self._accumulated_completion_tokens * 4 + logger.info( + "[Paygent] Fallback TTS — %d completion tokens -> ~%d estimated chars " + "provider=%s model=%s", + self._accumulated_completion_tokens, + estimated_chars, + self._tts_provider, + self._tts_model, + ) + self._post( + "/api/v1/voice/tts", + { + "sessionId": self._session_id, + "provider": self._tts_provider, + "model": self._tts_model, + "plan": "default", + "characters": estimated_chars, + }, + ) + else: + # STS / realtime mode + # Grok and Ultravox are billed purely on wall-clock minutes. + # Track the final session duration as 'connection.minutes'. + if self._llm_provider in ("grok", "ultravox", "grok_realtime", "ultravox_realtime"): + logger.info( + "[Paygent] Sending per-minute STS usage for provider=%s model=%s minutes=%.3f", + self._llm_provider, + self._llm_model, + duration_minutes, + ) + self._post( + "/api/v1/voice/speech-to-speech", + { + "sessionId": self._session_id, + "provider": self._llm_provider, + "model": self._llm_model, + "plan": "default", + "usageMetadata": { + "schemaVersion": 1, + "connection": {"minutes": duration_minutes}, + }, + }, + ) + + # Always send the billing indicator regardless of mode + self._post( + "/api/v1/voice/indicator", + { + "sessionId": self._session_id, + "indicator": self._indicator, + "totalDuration": duration_minutes, + }, + ) + + except Exception as exc: + logger.error("[Paygent] Error finalizing session: %s", exc) diff --git a/api/services/pipecat/paygent_agent_sync.py b/api/services/pipecat/paygent_agent_sync.py new file mode 100644 index 000000000..fa375ef0d --- /dev/null +++ b/api/services/pipecat/paygent_agent_sync.py @@ -0,0 +1,396 @@ +""" +paygent_agent_sync.py — Paygent agent auto-registration for Dograh workflows. + +Design contract (never break these): + - Pure REST (requests only). Zero external SDK dependencies. + - Completely asynchronous at the call site: + * ensure_agent_async() is a proper coroutine that submits the HTTP call to + a ThreadPoolExecutor — never blocks the FastAPI event loop. + - Idempotent: POST /api/v2/agents/ensure returns the existing record unchanged + if already registered — safe to call on every workflow create and at startup. + - Nil-safe: all public functions are safe to call when PAYGENT_API_KEY is unset + or empty — they become silent no-ops. + - Zero-impact on call pipeline: errors are logged at WARNING level and swallowed. + - Docker-aware: localhost/127.0.0.1 in PAYGENT_BASE_URL is automatically + rewritten to host.docker.internal when running inside a container. + +Environment variables consumed (all optional, all default-safe): + PAYGENT_API_KEY — API key (pk_…). If unset → all calls are no-ops. + PAYGENT_BASE_URL — CP tracking service URL. Default: http://localhost:8082 + PAYGENT_AGENT_PRICING_ENABLED — "true"/"false". Default: false + PAYGENT_AGENT_PRICE_PER_MINUTE — float. Default: 0.0 + PAYGENT_AGENT_INDICATOR_ID — indicator name. Default: "per-minute" + PAYGENT_BACKFILL — "true" to run backfill at startup. Default: false + +Backfill is triggered at startup by run_backfill_if_requested(): + - Reads PAYGENT_BACKFILL=true + - Uses the same SQLAlchemy async session factory already configured in the app + - Pages through all active workflows (100 per page) + - Calls ensure_agent_v2 via thread pool (to not block the event loop during HTTP) + - Logs a summary banner on completion + - Non-blocking: runs as asyncio.create_task() so the server starts immediately. +""" + +from __future__ import annotations + +import asyncio +import logging +import os +from concurrent.futures import ThreadPoolExecutor +from typing import Any, Dict, Optional, Tuple + +import requests + +logger = logging.getLogger("api.services.pipecat.paygent_agent_sync") + +# ── Constants ───────────────────────────────────────────────────────────────── + +_DEFAULT_BASE_URL = "http://localhost:8082" +_REQUEST_TIMEOUT_SECONDS = 15 +_BACKFILL_PAGE_SIZE = 100 +_BACKFILL_ENV_VAR = "PAYGENT_BACKFILL" +_API_KEY_HEADER = "paygent-api-key" +_ENSURE_AGENT_PATH = "/api/v2/agents/ensure" +_AGENT_TYPE = "voice" + +# Shared fire-and-forget thread pool (daemon threads — die with the process). +_executor = ThreadPoolExecutor(max_workers=4, thread_name_prefix="paygent_agent_sync") + + +# ── Configuration resolution ────────────────────────────────────────────────── + +def _resolve_base_url(raw: str) -> str: + """Rewrite localhost/127.0.0.1 → host.docker.internal when inside Docker.""" + if os.path.exists("/.dockerenv") or os.environ.get("RUNNING_IN_DOCKER", "").lower() == "true": + raw = raw.replace("127.0.0.1", "host.docker.internal") + raw = raw.replace("localhost", "host.docker.internal") + return raw.rstrip("/") + + +def _get_config() -> Tuple[Optional[str], str, bool, float, str]: + """ + Read all Paygent agent-sync config from environment variables. + + Returns: + (api_key, base_url, pricing_enabled, price_per_minute, indicator_id) + api_key is None when billing is disabled. + """ + api_key = os.environ.get("PAYGENT_API_KEY", "").strip() or None + raw_url = os.environ.get("PAYGENT_BASE_URL", _DEFAULT_BASE_URL).strip() + base_url = _resolve_base_url(raw_url) + pricing_enabled = ( + os.environ.get("PAYGENT_AGENT_PRICING_ENABLED", "false").strip().lower() == "true" + ) + try: + price_per_minute = float(os.environ.get("PAYGENT_AGENT_PRICE_PER_MINUTE", "0.0").strip()) + except (ValueError, TypeError): + price_per_minute = 0.0 + indicator_id = ( + os.environ.get("PAYGENT_AGENT_INDICATOR_ID", "per-minute").strip() or "per-minute" + ) + return api_key, base_url, pricing_enabled, price_per_minute, indicator_id + + +# ── Core HTTP call ───────────────────────────────────────────────────────────── + +def _build_ensure_agent_payload( + workflow_id: int, + workflow_name: str, + pricing_enabled: bool, + price_per_minute: float, + indicator_id: str, +) -> Dict[str, Any]: + """ + Build the POST /api/v2/agents/ensure request body. + + Dograh Workflow → Paygent agent mapping: + agent_external_id = str(workflow.id) — stable, DB primary key + agent_name = workflow.name + agent_type = "voice" + pricing = from env vars (optional, operator-configurable) + """ + payload: Dict[str, Any] = { + "agent_external_id": str(workflow_id), + "agent_name": workflow_name or f"Workflow {workflow_id}", + "agent_type": _AGENT_TYPE, + } + + if pricing_enabled: + payload["pricing"] = { + "activityBased": { + "enabled": True, + "indicators": { + indicator_id: { + "enabled": True, + "billingType": "FLAT", + "price": price_per_minute, + "billingFrequency": "Monthly", + } + }, + } + } + + return payload + + +def _post_ensure_agent_sync( + api_key: str, + base_url: str, + workflow_id: int, + workflow_name: str, + pricing_enabled: bool, + price_per_minute: float, + indicator_id: str, +) -> str: + """ + Execute POST /api/v2/agents/ensure synchronously. + + Always called from a background thread — never directly from the event loop. + + Returns: + The Paygent agent UUID from the response. + + Raises: + RuntimeError: On non-200/201 HTTP response. + requests.exceptions.*: On network errors (let caller classify and log). + """ + url = f"{base_url}{_ENSURE_AGENT_PATH}" + headers = { + "Content-Type": "application/json", + _API_KEY_HEADER: api_key, + } + payload = _build_ensure_agent_payload( + workflow_id, workflow_name, pricing_enabled, price_per_minute, indicator_id + ) + + resp = requests.post( + url, json=payload, headers=headers, timeout=_REQUEST_TIMEOUT_SECONDS + ) + + # 200 = already existed (idempotent), 201 = newly created + if resp.status_code not in (200, 201): + raise RuntimeError( + f"ensure-agent returned HTTP {resp.status_code}: {resp.text[:300]}" + ) + + body = resp.json() + return body.get("id", "") + + +# ── Async public API ─────────────────────────────────────────────────────────── + +async def ensure_agent_async(workflow_id: int, workflow_name: str) -> None: + """ + Idempotently register a dograh workflow as a Paygent agent. + + Completely non-blocking — submits the HTTP call to a thread pool and returns + immediately. Safe to call on every workflow create path; the Paygent + /api/v2/agents/ensure endpoint is idempotent (returns existing agent unchanged). + + No-op when PAYGENT_API_KEY is not set. + + Args: + workflow_id: Workflow primary key (used as agent_external_id). + workflow_name: Human-readable workflow name stored in Paygent dashboard. + """ + try: + api_key, base_url, pricing_enabled, price_per_minute, indicator_id = _get_config() + if not api_key: + return # Paygent not configured — silent no-op + + loop = asyncio.get_event_loop() + + def _task() -> None: + try: + uuid = _post_ensure_agent_sync( + api_key, base_url, workflow_id, workflow_name, + pricing_enabled, price_per_minute, indicator_id, + ) + logger.info( + "[Paygent AgentSync] ✓ registered workflow_id=%d name=%r → agent_uuid=%s " + "pricing_enabled=%s price_per_min=%.4f", + workflow_id, workflow_name, uuid, pricing_enabled, price_per_minute, + ) + except requests.exceptions.Timeout: + logger.warning( + "[Paygent AgentSync] Timeout registering workflow_id=%d — server slow", + workflow_id, + ) + except requests.exceptions.ConnectionError: + logger.warning( + "[Paygent AgentSync] Connection error registering workflow_id=%d " + "— server unreachable", + workflow_id, + ) + except Exception as exc: + logger.warning( + "[Paygent AgentSync] Failed to register workflow_id=%d name=%r: %s", + workflow_id, workflow_name, exc, + ) + + try: + loop.run_in_executor(_executor, _task) + except RuntimeError: + # Loop or executor already shut down during teardown + pass + + except Exception as exc: + # Guarantee this never propagates to the caller + logger.warning("[Paygent AgentSync] Unexpected error in ensure_agent_async: %s", exc) + + +# ── Startup backfill ────────────────────────────────────────────────────────── + +async def run_backfill_if_requested() -> None: + """ + Entry point called from app.py lifespan as asyncio.create_task(). + + Checks PAYGENT_BACKFILL=true; if set, iterates all active workflows using + the app's existing async SQLAlchemy session factory and registers each as + a Paygent agent. HTTP calls run in a thread pool; the event loop remains free. + + The server starts accepting requests immediately — backfill is fully + non-blocking from the perspective of the startup sequence. + """ + should_backfill = ( + os.environ.get(_BACKFILL_ENV_VAR, "").strip().lower() == "true" + ) + if not should_backfill: + return + + api_key, base_url, pricing_enabled, price_per_minute, indicator_id = _get_config() + if not api_key: + logger.warning( + "[Paygent Backfill] PAYGENT_BACKFILL=true but PAYGENT_API_KEY is not set — skipping." + ) + return + + logger.info("=== PAYGENT BACKFILL MODE ACTIVATED ===") + logger.info( + "[Paygent Backfill] Config: base_url=%s pricing_enabled=%s " + "price_per_min=%.4f indicator=%s page_size=%d", + base_url, pricing_enabled, price_per_minute, indicator_id, _BACKFILL_PAGE_SIZE, + ) + + # Import here to avoid circular imports at module load time + try: + from sqlalchemy import func + from sqlalchemy.future import select + from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker + from api.constants import DATABASE_URL + from api.db.models import WorkflowModel + except Exception as exc: + logger.error("[Paygent Backfill] Import error — cannot run backfill: %s", exc) + return + + loop = asyncio.get_event_loop() + total = 0 + synced = 0 + failed = 0 + page = 1 + consecutive_db_errors = 0 + + # Create a dedicated engine for the backfill (avoids sharing state with app pool) + try: + _engine = create_async_engine(DATABASE_URL, pool_size=2, max_overflow=0) + _session_factory = async_sessionmaker(bind=_engine, expire_on_commit=False) + except Exception as exc: + logger.error("[Paygent Backfill] Failed to create DB engine: %s", exc) + return + + try: + async with _session_factory() as session: + # Count active workflows for progress logging + try: + count_result = await session.execute( + select(func.count(WorkflowModel.id)).where( + WorkflowModel.status == "active" + ) + ) + total = count_result.scalar() or 0 + except Exception as exc: + logger.error("[Paygent Backfill] Cannot count active workflows: %s", exc) + return + + logger.info("[Paygent Backfill] Found %d active workflows to process.", total) + + while True: + offset = (page - 1) * _BACKFILL_PAGE_SIZE + try: + result = await session.execute( + select(WorkflowModel) + .where(WorkflowModel.status == "active") + .order_by(WorkflowModel.id) + .offset(offset) + .limit(_BACKFILL_PAGE_SIZE) + ) + workflows = result.scalars().all() + consecutive_db_errors = 0 + except Exception as exc: + logger.error( + "[Paygent Backfill] DB error fetching page %d: %s", page, exc + ) + consecutive_db_errors += 1 + if consecutive_db_errors > 3: + logger.error( + "[Paygent Backfill] Too many consecutive DB errors — aborting." + ) + break + page += 1 + continue + + if not workflows: + break # No more rows + + logger.info( + "[Paygent Backfill] Processing page %d (%d workflows).", + page, len(workflows), + ) + + # Process each workflow: HTTP call runs in thread pool to free the event loop + for wf in workflows: + wf_id = wf.id + wf_name = wf.name + try: + uuid = await loop.run_in_executor( + _executor, + _post_ensure_agent_sync, + api_key, base_url, wf_id, wf_name, + pricing_enabled, price_per_minute, indicator_id, + ) + logger.info( + "[Paygent Backfill] ✓ synced id=%-8d name=%r → %s", + wf_id, wf_name, uuid, + ) + synced += 1 + except Exception as exc: + logger.warning( + "[Paygent Backfill] ✗ FAILED id=%-8d name=%r error=%s", + wf_id, wf_name, exc, + ) + failed += 1 + + page += 1 + + except Exception as exc: + logger.error("[Paygent Backfill] Unhandled error during backfill: %s", exc) + finally: + try: + await _engine.dispose() + except Exception: + pass + + + # Summary banner (always printed) + logger.info("[Paygent Backfill] ──────────────────────────────────────────────") + logger.info("[Paygent Backfill] Total: %d", total) + logger.info("[Paygent Backfill] Synced: %d", synced) + logger.info("[Paygent Backfill] Failed: %d", failed) + logger.info("[Paygent Backfill] ──────────────────────────────────────────────") + if failed > 0: + logger.warning( + "=== PAYGENT BACKFILL COMPLETED WITH %d FAILURES — server running normally ===", + failed, + ) + else: + logger.info("=== PAYGENT BACKFILL COMPLETED SUCCESSFULLY ===") diff --git a/api/services/pipecat/pipeline_builder.py b/api/services/pipecat/pipeline_builder.py index de9d48c2e..0435769e9 100644 --- a/api/services/pipecat/pipeline_builder.py +++ b/api/services/pipecat/pipeline_builder.py @@ -37,6 +37,7 @@ def build_pipeline( pipeline_metrics_aggregator, voicemail_detector=None, recording_router=None, + paygent_aggregator=None, ): """Build the main pipeline with all components. @@ -91,6 +92,9 @@ def build_pipeline( ] ) + if paygent_aggregator: + processors.append(paygent_aggregator) + return Pipeline(processors) @@ -103,6 +107,7 @@ def build_realtime_pipeline( pipeline_engine_callback_processor, pipeline_metrics_aggregator, voicemail_detector=None, + paygent_aggregator=None, ): """Build a pipeline for realtime (speech-to-speech) LLM services. @@ -149,6 +154,9 @@ def build_realtime_pipeline( ] ) + if paygent_aggregator: + processors.append(paygent_aggregator) + return Pipeline(processors) diff --git a/api/services/pipecat/run_pipeline.py b/api/services/pipecat/run_pipeline.py index 6cae498f1..78d34b3bf 100644 --- a/api/services/pipecat/run_pipeline.py +++ b/api/services/pipecat/run_pipeline.py @@ -437,6 +437,113 @@ async def _run_pipeline( workflow_run_id, initial_context=merged_call_context_vars ) + # ── Paygent cost tracking ───────────────────────────────────────────── + # All configuration is sourced from environment variables — no secrets in + # source code. Set these in your .env / .env.local / deployment secrets: + # + # PAYGENT_API_KEY — Paygent API key (required to enable tracking) + # PAYGENT_INDICATOR — billing indicator name (default: "per-minute") + # PAYGENT_BASE_URL — Paygent service URL (default: https://cp-api.withpaygent.com) + # + # agent_id is always the workflow integer ID (str) — matches what + # ensure_agent_async registers on workflow create. + # customer_id is always the organization_id (str). + # Do NOT override these per-deployment via PAYGENT_AGENT_ID/CUSTOMER_ID. + import os as _os + try: + from dotenv import load_dotenv as _load_dotenv + _api_dir = _os.path.dirname(_os.path.dirname(_os.path.dirname(_os.path.abspath(__file__)))) + _env_path = _os.path.join(_api_dir, ".env") + if _os.path.exists(_env_path): + _load_dotenv(_env_path) + _env_local_path = _os.path.join(_api_dir, ".env.local") + if _os.path.exists(_env_local_path): + _load_dotenv(_env_local_path, override=True) + except Exception as _dotenv_err: + logger.warning(f"[Paygent] Could not load .env files: {_dotenv_err}") + + paygent_aggregator = None + _paygent_api_key = _os.environ.get("PAYGENT_API_KEY", "").strip() + + if _paygent_api_key: + try: + from api.services.pipecat.paygent import PaygentPipelineMetricsAggregator + + def _get_prov_model(cfg_obj): + if not cfg_obj: return "unknown", "default" + prov = getattr(getattr(cfg_obj, "provider", None), "value", None) or getattr(cfg_obj, "provider", "unknown") + mod = getattr(cfg_obj, "model", "default") + return str(prov), str(mod) + + stt_p, stt_m = _get_prov_model(getattr(user_config, "stt", None)) + tts_p, tts_m = _get_prov_model(getattr(user_config, "tts", None)) + llm_p, llm_m = _get_prov_model(getattr(user_config, "llm", None)) + if is_realtime and getattr(user_config, "realtime", None): + rt_p, rt_m = _get_prov_model(user_config.realtime) + llm_p, llm_m = rt_p, rt_m + stt_p, stt_m = rt_p, rt_m + tts_p, tts_m = rt_p, rt_m + + # ── customer_id derivation ──────────────────────────────────────── + # Use the most meaningful identifier available for the end customer: + # • Telephony outbound → the number being called (called_number) + # • Telephony inbound → the caller's number (caller_number) + # • WebRTC / web-call → "web-call" sentinel string + # This makes Paygent usage records map directly to real contacts + # rather than the opaque org integer. + _ctx = merged_call_context_vars or {} + _run_mode = getattr(workflow_run, "mode", "") or "" + _is_telephony = bool( + _ctx.get("provider") # telephony runs always stamp provider + or "twilio" in _run_mode.lower() + or "vonage" in _run_mode.lower() + or "telnyx" in _run_mode.lower() + or "vobiz" in _run_mode.lower() + or "plivo" in _run_mode.lower() + or "ari" in _run_mode.lower() + or "tel" in _run_mode.lower() + ) + if _is_telephony: + # Outbound: we dialed `called_number`; inbound: caller is `caller_number` + _direction = _ctx.get("direction", "") + if _direction == "inbound": + _customer_id = _ctx.get("caller_number") or str(workflow.organization_id) + else: + _customer_id = ( + _ctx.get("called_number") + or _ctx.get("phone_number") + or str(workflow.organization_id) + ) + else: + # WebRTC / web-call + _customer_id = "web-call" + + logger.info( + "[Paygent] customer_id=%s (mode=%s direction=%s)", + _customer_id, _run_mode, _ctx.get("direction", "n/a"), + ) + + paygent_aggregator = PaygentPipelineMetricsAggregator( + api_key=_paygent_api_key, + agent_id=str(workflow_id), + customer_id=_customer_id, + session_id=str(workflow_run_id), + indicator=_os.environ.get("PAYGENT_INDICATOR", "per-minute").strip(), + base_url=_os.environ.get("PAYGENT_BASE_URL", "https://cp-api.withpaygent.com").strip(), + is_realtime=is_realtime, + stt_provider=stt_p, + stt_model=stt_m, + tts_provider=tts_p, + tts_model=tts_m, + llm_provider=llm_p, + llm_model=llm_m, + ) + logger.info(f"[Paygent] Cost tracking enabled for run {workflow_run_id}") + except Exception as e: + logger.error(f"[Paygent] Failed to initialize aggregator: {e}") + else: + logger.info("[Paygent] PAYGENT_API_KEY not set — cost tracking disabled") + workflow_graph = WorkflowGraph(ReactFlowDTO.model_validate(run_workflow_json)) # Pre-call fetch: fire early so it runs concurrently with remaining setup @@ -724,6 +831,7 @@ async def _on_voicemail_detected(_processor): pipeline_engine_callback_processor, pipeline_metrics_aggregator, voicemail_detector=voicemail_detector, + paygent_aggregator=paygent_aggregator, ) else: pipeline = build_pipeline( @@ -738,6 +846,7 @@ async def _on_voicemail_detected(_processor): pipeline_metrics_aggregator, voicemail_detector=voicemail_detector, recording_router=recording_router, + paygent_aggregator=paygent_aggregator, ) # Create pipeline task with audio configuration diff --git a/docker-compose.override.yaml b/docker-compose.override.yaml new file mode 100644 index 000000000..a94442379 --- /dev/null +++ b/docker-compose.override.yaml @@ -0,0 +1,7 @@ +services: + api: + build: + context: . + dockerfile: api/Dockerfile + env_file: + - .env \ No newline at end of file diff --git a/pipecat b/pipecat index a8458879e..c6f3906fa 160000 --- a/pipecat +++ b/pipecat @@ -1 +1 @@ -Subproject commit a8458879e6c950007b0753a33652f744fca5cdb5 +Subproject commit c6f3906faafe55da75e43ee510f5b9f658c90f92