Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
33f6c27
feat: add primary keys validation to readiness report and evals
devin-ai-integration[bot] Oct 11, 2025
5d6bd94
refactor: consolidate YAML structure and add expected_records validation
devin-ai-integration[bot] Oct 11, 2025
a03f701
refactor: extract common helper functions in evaluators
devin-ai-integration[bot] Oct 11, 2025
0717df0
refactor: simplify evaluators by eliminating intermediate dicts
devin-ai-integration[bot] Oct 11, 2025
595705b
fix: move import re to top of file
devin-ai-integration[bot] Oct 11, 2025
ce1703a
refactor: remove unnecessary early returns from evaluators
devin-ai-integration[bot] Oct 11, 2025
519914d
refactor: remove expected_stream_names variable from streams_eval
devin-ai-integration[bot] Oct 11, 2025
4467160
Apply suggestion from @aaronsteers
aaronsteers Oct 11, 2025
4765645
refactor: rename streams_eval to stream_names_eval and total_expected…
devin-ai-integration[bot] Oct 11, 2025
9e247ce
refactor: simplify evaluators with having parameter in _parse_expecte…
devin-ai-integration[bot] Oct 11, 2025
791d6da
add generic evaluator fn
aaronsteers Oct 11, 2025
3be1c8a
try implementation of generic compare
aaronsteers Oct 11, 2025
b3c2b21
fix: correct indentation and complete generic evaluator pattern
devin-ai-integration[bot] Oct 11, 2025
0817fdb
Apply suggestion from @aaronsteers
aaronsteers Oct 11, 2025
caf5b7c
refactor: rename records_eval to stream_record_counts_eval and simpli…
devin-ai-integration[bot] Oct 11, 2025
bfa5cb1
fix: correct stream_names_eval to compare stream name keys instead of…
devin-ai-integration[bot] Oct 11, 2025
5c7f6e2
fix: update session_id format to use 'conv-' prefix for OpenAI API co…
devin-ai-integration[bot] Oct 11, 2025
3db608b
fix: use conv_ prefix and replace hyphens with underscores in session…
devin-ai-integration[bot] Oct 11, 2025
aecd00e
fix: let OpenAI auto-create conversation IDs instead of passing custo…
devin-ai-integration[bot] Oct 11, 2025
5feef3a
Merge branch 'main' into devin/1760145011-add-primary-keys-check
devin-ai-integration[bot] Oct 11, 2025
8fc6261
feat: add primary key normalization to handle string vs list comparison
devin-ai-integration[bot] Oct 11, 2025
1f4282d
Merge branch 'main' into devin/1760145011-add-primary-keys-check
aaronsteers Oct 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 45 additions & 15 deletions connector_builder_agents/src/evals/data/connectors.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,58 @@ connectors:
prompt_name: "JSONPlaceholder"
expected:
expected_streams:
- "posts"
- "comments"
- "albums"
- "photos"
- "todos"
- "users"
- posts:
primary_key: ["id"]
expected_records: 100
- comments:
primary_key: ["id"]
expected_records: ">400"
- albums:
primary_key: ["id"]
expected_records: 100
- photos:
primary_key: ["id"]
expected_records: ">1000"
- todos:
primary_key: ["id"]
expected_records: ">100,<300"
- users:
primary_key: ["id"]
expected_records: 10
- input:
name: "source-starwars"
prompt_name: "StarWars API (https://swapi.info/)"
expected:
expected_streams:
- "people"
- "films"
- "planets"
- "species"
- "starships"
- "vehicles"
- people:
primary_key: ["url"]
expected_records: ">80"
- films:
primary_key: ["url"]
expected_records: 6
- planets:
primary_key: ["url"]
expected_records: ">50,<70"
- species:
primary_key: ["url"]
expected_records: ">30"
- starships:
primary_key: ["url"]
expected_records: ">30"
- vehicles:
primary_key: ["url"]
expected_records: ">30"
- input:
name: "source-rickandmorty"
prompt_name: "Rick and Morty API"
expected:
expected_streams:
- "characters"
- "episodes"
- "locations"
- characters:
primary_key: ["id"]
expected_records: ">100"
- episodes:
primary_key: ["id"]
expected_records: ">50"
- locations:
primary_key: ["id"]
expected_records: ">100"
201 changes: 184 additions & 17 deletions connector_builder_agents/src/evals/evaluators.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,36 @@
"""


def readiness_eval(output: dict) -> int:
"""Create Phoenix LLM classifier for readiness evaluation. Return 1 if PASSED, 0 if FAILED."""
def _parse_expected_streams(expected: dict) -> list:
"""Parse and return the expected_streams list from the expected dict."""
expected_obj = json.loads(expected.get("expected", "{}"))
return expected_obj.get("expected_streams", [])


def _get_manifest_streams(output: dict) -> list | None:
"""Extract and parse the manifest streams from output artifacts."""
if output is None:
logger.warning("Output is None, cannot evaluate readiness")
return 0
return None

manifest_str = output.get("artifacts", {}).get("manifest", None)
if manifest_str is None:
return None

manifest = yaml.safe_load(manifest_str)
return manifest.get("streams", [])


def _get_readiness_report(output: dict) -> str | None:
"""Extract the readiness report from output artifacts."""
if output is None:
return None

return output.get("artifacts", {}).get("readiness_report", None)

readiness_report = output.get("artifacts", {}).get("readiness_report", None)

def readiness_eval(output: dict) -> int:
"""Create Phoenix LLM classifier for readiness evaluation. Return 1 if PASSED, 0 if FAILED."""
readiness_report = _get_readiness_report(output)
if readiness_report is None:
logger.warning("No readiness report found")
return 0
Expand All @@ -68,23 +90,22 @@ def readiness_eval(output: dict) -> int:

def streams_eval(expected: dict, output: dict) -> float:
"""Evaluate if all expected streams were built. Return the percentage of expected streams that are present in available streams."""

if output is None:
logger.warning("Output is None, cannot evaluate streams")
return 0.0

manifest_str = output.get("artifacts", {}).get("manifest", None)
if manifest_str is None:
available_streams = _get_manifest_streams(output)
if available_streams is None:
logger.warning("No manifest found")
return 0
return 0.0

manifest = yaml.safe_load(manifest_str)
available_streams = manifest.get("streams", [])
available_stream_names = [stream.get("name", "") for stream in available_streams]
logger.info(f"Available stream names: {available_stream_names}")

expected_obj = json.loads(expected.get("expected", "{}"))
expected_stream_names = expected_obj.get("expected_streams", [])
expected_streams = _parse_expected_streams(expected)
expected_stream_names = []
for stream_obj in expected_streams:
if isinstance(stream_obj, dict):
expected_stream_names.extend(stream_obj.keys())
elif isinstance(stream_obj, str):
expected_stream_names.append(stream_obj)

logger.info(f"Expected stream names: {expected_stream_names}")

# Set attributes on span for visibility
Expand All @@ -102,3 +123,149 @@ def streams_eval(expected: dict, output: dict) -> float:
percent_matched = len(matched_streams) / len(expected_stream_names)
logger.info(f"Percent matched: {percent_matched}")
return float(percent_matched)


def primary_keys_eval(expected: dict, output: dict) -> float:
"""Evaluate if primary keys match expected values for each stream.

