Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
213 changes: 158 additions & 55 deletions connector_builder_agents/src/evals/evaluators.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,19 @@
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
"""Evaluators for connector builder agents."""
"""Unified LLM-based evaluator for connector builder agents.

This module provides a single LLM-based evaluator that assesses connector quality
across multiple criteria (readiness and stream presence) using GPT-4o. This approach
simplifies the evaluation system by replacing separate programmatic and LLM evaluators
with a unified prompt-based approach.

The evaluator is designed for use with the Phoenix evaluation framework and returns
structured scores that can be aggregated and reported in evaluation summaries.
"""

import json
import logging

import pandas as pd
import yaml
from dotenv import load_dotenv
from opentelemetry.trace import get_current_span
from phoenix.evals import OpenAIModel, llm_classify
Expand All @@ -15,8 +23,21 @@

logger = logging.getLogger(__name__)

READINESS_EVAL_MODEL = "gpt-4o"
READINESS_EVAL_TEMPLATE = """You are evaluating whether a connector readiness test passed or failed.
UNIFIED_EVAL_MODEL = "gpt-4o"
"""Model used for unified LLM-based evaluation."""

UNIFIED_EVAL_TEMPLATE = """You are evaluating the quality of a generated Airbyte connector.

You will evaluate based on two criteria and return scores for each.

**Artifacts Provided:**
1. **Readiness Report**: Markdown report showing test results for the connector
2. **Manifest**: YAML defining the connector's streams and configuration
3. **Expected Streams**: List of stream names that should be present in the manifest

**Evaluation Criteria:**

Evaluate whether the connector readiness test passed or failed.

A passing report should have all of the following:
- All streams tested successfully (marked with ✅)
Expand All @@ -30,75 +51,157 @@
- Zero records extracted from streams
- Error messages indicating failure

Based on the connector readiness report below, classify whether the test PASSED or FAILED. Your answer should be a single word, either "PASSED" or "FAILED".
**Score: 1.0 if PASSED, 0.0 if FAILED**

{readiness_report}
"""
Evaluate what percentage of expected streams are present in the manifest.

Instructions:
- Extract all stream names from the manifest YAML (look for `streams:` section, each with a `name:` field)
- Compare against the expected streams list
- Count only exact name matches (case-sensitive)
- Calculate: (number of expected streams found) / (total expected streams)

def readiness_eval(output: dict) -> int:
"""Create Phoenix LLM classifier for readiness evaluation. Return 1 if PASSED, 0 if FAILED."""
Example:
- Expected: ["posts", "users", "comments"]
- Found in manifest: ["posts", "comments", "albums"]
- Matched: ["posts", "comments"]
- Score: 2/3 = 0.67

if output is None:
logger.warning("Output is None, cannot evaluate readiness")
return 0
**Score: float between 0.0 and 1.0**

readiness_report = output.get("artifacts", {}).get("readiness_report", None)
if readiness_report is None:
logger.warning("No readiness report found")
return 0
---

rails = ["PASSED", "FAILED"]
**Input Data:**

eval_df = llm_classify(
model=OpenAIModel(model=READINESS_EVAL_MODEL),
data=pd.DataFrame([{"readiness_report": readiness_report}]),
template=READINESS_EVAL_TEMPLATE,
rails=rails,
provide_explanation=True,
)
Readiness Report:
```
{readiness_report}
```

logger.info(f"Readiness evaluation result: {eval_df}")
Manifest:
```
{manifest}
```

label = eval_df["label"][0]
score = 1 if label.upper() == "PASSED" else 0
Expected Streams: {expected_streams}

return score
---

**Instructions:**
Carefully analyze the artifacts above and classify the readiness as either "PASSED" or "FAILED", and calculate the streams percentage.

Your response must be in this exact format (one word for readiness, one number for streams):
READINESS: <PASSED or FAILED>
STREAMS: <percentage as decimal, e.g., 0.67>
"""

def streams_eval(expected: dict, output: dict) -> float:
"""Evaluate if all expected streams were built. Return the percentage of expected streams that are present in available streams."""

