Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 148 additions & 0 deletions .claude/plans/cost-tracking.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
# Cost Tracking -- Implementation Plan

## Overview

Add per-run cost and duration tracking to Orchestra. Every LLM invocation records token counts, cost (USD), duration, and phase metadata. Aggregated metrics are surfaced per thread, with configurable budget caps that integrate with the existing HITL interrupt system. The frontend displays real-time cost badges and a detailed breakdown panel.

Spec: [`.claude/specs/cost-tracking.md`](../specs/cost-tracking.md)

## User Stories

1. **As a user**, I want to see the running cost of my current thread in the chat header so I can monitor spend in real time.
2. **As a user**, I want to set a budget cap on a thread or assistant so that execution pauses before exceeding my limit.
3. **As a user**, I want to view a per-turn cost breakdown (model, tokens, cost, duration) so I can identify expensive steps.
4. **As an admin**, I want to configure model pricing tables via API so costs stay accurate when providers change rates.
5. **As a team lead**, I want historical cost data per thread so I can analyze spending trends across projects.

## Implementation Phases

### Phase 1: Backend Data Layer (Priority: Critical)

**Goal**: Define schemas, database tables, and pricing constants.

**Tasks**:
1. Create `backend/src/schemas/entities/metrics.py` with `TurnMetrics`, `ThreadCostSummary`, and `RunBudget` Pydantic models.
2. Create `backend/src/constants/pricing.py` with hardcoded pricing dictionary for OpenAI, Anthropic, Google models.
3. Create Alembic migration for `turn_metrics` table (columns: turn_id, thread_id, assistant_id, user_id, model, input_tokens, output_tokens, total_tokens, input_cost, output_cost, total_cost, duration_ms, phase, created_at).
4. Create Alembic migration for `run_budgets` table (columns: id, thread_id, assistant_id, user_id, max_cost_usd, max_tokens, action_on_exceed, is_active, created_at, updated_at).
5. Create Alembic migration for `pricing_overrides` table (columns: id, model, input_per_1k, output_per_1k, currency, created_at, updated_at).

**Acceptance Criteria**:
- Migrations run without errors on a clean database.
- Pydantic schemas validate correctly with sample data.
- Pricing dictionary covers at least 10 commonly used models.

### Phase 2: Metrics Capture Middleware (Priority: Critical)

**Goal**: Instrument LLM calls to capture token usage, cost, and duration.

**Tasks**:
1. Create `backend/src/utils/metrics.py` with `capture_turn_metrics()` wrapper.
2. Hook into the LangChain callback system (or wrap the LLM service layer) to intercept response metadata.
3. Extract `usage` (input_tokens, output_tokens) from LLM response.
4. Compute cost using pricing lookup (DB override -> hardcoded fallback).
5. Measure wall-clock duration with `time.perf_counter()`.
6. Persist `TurnMetrics` asynchronously (background task or fire-and-forget DB write).
7. Emit a `cost_update` event on the SSE/streaming channel for real-time frontend updates.

**Acceptance Criteria**:
- Every LLM call produces a `TurnMetrics` record in the database.
- Cost calculation matches expected values for known token counts and pricing.
- Metrics capture adds < 5ms overhead to request latency.

### Phase 3: Budget Enforcement (Priority: High)

**Goal**: Enforce spending limits with HITL integration.

**Tasks**:
1. Create `backend/src/services/budget.py` with budget check logic.
2. Before each LLM call, query cumulative thread cost (cached, refresh per turn).
3. Compare against active `RunBudget` for the thread/assistant.
4. Implement three actions: `pause` (HITL interrupt), `warn` (log + UI event), `stop` (raise error).
5. Integrate with existing HITL interrupt flow for the `pause` action.
6. Add budget status to the `cost_update` SSE event payload.

**Acceptance Criteria**:
- Thread pauses when cost exceeds budget with `pause` action.
- User can approve continuation after HITL interrupt.
- `stop` action halts execution and returns a clear error message.
- `warn` action logs and sends notification but does not block.

### Phase 4: API Endpoints (Priority: High)

**Goal**: Expose metrics, pricing, and budget management via REST API.

**Tasks**:
1. Create `backend/src/routes/v0/metrics.py` with thread metrics endpoints.
2. Create `backend/src/routes/v0/pricing.py` with pricing CRUD endpoints.
3. Create `backend/src/routes/v0/budgets.py` with budget CRUD endpoints.
4. Create corresponding controller and service layers.
5. Add OpenAPI documentation for all new endpoints.
6. Register routes in the v0 router.

