Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
d8c812b
Add token usage tracking to multi-agent workflow
devin-ai-integration[bot] Sep 21, 2025
02f12ed
Fix code formatting issues
devin-ai-integration[bot] Sep 21, 2025
71defe8
Refactor cost tracking to use cost_summary_report property
devin-ai-integration[bot] Sep 21, 2025
5e5f745
Revert GitHub token validation changes - keep focus on core token tra…
devin-ai-integration[bot] Sep 21, 2025
5d702de
clean up
aaronsteers Sep 21, 2025
f098793
feat: Add dollar cost estimation with hard-coded pricing tables
devin-ai-integration[bot] Sep 21, 2025
f446628
docs: Add docstring to _MODEL_PRICING explaining tuple format
devin-ai-integration[bot] Sep 21, 2025
d260153
fix: Replace hallucinated pricing with accurate API pricing data
devin-ai-integration[bot] Sep 21, 2025
562dfdc
docs: Add OpenAI pricing documentation reference
devin-ai-integration[bot] Sep 21, 2025
f0d4642
docs: Confirm OpenAI pricing page requires login
devin-ai-integration[bot] Sep 21, 2025
9817be2
fix lint warnings
aaronsteers Sep 21, 2025
539659e
gpt-5 pricing for unknown-model
aaronsteers Sep 21, 2025
426183e
add missing copyright
aaronsteers Sep 21, 2025
8934076
fix: Resolve model extraction and usage attribute mapping issues
devin-ai-integration[bot] Sep 22, 2025
2c6b6c0
Merge branch 'main' into devin/1758422080-add-token-usage-tracking
aaronsteers Sep 22, 2025
1faa0c6
feat: Save usage files in manifest directory following readiness repo…
devin-ai-integration[bot] Sep 22, 2025
48d82ec
fix: Apply formatting and linting fixes for usage file path changes
devin-ai-integration[bot] Sep 22, 2025
22c184f
Merge branch 'main' into devin/1758422080-add-token-usage-tracking
aaronsteers Sep 24, 2025
6136af9
revert gh-specific token change
aaronsteers Sep 24, 2025
3d354ca
clean up logs dir
aaronsteers Sep 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
196 changes: 196 additions & 0 deletions connector_builder_agents/src/cost_tracking.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
"""Cost tracking module for multi-agent workflow execution.

This module provides functionality to track token usage and costs during
the execution of multi-agent workflows, with support for multiple models
and real-time cost calculation.
"""

import json
import logging
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any

from agents.result import RunResult


logger = logging.getLogger(__name__)


@dataclass
class ModelUsage:
"""Usage statistics for a specific model."""

model_name: str
input_tokens: int = 0
output_tokens: int = 0
total_tokens: int = 0
requests: int = 0
estimated_cost: float = 0.0


@dataclass
class CostTracker:
"""Tracks costs and usage across multi-agent workflow execution."""

trace_id: str
model_usage: dict[str, ModelUsage] = field(default_factory=dict)
total_estimated_cost: float = 0.0
start_time: str | None = None
end_time: str | None = None

def add_run_result(self, run_result: RunResult) -> float:
"""Extract usage from RunResult and add to tracking.

Args:
run_result: The result from a Runner.run() call

Returns:
The estimated cost for this run result
"""
run_cost = 0.0

for response in run_result.raw_responses:
if not response.usage:
continue

model_name = self._extract_model_name(response)

if model_name not in self.model_usage:
self.model_usage[model_name] = ModelUsage(model_name=model_name)

usage_tracker = self.model_usage[model_name]

usage_tracker.input_tokens += response.usage.input_tokens
usage_tracker.output_tokens += response.usage.output_tokens
usage_tracker.total_tokens += response.usage.total_tokens
usage_tracker.requests += response.usage.requests

response_cost = self._calculate_cost(model_name, response.usage)
usage_tracker.estimated_cost += response_cost
run_cost += response_cost

self.total_estimated_cost += run_cost

logger.info(
f"[{self.trace_id}] Run tokens: {sum(response.usage.total_tokens for response in run_result.raw_responses if response.usage)}, "
f"Total tokens: {sum(usage.total_tokens for usage in self.model_usage.values())}"
)

return run_cost

def _extract_model_name(self, response: Any) -> str:
"""Extract model name from response object."""
for attr in ["model", "model_name", "engine"]:
if hasattr(response, attr):
model_value = getattr(response, attr)
if model_value:
return str(model_value)

if hasattr(response, "raw_response"):
raw = response.raw_response
if hasattr(raw, "model"):
return str(raw.model)

return "unknown-model"

def _calculate_cost(self, model_name: str, usage: Any) -> float:
"""Calculate estimated cost based on model and usage.

Returns 0.0 for now - cost calculation can be implemented later
with configurable pricing or actual API cost data.
"""
return 0.0

