Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 131 additions & 55 deletions connector_builder_agents/src/evals/evaluators.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
"""Evaluators for connector builder agents."""
"""Unified LLM-based evaluator for connector builder agents."""

import json
import logging

import pandas as pd
import yaml
from dotenv import load_dotenv
from opentelemetry.trace import get_current_span
from phoenix.evals import OpenAIModel, llm_classify
Expand All @@ -15,8 +14,20 @@

logger = logging.getLogger(__name__)

READINESS_EVAL_MODEL = "gpt-4o"
READINESS_EVAL_TEMPLATE = """You are evaluating whether a connector readiness test passed or failed.
UNIFIED_EVAL_MODEL = "gpt-4o"

UNIFIED_EVAL_TEMPLATE = """You are evaluating the quality of a generated Airbyte connector.

You will evaluate based on two criteria and return scores for each.

**Artifacts Provided:**
1. **Readiness Report**: Markdown report showing test results for the connector
2. **Manifest**: YAML defining the connector's streams and configuration
3. **Expected Streams**: List of stream names that should be present in the manifest

**Evaluation Criteria:**

Evaluate whether the connector readiness test passed or failed.

A passing report should have all of the following:
- All streams tested successfully (marked with ✅)
Expand All @@ -30,75 +41,140 @@
- Zero records extracted from streams
- Error messages indicating failure

Based on the connector readiness report below, classify whether the test PASSED or FAILED. Your answer should be a single word, either "PASSED" or "FAILED".
**Score: 1.0 if PASSED, 0.0 if FAILED**

{readiness_report}
"""
Evaluate what percentage of expected streams are present in the manifest.

Instructions:
- Extract all stream names from the manifest YAML (look for `streams:` section, each with a `name:` field)
- Compare against the expected streams list
- Count only exact name matches (case-sensitive)
- Calculate: (number of expected streams found) / (total expected streams)

def readiness_eval(output: dict) -> int:
"""Create Phoenix LLM classifier for readiness evaluation. Return 1 if PASSED, 0 if FAILED."""
Example:
- Expected: ["posts", "users", "comments"]
- Found in manifest: ["posts", "comments", "albums"]
- Matched: ["posts", "comments"]
- Score: 2/3 = 0.67

if output is None:
logger.warning("Output is None, cannot evaluate readiness")
return 0
**Score: float between 0.0 and 1.0**

readiness_report = output.get("artifacts", {}).get("readiness_report", None)
if readiness_report is None:
logger.warning("No readiness report found")
return 0
---

rails = ["PASSED", "FAILED"]
**Input Data:**

eval_df = llm_classify(
model=OpenAIModel(model=READINESS_EVAL_MODEL),
data=pd.DataFrame([{"readiness_report": readiness_report}]),
template=READINESS_EVAL_TEMPLATE,
rails=rails,
provide_explanation=True,
)
Readiness Report:
```
{readiness_report}
```

Manifest:
```
{manifest}
```

logger.info(f"Readiness evaluation result: {eval_df}")
Expected Streams: {expected_streams}

label = eval_df["label"][0]
score = 1 if label.upper() == "PASSED" else 0
---

return score
**Instructions:**
Carefully analyze the artifacts above and classify the readiness as either "PASSED" or "FAILED", and calculate the streams percentage.

Your response must be in this exact format (one word for readiness, one number for streams):
READINESS: <PASSED or FAILED>
STREAMS: <percentage as decimal, e.g., 0.67>
"""

def streams_eval(expected: dict, output: dict) -> float:
"""Evaluate if all expected streams were built. Return the percentage of expected streams that are present in available streams."""