**Acceptance Criteria**:
- All endpoints return correct data with proper HTTP status codes.
- Pricing CRUD is restricted to admin users.
- Thread metrics are scoped to the requesting user's threads.
- Endpoints appear in `/docs` with full OpenAPI schemas.

### Phase 5: Frontend Integration (Priority: Medium)

**Goal**: Display cost data in the chat UI with budget configuration.

**Tasks**:
1. Add `CostBadge` component to chat header showing running thread cost.
2. Subscribe to `cost_update` SSE events for real-time updates.
3. Build `CostBreakdownPanel` (expandable side panel or modal) with per-turn table and per-phase summary.
4. Build `BudgetConfigDialog` for setting budget caps on threads/assistants.
5. Add color-coded warning indicators: green (< 75%), yellow (75-99%), red (>= 100%) of budget.
6. Add cost column to thread list view for historical comparison.

**Acceptance Criteria**:
- Cost badge updates within 1 second of each LLM response.
- Breakdown panel displays accurate per-turn data with model, tokens, cost, and duration.
- Budget dialog saves configuration and reflects active budget in the UI.
- Warning colors transition correctly at threshold boundaries.

### Phase 6: Testing & Documentation (Priority: High)

**Goal**: Comprehensive test coverage and developer documentation.

**Tasks**:
1. Unit tests for pricing calculation (`test_pricing.py`).
2. Unit tests for budget threshold logic (`test_budget.py`).
3. Unit tests for metrics aggregation (`test_metrics.py`).
4. Integration tests for metrics capture during LLM calls.
5. Integration tests for budget enforcement with HITL interrupt.
6. Integration tests for all API endpoints.
7. Frontend tests for `CostBadge`, `CostBreakdownPanel`, `BudgetConfigDialog`.
8. Update API documentation and `llm.txt` if public-facing.

**Acceptance Criteria**:
- >= 80% code coverage on new backend modules.
- All integration tests pass in CI.
- Frontend component tests cover rendering and user interaction flows.

## Dependencies

- **Existing HITL system**: Budget `pause` action depends on the interrupt flow in `backend/src/schemas/entities/hitl.py`.
- **LangChain callback system**: Metrics capture hooks into the existing LLM invocation layer.
- **SSE/streaming infrastructure**: Real-time cost updates use the existing event streaming channel.
- **Database**: Requires PostgreSQL with Alembic migrations.

## Risks & Mitigations

| Risk | Impact | Mitigation |
|------|--------|------------|
| Provider pricing changes frequently | Stale costs | DB-backed overrides + periodic review cadence |
| Metrics capture adds latency | Slower responses | Async DB writes, fire-and-forget pattern |
| Token count unavailable for some models | Missing cost data | Graceful fallback: log warning, record zero cost |
| Budget check race condition (concurrent turns) | Overspend | Optimistic check + post-turn reconciliation |
| Large turn_metrics table over time | DB performance | Indexed queries, future: partitioning or archival |

## Testing Strategy

- **Unit**: Pure function tests for pricing math, budget thresholds, aggregation logic. No DB or network needed.
- **Integration**: Full request cycle with test database. Verify metrics records created, budget enforcement triggers, API responses correct.
- **Frontend**: Vitest + Testing Library for component rendering, mock SSE events, form validation.
- **Manual QA**: End-to-end flow with real LLM calls in dev environment, verify cost badge accuracy.
108 changes: 108 additions & 0 deletions .claude/specs/cost-tracking.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
# Per-Run Cost & Duration Tracking

## Summary

Per-turn metrics (model, tokens, cost, duration, phase), aggregate metrics per thread/phase, provider pricing tables (configurable), real-time cost display in frontend, budget cap per thread/assistant with pause-on-exceed, historical cost data for analytics.

## Motivation

Users and operators need visibility into the cost of every AI interaction. Without cost tracking, teams cannot forecast budgets, identify expensive workflows, or enforce spending limits. This feature adds full-stack cost observability -- from token-level capture to real-time UI display -- plus budget enforcement that integrates with the existing HITL (human-in-the-loop) interrupt system.

## Key Components

### Backend

#### Schemas (`backend/src/schemas/entities/metrics.py`)

- **TurnMetrics** -- Per-turn record capturing:
- `turn_id`, `thread_id`, `assistant_id`, `user_id`
- `model` (string, e.g. `gpt-4o`, `claude-sonnet-4-20250514`)
- `input_tokens`, `output_tokens`, `total_tokens`
- `input_cost`, `output_cost`, `total_cost` (Decimal, USD)
- `duration_ms` (int, wall-clock time of LLM call)
- `phase` (enum: `planning`, `execution`, `reflection`, `tool_call`)
- `created_at` timestamp