def get_summary(self) -> dict[str, Any]:
"""Get a summary of all tracked usage and costs."""
return {
"trace_id": self.trace_id,
"total_estimated_cost": self.total_estimated_cost,
"total_tokens": sum(usage.total_tokens for usage in self.model_usage.values()),
"total_requests": sum(usage.requests for usage in self.model_usage.values()),
"models_used": list(self.model_usage.keys()),
"model_breakdown": {
name: {
"input_tokens": usage.input_tokens,
"output_tokens": usage.output_tokens,
"total_tokens": usage.total_tokens,
"requests": usage.requests,
"estimated_cost": usage.estimated_cost,
}
for name, usage in self.model_usage.items()
},
"start_time": self.start_time,
"end_time": self.end_time,
}

def save_to_file(self, output_path: Path | str) -> None:
"""Save cost tracking summary to a JSON file."""
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)

with output_path.open("w") as f:
json.dump(self.get_summary(), f, indent=2)

logger.info(f"Cost tracking summary saved to {output_path}")


class CostEvaluator:
"""Evaluates cost tracking results with business logic."""

@staticmethod
def evaluate_cost_efficiency(cost_tracker: CostTracker) -> dict[str, Any]:
"""Evaluate the cost efficiency of the workflow execution."""
summary = cost_tracker.get_summary()

thresholds = {
"max_tokens_warning": 100000, # Warn if tokens exceed 100K
"max_tokens_critical": 500000, # Critical if tokens exceed 500K
"min_efficiency_ratio": 0.7, # Minimum output/input token ratio
"max_requests_warning": 100, # Warn if requests exceed 100
}

evaluation = {
"usage_status": "ok",
"warnings": [],
"recommendations": [],
"efficiency_metrics": {},
}

total_tokens = summary["total_tokens"]
total_requests = summary["total_requests"]

if total_tokens > thresholds["max_tokens_critical"]:
evaluation["usage_status"] = "critical"
evaluation["warnings"].append(
f"Token usage {total_tokens:,} exceeds critical threshold {thresholds['max_tokens_critical']:,}"
)
elif total_tokens > thresholds["max_tokens_warning"]:
evaluation["usage_status"] = "warning"
evaluation["warnings"].append(
f"Token usage {total_tokens:,} exceeds warning threshold {thresholds['max_tokens_warning']:,}"
)

if total_requests > thresholds["max_requests_warning"]:
evaluation["warnings"].append(
f"Request count {total_requests} exceeds warning threshold {thresholds['max_requests_warning']}"
)

evaluation["efficiency_metrics"]["total_tokens"] = total_tokens
evaluation["efficiency_metrics"]["total_requests"] = total_requests
if total_requests > 0:
tokens_per_request = total_tokens / total_requests
evaluation["efficiency_metrics"]["tokens_per_request"] = tokens_per_request

for model_name, model_data in summary["model_breakdown"].items():
if model_data["input_tokens"] > 0:
efficiency_ratio = model_data["output_tokens"] / model_data["input_tokens"]
evaluation["efficiency_metrics"][f"{model_name}_efficiency"] = efficiency_ratio

if efficiency_ratio < thresholds["min_efficiency_ratio"]:
evaluation["recommendations"].append(
f"{model_name}: Low output/input ratio {efficiency_ratio:.2f}, "
f"expected >{thresholds['min_efficiency_ratio']}"
)

return evaluation
79 changes: 79 additions & 0 deletions connector_builder_agents/src/run.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
"""Functions to run connector builder agents in different modalities."""

import datetime
import sys
from pathlib import Path