def unified_eval(expected: dict, output: dict) -> dict:
"""Unified LLM-based evaluator for all connector quality criteria.

This evaluator replaces the previous hybrid approach (readiness_eval + streams_eval)
with a single LLM-based evaluation using GPT-4o. It evaluates both readiness
(pass/fail based on test results) and stream presence (percentage match against
expected streams) in a single LLM call.

The evaluator uses a structured prompt template that instructs the LLM to analyze
connector artifacts (readiness report and manifest) and return scores in a
standardized format that can be parsed programmatically.

Args:
expected: Dict containing expected evaluation criteria. Should include an
'expected' key with JSON string containing 'expected_streams' list.
output: Dict containing task output with 'artifacts' key containing:
- 'readiness_report': Markdown report of connector test results
- 'manifest': YAML manifest defining connector streams and config

Returns:
Dict with two float scores:
- 'readiness': 0.0 (failed) or 1.0 (passed) based on test results
- 'streams': 0.0-1.0 representing percentage of expected streams found

Example:
>>> expected = {"expected": '{"expected_streams": ["users", "posts"]}'}
>>> output = {"artifacts": {"readiness_report": "...", "manifest": "..."}}
>>> scores = unified_eval(expected, output)
>>> scores
{'readiness': 1.0, 'streams': 1.0}
"""
if output is None:
logger.warning("Output is None, cannot evaluate streams")
return 0.0
logger.warning("Output is None, cannot evaluate")
return {"readiness": 0.0, "streams": 0.0}

manifest_str = output.get("artifacts", {}).get("manifest", None)
if manifest_str is None:
logger.warning("No manifest found")
return 0
readiness_report = output.get("artifacts", {}).get("readiness_report", "Not available")
manifest = output.get("artifacts", {}).get("manifest", "Not available")

manifest = yaml.safe_load(manifest_str)
available_streams = manifest.get("streams", [])
available_stream_names = [stream.get("name", "") for stream in available_streams]
logger.info(f"Available stream names: {available_stream_names}")
if readiness_report == "Not available":
logger.warning("No readiness report found")

if manifest == "Not available":
logger.warning("No manifest found")

expected_obj = json.loads(expected.get("expected", "{}"))
expected_stream_names = expected_obj.get("expected_streams", [])
logger.info(f"Expected stream names: {expected_stream_names}")
expected_streams = expected_obj.get("expected_streams", [])

logger.info(f"Expected streams: {expected_streams}")

# Set attributes on span for visibility
span = get_current_span()
span.set_attribute("available_stream_names", available_stream_names)
span.set_attribute("expected_stream_names", expected_stream_names)

if not expected_stream_names:
logger.warning("No expected streams found")
return 0.0

# Calculate the percentage of expected streams that are present in available streams
matched_streams = set(available_stream_names) & set(expected_stream_names)
logger.info(f"Matched streams: {matched_streams}")
percent_matched = len(matched_streams) / len(expected_stream_names)
logger.info(f"Percent matched: {percent_matched}")
return float(percent_matched)
span.set_attribute("expected_streams", expected_streams)

if not expected_streams:
logger.warning("No expected streams provided")

try:
eval_df = llm_classify(
model=OpenAIModel(model=UNIFIED_EVAL_MODEL),
data=pd.DataFrame(
[
{
"readiness_report": readiness_report,
"manifest": manifest,
"expected_streams": json.dumps(expected_streams),
}
]
),
template=UNIFIED_EVAL_TEMPLATE,
rails=None,
provide_explanation=True,
)

logger.info(f"Unified evaluation result: {eval_df}")

response_text = eval_df["label"][0]

if response_text is None:
logger.error("LLM returned None response")
return {"readiness": 0.0, "streams": 0.0}

readiness_score = 0.0
streams_score = 0.0

for line in response_text.strip().split("\n"):
line = line.strip()
if line.startswith("READINESS:"):
readiness_value = line.split(":", 1)[1].strip().upper()
readiness_score = 1.0 if readiness_value == "PASSED" else 0.0
elif line.startswith("STREAMS:"):
streams_value = line.split(":", 1)[1].strip()
try:
streams_score = float(streams_value)
streams_score = max(0.0, min(1.0, streams_score))
except ValueError:
logger.warning(f"Could not parse streams score from: {streams_value}")
streams_score = 0.0

Comment on lines +187 to +200
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Make the READINESS/STREAMS parsing case-insensitive.