- **ThreadCostSummary** -- Aggregate view:
- `thread_id`
- `total_cost`, `total_tokens`, `total_duration_ms`
- `turn_count`
- Per-phase breakdown (dict keyed by phase enum)
- Per-model breakdown (dict keyed by model string)

- **RunBudget** -- Budget configuration:
- `thread_id` or `assistant_id` (nullable, at least one required)
- `max_cost_usd` (Decimal)
- `max_tokens` (int, optional)
- `action_on_exceed` (enum: `pause`, `warn`, `stop`)
- `is_active` (bool)

#### Pricing Tables (`backend/src/constants/pricing.py`)

- Dictionary of model -> `{ input_per_1k, output_per_1k, currency }` pricing.
- Covers OpenAI, Anthropic, Google, and open-source model families.
- Admin-configurable via API (CRUD endpoints) with DB-backed overrides.
- Fallback to hardcoded defaults when no DB override exists.

#### Metrics Capture Middleware (`backend/src/utils/metrics.py`)

- Wraps LLM invocations to capture:
- Token counts from response metadata (`usage` field)
- Wall-clock duration via `time.perf_counter()`
- Model identifier from the request config
- Computes cost = tokens * pricing rate
- Persists `TurnMetrics` record asynchronously (fire-and-forget DB write)
- Emits metrics event for real-time streaming to frontend

#### Budget Enforcement

- Before each LLM call, check cumulative thread cost against `RunBudget`.
- If `max_cost_usd` exceeded:
- `pause` -- Trigger HITL interrupt, let user approve continuation.
- `warn` -- Log warning and continue, send UI notification.
- `stop` -- Halt execution, return budget-exceeded error.
- Budget checks are lightweight (cached running total, refreshed on each turn).

#### API Endpoints

| Method | Path | Description |
|--------|------|-------------|
| GET | `/v0/threads/{id}/metrics` | Thread cost summary |
| GET | `/v0/threads/{id}/metrics/turns` | Per-turn breakdown |
| GET | `/v0/pricing` | List all pricing entries |
| POST | `/v0/pricing` | Create/update pricing entry |
| DELETE | `/v0/pricing/{model}` | Delete pricing override |
| GET | `/v0/budgets` | List budgets for user |
| POST | `/v0/budgets` | Create/update budget |
| DELETE | `/v0/budgets/{id}` | Delete budget |

### Frontend

- **Running cost badge** -- Small pill in the chat header showing cumulative thread cost (e.g. `$0.12`), updated in real-time via SSE/streaming events.
- **Cost breakdown panel** -- Expandable side panel or modal showing per-turn and per-phase cost tables with charts.
- **Budget configuration** -- Settings UI to set `max_cost_usd` per thread or per assistant, choose action-on-exceed behavior.
- **Warning indicators** -- Yellow/red badge color when approaching/exceeding budget thresholds (75%/100%).

### Database Migration

- New `turn_metrics` table (Alembic migration):
- Columns matching `TurnMetrics` schema
- Indexes on `thread_id`, `created_at`, `model`
- Foreign key to threads table
- New `run_budgets` table:
- Columns matching `RunBudget` schema
- Unique constraint on (`thread_id`, `assistant_id`)
- Optional `pricing_overrides` table for admin-managed pricing.

### Testing

- **Unit tests**: Pricing calculation, budget threshold logic, metrics aggregation.
- **Integration tests**: End-to-end LLM call with metrics capture, budget enforcement with HITL interrupt, API endpoint responses.
- **Frontend tests**: Cost badge rendering, budget config form validation.

## Non-Goals (v1)

- Multi-currency support (USD only for now)
- Organization-level billing aggregation
- Automated payment/invoicing integration
- Cost prediction/forecasting ML models
12 changes: 12 additions & 0 deletions backend/src/constants/phases.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from enum import Enum


class AgentPhase(str, Enum):
"""Unified phase names across cost tracking, observability, and harness features."""

PLANNER = "planner"
GENERATOR = "generator"
EVALUATOR = "evaluator"
HANDOFF = "handoff"
NEGOTIATION = "negotiation"
SOLO = "solo"
60 changes: 60 additions & 0 deletions backend/src/constants/pricing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
"""Per-model token pricing (USD per 1M tokens). Updated 2026-03."""