Returns the percentage of streams with correct primary keys.
"""
available_streams = _get_manifest_streams(output)
if available_streams is None:
logger.warning("No manifest found")
return 0.0

expected_streams = _parse_expected_streams(expected)
expected_primary_keys = {}
for stream_obj in expected_streams:
if isinstance(stream_obj, dict):
for stream_name, stream_config in stream_obj.items():
if isinstance(stream_config, dict) and "primary_key" in stream_config:
expected_primary_keys[stream_name] = stream_config["primary_key"]

logger.info(f"Expected primary keys: {expected_primary_keys}")

if not expected_primary_keys:
logger.warning("No expected primary keys found")
return 0.0

matched_count = 0
total_expected_streams = len(expected_primary_keys)

for stream in available_streams:
stream_name = stream.get("name", "")
if stream_name not in expected_primary_keys:
continue

actual_pk = stream.get("primary_key", [])
expected_pk = expected_primary_keys[stream_name]

if actual_pk == expected_pk:
matched_count += 1
logger.info(f"✓ {stream_name}: primary key matches {expected_pk}")
else:
logger.warning(
f"✗ {stream_name}: primary key mismatch - expected {expected_pk}, got {actual_pk}"
)

span = get_current_span()
span.set_attribute("matched_primary_keys_count", matched_count)
span.set_attribute("total_expected_streams", total_expected_streams)

percent_matched = matched_count / total_expected_streams if total_expected_streams > 0 else 0.0
logger.info(f"Primary keys percent matched: {percent_matched}")
return float(percent_matched)


def records_eval(expected: dict, output: dict) -> float:
"""Evaluate if record counts match expected values for each stream.

