From 33f6c2786b6b123384d7d2fd2142a9d42c8e32cf Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Sat, 11 Oct 2025 01:10:23 +0000 Subject: [PATCH 01/20] feat: add primary keys validation to readiness report and evals - Add expected_primary_keys field to connectors.yaml for all test connectors - Create primary_keys_eval() evaluator to validate AI-generated primary keys - Register primary_keys_eval in phoenix_run.py evaluators list - Add primary key validation warnings to readiness report - Validates both presence of primary keys in manifest and in record data Closes #93 Co-Authored-By: AJ Steers --- .../src/evals/data/connectors.yaml | 18 +++++++ .../src/evals/evaluators.py | 53 +++++++++++++++++++ .../src/evals/phoenix_run.py | 4 +- connector_builder_mcp/validation_testing.py | 16 ++++++ 4 files changed, 89 insertions(+), 2 deletions(-) diff --git a/connector_builder_agents/src/evals/data/connectors.yaml b/connector_builder_agents/src/evals/data/connectors.yaml index cf4d237..e5f1334 100644 --- a/connector_builder_agents/src/evals/data/connectors.yaml +++ b/connector_builder_agents/src/evals/data/connectors.yaml @@ -10,6 +10,13 @@ connectors: - "photos" - "todos" - "users" + expected_primary_keys: + posts: ["id"] + comments: ["id"] + albums: ["id"] + photos: ["id"] + todos: ["id"] + users: ["id"] - input: name: "source-starwars" prompt_name: "StarWars API (https://swapi.info/)" @@ -21,6 +28,13 @@ connectors: - "species" - "starships" - "vehicles" + expected_primary_keys: + people: ["url"] + films: ["url"] + planets: ["url"] + species: ["url"] + starships: ["url"] + vehicles: ["url"] - input: name: "source-rickandmorty" prompt_name: "Rick and Morty API" @@ -29,3 +43,7 @@ connectors: - "characters" - "episodes" - "locations" + expected_primary_keys: + characters: ["id"] + episodes: ["id"] + locations: ["id"] diff --git a/connector_builder_agents/src/evals/evaluators.py b/connector_builder_agents/src/evals/evaluators.py index 9a4fc82..a8d6282 100644 --- a/connector_builder_agents/src/evals/evaluators.py +++ b/connector_builder_agents/src/evals/evaluators.py @@ -102,3 +102,56 @@ def streams_eval(expected: dict, output: dict) -> float: percent_matched = len(matched_streams) / len(expected_stream_names) logger.info(f"Percent matched: {percent_matched}") return float(percent_matched) + + +def primary_keys_eval(expected: dict, output: dict) -> float: + """Evaluate if primary keys match expected values for each stream. + + Returns the percentage of streams with correct primary keys. + """ + if output is None: + logger.warning("Output is None, cannot evaluate primary keys") + return 0.0 + + manifest_str = output.get("artifacts", {}).get("manifest", None) + if manifest_str is None: + logger.warning("No manifest found") + return 0.0 + + manifest = yaml.safe_load(manifest_str) + available_streams = manifest.get("streams", []) + + expected_obj = json.loads(expected.get("expected", "{}")) + expected_primary_keys = expected_obj.get("expected_primary_keys", {}) + logger.info(f"Expected primary keys: {expected_primary_keys}") + + if not expected_primary_keys: + logger.warning("No expected primary keys found") + return 0.0 + + matched_count = 0 + total_expected_streams = len(expected_primary_keys) + + for stream in available_streams: + stream_name = stream.get("name", "") + if stream_name not in expected_primary_keys: + continue + + actual_pk = stream.get("primary_key", []) + expected_pk = expected_primary_keys[stream_name] + + if actual_pk == expected_pk: + matched_count += 1 + logger.info(f"✓ {stream_name}: primary key matches {expected_pk}") + else: + logger.warning( + f"✗ {stream_name}: primary key mismatch - expected {expected_pk}, got {actual_pk}" + ) + + span = get_current_span() + span.set_attribute("matched_primary_keys_count", matched_count) + span.set_attribute("total_expected_streams", total_expected_streams) + + percent_matched = matched_count / total_expected_streams if total_expected_streams > 0 else 0.0 + logger.info(f"Primary keys percent matched: {percent_matched}") + return float(percent_matched) diff --git a/connector_builder_agents/src/evals/phoenix_run.py b/connector_builder_agents/src/evals/phoenix_run.py index 020c307..4e7963b 100644 --- a/connector_builder_agents/src/evals/phoenix_run.py +++ b/connector_builder_agents/src/evals/phoenix_run.py @@ -24,7 +24,7 @@ from phoenix.otel import register from .dataset import get_or_create_phoenix_dataset -from .evaluators import READINESS_EVAL_MODEL, readiness_eval, streams_eval +from .evaluators import READINESS_EVAL_MODEL, primary_keys_eval, readiness_eval, streams_eval from .summary import generate_markdown_summary from .task import EVAL_DEVELOPER_MODEL, EVAL_MANAGER_MODEL, run_connector_build_task @@ -51,7 +51,7 @@ async def main(connectors: list[str] | None = None, *, dataset_prefix: str): experiment_id = str(uuid.uuid4())[:5] experiment_name = f"builder-evals-{experiment_id}" - evaluators = [readiness_eval, streams_eval] + evaluators = [readiness_eval, streams_eval, primary_keys_eval] logger.info(f"Using evaluators: {[eval.__name__ for eval in evaluators]}") diff --git a/connector_builder_mcp/validation_testing.py b/connector_builder_mcp/validation_testing.py index 97bc80a..0937af5 100644 --- a/connector_builder_mcp/validation_testing.py +++ b/connector_builder_mcp/validation_testing.py @@ -612,6 +612,22 @@ def run_connector_readiness_test_report( # noqa: PLR0912, PLR0914, PLR0915 (too f"Records have only {result.record_stats.get('num_properties', 0)} field(s), expected at least 2" ) + stream_config = next( + (s for s in available_streams if s.get("name") == stream_name), + None, + ) + if stream_config: + primary_key = stream_config.get("primary_key", []) + if not primary_key: + field_count_warnings.append("No primary key defined in manifest") + elif result.record_stats: + properties = result.record_stats.get("properties", {}) + missing_pk_fields = [pk for pk in primary_key if pk not in properties] + if missing_pk_fields: + field_count_warnings.append( + f"Primary key field(s) missing from records: {', '.join(missing_pk_fields)}" + ) + smoke_test_result = StreamSmokeTest( stream_name=stream_name, success=True, From 5d6bd94f771305dcb92216a88bb9d0133a221962 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Sat, 11 Oct 2025 01:48:36 +0000 Subject: [PATCH 02/20] refactor: consolidate YAML structure and add expected_records validation - Refactored expected_streams to use list of stream objects with nested primary_key - Added optional expected_records field supporting integer or constraint strings ('>100', '<999', '>100,<999') - Updated streams_eval() and primary_keys_eval() to parse new consolidated structure - Added records_eval() evaluator to validate record counts against expectations - Maintains backward compatibility with string-based stream names Co-Authored-By: AJ Steers --- .../src/evals/data/connectors.yaml | 78 ++++++----- .../src/evals/evaluators.py | 122 +++++++++++++++++- .../src/evals/phoenix_run.py | 10 +- 3 files changed, 173 insertions(+), 37 deletions(-) diff --git a/connector_builder_agents/src/evals/data/connectors.yaml b/connector_builder_agents/src/evals/data/connectors.yaml index e5f1334..adab3e4 100644 --- a/connector_builder_agents/src/evals/data/connectors.yaml +++ b/connector_builder_agents/src/evals/data/connectors.yaml @@ -4,46 +4,58 @@ connectors: prompt_name: "JSONPlaceholder" expected: expected_streams: - - "posts" - - "comments" - - "albums" - - "photos" - - "todos" - - "users" - expected_primary_keys: - posts: ["id"] - comments: ["id"] - albums: ["id"] - photos: ["id"] - todos: ["id"] - users: ["id"] + - posts: + primary_key: ["id"] + expected_records: 100 + - comments: + primary_key: ["id"] + expected_records: ">400" + - albums: + primary_key: ["id"] + expected_records: 100 + - photos: + primary_key: ["id"] + expected_records: ">1000" + - todos: + primary_key: ["id"] + expected_records: ">100,<300" + - users: + primary_key: ["id"] + expected_records: 10 - input: name: "source-starwars" prompt_name: "StarWars API (https://swapi.info/)" expected: expected_streams: - - "people" - - "films" - - "planets" - - "species" - - "starships" - - "vehicles" - expected_primary_keys: - people: ["url"] - films: ["url"] - planets: ["url"] - species: ["url"] - starships: ["url"] - vehicles: ["url"] + - people: + primary_key: ["url"] + expected_records: ">80" + - films: + primary_key: ["url"] + expected_records: 6 + - planets: + primary_key: ["url"] + expected_records: ">50,<70" + - species: + primary_key: ["url"] + expected_records: ">30" + - starships: + primary_key: ["url"] + expected_records: ">30" + - vehicles: + primary_key: ["url"] + expected_records: ">30" - input: name: "source-rickandmorty" prompt_name: "Rick and Morty API" expected: expected_streams: - - "characters" - - "episodes" - - "locations" - expected_primary_keys: - characters: ["id"] - episodes: ["id"] - locations: ["id"] + - characters: + primary_key: ["id"] + expected_records: ">100" + - episodes: + primary_key: ["id"] + expected_records: ">50" + - locations: + primary_key: ["id"] + expected_records: ">100" diff --git a/connector_builder_agents/src/evals/evaluators.py b/connector_builder_agents/src/evals/evaluators.py index a8d6282..f77b3c8 100644 --- a/connector_builder_agents/src/evals/evaluators.py +++ b/connector_builder_agents/src/evals/evaluators.py @@ -84,7 +84,16 @@ def streams_eval(expected: dict, output: dict) -> float: logger.info(f"Available stream names: {available_stream_names}") expected_obj = json.loads(expected.get("expected", "{}")) - expected_stream_names = expected_obj.get("expected_streams", []) + expected_streams = expected_obj.get("expected_streams", []) + + # expected_streams is now a list of dicts like [{"posts": {"primary_key": ["id"]}}, ...] + expected_stream_names = [] + for stream_obj in expected_streams: + if isinstance(stream_obj, dict): + expected_stream_names.extend(stream_obj.keys()) + elif isinstance(stream_obj, str): + expected_stream_names.append(stream_obj) + logger.info(f"Expected stream names: {expected_stream_names}") # Set attributes on span for visibility @@ -122,7 +131,16 @@ def primary_keys_eval(expected: dict, output: dict) -> float: available_streams = manifest.get("streams", []) expected_obj = json.loads(expected.get("expected", "{}")) - expected_primary_keys = expected_obj.get("expected_primary_keys", {}) + expected_streams = expected_obj.get("expected_streams", []) + + # expected_streams is now a list of dicts like [{"posts": {"primary_key": ["id"]}}, ...] + expected_primary_keys = {} + for stream_obj in expected_streams: + if isinstance(stream_obj, dict): + for stream_name, stream_config in stream_obj.items(): + if isinstance(stream_config, dict) and "primary_key" in stream_config: + expected_primary_keys[stream_name] = stream_config["primary_key"] + logger.info(f"Expected primary keys: {expected_primary_keys}") if not expected_primary_keys: @@ -155,3 +173,103 @@ def primary_keys_eval(expected: dict, output: dict) -> float: percent_matched = matched_count / total_expected_streams if total_expected_streams > 0 else 0.0 logger.info(f"Primary keys percent matched: {percent_matched}") return float(percent_matched) + + +def records_eval(expected: dict, output: dict) -> float: + """Evaluate if record counts match expected values for each stream. + + Returns the percentage of streams with correct record counts. + Supports both integer values and constraint strings like ">100", "<999", ">100,<999". + """ + if output is None: + logger.warning("Output is None, cannot evaluate records") + return 0.0 + + readiness_report = output.get("artifacts", {}).get("readiness_report", None) + if readiness_report is None: + logger.warning("No readiness report found") + return 0.0 + + expected_obj = json.loads(expected.get("expected", "{}")) + expected_streams = expected_obj.get("expected_streams", []) + + expected_records = {} + for stream_obj in expected_streams: + if isinstance(stream_obj, dict): + for stream_name, stream_config in stream_obj.items(): + if isinstance(stream_config, dict) and "expected_records" in stream_config: + expected_records[stream_name] = stream_config["expected_records"] + + logger.info(f"Expected records: {expected_records}") + + if not expected_records: + logger.warning("No expected records found") + return 1.0 + + matched_count = 0 + total_expected_streams = len(expected_records) + + for stream_name, expected_value in expected_records.items(): + actual_count = _extract_record_count(readiness_report, stream_name) + + if actual_count is None: + logger.warning(f"✗ {stream_name}: could not extract record count from report") + continue + + if _validate_record_count(actual_count, expected_value): + matched_count += 1 + logger.info( + f"✓ {stream_name}: record count {actual_count} meets expectation {expected_value}" + ) + else: + logger.warning( + f"✗ {stream_name}: record count {actual_count} does not meet expectation {expected_value}" + ) + + span = get_current_span() + span.set_attribute("matched_records_count", matched_count) + span.set_attribute("total_expected_streams", total_expected_streams) + + percent_matched = matched_count / total_expected_streams if total_expected_streams > 0 else 0.0 + logger.info(f"Records percent matched: {percent_matched}") + return float(percent_matched) + + +def _extract_record_count(readiness_report: str, stream_name: str) -> int | None: + """Extract record count for a stream from the readiness report.""" + lines = readiness_report.split("\n") + for i, line in enumerate(lines): + if f"**{stream_name}**" in line or f"`{stream_name}`" in line: + for j in range(i, min(i + 10, len(lines))): + if "records" in lines[j].lower(): + import re + + match = re.search(r"(\d+)\s+records?", lines[j], re.IGNORECASE) + if match: + return int(match.group(1)) + return None + + +def _validate_record_count(actual_count: int, expected_value: int | str) -> bool: + """Validate record count against expected value or constraint string.""" + if isinstance(expected_value, int): + return actual_count == expected_value + + if not isinstance(expected_value, str): + return False + + constraints = [c.strip() for c in expected_value.split(",")] + for constraint in constraints: + if constraint.startswith(">"): + threshold = int(constraint[1:]) + if actual_count <= threshold: + return False + elif constraint.startswith("<"): + threshold = int(constraint[1:]) + if actual_count >= threshold: + return False + elif constraint.isdigit(): + if actual_count != int(constraint): + return False + + return True diff --git a/connector_builder_agents/src/evals/phoenix_run.py b/connector_builder_agents/src/evals/phoenix_run.py index 4e7963b..c9c4cad 100644 --- a/connector_builder_agents/src/evals/phoenix_run.py +++ b/connector_builder_agents/src/evals/phoenix_run.py @@ -24,7 +24,13 @@ from phoenix.otel import register from .dataset import get_or_create_phoenix_dataset -from .evaluators import READINESS_EVAL_MODEL, primary_keys_eval, readiness_eval, streams_eval +from .evaluators import ( + READINESS_EVAL_MODEL, + primary_keys_eval, + readiness_eval, + records_eval, + streams_eval, +) from .summary import generate_markdown_summary from .task import EVAL_DEVELOPER_MODEL, EVAL_MANAGER_MODEL, run_connector_build_task @@ -51,7 +57,7 @@ async def main(connectors: list[str] | None = None, *, dataset_prefix: str): experiment_id = str(uuid.uuid4())[:5] experiment_name = f"builder-evals-{experiment_id}" - evaluators = [readiness_eval, streams_eval, primary_keys_eval] + evaluators = [readiness_eval, streams_eval, primary_keys_eval, records_eval] logger.info(f"Using evaluators: {[eval.__name__ for eval in evaluators]}") From a03f701e44acdc4b35ccf8ad998a0b0b7f91ba19 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Sat, 11 Oct 2025 02:09:16 +0000 Subject: [PATCH 03/20] refactor: extract common helper functions in evaluators - Created _parse_expected_streams() to parse expected data from JSON - Created _get_manifest_streams() to extract manifest from artifacts - Created _get_readiness_report() to extract readiness report from artifacts - Updated all evaluator functions to use shared helpers - Reduces code duplication while maintaining exact same functionality Co-Authored-By: AJ Steers --- .../src/evals/evaluators.py | 76 +++++++++---------- 1 file changed, 36 insertions(+), 40 deletions(-) diff --git a/connector_builder_agents/src/evals/evaluators.py b/connector_builder_agents/src/evals/evaluators.py index f77b3c8..7ec89ee 100644 --- a/connector_builder_agents/src/evals/evaluators.py +++ b/connector_builder_agents/src/evals/evaluators.py @@ -36,14 +36,36 @@ """ -def readiness_eval(output: dict) -> int: - """Create Phoenix LLM classifier for readiness evaluation. Return 1 if PASSED, 0 if FAILED.""" +def _parse_expected_streams(expected: dict) -> list: + """Parse and return the expected_streams list from the expected dict.""" + expected_obj = json.loads(expected.get("expected", "{}")) + return expected_obj.get("expected_streams", []) + +def _get_manifest_streams(output: dict) -> list | None: + """Extract and parse the manifest streams from output artifacts.""" if output is None: - logger.warning("Output is None, cannot evaluate readiness") - return 0 + return None + + manifest_str = output.get("artifacts", {}).get("manifest", None) + if manifest_str is None: + return None + + manifest = yaml.safe_load(manifest_str) + return manifest.get("streams", []) + - readiness_report = output.get("artifacts", {}).get("readiness_report", None) +def _get_readiness_report(output: dict) -> str | None: + """Extract the readiness report from output artifacts.""" + if output is None: + return None + + return output.get("artifacts", {}).get("readiness_report", None) + + +def readiness_eval(output: dict) -> int: + """Create Phoenix LLM classifier for readiness evaluation. Return 1 if PASSED, 0 if FAILED.""" + readiness_report = _get_readiness_report(output) if readiness_report is None: logger.warning("No readiness report found") return 0 @@ -68,25 +90,15 @@ def readiness_eval(output: dict) -> int: def streams_eval(expected: dict, output: dict) -> float: """Evaluate if all expected streams were built. Return the percentage of expected streams that are present in available streams.""" - - if output is None: - logger.warning("Output is None, cannot evaluate streams") - return 0.0 - - manifest_str = output.get("artifacts", {}).get("manifest", None) - if manifest_str is None: + available_streams = _get_manifest_streams(output) + if available_streams is None: logger.warning("No manifest found") - return 0 + return 0.0 - manifest = yaml.safe_load(manifest_str) - available_streams = manifest.get("streams", []) available_stream_names = [stream.get("name", "") for stream in available_streams] logger.info(f"Available stream names: {available_stream_names}") - expected_obj = json.loads(expected.get("expected", "{}")) - expected_streams = expected_obj.get("expected_streams", []) - - # expected_streams is now a list of dicts like [{"posts": {"primary_key": ["id"]}}, ...] + expected_streams = _parse_expected_streams(expected) expected_stream_names = [] for stream_obj in expected_streams: if isinstance(stream_obj, dict): @@ -118,22 +130,12 @@ def primary_keys_eval(expected: dict, output: dict) -> float: Returns the percentage of streams with correct primary keys. """ - if output is None: - logger.warning("Output is None, cannot evaluate primary keys") - return 0.0 - - manifest_str = output.get("artifacts", {}).get("manifest", None) - if manifest_str is None: + available_streams = _get_manifest_streams(output) + if available_streams is None: logger.warning("No manifest found") return 0.0 - manifest = yaml.safe_load(manifest_str) - available_streams = manifest.get("streams", []) - - expected_obj = json.loads(expected.get("expected", "{}")) - expected_streams = expected_obj.get("expected_streams", []) - - # expected_streams is now a list of dicts like [{"posts": {"primary_key": ["id"]}}, ...] + expected_streams = _parse_expected_streams(expected) expected_primary_keys = {} for stream_obj in expected_streams: if isinstance(stream_obj, dict): @@ -181,18 +183,12 @@ def records_eval(expected: dict, output: dict) -> float: Returns the percentage of streams with correct record counts. Supports both integer values and constraint strings like ">100", "<999", ">100,<999". """ - if output is None: - logger.warning("Output is None, cannot evaluate records") - return 0.0 - - readiness_report = output.get("artifacts", {}).get("readiness_report", None) + readiness_report = _get_readiness_report(output) if readiness_report is None: logger.warning("No readiness report found") return 0.0 - expected_obj = json.loads(expected.get("expected", "{}")) - expected_streams = expected_obj.get("expected_streams", []) - + expected_streams = _parse_expected_streams(expected) expected_records = {} for stream_obj in expected_streams: if isinstance(stream_obj, dict): From 0717df011c6f5911fdccc2abea7f593f712de45b Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Sat, 11 Oct 2025 02:18:13 +0000 Subject: [PATCH 04/20] refactor: simplify evaluators by eliminating intermediate dicts - Remove streams_with_pk and streams_with_records dict comprehensions - Directly iterate over available_streams and access expected_streams dict - Use sum() with generator for counting expected streams - Cleaner, more direct code without unnecessary data transformations Co-Authored-By: AJ Steers --- .../src/evals/evaluators.py | 67 +++++++++---------- 1 file changed, 33 insertions(+), 34 deletions(-) diff --git a/connector_builder_agents/src/evals/evaluators.py b/connector_builder_agents/src/evals/evaluators.py index 7ec89ee..bd5d138 100644 --- a/connector_builder_agents/src/evals/evaluators.py +++ b/connector_builder_agents/src/evals/evaluators.py @@ -36,10 +36,19 @@ """ -def _parse_expected_streams(expected: dict) -> list: - """Parse and return the expected_streams list from the expected dict.""" +def _parse_expected_streams_dict(expected: dict) -> dict: + """Parse and return expected streams as a dict mapping stream_name -> stream_config.""" expected_obj = json.loads(expected.get("expected", "{}")) - return expected_obj.get("expected_streams", []) + expected_streams = expected_obj.get("expected_streams", []) + + result = {} + for stream_obj in expected_streams: + if isinstance(stream_obj, dict): + result.update(stream_obj) + elif isinstance(stream_obj, str): + result[stream_obj] = {} + + return result def _get_manifest_streams(output: dict) -> list | None: @@ -98,14 +107,8 @@ def streams_eval(expected: dict, output: dict) -> float: available_stream_names = [stream.get("name", "") for stream in available_streams] logger.info(f"Available stream names: {available_stream_names}") - expected_streams = _parse_expected_streams(expected) - expected_stream_names = [] - for stream_obj in expected_streams: - if isinstance(stream_obj, dict): - expected_stream_names.extend(stream_obj.keys()) - elif isinstance(stream_obj, str): - expected_stream_names.append(stream_obj) - + expected_streams = _parse_expected_streams_dict(expected) + expected_stream_names = list(expected_streams.keys()) logger.info(f"Expected stream names: {expected_stream_names}") # Set attributes on span for visibility @@ -135,30 +138,28 @@ def primary_keys_eval(expected: dict, output: dict) -> float: logger.warning("No manifest found") return 0.0 - expected_streams = _parse_expected_streams(expected) - expected_primary_keys = {} - for stream_obj in expected_streams: - if isinstance(stream_obj, dict): - for stream_name, stream_config in stream_obj.items(): - if isinstance(stream_config, dict) and "primary_key" in stream_config: - expected_primary_keys[stream_name] = stream_config["primary_key"] + expected_streams = _parse_expected_streams_dict(expected) - logger.info(f"Expected primary keys: {expected_primary_keys}") + total_expected_streams = sum( + 1 for config in expected_streams.values() if config.get("primary_key") is not None + ) - if not expected_primary_keys: + if total_expected_streams == 0: logger.warning("No expected primary keys found") return 0.0 matched_count = 0 - total_expected_streams = len(expected_primary_keys) for stream in available_streams: stream_name = stream.get("name", "") - if stream_name not in expected_primary_keys: + if stream_name not in expected_streams: + continue + + expected_pk = expected_streams[stream_name].get("primary_key") + if expected_pk is None: continue actual_pk = stream.get("primary_key", []) - expected_pk = expected_primary_keys[stream_name] if actual_pk == expected_pk: matched_count += 1 @@ -188,24 +189,22 @@ def records_eval(expected: dict, output: dict) -> float: logger.warning("No readiness report found") return 0.0 - expected_streams = _parse_expected_streams(expected) - expected_records = {} - for stream_obj in expected_streams: - if isinstance(stream_obj, dict): - for stream_name, stream_config in stream_obj.items(): - if isinstance(stream_config, dict) and "expected_records" in stream_config: - expected_records[stream_name] = stream_config["expected_records"] + expected_streams = _parse_expected_streams_dict(expected) - logger.info(f"Expected records: {expected_records}") + total_expected_streams = sum( + 1 for config in expected_streams.values() if config.get("expected_records") is not None + ) - if not expected_records: + if total_expected_streams == 0: logger.warning("No expected records found") return 1.0 matched_count = 0 - total_expected_streams = len(expected_records) - for stream_name, expected_value in expected_records.items(): + for stream_name, stream_config in expected_streams.items(): + expected_value = stream_config.get("expected_records") + if expected_value is None: + continue actual_count = _extract_record_count(readiness_report, stream_name) if actual_count is None: From 595705bc2e4bf81f24dc071227e229353638e5f4 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Sat, 11 Oct 2025 02:20:12 +0000 Subject: [PATCH 05/20] fix: move import re to top of file Addresses GitHub comment - imports should be at the top of the file, not inside functions. Co-Authored-By: AJ Steers --- connector_builder_agents/src/evals/evaluators.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/connector_builder_agents/src/evals/evaluators.py b/connector_builder_agents/src/evals/evaluators.py index bd5d138..359e6fc 100644 --- a/connector_builder_agents/src/evals/evaluators.py +++ b/connector_builder_agents/src/evals/evaluators.py @@ -3,6 +3,7 @@ import json import logging +import re import pandas as pd import yaml @@ -237,8 +238,6 @@ def _extract_record_count(readiness_report: str, stream_name: str) -> int | None if f"**{stream_name}**" in line or f"`{stream_name}`" in line: for j in range(i, min(i + 10, len(lines))): if "records" in lines[j].lower(): - import re - match = re.search(r"(\d+)\s+records?", lines[j], re.IGNORECASE) if match: return int(match.group(1)) From ce1703a3977799645fce76f8e5a9111b1a42befe Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Sat, 11 Oct 2025 02:31:27 +0000 Subject: [PATCH 06/20] refactor: remove unnecessary early returns from evaluators Removes the early return blocks from primary_keys_eval() and records_eval() when no expected values are defined. The ternary operators at the end of each function already handle this case correctly by returning 1.0 when total_expected_streams == 0. Addresses GitHub feedback from aaronsteers. Co-Authored-By: AJ Steers --- connector_builder_agents/src/evals/evaluators.py | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/connector_builder_agents/src/evals/evaluators.py b/connector_builder_agents/src/evals/evaluators.py index 359e6fc..86179e7 100644 --- a/connector_builder_agents/src/evals/evaluators.py +++ b/connector_builder_agents/src/evals/evaluators.py @@ -145,10 +145,6 @@ def primary_keys_eval(expected: dict, output: dict) -> float: 1 for config in expected_streams.values() if config.get("primary_key") is not None ) - if total_expected_streams == 0: - logger.warning("No expected primary keys found") - return 0.0 - matched_count = 0 for stream in available_streams: @@ -174,7 +170,7 @@ def primary_keys_eval(expected: dict, output: dict) -> float: span.set_attribute("matched_primary_keys_count", matched_count) span.set_attribute("total_expected_streams", total_expected_streams) - percent_matched = matched_count / total_expected_streams if total_expected_streams > 0 else 0.0 + percent_matched = matched_count / total_expected_streams if total_expected_streams > 0 else 1.0 logger.info(f"Primary keys percent matched: {percent_matched}") return float(percent_matched) @@ -196,10 +192,6 @@ def records_eval(expected: dict, output: dict) -> float: 1 for config in expected_streams.values() if config.get("expected_records") is not None ) - if total_expected_streams == 0: - logger.warning("No expected records found") - return 1.0 - matched_count = 0 for stream_name, stream_config in expected_streams.items(): @@ -226,7 +218,7 @@ def records_eval(expected: dict, output: dict) -> float: span.set_attribute("matched_records_count", matched_count) span.set_attribute("total_expected_streams", total_expected_streams) - percent_matched = matched_count / total_expected_streams if total_expected_streams > 0 else 0.0 + percent_matched = matched_count / total_expected_streams if total_expected_streams > 0 else 1.0 logger.info(f"Records percent matched: {percent_matched}") return float(percent_matched) From 519914d008202af12b3520a976e8e01bf3f34230 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Sat, 11 Oct 2025 02:38:57 +0000 Subject: [PATCH 07/20] refactor: remove expected_stream_names variable from streams_eval Co-Authored-By: AJ Steers --- connector_builder_agents/src/evals/evaluators.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/connector_builder_agents/src/evals/evaluators.py b/connector_builder_agents/src/evals/evaluators.py index 86179e7..5a11b8b 100644 --- a/connector_builder_agents/src/evals/evaluators.py +++ b/connector_builder_agents/src/evals/evaluators.py @@ -109,22 +109,21 @@ def streams_eval(expected: dict, output: dict) -> float: logger.info(f"Available stream names: {available_stream_names}") expected_streams = _parse_expected_streams_dict(expected) - expected_stream_names = list(expected_streams.keys()) - logger.info(f"Expected stream names: {expected_stream_names}") + logger.info(f"Expected stream names: {list(expected_streams.keys())}") # Set attributes on span for visibility span = get_current_span() span.set_attribute("available_stream_names", available_stream_names) - span.set_attribute("expected_stream_names", expected_stream_names) + span.set_attribute("expected_stream_names", list(expected_streams.keys())) - if not expected_stream_names: + if not expected_streams: logger.warning("No expected streams found") return 0.0 # Calculate the percentage of expected streams that are present in available streams - matched_streams = set(available_stream_names) & set(expected_stream_names) + matched_streams = set(available_stream_names) & set(expected_streams.keys()) logger.info(f"Matched streams: {matched_streams}") - percent_matched = len(matched_streams) / len(expected_stream_names) + percent_matched = len(matched_streams) / len(expected_streams) logger.info(f"Percent matched: {percent_matched}") return float(percent_matched) From 4467160aeab6edba8dd045a9cbe2a35a002d4386 Mon Sep 17 00:00:00 2001 From: "Aaron (\"AJ\") Steers" Date: Fri, 10 Oct 2025 19:45:38 -0700 Subject: [PATCH 08/20] Apply suggestion from @aaronsteers --- connector_builder_agents/src/evals/phoenix_run.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/connector_builder_agents/src/evals/phoenix_run.py b/connector_builder_agents/src/evals/phoenix_run.py index c9c4cad..6540853 100644 --- a/connector_builder_agents/src/evals/phoenix_run.py +++ b/connector_builder_agents/src/evals/phoenix_run.py @@ -57,7 +57,12 @@ async def main(connectors: list[str] | None = None, *, dataset_prefix: str): experiment_id = str(uuid.uuid4())[:5] experiment_name = f"builder-evals-{experiment_id}" - evaluators = [readiness_eval, streams_eval, primary_keys_eval, records_eval] + evaluators = [ + readiness_eval, + streams_eval, + primary_keys_eval, + records_eval, + ] logger.info(f"Using evaluators: {[eval.__name__ for eval in evaluators]}") From 4765645e3d30b248ee67e9875645af4e9f992b9b Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Sat, 11 Oct 2025 02:51:53 +0000 Subject: [PATCH 09/20] refactor: rename streams_eval to stream_names_eval and total_expected_streams to total_evaluated_streams Co-Authored-By: AJ Steers --- .../src/evals/evaluators.py | 18 +++++++++++------- .../src/evals/phoenix_run.py | 4 ++-- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/connector_builder_agents/src/evals/evaluators.py b/connector_builder_agents/src/evals/evaluators.py index 5a11b8b..67fc4c2 100644 --- a/connector_builder_agents/src/evals/evaluators.py +++ b/connector_builder_agents/src/evals/evaluators.py @@ -98,7 +98,7 @@ def readiness_eval(output: dict) -> int: return score -def streams_eval(expected: dict, output: dict) -> float: +def stream_names_eval(expected: dict, output: dict) -> float: """Evaluate if all expected streams were built. Return the percentage of expected streams that are present in available streams.""" available_streams = _get_manifest_streams(output) if available_streams is None: @@ -140,7 +140,7 @@ def primary_keys_eval(expected: dict, output: dict) -> float: expected_streams = _parse_expected_streams_dict(expected) - total_expected_streams = sum( + total_evaluated_streams = sum( 1 for config in expected_streams.values() if config.get("primary_key") is not None ) @@ -167,9 +167,11 @@ def primary_keys_eval(expected: dict, output: dict) -> float: span = get_current_span() span.set_attribute("matched_primary_keys_count", matched_count) - span.set_attribute("total_expected_streams", total_expected_streams) + span.set_attribute("total_evaluated_streams", total_evaluated_streams) - percent_matched = matched_count / total_expected_streams if total_expected_streams > 0 else 1.0 + percent_matched = ( + matched_count / total_evaluated_streams if total_evaluated_streams > 0 else 1.0 + ) logger.info(f"Primary keys percent matched: {percent_matched}") return float(percent_matched) @@ -187,7 +189,7 @@ def records_eval(expected: dict, output: dict) -> float: expected_streams = _parse_expected_streams_dict(expected) - total_expected_streams = sum( + total_evaluated_streams = sum( 1 for config in expected_streams.values() if config.get("expected_records") is not None ) @@ -215,9 +217,11 @@ def records_eval(expected: dict, output: dict) -> float: span = get_current_span() span.set_attribute("matched_records_count", matched_count) - span.set_attribute("total_expected_streams", total_expected_streams) + span.set_attribute("total_evaluated_streams", total_evaluated_streams) - percent_matched = matched_count / total_expected_streams if total_expected_streams > 0 else 1.0 + percent_matched = ( + matched_count / total_evaluated_streams if total_evaluated_streams > 0 else 1.0 + ) logger.info(f"Records percent matched: {percent_matched}") return float(percent_matched) diff --git a/connector_builder_agents/src/evals/phoenix_run.py b/connector_builder_agents/src/evals/phoenix_run.py index 6540853..808db48 100644 --- a/connector_builder_agents/src/evals/phoenix_run.py +++ b/connector_builder_agents/src/evals/phoenix_run.py @@ -29,7 +29,7 @@ primary_keys_eval, readiness_eval, records_eval, - streams_eval, + stream_names_eval, ) from .summary import generate_markdown_summary from .task import EVAL_DEVELOPER_MODEL, EVAL_MANAGER_MODEL, run_connector_build_task @@ -59,7 +59,7 @@ async def main(connectors: list[str] | None = None, *, dataset_prefix: str): experiment_name = f"builder-evals-{experiment_id}" evaluators = [ readiness_eval, - streams_eval, + stream_names_eval, primary_keys_eval, records_eval, ] From 9e247ce9a467b91393c5322506b4f9e8d559e795 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Sat, 11 Oct 2025 02:57:02 +0000 Subject: [PATCH 10/20] refactor: simplify evaluators with having parameter in _parse_expected_streams_dict Co-Authored-By: AJ Steers --- .../src/evals/evaluators.py | 45 ++++++++----------- 1 file changed, 18 insertions(+), 27 deletions(-) diff --git a/connector_builder_agents/src/evals/evaluators.py b/connector_builder_agents/src/evals/evaluators.py index 67fc4c2..54a3759 100644 --- a/connector_builder_agents/src/evals/evaluators.py +++ b/connector_builder_agents/src/evals/evaluators.py @@ -37,8 +37,13 @@ """ -def _parse_expected_streams_dict(expected: dict) -> dict: - """Parse and return expected streams as a dict mapping stream_name -> stream_config.""" +def _parse_expected_streams_dict(expected: dict, having: str | None = None) -> dict: + """Parse and return expected streams as a dict mapping stream_name -> stream_config. + + Args: + expected: The expected dictionary containing stream configurations + having: Optional key name to filter streams - only returns streams where this key exists + """ expected_obj = json.loads(expected.get("expected", "{}")) expected_streams = expected_obj.get("expected_streams", []) @@ -49,6 +54,9 @@ def _parse_expected_streams_dict(expected: dict) -> dict: elif isinstance(stream_obj, str): result[stream_obj] = {} + if having is not None: + result = {name: config for name, config in result.items() if config.get(having) is not None} + return result @@ -138,11 +146,7 @@ def primary_keys_eval(expected: dict, output: dict) -> float: logger.warning("No manifest found") return 0.0 - expected_streams = _parse_expected_streams_dict(expected) - - total_evaluated_streams = sum( - 1 for config in expected_streams.values() if config.get("primary_key") is not None - ) + expected_streams = _parse_expected_streams_dict(expected, having="primary_key") matched_count = 0 @@ -151,10 +155,7 @@ def primary_keys_eval(expected: dict, output: dict) -> float: if stream_name not in expected_streams: continue - expected_pk = expected_streams[stream_name].get("primary_key") - if expected_pk is None: - continue - + expected_pk = expected_streams[stream_name]["primary_key"] actual_pk = stream.get("primary_key", []) if actual_pk == expected_pk: @@ -167,11 +168,9 @@ def primary_keys_eval(expected: dict, output: dict) -> float: span = get_current_span() span.set_attribute("matched_primary_keys_count", matched_count) - span.set_attribute("total_evaluated_streams", total_evaluated_streams) + span.set_attribute("total_evaluated_streams", len(expected_streams)) - percent_matched = ( - matched_count / total_evaluated_streams if total_evaluated_streams > 0 else 1.0 - ) + percent_matched = matched_count / len(expected_streams) if len(expected_streams) > 0 else 1.0 logger.info(f"Primary keys percent matched: {percent_matched}") return float(percent_matched) @@ -187,18 +186,12 @@ def records_eval(expected: dict, output: dict) -> float: logger.warning("No readiness report found") return 0.0 - expected_streams = _parse_expected_streams_dict(expected) - - total_evaluated_streams = sum( - 1 for config in expected_streams.values() if config.get("expected_records") is not None - ) + expected_streams = _parse_expected_streams_dict(expected, having="expected_records") matched_count = 0 for stream_name, stream_config in expected_streams.items(): - expected_value = stream_config.get("expected_records") - if expected_value is None: - continue + expected_value = stream_config["expected_records"] actual_count = _extract_record_count(readiness_report, stream_name) if actual_count is None: @@ -217,11 +210,9 @@ def records_eval(expected: dict, output: dict) -> float: span = get_current_span() span.set_attribute("matched_records_count", matched_count) - span.set_attribute("total_evaluated_streams", total_evaluated_streams) + span.set_attribute("total_evaluated_streams", len(expected_streams)) - percent_matched = ( - matched_count / total_evaluated_streams if total_evaluated_streams > 0 else 1.0 - ) + percent_matched = matched_count / len(expected_streams) if len(expected_streams) > 0 else 1.0 logger.info(f"Records percent matched: {percent_matched}") return float(percent_matched) From 791d6dabb56fc7bfdb2d126297077c6b575b42dd Mon Sep 17 00:00:00 2001 From: "Aaron (\"AJ\") Steers" Date: Fri, 10 Oct 2025 20:12:36 -0700 Subject: [PATCH 11/20] add generic evaluator fn --- .../src/evals/evaluators.py | 35 +++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/connector_builder_agents/src/evals/evaluators.py b/connector_builder_agents/src/evals/evaluators.py index 54a3759..83619e2 100644 --- a/connector_builder_agents/src/evals/evaluators.py +++ b/connector_builder_agents/src/evals/evaluators.py @@ -136,6 +136,41 @@ def stream_names_eval(expected: dict, output: dict) -> float: return float(percent_matched) +def _eval_expected_stream_props( + expected_stream_props: dict[str, Any], + output_stream_props: dict[str, Any], + prop: str, + eval_fn: Callable[[Any, Any], bool], + span: Any | None = None, # TODO: replace `Any` with proper type +) -> float: + """Generic evaluator for expected stream properties.""" + matched_count = 0 + total_count = len(expected_stream_props) + + for stream_name, expected_props in expected_stream_props.items(): + expected_value = expected_props.get(prop) + actual_value = output_stream_props.get(stream_name, {}).get(prop) + + if expected_value is None: + continue + + if eval_fn(expected_value, actual_value): + matched_count += 1 + logger.info(f"✓ {stream_name}: {prop} matches {expected_value}") + else: + logger.warning( + f"✗ {stream_name}: {prop} mismatch - expected {expected_value}, got {actual_value}" + ) + + span = get_current_span() + span.set_attribute(f"{prop}_matched_count", matched_count) + span.set_attribute(f"{prop}_evaluated_streams", total_count) + + percent_matched = (matched_count * 1.0) / (total_count * 1.0) if total_count > 0 else 1.0 + logger.info(f"{prop.capitalize()} percent matched: {percent_matched}") + return float(percent_matched) + + def primary_keys_eval(expected: dict, output: dict) -> float: """Evaluate if primary keys match expected values for each stream. From 3be1c8a9e27519d4e621d3df714efe961e124d2d Mon Sep 17 00:00:00 2001 From: "Aaron (\"AJ\") Steers" Date: Fri, 10 Oct 2025 20:20:40 -0700 Subject: [PATCH 12/20] try implementation of generic compare --- .../src/evals/evaluators.py | 36 +++++-------------- 1 file changed, 9 insertions(+), 27 deletions(-) diff --git a/connector_builder_agents/src/evals/evaluators.py b/connector_builder_agents/src/evals/evaluators.py index 83619e2..0699d09 100644 --- a/connector_builder_agents/src/evals/evaluators.py +++ b/connector_builder_agents/src/evals/evaluators.py @@ -108,39 +108,21 @@ def readiness_eval(output: dict) -> int: def stream_names_eval(expected: dict, output: dict) -> float: """Evaluate if all expected streams were built. Return the percentage of expected streams that are present in available streams.""" - available_streams = _get_manifest_streams(output) - if available_streams is None: - logger.warning("No manifest found") - return 0.0 - - available_stream_names = [stream.get("name", "") for stream in available_streams] - logger.info(f"Available stream names: {available_stream_names}") - - expected_streams = _parse_expected_streams_dict(expected) - logger.info(f"Expected stream names: {list(expected_streams.keys())}") - - # Set attributes on span for visibility - span = get_current_span() - span.set_attribute("available_stream_names", available_stream_names) - span.set_attribute("expected_stream_names", list(expected_streams.keys())) - - if not expected_streams: - logger.warning("No expected streams found") - return 0.0 - - # Calculate the percentage of expected streams that are present in available streams - matched_streams = set(available_stream_names) & set(expected_streams.keys()) - logger.info(f"Matched streams: {matched_streams}") - percent_matched = len(matched_streams) / len(expected_streams) - logger.info(f"Percent matched: {percent_matched}") - return float(percent_matched) + return _eval_expected_stream_props( + expected_stream_props=_parse_expected_streams_dict(expected), + output_stream_props={ + stream.get("name", "(undeclared)"): stream for stream in _get_manifest_streams(output) or [] + }, + prop="name", + ) def _eval_expected_stream_props( + *, expected_stream_props: dict[str, Any], output_stream_props: dict[str, Any], prop: str, - eval_fn: Callable[[Any, Any], bool], + eval_fn: Callable[[Any, Any], bool] = lambda expected, actual: expected == actual, span: Any | None = None, # TODO: replace `Any` with proper type ) -> float: """Generic evaluator for expected stream properties.""" From b3c2b213a52a5ec3ef04fe1a7ca288412037611c Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Sat, 11 Oct 2025 03:31:43 +0000 Subject: [PATCH 13/20] fix: correct indentation and complete generic evaluator pattern MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Fix indentation error in _eval_expected_stream_props docstring (5→4 spaces) - Add missing typing imports (Callable from collections.abc, Any from typing) - Refactor primary_keys_eval to use generic evaluator pattern - Reduces code duplication following Aaron's simplification approach - All lint, format, and tests passing locally Co-Authored-By: AJ Steers --- .../src/evals/evaluators.py | 47 +++++-------------- 1 file changed, 13 insertions(+), 34 deletions(-) diff --git a/connector_builder_agents/src/evals/evaluators.py b/connector_builder_agents/src/evals/evaluators.py index 0699d09..0adb61d 100644 --- a/connector_builder_agents/src/evals/evaluators.py +++ b/connector_builder_agents/src/evals/evaluators.py @@ -4,6 +4,8 @@ import json import logging import re +from collections.abc import Callable +from typing import Any import pandas as pd import yaml @@ -111,7 +113,8 @@ def stream_names_eval(expected: dict, output: dict) -> float: return _eval_expected_stream_props( expected_stream_props=_parse_expected_streams_dict(expected), output_stream_props={ - stream.get("name", "(undeclared)"): stream for stream in _get_manifest_streams(output) or [] + stream.get("name", "(undeclared)"): stream + for stream in _get_manifest_streams(output) or [] }, prop="name", ) @@ -125,7 +128,7 @@ def _eval_expected_stream_props( eval_fn: Callable[[Any, Any], bool] = lambda expected, actual: expected == actual, span: Any | None = None, # TODO: replace `Any` with proper type ) -> float: - """Generic evaluator for expected stream properties.""" + """Generic evaluator for expected stream properties.""" matched_count = 0 total_count = len(expected_stream_props) @@ -158,38 +161,14 @@ def primary_keys_eval(expected: dict, output: dict) -> float: Returns the percentage of streams with correct primary keys. """ - available_streams = _get_manifest_streams(output) - if available_streams is None: - logger.warning("No manifest found") - return 0.0 - - expected_streams = _parse_expected_streams_dict(expected, having="primary_key") - - matched_count = 0 - - for stream in available_streams: - stream_name = stream.get("name", "") - if stream_name not in expected_streams: - continue - - expected_pk = expected_streams[stream_name]["primary_key"] - actual_pk = stream.get("primary_key", []) - - if actual_pk == expected_pk: - matched_count += 1 - logger.info(f"✓ {stream_name}: primary key matches {expected_pk}") - else: - logger.warning( - f"✗ {stream_name}: primary key mismatch - expected {expected_pk}, got {actual_pk}" - ) - - span = get_current_span() - span.set_attribute("matched_primary_keys_count", matched_count) - span.set_attribute("total_evaluated_streams", len(expected_streams)) - - percent_matched = matched_count / len(expected_streams) if len(expected_streams) > 0 else 1.0 - logger.info(f"Primary keys percent matched: {percent_matched}") - return float(percent_matched) + return _eval_expected_stream_props( + expected_stream_props=_parse_expected_streams_dict(expected, having="primary_key"), + output_stream_props={ + stream.get("name", "(undeclared)"): stream + for stream in _get_manifest_streams(output) or [] + }, + prop="primary_key", + ) def records_eval(expected: dict, output: dict) -> float: From 0817fdb4bdc2cbcf7f44ba3c212d8b05a82f2bd0 Mon Sep 17 00:00:00 2001 From: "Aaron (\"AJ\") Steers" Date: Fri, 10 Oct 2025 22:12:05 -0700 Subject: [PATCH 14/20] Apply suggestion from @aaronsteers --- connector_builder_agents/src/evals/evaluators.py | 1 - 1 file changed, 1 deletion(-) diff --git a/connector_builder_agents/src/evals/evaluators.py b/connector_builder_agents/src/evals/evaluators.py index 0adb61d..d3ddbc6 100644 --- a/connector_builder_agents/src/evals/evaluators.py +++ b/connector_builder_agents/src/evals/evaluators.py @@ -126,7 +126,6 @@ def _eval_expected_stream_props( output_stream_props: dict[str, Any], prop: str, eval_fn: Callable[[Any, Any], bool] = lambda expected, actual: expected == actual, - span: Any | None = None, # TODO: replace `Any` with proper type ) -> float: """Generic evaluator for expected stream properties.""" matched_count = 0 From caf5b7c883bd22066536bc917449fced417973b8 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Sat, 11 Oct 2025 05:17:55 +0000 Subject: [PATCH 15/20] refactor: rename records_eval to stream_record_counts_eval and simplify using generic evaluator - Rename records_eval() to stream_record_counts_eval() for clarity - Refactor to use _eval_expected_stream_props() helper with custom eval_fn - Reduces ~27 lines of duplicated logic to ~8 lines - Maintains all existing constraint validation functionality ('>100', '<999', etc.) - Follows the same pattern as primary_keys_eval() and stream_names_eval() - Update imports in phoenix_run.py Addresses: https://github.com/airbytehq/connector-builder-mcp/pull/136#discussion_r1890699507 Co-Authored-By: AJ Steers --- .../src/evals/evaluators.py | 39 ++++++------------- .../src/evals/phoenix_run.py | 4 +- 2 files changed, 14 insertions(+), 29 deletions(-) diff --git a/connector_builder_agents/src/evals/evaluators.py b/connector_builder_agents/src/evals/evaluators.py index d3ddbc6..9cad8fc 100644 --- a/connector_builder_agents/src/evals/evaluators.py +++ b/connector_builder_agents/src/evals/evaluators.py @@ -170,7 +170,7 @@ def primary_keys_eval(expected: dict, output: dict) -> float: ) -def records_eval(expected: dict, output: dict) -> float: +def stream_record_counts_eval(expected: dict, output: dict) -> float: """Evaluate if record counts match expected values for each stream. Returns the percentage of streams with correct record counts. @@ -183,33 +183,18 @@ def records_eval(expected: dict, output: dict) -> float: expected_streams = _parse_expected_streams_dict(expected, having="expected_records") - matched_count = 0 - - for stream_name, stream_config in expected_streams.items(): - expected_value = stream_config["expected_records"] - actual_count = _extract_record_count(readiness_report, stream_name) - - if actual_count is None: - logger.warning(f"✗ {stream_name}: could not extract record count from report") - continue + output_stream_props = {} + for stream_name in expected_streams.keys(): + record_count = _extract_record_count(readiness_report, stream_name) + output_stream_props[stream_name] = {"expected_records": record_count} - if _validate_record_count(actual_count, expected_value): - matched_count += 1 - logger.info( - f"✓ {stream_name}: record count {actual_count} meets expectation {expected_value}" - ) - else: - logger.warning( - f"✗ {stream_name}: record count {actual_count} does not meet expectation {expected_value}" - ) - - span = get_current_span() - span.set_attribute("matched_records_count", matched_count) - span.set_attribute("total_evaluated_streams", len(expected_streams)) - - percent_matched = matched_count / len(expected_streams) if len(expected_streams) > 0 else 1.0 - logger.info(f"Records percent matched: {percent_matched}") - return float(percent_matched) + return _eval_expected_stream_props( + expected_stream_props=expected_streams, + output_stream_props=output_stream_props, + prop="expected_records", + eval_fn=lambda expected, actual: actual is not None + and _validate_record_count(actual, expected), + ) def _extract_record_count(readiness_report: str, stream_name: str) -> int | None: diff --git a/connector_builder_agents/src/evals/phoenix_run.py b/connector_builder_agents/src/evals/phoenix_run.py index 808db48..d2d5b6c 100644 --- a/connector_builder_agents/src/evals/phoenix_run.py +++ b/connector_builder_agents/src/evals/phoenix_run.py @@ -28,8 +28,8 @@ READINESS_EVAL_MODEL, primary_keys_eval, readiness_eval, - records_eval, stream_names_eval, + stream_record_counts_eval, ) from .summary import generate_markdown_summary from .task import EVAL_DEVELOPER_MODEL, EVAL_MANAGER_MODEL, run_connector_build_task @@ -61,7 +61,7 @@ async def main(connectors: list[str] | None = None, *, dataset_prefix: str): readiness_eval, stream_names_eval, primary_keys_eval, - records_eval, + stream_record_counts_eval, ] logger.info(f"Using evaluators: {[eval.__name__ for eval in evaluators]}") From bfa5cb14ae720934870a5b86a9be67b34f8fc8ce Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Sat, 11 Oct 2025 05:59:45 +0000 Subject: [PATCH 16/20] fix: correct stream_names_eval to compare stream name keys instead of looking for 'name' property CodeRabbit identified that stream_names_eval was incorrectly passing prop='name' to the generic evaluator, but expected stream configs don't have a 'name' property inside them - stream names are the dict keys, not properties. This fix changes stream_names_eval to directly compare expected stream name keys against actual stream name keys from the manifest, matching the actual data structure. Fixes critical bug where all streams were being skipped during evaluation. Co-Authored-By: AJ Steers --- .../src/evals/evaluators.py | 35 ++++++++++++++----- 1 file changed, 27 insertions(+), 8 deletions(-) diff --git a/connector_builder_agents/src/evals/evaluators.py b/connector_builder_agents/src/evals/evaluators.py index 9cad8fc..b520043 100644 --- a/connector_builder_agents/src/evals/evaluators.py +++ b/connector_builder_agents/src/evals/evaluators.py @@ -110,14 +110,33 @@ def readiness_eval(output: dict) -> int: def stream_names_eval(expected: dict, output: dict) -> float: """Evaluate if all expected streams were built. Return the percentage of expected streams that are present in available streams.""" - return _eval_expected_stream_props( - expected_stream_props=_parse_expected_streams_dict(expected), - output_stream_props={ - stream.get("name", "(undeclared)"): stream - for stream in _get_manifest_streams(output) or [] - }, - prop="name", - ) + expected_streams = _parse_expected_streams_dict(expected) + output_streams = { + stream.get("name", "(undeclared)"): stream for stream in _get_manifest_streams(output) or [] + } + + expected_stream_names = set(expected_streams.keys()) + output_stream_names = set(output_streams.keys()) + + if not expected_stream_names: + logger.warning("No expected streams found") + return 1.0 + + matched_streams = expected_stream_names & output_stream_names + + span = get_current_span() + span.set_attribute("name_matched_count", len(matched_streams)) + span.set_attribute("name_evaluated_streams", len(expected_stream_names)) + + for stream_name in expected_stream_names: + if stream_name in output_stream_names: + logger.info(f"✓ {stream_name}: name matches") + else: + logger.warning(f"✗ {stream_name}: name not found in output") + + percent_matched = len(matched_streams) / len(expected_stream_names) + logger.info(f"Name percent matched: {percent_matched}") + return float(percent_matched) def _eval_expected_stream_props( From 5c7f6e292a3343f686eb2e188497adf5c274486b Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Sat, 11 Oct 2025 12:01:31 +0000 Subject: [PATCH 17/20] fix: update session_id format to use 'conv-' prefix for OpenAI API compatibility OpenAI's Conversations API requires conversation IDs to begin with 'conv-'. Updated both eval sessions and interactive sessions to use compliant format. This fixes the error: openai.BadRequestError: Invalid 'conversation_id': Expected an ID that begins with 'conv'. Co-Authored-By: AJ Steers --- connector_builder_agents/src/evals/task.py | 2 +- connector_builder_agents/src/run.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/connector_builder_agents/src/evals/task.py b/connector_builder_agents/src/evals/task.py index 3778da9..b76f566 100644 --- a/connector_builder_agents/src/evals/task.py +++ b/connector_builder_agents/src/evals/task.py @@ -19,7 +19,7 @@ async def run_connector_build_task(dataset_row: dict) -> dict: input_obj = json.loads(dataset_row.get("input", "{}")) connector_name = input_obj.get("name", "unknown") prompt_name = input_obj.get("prompt_name", "unknown") - session_id = f"eval-{connector_name}-{int(time.time())}" + session_id = f"conv-eval-{connector_name}-{int(time.time())}" logger.info( f"Starting connector build task for '{connector_name}' with prompt '{prompt_name}' (session: {session_id})" diff --git a/connector_builder_agents/src/run.py b/connector_builder_agents/src/run.py index d39baf6..6b4bcc3 100644 --- a/connector_builder_agents/src/run.py +++ b/connector_builder_agents/src/run.py @@ -37,7 +37,7 @@ def generate_session_id() -> str: """Generate a unique session ID based on current timestamp.""" - return f"unified-mcp-session-{int(time.time())}" + return f"conv-unified-mcp-session-{int(time.time())}" def get_workspace_dir(session_id: str) -> Path: From 3db608b7ff6fa0530359397272d900529d6d0100 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Sat, 11 Oct 2025 12:14:46 +0000 Subject: [PATCH 18/20] fix: use conv_ prefix and replace hyphens with underscores in session IDs for OpenAI API compatibility OpenAI's Conversations API requires IDs to use only letters, numbers, underscores, or dashes. Changed prefix from 'conv-' to 'conv_' and replaced hyphens in connector names with underscores to ensure compliance with OpenAI's character restrictions. Also fixed unrelated ruff style errors in summary.py (UP038) that were blocking verification. Co-Authored-By: AJ Steers --- connector_builder_agents/src/evals/summary.py | 4 ++-- connector_builder_agents/src/evals/task.py | 2 +- connector_builder_agents/src/run.py | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/connector_builder_agents/src/evals/summary.py b/connector_builder_agents/src/evals/summary.py index 359d910..e804566 100644 --- a/connector_builder_agents/src/evals/summary.py +++ b/connector_builder_agents/src/evals/summary.py @@ -210,7 +210,7 @@ def extract_scores_by_connector(experiment: dict, client) -> dict: score = None if result and isinstance(result, dict): score = result.get("score") - elif result and isinstance(result, (list, tuple)) and len(result) > 0: + elif result and isinstance(result, list | tuple) and len(result) > 0: first_result = result[0] if isinstance(first_result, dict): score = first_result.get("score") @@ -680,7 +680,7 @@ def generate_markdown_summary(experiment: dict, experiment_name: str) -> str | N score = None if result and isinstance(result, dict): score = result.get("score") - elif result and isinstance(result, (list, tuple)) and len(result) > 0: + elif result and isinstance(result, list | tuple) and len(result) > 0: # Handle list of results - take the first one first_result = result[0] if isinstance(first_result, dict): diff --git a/connector_builder_agents/src/evals/task.py b/connector_builder_agents/src/evals/task.py index b76f566..497fe86 100644 --- a/connector_builder_agents/src/evals/task.py +++ b/connector_builder_agents/src/evals/task.py @@ -19,7 +19,7 @@ async def run_connector_build_task(dataset_row: dict) -> dict: input_obj = json.loads(dataset_row.get("input", "{}")) connector_name = input_obj.get("name", "unknown") prompt_name = input_obj.get("prompt_name", "unknown") - session_id = f"conv-eval-{connector_name}-{int(time.time())}" + session_id = f"conv_eval_{connector_name.replace('-', '_')}_{int(time.time())}" logger.info( f"Starting connector build task for '{connector_name}' with prompt '{prompt_name}' (session: {session_id})" diff --git a/connector_builder_agents/src/run.py b/connector_builder_agents/src/run.py index 6b4bcc3..650db79 100644 --- a/connector_builder_agents/src/run.py +++ b/connector_builder_agents/src/run.py @@ -37,7 +37,7 @@ def generate_session_id() -> str: """Generate a unique session ID based on current timestamp.""" - return f"conv-unified-mcp-session-{int(time.time())}" + return f"conv_unified_mcp_session_{int(time.time())}" def get_workspace_dir(session_id: str) -> Path: From aecd00e56f7ffc4e6afd4a9f792f6ef34f0acb8e Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Sat, 11 Oct 2025 13:21:20 +0000 Subject: [PATCH 19/20] fix: let OpenAI auto-create conversation IDs instead of passing custom ones OpenAI's Conversations API was rejecting our custom conversation_id with 404 errors because those conversations don't exist in their system. By not passing conversation_id, OpenAI will auto-create and manage its own conversation IDs. The session_id is still used for workspace directory management and logging. This decouples two different concerns: - session_id: Our internal ID for workspace directory management (we control this) - conversation_id: OpenAI's ID for conversation tracking (they control this) The existing code already retrieves the auto-generated conversation_id via await session._get_session_id() at lines 170 and 282 for logging purposes. Co-Authored-By: AJ Steers --- connector_builder_agents/src/evals/task.py | 2 +- connector_builder_agents/src/run.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/connector_builder_agents/src/evals/task.py b/connector_builder_agents/src/evals/task.py index 497fe86..6a1a8c3 100644 --- a/connector_builder_agents/src/evals/task.py +++ b/connector_builder_agents/src/evals/task.py @@ -19,7 +19,7 @@ async def run_connector_build_task(dataset_row: dict) -> dict: input_obj = json.loads(dataset_row.get("input", "{}")) connector_name = input_obj.get("name", "unknown") prompt_name = input_obj.get("prompt_name", "unknown") - session_id = f"conv_eval_{connector_name.replace('-', '_')}_{int(time.time())}" + session_id = f"eval_{connector_name.replace('-', '_')}_{int(time.time())}" logger.info( f"Starting connector build task for '{connector_name}' with prompt '{prompt_name}' (session: {session_id})" diff --git a/connector_builder_agents/src/run.py b/connector_builder_agents/src/run.py index 650db79..09d9a0e 100644 --- a/connector_builder_agents/src/run.py +++ b/connector_builder_agents/src/run.py @@ -61,7 +61,7 @@ def create_session(session_id: str): if backend == "sqlite": return SQLiteSession(session_id=session_id) elif backend == "openai": - return OpenAIConversationsSession(conversation_id=session_id) + return OpenAIConversationsSession() else: raise ValueError( f"Invalid OPENAI_SESSION_BACKEND value: '{backend}'. Must be 'openai' or 'sqlite'" From 8fc6261ea8cc7a580f9f19490ab918f7a62bb620 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Sat, 11 Oct 2025 18:17:52 +0000 Subject: [PATCH 20/20] feat: add primary key normalization to handle string vs list comparison - Add _normalize_primary_key() helper to flatten and normalize primary keys - Handles str -> [str], [[str]] -> [str], and [str] -> [str] cases - Update primary_keys_eval() to use normalization via eval_fn parameter - This allows 'id' and ['id'] to be treated as equivalent for comparison Co-Authored-By: AJ Steers --- .../src/evals/evaluators.py | 34 +++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/connector_builder_agents/src/evals/evaluators.py b/connector_builder_agents/src/evals/evaluators.py index b520043..0390ebd 100644 --- a/connector_builder_agents/src/evals/evaluators.py +++ b/connector_builder_agents/src/evals/evaluators.py @@ -174,10 +174,42 @@ def _eval_expected_stream_props( return float(percent_matched) +def _normalize_primary_key(pk: Any) -> list[str]: + """Normalize primary key to a list of strings. + + Handles: + - str -> [str] (e.g., "id" -> ["id"]) + - [str] -> [str] (already normalized) + - [[str]] -> [str] (flatten nested lists) + """ + if pk is None: + return [] + + if isinstance(pk, str): + return [pk] + + if isinstance(pk, list): + if not pk: + return [] + + if all(isinstance(item, str) for item in pk): + return pk + + if all(isinstance(item, list) for item in pk): + flattened = [] + for sublist in pk: + if isinstance(sublist, list): + flattened.extend(sublist) + return flattened + + return [str(pk)] + + def primary_keys_eval(expected: dict, output: dict) -> float: """Evaluate if primary keys match expected values for each stream. Returns the percentage of streams with correct primary keys. + Normalizes primary keys before comparison (str -> [str], [[str]] -> [str]). """ return _eval_expected_stream_props( expected_stream_props=_parse_expected_streams_dict(expected, having="primary_key"), @@ -186,6 +218,8 @@ def primary_keys_eval(expected: dict, output: dict) -> float: for stream in _get_manifest_streams(output) or [] }, prop="primary_key", + eval_fn=lambda expected, actual: _normalize_primary_key(expected) + == _normalize_primary_key(actual), )