LLMs often capitalize only the first letter ("Readiness:", "Streams:") despite the prompt. With the current strict startswith("READINESS:")/"STREAMS:" checks, those valid responses will be ignored and both scores remain 0.0, corrupting the evaluation output. Please normalize the prefix (e.g., compare on line.upper().startswith("READINESS:")) before extracting the value.

-        for line in response_text.strip().split("\n"):
-            line = line.strip()
-            if line.startswith("READINESS:"):
+        for line in response_text.strip().split("\n"):
+            line = line.strip()
+            upper_line = line.upper()
+            if upper_line.startswith("READINESS:"):
                 readiness_value = line.split(":", 1)[1].strip().upper()
                 readiness_score = 1.0 if readiness_value == "PASSED" else 0.0
-            elif line.startswith("STREAMS:"):
+            elif upper_line.startswith("STREAMS:"):
                 streams_value = line.split(":", 1)[1].strip()

That makes the parser resilient to benign casing variations while still enforcing the format for scoring.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
for line in response_text.strip().split("\n"):
line = line.strip()
if line.startswith("READINESS:"):
readiness_value = line.split(":", 1)[1].strip().upper()
readiness_score = 1.0 if readiness_value == "PASSED" else 0.0
elif line.startswith("STREAMS:"):
streams_value = line.split(":", 1)[1].strip()
try:
streams_score = float(streams_value)
streams_score = max(0.0, min(1.0, streams_score))
except ValueError:
logger.warning(f"Could not parse streams score from: {streams_value}")
streams_score = 0.0
for line in response_text.strip().split("\n"):
line = line.strip()
upper_line = line.upper()
if upper_line.startswith("READINESS:"):
readiness_value = line.split(":", 1)[1].strip().upper()
readiness_score = 1.0 if readiness_value == "PASSED" else 0.0
elif upper_line.startswith("STREAMS:"):
streams_value = line.split(":", 1)[1].strip()
try:
streams_score = float(streams_value)
streams_score = max(0.0, min(1.0, streams_score))
except ValueError:
logger.warning(f"Could not parse streams score from: {streams_value}")
streams_score = 0.0
🤖 Prompt for AI Agents
In connector_builder_agents/src/evals/evaluators.py around lines 187 to 200, the
parser only matches "READINESS:" and "STREAMS:" exactly which misses valid
variants like "Readiness:" or "streams:"; make the prefix checks
case-insensitive by normalizing the line (e.g., compute an uppercased copy or
use line.lower()) before checking startswith, then extract the value from the
original line (or split using the same normalized index) and proceed with the
existing parsing and error handling so both READINESS and STREAMS are detected
regardless of casing.

logger.info(f"Parsed readiness score: {readiness_score}")
logger.info(f"Parsed streams score: {streams_score}")

span.set_attribute("readiness_score", readiness_score)
span.set_attribute("streams_score", streams_score)

return {"readiness": readiness_score, "streams": streams_score}

except Exception as e:
logger.error(f"Error during unified evaluation: {e}", exc_info=True)
return {"readiness": 0.0, "streams": 0.0}
6 changes: 3 additions & 3 deletions connector_builder_agents/src/evals/phoenix_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from phoenix.otel import register

from .dataset import get_or_create_phoenix_dataset
from .evaluators import READINESS_EVAL_MODEL, readiness_eval, streams_eval
from .evaluators import UNIFIED_EVAL_MODEL, unified_eval
from .summary import generate_markdown_summary
from .task import EVAL_DEVELOPER_MODEL, EVAL_MANAGER_MODEL, run_connector_build_task

Expand All @@ -51,7 +51,7 @@ async def main(connectors: list[str] | None = None, *, dataset_prefix: str):

experiment_id = str(uuid.uuid4())[:5]
experiment_name = f"builder-evals-{experiment_id}"
evaluators = [readiness_eval, streams_eval]
evaluators = [unified_eval]

logger.info(f"Using evaluators: {[eval.__name__ for eval in evaluators]}")

Expand All @@ -66,7 +66,7 @@ async def main(connectors: list[str] | None = None, *, dataset_prefix: str):
experiment_metadata={
"developer_model": EVAL_DEVELOPER_MODEL,
"manager_model": EVAL_MANAGER_MODEL,
"readiness_eval_model": READINESS_EVAL_MODEL,
"unified_eval_model": UNIFIED_EVAL_MODEL,
},
timeout=1800,
)
Expand Down
Loading