def unified_eval(expected: dict, output: dict) -> dict:
"""Unified LLM-based evaluator for all connector quality criteria.

Evaluates both readiness (pass/fail) and streams (percentage match) using a single LLM call.

Args:
expected: Dict containing expected criteria (e.g., expected_streams list)
output: Dict containing task output with artifacts (readiness_report, manifest)

Returns:
Dict with 'readiness' (0.0 or 1.0) and 'streams' (0.0-1.0) scores
"""
if output is None:
logger.warning("Output is None, cannot evaluate streams")
return 0.0
logger.warning("Output is None, cannot evaluate")
return {"readiness": 0.0, "streams": 0.0}

manifest_str = output.get("artifacts", {}).get("manifest", None)
if manifest_str is None:
logger.warning("No manifest found")
return 0
readiness_report = output.get("artifacts", {}).get("readiness_report", "Not available")
manifest = output.get("artifacts", {}).get("manifest", "Not available")

if readiness_report == "Not available":
logger.warning("No readiness report found")

manifest = yaml.safe_load(manifest_str)
available_streams = manifest.get("streams", [])
available_stream_names = [stream.get("name", "") for stream in available_streams]
logger.info(f"Available stream names: {available_stream_names}")
if manifest == "Not available":
logger.warning("No manifest found")

expected_obj = json.loads(expected.get("expected", "{}"))
expected_stream_names = expected_obj.get("expected_streams", [])
logger.info(f"Expected stream names: {expected_stream_names}")
expected_streams = expected_obj.get("expected_streams", [])

logger.info(f"Expected streams: {expected_streams}")

# Set attributes on span for visibility
span = get_current_span()
span.set_attribute("available_stream_names", available_stream_names)
span.set_attribute("expected_stream_names", expected_stream_names)

if not expected_stream_names:
logger.warning("No expected streams found")
return 0.0

# Calculate the percentage of expected streams that are present in available streams
matched_streams = set(available_stream_names) & set(expected_stream_names)
logger.info(f"Matched streams: {matched_streams}")
percent_matched = len(matched_streams) / len(expected_stream_names)
logger.info(f"Percent matched: {percent_matched}")
return float(percent_matched)
span.set_attribute("expected_streams", expected_streams)

if not expected_streams:
logger.warning("No expected streams provided")

prompt = UNIFIED_EVAL_TEMPLATE.format(
readiness_report=readiness_report,
manifest=manifest,
expected_streams=json.dumps(expected_streams),
)

try:
eval_df = llm_classify(
model=OpenAIModel(model=UNIFIED_EVAL_MODEL),
data=pd.DataFrame(
[
{
"readiness_report": readiness_report,
"manifest": manifest,
"expected_streams": json.dumps(expected_streams),
}
]
),
template=prompt,
rails=None,
provide_explanation=True,
)

logger.info(f"Unified evaluation result: {eval_df}")

response_text = eval_df["label"][0]

readiness_score = 0.0
streams_score = 0.0

for line in response_text.strip().split("\n"):
line = line.strip()
if line.startswith("READINESS:"):
readiness_value = line.split(":", 1)[1].strip().upper()
readiness_score = 1.0 if readiness_value == "PASSED" else 0.0
elif line.startswith("STREAMS:"):
streams_value = line.split(":", 1)[1].strip()
try:
streams_score = float(streams_value)
streams_score = max(0.0, min(1.0, streams_score))
except ValueError:
logger.warning(f"Could not parse streams score from: {streams_value}")
streams_score = 0.0

Comment on lines +187 to +200
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Make the READINESS/STREAMS parsing case-insensitive.

LLMs often capitalize only the first letter ("Readiness:", "Streams:") despite the prompt. With the current strict startswith("READINESS:")/"STREAMS:" checks, those valid responses will be ignored and both scores remain 0.0, corrupting the evaluation output. Please normalize the prefix (e.g., compare on line.upper().startswith("READINESS:")) before extracting the value.

-        for line in response_text.strip().split("\n"):
-            line = line.strip()
-            if line.startswith("READINESS:"):
+        for line in response_text.strip().split("\n"):
+            line = line.strip()
+            upper_line = line.upper()
+            if upper_line.startswith("READINESS:"):
                 readiness_value = line.split(":", 1)[1].strip().upper()
                 readiness_score = 1.0 if readiness_value == "PASSED" else 0.0