Expand All @@ -24,6 +25,7 @@
MAX_CONNECTOR_BUILD_STEPS,
SESSION_ID,
)
from .cost_tracking import CostEvaluator, CostTracker
from .tools import (
ALL_MCP_SERVERS,
DEVELOPER_AGENT_TOOLS,
Expand Down Expand Up @@ -97,6 +99,10 @@ async def run_interactive_build(
with trace(workflow_name="Interactive Connector Builder Session", trace_id=trace_id):
trace_url = f"https://platform.openai.com/traces/trace?trace_id={trace_id}"

cost_tracker = CostTracker(trace_id=trace_id)
cost_tracker.start_time = datetime.datetime.utcnow().isoformat()
update_progress_log(f"🔢 Token usage tracking enabled for trace: {trace_id}")

input_prompt: str = prompt
while True:
update_progress_log("\n⚙️ AI Agent is working...")
Expand Down Expand Up @@ -127,6 +133,13 @@ async def run_interactive_build(
# After streaming ends, get the final result
update_progress_log(f"\n🤖 AI Agent: {result_stream.final_output}")

if hasattr(result_stream, "final_result") and result_stream.final_result:
cost_tracker.add_run_result(result_stream.final_result)
total_tokens = sum(
usage.total_tokens for usage in cost_tracker.model_usage.values()
)
update_progress_log(f"🔢 Session tokens: {total_tokens:,}")

input_prompt = input("\n👤 You: ")
if input_prompt.lower() in {"exit", "quit"}:
update_progress_log("☑️ Ending conversation...")
Expand All @@ -138,6 +151,15 @@ async def run_interactive_build(
update_progress_log(f"🪵 Review trace logs at: {trace_url}")
sys.exit(0)
finally:
cost_tracker.end_time = datetime.datetime.utcnow().isoformat()
cost_summary = cost_tracker.get_summary()

if cost_summary["total_tokens"] > 0:
update_progress_log(
f"\n🔢 Session Total Tokens: {cost_summary['total_tokens']:,}"
)
update_progress_log(f"🔢 Total Requests: {cost_summary['total_requests']}")

for server in ALL_MCP_SERVERS:
await server.cleanup()

Expand Down Expand Up @@ -169,6 +191,9 @@ async def run_manager_developer_build(
with trace(workflow_name="Manager-Developer Connector Build", trace_id=trace_id):
trace_url = f"https://platform.openai.com/traces/trace?trace_id={trace_id}"

cost_tracker = CostTracker(trace_id=trace_id)
cost_tracker.start_time = datetime.datetime.utcnow().isoformat()

run_prompt = (
"You are working on a connector build task. "
f"You are managing a connector build for the API: '{api_name or 'N/A'}'. "
Expand All @@ -177,6 +202,7 @@ async def run_manager_developer_build(

update_progress_log("\n⚙️ Manager Agent is orchestrating the build...")
update_progress_log(f"🔗 Follow along at: {trace_url}")
update_progress_log(f"🔢 Token usage tracking enabled for trace: {trace_id}")
open_if_browser_available(trace_url)

try:
Expand All @@ -190,9 +216,22 @@ async def run_manager_developer_build(
session=session,
# previous_response_id=prev_response_id,
)

cost_tracker.add_run_result(run_result)

# prev_response_id = run_result.raw_responses[-1].response_id if run_result.raw_responses else None
status_msg = f"\n🤖 {run_result.last_agent.name}: {run_result.final_output}"
update_progress_log(status_msg)
run_tokens = sum(
response.usage.total_tokens
for response in run_result.raw_responses
if response.usage
)
total_tokens = sum(
usage.total_tokens for usage in cost_tracker.model_usage.values()
)
update_progress_log(f"🔢 Run tokens: {run_tokens:,} | Total: {total_tokens:,}")

run_prompt = (
"You are still working on the connector build task. "
"Continue to the next step or raise an issue if needed. "
Expand All @@ -208,3 +247,43 @@ async def run_manager_developer_build(
update_progress_log(f"\n❌ Unexpected error during build: {ex}")
update_progress_log(f"🪵 Review trace logs at: {trace_url}")
sys.exit(1)
finally:
cost_tracker.end_time = datetime.datetime.utcnow().isoformat()

cost_summary = cost_tracker.get_summary()
cost_evaluation = CostEvaluator.evaluate_cost_efficiency(cost_tracker)

update_progress_log("\n" + "=" * 60)
update_progress_log("🔢 TOKEN USAGE TRACKING SUMMARY")
update_progress_log("=" * 60)
update_progress_log(f"Total Tokens: {cost_summary['total_tokens']:,}")
update_progress_log(f"Total Requests: {cost_summary['total_requests']}")
update_progress_log(f"Models Used: {', '.join(cost_summary['models_used'])}")

for model_name, model_data in cost_summary["model_breakdown"].items():
update_progress_log(f" {model_name}:")
update_progress_log(f" Input tokens: {model_data['input_tokens']:,}")
update_progress_log(f" Output tokens: {model_data['output_tokens']:,}")
update_progress_log(f" Requests: {model_data['requests']}")

update_progress_log(f"\nUsage Status: {cost_evaluation['usage_status'].upper()}")
if cost_evaluation["warnings"]:
for warning in cost_evaluation["warnings"]:
update_progress_log(f"⚠️ {warning}")
if cost_evaluation["recommendations"]:
for rec in cost_evaluation["recommendations"]:
update_progress_log(f"💡 {rec}")

try:
from pathlib import Path

usage_file = Path("usage_tracking_results") / f"{trace_id}_usage_summary.json"
cost_tracker.save_to_file(usage_file)
update_progress_log(f"📊 Detailed usage data saved to: {usage_file}")
except Exception as save_ex:
update_progress_log(f"⚠️ Could not save usage data: {save_ex}")

update_progress_log("=" * 60)

for server in [*MANAGER_AGENT_TOOLS, *DEVELOPER_AGENT_TOOLS]:
await server.cleanup()
Loading