from typing import TypedDict

from src.utils.logger import logger


class ModelPricing(TypedDict):
input: float # USD per 1M input tokens
output: float # USD per 1M output tokens


# Pricing per 1M tokens
PRICING_TABLE: dict[str, ModelPricing] = {
# OpenAI
"openai:o3": {"input": 2.00, "output": 8.00},
"openai:o4-mini": {"input": 1.10, "output": 4.40},
"openai:gpt-4.1-nano": {"input": 0.10, "output": 0.40},
"openai:gpt-4.1-mini": {"input": 0.40, "output": 1.60},
"openai:gpt-5-nano": {"input": 0.15, "output": 0.60},
"openai:gpt-5-mini": {"input": 0.50, "output": 2.00},
"openai:gpt-5": {"input": 2.00, "output": 8.00},
"openai:gpt-5.1": {"input": 2.50, "output": 10.00},
"openai:gpt-5.2": {"input": 3.00, "output": 12.00},
"openai:gpt-5.2-chat-latest": {"input": 3.00, "output": 12.00},
"openai:gpt-5.2-pro": {"input": 15.00, "output": 60.00},
# Anthropic
"anthropic:claude-3-7-sonnet-latest": {"input": 3.00, "output": 15.00},
"anthropic:claude-sonnet-4": {"input": 3.00, "output": 15.00},
"anthropic:claude-opus-4-1": {"input": 15.00, "output": 75.00},
"anthropic:claude-haiku-4-5": {"input": 0.80, "output": 4.00},
"anthropic:claude-sonnet-4-5": {"input": 3.00, "output": 15.00},
# xAI
"xai:grok-4-1-fast": {"input": 3.00, "output": 12.00},
"xai:grok-4-1-fast-non-reasoning": {"input": 3.00, "output": 12.00},
"xai:grok-4": {"input": 3.00, "output": 15.00},
"xai:grok-4-fast": {"input": 5.00, "output": 25.00},
"xai:grok-4-fast-non-reasoning": {"input": 5.00, "output": 25.00},
"xai:grok-code-fast-1": {"input": 3.00, "output": 12.00},
# Google
"google_genai:gemini-2.5-flash-lite": {"input": 0.02, "output": 0.10},
"google_genai:gemini-2.5-flash": {"input": 0.15, "output": 0.60},
"google_genai:gemini-2.5-pro": {"input": 1.25, "output": 10.00},
"google_genai:gemini-flash-lite-latest": {"input": 0.02, "output": 0.10},
"google_genai:gemini-3-flash-preview": {"input": 0.15, "output": 0.60},
# Groq
"groq:llama-3.3-70b-versatile": {"input": 0.59, "output": 0.79},
"groq:openai/gpt-oss-120b": {"input": 0.59, "output": 0.79},
}


def estimate_cost(model: str, input_tokens: int, output_tokens: int) -> float:
"""Calculate estimated cost in USD for a model call."""
pricing = PRICING_TABLE.get(model)
if pricing is None:
logger.debug(f"pricing_fallback model={model} — not in pricing table, cost=$0.00")
return 0.0
input_cost = (input_tokens / 1_000_000) * pricing["input"]
output_cost = (output_tokens / 1_000_000) * pricing["output"]
return round(input_cost + output_cost, 6)
41 changes: 41 additions & 0 deletions backend/src/schemas/entities/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from datetime import datetime, timezone
from typing import Literal, Optional

from pydantic import BaseModel, Field
from src.constants.phases import AgentPhase


class TurnMetrics(BaseModel):
"""Per-LLM-call metrics captured by the cost tracking middleware."""

turn_id: str
thread_id: str
agent_phase: str = AgentPhase.SOLO
model: str
input_tokens: int = 0
output_tokens: int = 0
cache_read_tokens: int = 0
cache_creation_tokens: int = 0
estimated_cost_usd: float = 0.0
duration_seconds: float = 0.0
timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))


class ThreadCostSummary(BaseModel):
"""Aggregate cost data for a thread."""

thread_id: str
total_cost_usd: float = 0.0
total_duration_seconds: float = 0.0
total_input_tokens: int = 0
total_output_tokens: int = 0
cost_by_phase: dict[str, float] = Field(default_factory=dict)
turn_count: int = 0


class RunBudget(BaseModel):
"""Budget cap configuration for a thread or assistant."""

max_cost_usd: Optional[float] = None
max_duration_minutes: Optional[float] = None
action_on_exceed: Literal["pause", "stop", "warn"] = "warn"
Loading
Loading