diff --git a/connector_builder_agents/src/cost_tracking.py b/connector_builder_agents/src/cost_tracking.py new file mode 100644 index 0000000..65ab5b5 --- /dev/null +++ b/connector_builder_agents/src/cost_tracking.py @@ -0,0 +1,367 @@ +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. +"""Cost tracking module for multi-agent workflow execution. + +This module provides functionality to track token usage and costs during +the execution of multi-agent workflows, with support for multiple models +and real-time cost calculation. +""" + +import json +import logging +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any + +from agents.result import RunResult + + +logger = logging.getLogger(__name__) + +_THRESHOLDS = { + "max_tokens_warning": 1_000_000, # Warn if tokens exceed 1M + "max_tokens_critical": 2_000_000, # Critical if tokens exceed 2M + "min_efficiency_ratio": 0.7, # Minimum output/input token ratio + "max_requests_warning": 100, # Warn if requests exceed 100 +} + +_MODEL_PRICING = { + """Pricing per 1M tokens in USD as of September 2024. + + Each model maps to a tuple of (input_price_per_1M_tokens, output_price_per_1M_tokens). + Prices are based on official API documentation from OpenAI and other providers. + + Reference: https://platform.openai.com/docs/pricing (requires login) + + Example: "gpt-4o": (2.5, 10.0) means $2.50 per 1M input tokens, $10.00 per 1M output tokens. + """ + "gpt-5": (1.25, 10.00), + "gpt-5-mini": (0.25, 2.00), + "gpt-5-nano": (0.05, 0.40), + "gpt-5-chat-latest": (1.25, 10.00), + "gpt-4.1": (2.00, 8.00), + "gpt-4.1-mini": (0.40, 1.60), + "gpt-4.1-nano": (0.10, 0.40), + "gpt-4o": (2.50, 10.00), + "gpt-4o-2024-05-13": (5.00, 15.00), + "gpt-4o-mini": (0.15, 0.60), + "gpt-realtime": (4.00, 16.00), + "gpt-4o-realtime-preview": (5.00, 20.00), + "gpt-4o-mini-realtime-preview": (0.60, 2.40), + "gpt-audio": (2.50, 10.00), + "gpt-4o-audio-preview": (2.50, 10.00), + "gpt-4o-mini-audio-preview": (0.15, 0.60), + "o1": (15.00, 60.00), + "o1-pro": (150.00, 600.00), + "o3-pro": (20.00, 80.00), + "o3": (2.00, 8.00), + "o3-deep-research": (10.00, 40.00), + "o4-mini": (1.10, 4.40), + "o4-mini-deep-research": (2.00, 8.00), + "o3-mini": (1.10, 4.40), + "o1-mini": (1.10, 4.40), + "codex-mini-latest": (1.50, 6.00), + "gpt-4o-mini-search-preview": (0.15, 0.60), + "gpt-4o-search-preview": (2.50, 10.00), + "computer-use-preview": (3.00, 12.00), + "gpt-image-1": (5.00, 0.00), # Image model with no output tokens + "unknown-model": (1.25, 10.00), # Assume gpt-5 pricing for unknown models +} + + +@dataclass +class ModelUsage: + """Usage statistics for a specific model.""" + + model_name: str + input_tokens: int = 0 + output_tokens: int = 0 + total_tokens: int = 0 + requests: int = 0 + estimated_cost: float = 0.0 + + +@dataclass +class CostTracker: + """Tracks costs and usage across multi-agent workflow execution.""" + + trace_id: str + model_usage: dict[str, ModelUsage] = field(default_factory=dict) + total_estimated_cost: float = 0.0 + start_time: str | None = None + end_time: str | None = None + + def add_run_result(self, run_result: RunResult) -> float: + """Extract usage from RunResult and add to tracking. + + Args: + run_result: The result from a Runner.run() call + + Returns: + The estimated cost for this run result + """ + run_cost = 0.0 + + for response in run_result.raw_responses: + if not response.usage: + continue + + model_name = self._extract_model_name(response) + + if model_name not in self.model_usage: + self.model_usage[model_name] = ModelUsage(model_name=model_name) + + usage_tracker = self.model_usage[model_name] + + input_tokens = getattr(response.usage, "input_tokens", None) or getattr( + response.usage, "prompt_tokens", 0 + ) + output_tokens = getattr(response.usage, "output_tokens", None) or getattr( + response.usage, "completion_tokens", 0 + ) + total_tokens = getattr(response.usage, "total_tokens", input_tokens + output_tokens) + requests = getattr(response.usage, "requests", 1) # Default to 1 request per response + + usage_tracker.input_tokens += input_tokens + usage_tracker.output_tokens += output_tokens + usage_tracker.total_tokens += total_tokens + usage_tracker.requests += requests + + response_cost = self._calculate_cost(model_name, response.usage) + usage_tracker.estimated_cost += response_cost + run_cost += response_cost + + self.total_estimated_cost += run_cost + + run_tokens = 0 + for response in run_result.raw_responses: + if response.usage: + total_tokens = getattr(response.usage, "total_tokens", 0) + if total_tokens == 0: + input_tokens = getattr(response.usage, "input_tokens", None) or getattr( + response.usage, "prompt_tokens", 0 + ) + output_tokens = getattr(response.usage, "output_tokens", None) or getattr( + response.usage, "completion_tokens", 0 + ) + total_tokens = input_tokens + output_tokens + run_tokens += total_tokens + + logger.info( + f"[{self.trace_id}] Run tokens: {run_tokens}, " + f"Total tokens: {sum(usage.total_tokens for usage in self.model_usage.values())}" + ) + + return run_cost + + def _extract_model_name(self, response: Any) -> str: + """Extract model name from response object.""" + for attr in ["model", "model_name", "engine"]: + if hasattr(response, attr): + model_value = getattr(response, attr) + if model_value: + return str(model_value) + + # Try nested raw_response + if hasattr(response, "raw_response"): + raw = response.raw_response + if hasattr(raw, "model"): + model_value = raw.model + if model_value: + return str(model_value) + + if hasattr(response, "response"): + resp = response.response + if hasattr(resp, "model"): + model_value = resp.model + if model_value: + return str(model_value) + + if hasattr(response, "__getitem__"): + try: + if "model" in response: + return str(response["model"]) + except (TypeError, KeyError): + pass + + if hasattr(response, "choices") and response.choices: + choice = response.choices[0] + if hasattr(choice, "message") and hasattr(choice.message, "model"): + model_value = choice.message.model + if model_value: + return str(model_value) + + logger.debug(f"Could not extract model name from response. Response type: {type(response)}") + logger.debug( + f"Available attributes: {[attr for attr in dir(response) if not attr.startswith('_')]}" + ) + if hasattr(response, "raw_response"): + logger.debug(f"Raw response type: {type(response.raw_response)}") + logger.debug( + f"Raw response attributes: {[attr for attr in dir(response.raw_response) if not attr.startswith('_')]}" + ) + + return "unknown-model" + + def _calculate_cost(self, model_name: str, usage: Any) -> float: + """Calculate estimated cost based on model and usage. + + Args: + model_name: Name of the model used + usage: Usage object with input_tokens/output_tokens or prompt_tokens/completion_tokens + + Returns: + Estimated cost in USD + """ + input_tokens = getattr(usage, "input_tokens", None) or getattr(usage, "prompt_tokens", 0) + output_tokens = getattr(usage, "output_tokens", None) or getattr( + usage, "completion_tokens", 0 + ) + + if input_tokens == 0 and output_tokens == 0: + logger.warning(f"Usage object missing token counts for model {model_name}") + return 0.0 + + if input_tokens == 0 and output_tokens == 0: + return 0.0 + + input_price_per_1m, output_price_per_1m = _MODEL_PRICING.get( + model_name, _MODEL_PRICING["unknown-model"] + ) + + input_cost = (input_tokens / 1_000_000) * input_price_per_1m + output_cost = (output_tokens / 1_000_000) * output_price_per_1m + total_cost = input_cost + output_cost + + logger.debug( + f"Cost calculation for {model_name}: " + f"{input_tokens:,} input tokens (${input_cost:.6f}) + " + f"{output_tokens:,} output tokens (${output_cost:.6f}) = " + f"${total_cost:.6f}" + ) + + return total_cost + + def get_summary(self) -> dict[str, Any]: + """Get a summary of all tracked usage and costs.""" + return { + "trace_id": self.trace_id, + "total_estimated_cost": self.total_estimated_cost, + "total_tokens": sum(usage.total_tokens for usage in self.model_usage.values()), + "total_requests": sum(usage.requests for usage in self.model_usage.values()), + "models_used": list(self.model_usage.keys()), + "model_breakdown": { + name: { + "input_tokens": usage.input_tokens, + "output_tokens": usage.output_tokens, + "total_tokens": usage.total_tokens, + "requests": usage.requests, + "estimated_cost": usage.estimated_cost, + } + for name, usage in self.model_usage.items() + }, + "start_time": self.start_time, + "end_time": self.end_time, + } + + @property + def cost_summary_report(self) -> str: + """Generate a formatted summary report string.""" + cost_summary = self.get_summary() + cost_evaluation = CostEvaluator.evaluate_cost_efficiency(self) + + lines = [] + lines.extend( + ( + "=" * 60, + "šŸ”¢ TOKEN USAGE TRACKING SUMMARY", + "=" * 60, + f"Total Tokens: {cost_summary['total_tokens']:,}", + f"Total Requests: {cost_summary['total_requests']}", + f"Total Estimated Cost: ${cost_summary['total_estimated_cost']:.4f}", + f"Models Used: {', '.join(cost_summary['models_used'])}", + ), + ) + + for model_name, model_data in cost_summary["model_breakdown"].items(): + lines.extend( + ( + f" {model_name}:", + f" Input tokens: {model_data['input_tokens']:,}", + f" Output tokens: {model_data['output_tokens']:,}", + f" Requests: {model_data['requests']}", + f" Estimated cost: ${model_data['estimated_cost']:.4f}", + ), + ) + + lines.append(f"\nUsage Status: {cost_evaluation['usage_status'].upper()}") + if cost_evaluation["warnings"]: + lines.extend(f"āš ļø {warning}" for warning in cost_evaluation["warnings"]) + if cost_evaluation["recommendations"]: + lines.extend(f"šŸ’” {rec}" for rec in cost_evaluation["recommendations"]) + + lines.append("=" * 60) + + return "\n".join(lines) + + def save_to_file(self, output_path: Path | str) -> None: + """Save cost tracking summary to a JSON file.""" + output_path = Path(output_path) + output_path.parent.mkdir(parents=True, exist_ok=True) + + with output_path.open("w") as f: + json.dump(self.get_summary(), f, indent=2) + + logger.info(f"Cost tracking summary saved to {output_path}") + + +class CostEvaluator: + """Evaluates cost tracking results with business logic.""" + + @staticmethod + def evaluate_cost_efficiency(cost_tracker: CostTracker) -> dict[str, Any]: + """Evaluate the cost efficiency of the workflow execution.""" + summary = cost_tracker.get_summary() + evaluation = { + "usage_status": "ok", + "warnings": [], + "recommendations": [], + "efficiency_metrics": {}, + } + + total_tokens = summary["total_tokens"] + total_requests = summary["total_requests"] + + if total_tokens > _THRESHOLDS["max_tokens_critical"]: + evaluation["usage_status"] = "critical" + evaluation["warnings"].append( + f"Token usage {total_tokens:,} exceeds critical threshold {_THRESHOLDS['max_tokens_critical']:,}" + ) + elif total_tokens > _THRESHOLDS["max_tokens_warning"]: + evaluation["usage_status"] = "warning" + evaluation["warnings"].append( + f"Token usage {total_tokens:,} exceeds warning threshold {_THRESHOLDS['max_tokens_warning']:,}" + ) + + if total_requests > _THRESHOLDS["max_requests_warning"]: + evaluation["warnings"].append( + f"Request count {total_requests} exceeds warning threshold {_THRESHOLDS['max_requests_warning']}" + ) + + evaluation["efficiency_metrics"]["total_tokens"] = total_tokens + evaluation["efficiency_metrics"]["total_requests"] = total_requests + if total_requests > 0: + tokens_per_request = total_tokens / total_requests + evaluation["efficiency_metrics"]["tokens_per_request"] = tokens_per_request + + for model_name, model_data in summary["model_breakdown"].items(): + if model_data["input_tokens"] > 0: + efficiency_ratio = model_data["output_tokens"] / model_data["input_tokens"] + evaluation["efficiency_metrics"][f"{model_name}_efficiency"] = efficiency_ratio + + if efficiency_ratio < _THRESHOLDS["min_efficiency_ratio"]: + evaluation["recommendations"].append( + f"{model_name}: Low output/input ratio {efficiency_ratio:.2f}, " + f"expected >{_THRESHOLDS['min_efficiency_ratio']}" + ) + + return evaluation diff --git a/connector_builder_agents/src/run.py b/connector_builder_agents/src/run.py index 0a6a539..03a3566 100644 --- a/connector_builder_agents/src/run.py +++ b/connector_builder_agents/src/run.py @@ -1,6 +1,7 @@ # Copyright (c) 2025 Airbyte, Inc., all rights reserved. """Functions to run connector builder agents in different modalities.""" +import datetime import sys from pathlib import Path @@ -25,7 +26,9 @@ DEFAULT_MANAGER_MODEL, MAX_CONNECTOR_BUILD_STEPS, SESSION_ID, + WORKSPACE_WRITE_DIR, ) +from .cost_tracking import CostTracker from .tools import ( ALL_MCP_SERVERS, DEVELOPER_AGENT_TOOLS, @@ -117,6 +120,10 @@ async def run_interactive_build( with trace(workflow_name="Interactive Connector Builder Session", trace_id=trace_id): trace_url = f"https://platform.openai.com/traces/trace?trace_id={trace_id}" + cost_tracker = CostTracker(trace_id=trace_id) + cost_tracker.start_time = datetime.datetime.utcnow().isoformat() + update_progress_log(f"šŸ”¢ Token usage tracking enabled for trace: {trace_id}") + input_prompt: str = prompt while True: update_progress_log("\nāš™ļø AI Agent is working...") @@ -147,6 +154,13 @@ async def run_interactive_build( # After streaming ends, get the final result update_progress_log(f"\nšŸ¤– AI Agent: {result_stream.final_output}") + if hasattr(result_stream, "final_result") and result_stream.final_result: + cost_tracker.add_run_result(result_stream.final_result) + total_tokens = sum( + usage.total_tokens for usage in cost_tracker.model_usage.values() + ) + update_progress_log(f"šŸ”¢ Session tokens: {total_tokens:,}") + input_prompt = input("\nšŸ‘¤ You: ") if input_prompt.lower() in {"exit", "quit"}: update_progress_log("ā˜‘ļø Ending conversation...") @@ -158,6 +172,15 @@ async def run_interactive_build( update_progress_log(f"🪵 Review trace logs at: {trace_url}") sys.exit(0) finally: + cost_tracker.end_time = datetime.datetime.utcnow().isoformat() + cost_summary = cost_tracker.get_summary() + + if cost_summary["total_tokens"] > 0: + update_progress_log( + f"\nšŸ”¢ Session Total Tokens: {cost_summary['total_tokens']:,}" + ) + update_progress_log(f"šŸ”¢ Total Requests: {cost_summary['total_requests']}") + for server in ALL_MCP_SERVERS: await server.cleanup() @@ -196,6 +219,9 @@ async def run_manager_developer_build( with trace(workflow_name="Manager-Developer Connector Build", trace_id=trace_id): trace_url = f"https://platform.openai.com/traces/trace?trace_id={trace_id}" + cost_tracker = CostTracker(trace_id=trace_id) + cost_tracker.start_time = datetime.datetime.utcnow().isoformat() + run_prompt = ( "You are working on a connector build task. " f"You are managing a connector build for the API: '{api_name or 'N/A'}'. " @@ -206,6 +232,7 @@ async def run_manager_developer_build( update_progress_log(f"API Name: {api_name or 'N/A'}") update_progress_log(f"Additional Instructions: {instructions or 'N/A'}") update_progress_log(f"šŸ”— Follow along at: {trace_url}") + update_progress_log(f"šŸ”¢ Token usage tracking enabled for trace: {trace_id}") open_if_browser_available(trace_url) try: @@ -219,9 +246,22 @@ async def run_manager_developer_build( session=session, # previous_response_id=prev_response_id, ) + + cost_tracker.add_run_result(run_result) + # prev_response_id = run_result.raw_responses[-1].response_id if run_result.raw_responses else None status_msg = f"\nšŸ¤– {run_result.last_agent.name}: {run_result.final_output}" update_progress_log(status_msg) + run_tokens = sum( + response.usage.total_tokens + for response in run_result.raw_responses + if response.usage + ) + total_tokens = sum( + usage.total_tokens for usage in cost_tracker.model_usage.values() + ) + update_progress_log(f"šŸ”¢ Run tokens: {run_tokens:,} | Total: {total_tokens:,}") + run_prompt = ( "You are still working on the connector build task. " "Continue to the next step or raise an issue if needed. " @@ -237,3 +277,29 @@ async def run_manager_developer_build( update_progress_log(f"\nāŒ Unexpected error during build: {ex}") update_progress_log(f"🪵 Review trace logs at: {trace_url}") sys.exit(1) + finally: + cost_tracker.end_time = datetime.datetime.utcnow().isoformat() + + update_progress_log(f"\n{cost_tracker.cost_summary_report}") + + try: + usage_dir = WORKSPACE_WRITE_DIR + manifest_files = list(WORKSPACE_WRITE_DIR.glob("**/manifest.yaml")) + if manifest_files: + usage_dir = manifest_files[0].parent + update_progress_log( + f"šŸ“ Found manifest at {manifest_files[0]}, saving usage data in same directory" + ) + else: + update_progress_log( + "šŸ“ No manifest.yaml found, saving usage data in workspace directory" + ) + + usage_file = usage_dir / "usage_summary.json" + cost_tracker.save_to_file(usage_file) + update_progress_log(f"šŸ“Š Detailed usage data saved to: {usage_file}") + except Exception as save_ex: + update_progress_log(f"āš ļø Could not save usage data: {save_ex}") + + for server in [*MANAGER_AGENT_TOOLS, *DEVELOPER_AGENT_TOOLS]: + await server.cleanup() diff --git a/debug_model_extraction.py b/debug_model_extraction.py new file mode 100644 index 0000000..fecf20f --- /dev/null +++ b/debug_model_extraction.py @@ -0,0 +1,87 @@ +#!/usr/bin/env python3 +"""Debug script to test model name extraction from OpenAI API responses.""" + +import logging +import os + +from openai import OpenAI + + +logging.basicConfig(level=logging.DEBUG) +logger = logging.getLogger(__name__) + + +def test_openai_response_structure(): + """Make a simple OpenAI API call and examine the response structure.""" + + api_key = os.getenv("OPENAI_APLKEY") + if not api_key: + print("ERROR: OPENAI_APLKEY environment variable not set") + return + + client = OpenAI(api_key=api_key) + + try: + response = client.chat.completions.create( + model="gpt-4o-mini", + messages=[{"role": "user", "content": "Hello! Just say 'Hi' back."}], + max_tokens=10, + ) + + print("=== RESPONSE OBJECT STRUCTURE ===") + print(f"Response type: {type(response)}") + print(f"Response attributes: {dir(response)}") + print() + + print("=== RESPONSE CONTENT ===") + print(f"Model: {getattr(response, 'model', 'NOT FOUND')}") + print(f"Usage: {getattr(response, 'usage', 'NOT FOUND')}") + print() + + if hasattr(response, "usage"): + usage = response.usage + print("=== USAGE OBJECT ===") + print(f"Usage type: {type(usage)}") + print(f"Usage attributes: {dir(usage)}") + print(f"Input tokens: {getattr(usage, 'prompt_tokens', 'NOT FOUND')}") + print(f"Output tokens: {getattr(usage, 'completion_tokens', 'NOT FOUND')}") + print(f"Total tokens: {getattr(usage, 'total_tokens', 'NOT FOUND')}") + print() + + print("=== TESTING CURRENT EXTRACTION LOGIC ===") + + def test_extract_model_name(response): + """Test version of _extract_model_name method.""" + for attr in ["model", "model_name", "engine"]: + if hasattr(response, attr): + model_value = getattr(response, attr) + if model_value: + print(f"Found model via {attr}: {model_value}") + return str(model_value) + + # Try nested raw_response + if hasattr(response, "raw_response"): + raw = response.raw_response + print(f"Raw response type: {type(raw)}") + print(f"Raw response attributes: {dir(raw)}") + if hasattr(raw, "model"): + model_value = raw.model + if model_value: + print(f"Found model via raw_response.model: {model_value}") + return str(model_value) + + print("Could not extract model name - would return 'unknown-model'") + return "unknown-model" + + extracted_model = test_extract_model_name(response) + print(f"Extracted model name: {extracted_model}") + + except Exception as e: + print(f"ERROR making API call: {e}") + import traceback + + traceback.print_exc() + + +if __name__ == "__main__": + test_openai_response_structure() diff --git a/debug_workflow_responses.py b/debug_workflow_responses.py new file mode 100644 index 0000000..4d47ad6 --- /dev/null +++ b/debug_workflow_responses.py @@ -0,0 +1,125 @@ +#!/usr/bin/env python3 +"""Debug script to test model extraction and cost calculation with different response structures.""" + +import logging +import sys +from pathlib import Path + + +sys.path.insert(0, str(Path(__file__).parent)) + +from connector_builder_agents.src.cost_tracking import CostTracker + + +logging.basicConfig(level=logging.DEBUG) +logger = logging.getLogger(__name__) + + +def test_openai_response_structure(): + """Test with OpenAI-style response structure.""" + + class MockUsage: + def __init__(self): + self.completion_tokens = 10 + self.prompt_tokens = 50 + self.total_tokens = 60 + + class MockResponse: + def __init__(self): + self.model = "gpt-4o-mini-2024-07-18" + self.usage = MockUsage() + + class MockRunResult: + def __init__(self): + self.raw_responses = [MockResponse()] + + cost_tracker = CostTracker(trace_id="test-openai") + run_result = MockRunResult() + + print("=== Testing OpenAI Response Structure ===") + try: + cost = cost_tracker.add_run_result(run_result) + print(f"āœ… OpenAI response test passed. Cost: ${cost:.6f}") + summary = cost_tracker.get_summary() + print(f"Models used: {summary['models_used']}") + print(f"Total cost: ${summary['total_estimated_cost']:.6f}") + except Exception as e: + print(f"āŒ OpenAI response test failed: {e}") + import traceback + + traceback.print_exc() + + +def test_expected_response_structure(): + """Test with expected response structure.""" + + class MockUsage: + def __init__(self): + self.input_tokens = 50 + self.output_tokens = 10 + self.total_tokens = 60 + self.requests = 1 + + class MockResponse: + def __init__(self): + self.model = "gpt-4o-mini" + self.usage = MockUsage() + + class MockRunResult: + def __init__(self): + self.raw_responses = [MockResponse()] + + cost_tracker = CostTracker(trace_id="test-expected") + run_result = MockRunResult() + + print("\n=== Testing Expected Response Structure ===") + try: + cost = cost_tracker.add_run_result(run_result) + print(f"āœ… Expected response test passed. Cost: ${cost:.6f}") + summary = cost_tracker.get_summary() + print(f"Models used: {summary['models_used']}") + print(f"Total cost: ${summary['total_estimated_cost']:.6f}") + except Exception as e: + print(f"āŒ Expected response test failed: {e}") + import traceback + + traceback.print_exc() + + +def test_missing_attributes(): + """Test with response missing some attributes.""" + + class MockUsage: + def __init__(self): + self.completion_tokens = 10 + + class MockResponse: + def __init__(self): + self.model = "gpt-4o-mini" + self.usage = MockUsage() + + class MockRunResult: + def __init__(self): + self.raw_responses = [MockResponse()] + + cost_tracker = CostTracker(trace_id="test-missing") + run_result = MockRunResult() + + print("\n=== Testing Missing Attributes ===") + try: + cost = cost_tracker.add_run_result(run_result) + print(f"āœ… Missing attributes test passed. Cost: ${cost:.6f}") + summary = cost_tracker.get_summary() + print(f"Models used: {summary['models_used']}") + print(f"Total cost: ${summary['total_estimated_cost']:.6f}") + except Exception as e: + print(f"āŒ Missing attributes test failed: {e}") + import traceback + + traceback.print_exc() + + +if __name__ == "__main__": + test_openai_response_structure() + test_expected_response_structure() + test_missing_attributes()