Returns the percentage of streams with correct record counts.
Supports both integer values and constraint strings like ">100", "<999", ">100,<999".
"""
readiness_report = _get_readiness_report(output)
if readiness_report is None:
logger.warning("No readiness report found")
return 0.0

expected_streams = _parse_expected_streams(expected)
expected_records = {}
for stream_obj in expected_streams:
if isinstance(stream_obj, dict):
for stream_name, stream_config in stream_obj.items():
if isinstance(stream_config, dict) and "expected_records" in stream_config:
expected_records[stream_name] = stream_config["expected_records"]

logger.info(f"Expected records: {expected_records}")

if not expected_records:
logger.warning("No expected records found")
return 1.0

matched_count = 0
total_expected_streams = len(expected_records)

for stream_name, expected_value in expected_records.items():
actual_count = _extract_record_count(readiness_report, stream_name)

if actual_count is None:
logger.warning(f"✗ {stream_name}: could not extract record count from report")
continue

if _validate_record_count(actual_count, expected_value):
matched_count += 1
logger.info(
f"✓ {stream_name}: record count {actual_count} meets expectation {expected_value}"
)
else:
logger.warning(
f"✗ {stream_name}: record count {actual_count} does not meet expectation {expected_value}"
)

span = get_current_span()
span.set_attribute("matched_records_count", matched_count)
span.set_attribute("total_expected_streams", total_expected_streams)

percent_matched = matched_count / total_expected_streams if total_expected_streams > 0 else 0.0
logger.info(f"Records percent matched: {percent_matched}")
return float(percent_matched)


def _extract_record_count(readiness_report: str, stream_name: str) -> int | None:
"""Extract record count for a stream from the readiness report."""
lines = readiness_report.split("\n")
for i, line in enumerate(lines):
if f"**{stream_name}**" in line or f"`{stream_name}`" in line:
for j in range(i, min(i + 10, len(lines))):
if "records" in lines[j].lower():
import re

match = re.search(r"(\d+)\s+records?", lines[j], re.IGNORECASE)
if match:
return int(match.group(1))
return None


def _validate_record_count(actual_count: int, expected_value: int | str) -> bool:
"""Validate record count against expected value or constraint string."""
if isinstance(expected_value, int):
return actual_count == expected_value

if not isinstance(expected_value, str):
return False

constraints = [c.strip() for c in expected_value.split(",")]
for constraint in constraints:
if constraint.startswith(">"):
threshold = int(constraint[1:])
if actual_count <= threshold:
return False
elif constraint.startswith("<"):
threshold = int(constraint[1:])
if actual_count >= threshold:
return False
elif constraint.isdigit():
if actual_count != int(constraint):
return False

return True
10 changes: 8 additions & 2 deletions connector_builder_agents/src/evals/phoenix_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,13 @@
from phoenix.otel import register

from .dataset import get_or_create_phoenix_dataset
from .evaluators import READINESS_EVAL_MODEL, readiness_eval, streams_eval
from .evaluators import (
READINESS_EVAL_MODEL,
primary_keys_eval,
readiness_eval,
records_eval,
streams_eval,
)
from .summary import generate_markdown_summary
from .task import EVAL_DEVELOPER_MODEL, EVAL_MANAGER_MODEL, run_connector_build_task

Expand All @@ -51,7 +57,7 @@ async def main(connectors: list[str] | None = None, *, dataset_prefix: str):

experiment_id = str(uuid.uuid4())[:5]
experiment_name = f"builder-evals-{experiment_id}"
evaluators = [readiness_eval, streams_eval]
evaluators = [readiness_eval, streams_eval, primary_keys_eval, records_eval]

logger.info(f"Using evaluators: {[eval.__name__ for eval in evaluators]}")

Expand Down
16 changes: 16 additions & 0 deletions connector_builder_mcp/validation_testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,22 @@ def run_connector_readiness_test_report( # noqa: PLR0912, PLR0914, PLR0915 (too
f"Records have only {result.record_stats.get('num_properties', 0)} field(s), expected at least 2"
)

stream_config = next(
(s for s in available_streams if s.get("name") == stream_name),
None,
)
if stream_config:
primary_key = stream_config.get("primary_key", [])
if not primary_key:
field_count_warnings.append("No primary key defined in manifest")
elif result.record_stats:
properties = result.record_stats.get("properties", {})
missing_pk_fields = [pk for pk in primary_key if pk not in properties]
if missing_pk_fields:
field_count_warnings.append(
f"Primary key field(s) missing from records: {', '.join(missing_pk_fields)}"
)

smoke_test_result = StreamSmokeTest(
stream_name=stream_name,
success=True,
Expand Down
Loading