-            elif line.startswith("STREAMS:"):
+            elif upper_line.startswith("STREAMS:"):
                 streams_value = line.split(":", 1)[1].strip()

That makes the parser resilient to benign casing variations while still enforcing the format for scoring.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
for line in response_text.strip().split("\n"):
line = line.strip()
if line.startswith("READINESS:"):
readiness_value = line.split(":", 1)[1].strip().upper()
readiness_score = 1.0 if readiness_value == "PASSED" else 0.0
elif line.startswith("STREAMS:"):
streams_value = line.split(":", 1)[1].strip()
try:
streams_score = float(streams_value)
streams_score = max(0.0, min(1.0, streams_score))
except ValueError:
logger.warning(f"Could not parse streams score from: {streams_value}")
streams_score = 0.0
for line in response_text.strip().split("\n"):
line = line.strip()
upper_line = line.upper()
if upper_line.startswith("READINESS:"):
readiness_value = line.split(":", 1)[1].strip().upper()
readiness_score = 1.0 if readiness_value == "PASSED" else 0.0
elif upper_line.startswith("STREAMS:"):
streams_value = line.split(":", 1)[1].strip()
try:
streams_score = float(streams_value)
streams_score = max(0.0, min(1.0, streams_score))
except ValueError:
logger.warning(f"Could not parse streams score from: {streams_value}")
streams_score = 0.0
🤖 Prompt for AI Agents
In connector_builder_agents/src/evals/evaluators.py around lines 187 to 200, the
parser only matches "READINESS:" and "STREAMS:" exactly which misses valid
variants like "Readiness:" or "streams:"; make the prefix checks
case-insensitive by normalizing the line (e.g., compute an uppercased copy or
use line.lower()) before checking startswith, then extract the value from the
original line (or split using the same normalized index) and proceed with the
existing parsing and error handling so both READINESS and STREAMS are detected
regardless of casing.

logger.info(f"Parsed readiness score: {readiness_score}")
logger.info(f"Parsed streams score: {streams_score}")

span.set_attribute("readiness_score", readiness_score)
span.set_attribute("streams_score", streams_score)

return {"readiness": readiness_score, "streams": streams_score}

except Exception as e:
logger.error(f"Error during unified evaluation: {e}", exc_info=True)
return {"readiness": 0.0, "streams": 0.0}
6 changes: 3 additions & 3 deletions connector_builder_agents/src/evals/phoenix_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from phoenix.otel import register

from .dataset import get_or_create_phoenix_dataset
from .evaluators import READINESS_EVAL_MODEL, readiness_eval, streams_eval
from .evaluators import UNIFIED_EVAL_MODEL, unified_eval
from .summary import generate_markdown_summary
from .task import EVAL_DEVELOPER_MODEL, EVAL_MANAGER_MODEL, run_connector_build_task

Expand All @@ -51,7 +51,7 @@ async def main(connectors: list[str] | None = None, *, dataset_prefix: str):

experiment_id = str(uuid.uuid4())[:5]
experiment_name = f"builder-evals-{experiment_id}"
evaluators = [readiness_eval, streams_eval]
evaluators = [unified_eval]

logger.info(f"Using evaluators: {[eval.__name__ for eval in evaluators]}")

Expand All @@ -66,7 +66,7 @@ async def main(connectors: list[str] | None = None, *, dataset_prefix: str):
experiment_metadata={
"developer_model": EVAL_DEVELOPER_MODEL,
"manager_model": EVAL_MANAGER_MODEL,
"readiness_eval_model": READINESS_EVAL_MODEL,
"unified_eval_model": UNIFIED_EVAL_MODEL,
},
timeout=1800,
)
Expand Down
Loading