diff --git a/connector_builder_agents/src/evals/data/connectors.yaml b/connector_builder_agents/src/evals/data/connectors.yaml index cf4d237..adab3e4 100644 --- a/connector_builder_agents/src/evals/data/connectors.yaml +++ b/connector_builder_agents/src/evals/data/connectors.yaml @@ -4,28 +4,58 @@ connectors: prompt_name: "JSONPlaceholder" expected: expected_streams: - - "posts" - - "comments" - - "albums" - - "photos" - - "todos" - - "users" + - posts: + primary_key: ["id"] + expected_records: 100 + - comments: + primary_key: ["id"] + expected_records: ">400" + - albums: + primary_key: ["id"] + expected_records: 100 + - photos: + primary_key: ["id"] + expected_records: ">1000" + - todos: + primary_key: ["id"] + expected_records: ">100,<300" + - users: + primary_key: ["id"] + expected_records: 10 - input: name: "source-starwars" prompt_name: "StarWars API (https://swapi.info/)" expected: expected_streams: - - "people" - - "films" - - "planets" - - "species" - - "starships" - - "vehicles" + - people: + primary_key: ["url"] + expected_records: ">80" + - films: + primary_key: ["url"] + expected_records: 6 + - planets: + primary_key: ["url"] + expected_records: ">50,<70" + - species: + primary_key: ["url"] + expected_records: ">30" + - starships: + primary_key: ["url"] + expected_records: ">30" + - vehicles: + primary_key: ["url"] + expected_records: ">30" - input: name: "source-rickandmorty" prompt_name: "Rick and Morty API" expected: expected_streams: - - "characters" - - "episodes" - - "locations" + - characters: + primary_key: ["id"] + expected_records: ">100" + - episodes: + primary_key: ["id"] + expected_records: ">50" + - locations: + primary_key: ["id"] + expected_records: ">100" diff --git a/connector_builder_agents/src/evals/evaluators.py b/connector_builder_agents/src/evals/evaluators.py index 9a4fc82..0390ebd 100644 --- a/connector_builder_agents/src/evals/evaluators.py +++ b/connector_builder_agents/src/evals/evaluators.py @@ -3,6 +3,9 @@ import json import logging +import re +from collections.abc import Callable +from typing import Any import pandas as pd import yaml @@ -36,14 +39,53 @@ """ -def readiness_eval(output: dict) -> int: - """Create Phoenix LLM classifier for readiness evaluation. Return 1 if PASSED, 0 if FAILED.""" +def _parse_expected_streams_dict(expected: dict, having: str | None = None) -> dict: + """Parse and return expected streams as a dict mapping stream_name -> stream_config. + + Args: + expected: The expected dictionary containing stream configurations + having: Optional key name to filter streams - only returns streams where this key exists + """ + expected_obj = json.loads(expected.get("expected", "{}")) + expected_streams = expected_obj.get("expected_streams", []) + + result = {} + for stream_obj in expected_streams: + if isinstance(stream_obj, dict): + result.update(stream_obj) + elif isinstance(stream_obj, str): + result[stream_obj] = {} + if having is not None: + result = {name: config for name, config in result.items() if config.get(having) is not None} + + return result + + +def _get_manifest_streams(output: dict) -> list | None: + """Extract and parse the manifest streams from output artifacts.""" if output is None: - logger.warning("Output is None, cannot evaluate readiness") - return 0 + return None + + manifest_str = output.get("artifacts", {}).get("manifest", None) + if manifest_str is None: + return None + + manifest = yaml.safe_load(manifest_str) + return manifest.get("streams", []) + + +def _get_readiness_report(output: dict) -> str | None: + """Extract the readiness report from output artifacts.""" + if output is None: + return None + + return output.get("artifacts", {}).get("readiness_report", None) - readiness_report = output.get("artifacts", {}).get("readiness_report", None) + +def readiness_eval(output: dict) -> int: + """Create Phoenix LLM classifier for readiness evaluation. Return 1 if PASSED, 0 if FAILED.""" + readiness_report = _get_readiness_report(output) if readiness_report is None: logger.warning("No readiness report found") return 0 @@ -66,39 +108,181 @@ def readiness_eval(output: dict) -> int: return score -def streams_eval(expected: dict, output: dict) -> float: +def stream_names_eval(expected: dict, output: dict) -> float: """Evaluate if all expected streams were built. Return the percentage of expected streams that are present in available streams.""" + expected_streams = _parse_expected_streams_dict(expected) + output_streams = { + stream.get("name", "(undeclared)"): stream for stream in _get_manifest_streams(output) or [] + } - if output is None: - logger.warning("Output is None, cannot evaluate streams") - return 0.0 - - manifest_str = output.get("artifacts", {}).get("manifest", None) - if manifest_str is None: - logger.warning("No manifest found") - return 0 + expected_stream_names = set(expected_streams.keys()) + output_stream_names = set(output_streams.keys()) - manifest = yaml.safe_load(manifest_str) - available_streams = manifest.get("streams", []) - available_stream_names = [stream.get("name", "") for stream in available_streams] - logger.info(f"Available stream names: {available_stream_names}") + if not expected_stream_names: + logger.warning("No expected streams found") + return 1.0 - expected_obj = json.loads(expected.get("expected", "{}")) - expected_stream_names = expected_obj.get("expected_streams", []) - logger.info(f"Expected stream names: {expected_stream_names}") + matched_streams = expected_stream_names & output_stream_names - # Set attributes on span for visibility span = get_current_span() - span.set_attribute("available_stream_names", available_stream_names) - span.set_attribute("expected_stream_names", expected_stream_names) + span.set_attribute("name_matched_count", len(matched_streams)) + span.set_attribute("name_evaluated_streams", len(expected_stream_names)) - if not expected_stream_names: - logger.warning("No expected streams found") - return 0.0 + for stream_name in expected_stream_names: + if stream_name in output_stream_names: + logger.info(f"✓ {stream_name}: name matches") + else: + logger.warning(f"✗ {stream_name}: name not found in output") - # Calculate the percentage of expected streams that are present in available streams - matched_streams = set(available_stream_names) & set(expected_stream_names) - logger.info(f"Matched streams: {matched_streams}") percent_matched = len(matched_streams) / len(expected_stream_names) - logger.info(f"Percent matched: {percent_matched}") + logger.info(f"Name percent matched: {percent_matched}") + return float(percent_matched) + + +def _eval_expected_stream_props( + *, + expected_stream_props: dict[str, Any], + output_stream_props: dict[str, Any], + prop: str, + eval_fn: Callable[[Any, Any], bool] = lambda expected, actual: expected == actual, +) -> float: + """Generic evaluator for expected stream properties.""" + matched_count = 0 + total_count = len(expected_stream_props) + + for stream_name, expected_props in expected_stream_props.items(): + expected_value = expected_props.get(prop) + actual_value = output_stream_props.get(stream_name, {}).get(prop) + + if expected_value is None: + continue + + if eval_fn(expected_value, actual_value): + matched_count += 1 + logger.info(f"✓ {stream_name}: {prop} matches {expected_value}") + else: + logger.warning( + f"✗ {stream_name}: {prop} mismatch - expected {expected_value}, got {actual_value}" + ) + + span = get_current_span() + span.set_attribute(f"{prop}_matched_count", matched_count) + span.set_attribute(f"{prop}_evaluated_streams", total_count) + + percent_matched = (matched_count * 1.0) / (total_count * 1.0) if total_count > 0 else 1.0 + logger.info(f"{prop.capitalize()} percent matched: {percent_matched}") return float(percent_matched) + + +def _normalize_primary_key(pk: Any) -> list[str]: + """Normalize primary key to a list of strings. + + Handles: + - str -> [str] (e.g., "id" -> ["id"]) + - [str] -> [str] (already normalized) + - [[str]] -> [str] (flatten nested lists) + """ + if pk is None: + return [] + + if isinstance(pk, str): + return [pk] + + if isinstance(pk, list): + if not pk: + return [] + + if all(isinstance(item, str) for item in pk): + return pk + + if all(isinstance(item, list) for item in pk): + flattened = [] + for sublist in pk: + if isinstance(sublist, list): + flattened.extend(sublist) + return flattened + + return [str(pk)] + + +def primary_keys_eval(expected: dict, output: dict) -> float: + """Evaluate if primary keys match expected values for each stream. + + Returns the percentage of streams with correct primary keys. + Normalizes primary keys before comparison (str -> [str], [[str]] -> [str]). + """ + return _eval_expected_stream_props( + expected_stream_props=_parse_expected_streams_dict(expected, having="primary_key"), + output_stream_props={ + stream.get("name", "(undeclared)"): stream + for stream in _get_manifest_streams(output) or [] + }, + prop="primary_key", + eval_fn=lambda expected, actual: _normalize_primary_key(expected) + == _normalize_primary_key(actual), + ) + + +def stream_record_counts_eval(expected: dict, output: dict) -> float: + """Evaluate if record counts match expected values for each stream. + + Returns the percentage of streams with correct record counts. + Supports both integer values and constraint strings like ">100", "<999", ">100,<999". + """ + readiness_report = _get_readiness_report(output) + if readiness_report is None: + logger.warning("No readiness report found") + return 0.0 + + expected_streams = _parse_expected_streams_dict(expected, having="expected_records") + + output_stream_props = {} + for stream_name in expected_streams.keys(): + record_count = _extract_record_count(readiness_report, stream_name) + output_stream_props[stream_name] = {"expected_records": record_count} + + return _eval_expected_stream_props( + expected_stream_props=expected_streams, + output_stream_props=output_stream_props, + prop="expected_records", + eval_fn=lambda expected, actual: actual is not None + and _validate_record_count(actual, expected), + ) + + +def _extract_record_count(readiness_report: str, stream_name: str) -> int | None: + """Extract record count for a stream from the readiness report.""" + lines = readiness_report.split("\n") + for i, line in enumerate(lines): + if f"**{stream_name}**" in line or f"`{stream_name}`" in line: + for j in range(i, min(i + 10, len(lines))): + if "records" in lines[j].lower(): + match = re.search(r"(\d+)\s+records?", lines[j], re.IGNORECASE) + if match: + return int(match.group(1)) + return None + + +def _validate_record_count(actual_count: int, expected_value: int | str) -> bool: + """Validate record count against expected value or constraint string.""" + if isinstance(expected_value, int): + return actual_count == expected_value + + if not isinstance(expected_value, str): + return False + + constraints = [c.strip() for c in expected_value.split(",")] + for constraint in constraints: + if constraint.startswith(">"): + threshold = int(constraint[1:]) + if actual_count <= threshold: + return False + elif constraint.startswith("<"): + threshold = int(constraint[1:]) + if actual_count >= threshold: + return False + elif constraint.isdigit(): + if actual_count != int(constraint): + return False + + return True diff --git a/connector_builder_agents/src/evals/phoenix_run.py b/connector_builder_agents/src/evals/phoenix_run.py index 020c307..d2d5b6c 100644 --- a/connector_builder_agents/src/evals/phoenix_run.py +++ b/connector_builder_agents/src/evals/phoenix_run.py @@ -24,7 +24,13 @@ from phoenix.otel import register from .dataset import get_or_create_phoenix_dataset -from .evaluators import READINESS_EVAL_MODEL, readiness_eval, streams_eval +from .evaluators import ( + READINESS_EVAL_MODEL, + primary_keys_eval, + readiness_eval, + stream_names_eval, + stream_record_counts_eval, +) from .summary import generate_markdown_summary from .task import EVAL_DEVELOPER_MODEL, EVAL_MANAGER_MODEL, run_connector_build_task @@ -51,7 +57,12 @@ async def main(connectors: list[str] | None = None, *, dataset_prefix: str): experiment_id = str(uuid.uuid4())[:5] experiment_name = f"builder-evals-{experiment_id}" - evaluators = [readiness_eval, streams_eval] + evaluators = [ + readiness_eval, + stream_names_eval, + primary_keys_eval, + stream_record_counts_eval, + ] logger.info(f"Using evaluators: {[eval.__name__ for eval in evaluators]}") diff --git a/connector_builder_agents/src/evals/summary.py b/connector_builder_agents/src/evals/summary.py index 359d910..e804566 100644 --- a/connector_builder_agents/src/evals/summary.py +++ b/connector_builder_agents/src/evals/summary.py @@ -210,7 +210,7 @@ def extract_scores_by_connector(experiment: dict, client) -> dict: score = None if result and isinstance(result, dict): score = result.get("score") - elif result and isinstance(result, (list, tuple)) and len(result) > 0: + elif result and isinstance(result, list | tuple) and len(result) > 0: first_result = result[0] if isinstance(first_result, dict): score = first_result.get("score") @@ -680,7 +680,7 @@ def generate_markdown_summary(experiment: dict, experiment_name: str) -> str | N score = None if result and isinstance(result, dict): score = result.get("score") - elif result and isinstance(result, (list, tuple)) and len(result) > 0: + elif result and isinstance(result, list | tuple) and len(result) > 0: # Handle list of results - take the first one first_result = result[0] if isinstance(first_result, dict): diff --git a/connector_builder_mcp/validation_testing.py b/connector_builder_mcp/validation_testing.py index 97bc80a..0937af5 100644 --- a/connector_builder_mcp/validation_testing.py +++ b/connector_builder_mcp/validation_testing.py @@ -612,6 +612,22 @@ def run_connector_readiness_test_report( # noqa: PLR0912, PLR0914, PLR0915 (too f"Records have only {result.record_stats.get('num_properties', 0)} field(s), expected at least 2" ) + stream_config = next( + (s for s in available_streams if s.get("name") == stream_name), + None, + ) + if stream_config: + primary_key = stream_config.get("primary_key", []) + if not primary_key: + field_count_warnings.append("No primary key defined in manifest") + elif result.record_stats: + properties = result.record_stats.get("properties", {}) + missing_pk_fields = [pk for pk in primary_key if pk not in properties] + if missing_pk_fields: + field_count_warnings.append( + f"Primary key field(s) missing from records: {', '.join(missing_pk_fields)}" + ) + smoke_test_result = StreamSmokeTest( stream_name=stream_name, success=True,