Skip to content
Open
Changes from 1 commit
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
33f6c27
feat: add primary keys validation to readiness report and evals
devin-ai-integration[bot] Oct 11, 2025
5d6bd94
refactor: consolidate YAML structure and add expected_records validation
devin-ai-integration[bot] Oct 11, 2025
a03f701
refactor: extract common helper functions in evaluators
devin-ai-integration[bot] Oct 11, 2025
0717df0
refactor: simplify evaluators by eliminating intermediate dicts
devin-ai-integration[bot] Oct 11, 2025
595705b
fix: move import re to top of file
devin-ai-integration[bot] Oct 11, 2025
ce1703a
refactor: remove unnecessary early returns from evaluators
devin-ai-integration[bot] Oct 11, 2025
519914d
refactor: remove expected_stream_names variable from streams_eval
devin-ai-integration[bot] Oct 11, 2025
4467160
Apply suggestion from @aaronsteers
aaronsteers Oct 11, 2025
4765645
refactor: rename streams_eval to stream_names_eval and total_expected…
devin-ai-integration[bot] Oct 11, 2025
9e247ce
refactor: simplify evaluators with having parameter in _parse_expecte…
devin-ai-integration[bot] Oct 11, 2025
791d6da
add generic evaluator fn
aaronsteers Oct 11, 2025
3be1c8a
try implementation of generic compare
aaronsteers Oct 11, 2025
b3c2b21
fix: correct indentation and complete generic evaluator pattern
devin-ai-integration[bot] Oct 11, 2025
0817fdb
Apply suggestion from @aaronsteers
aaronsteers Oct 11, 2025
caf5b7c
refactor: rename records_eval to stream_record_counts_eval and simpli…
devin-ai-integration[bot] Oct 11, 2025
bfa5cb1
fix: correct stream_names_eval to compare stream name keys instead of…
devin-ai-integration[bot] Oct 11, 2025
5c7f6e2
fix: update session_id format to use 'conv-' prefix for OpenAI API co…
devin-ai-integration[bot] Oct 11, 2025
3db608b
fix: use conv_ prefix and replace hyphens with underscores in session…
devin-ai-integration[bot] Oct 11, 2025
aecd00e
fix: let OpenAI auto-create conversation IDs instead of passing custo…
devin-ai-integration[bot] Oct 11, 2025
5feef3a
Merge branch 'main' into devin/1760145011-add-primary-keys-check
devin-ai-integration[bot] Oct 11, 2025
8fc6261
feat: add primary key normalization to handle string vs list comparison
devin-ai-integration[bot] Oct 11, 2025
1f4282d
Merge branch 'main' into devin/1760145011-add-primary-keys-check
aaronsteers Oct 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 33 additions & 34 deletions connector_builder_agents/src/evals/evaluators.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,19 @@
"""


def _parse_expected_streams(expected: dict) -> list:
"""Parse and return the expected_streams list from the expected dict."""
def _parse_expected_streams_dict(expected: dict) -> dict:
"""Parse and return expected streams as a dict mapping stream_name -> stream_config."""
expected_obj = json.loads(expected.get("expected", "{}"))
return expected_obj.get("expected_streams", [])
expected_streams = expected_obj.get("expected_streams", [])

result = {}
for stream_obj in expected_streams:
if isinstance(stream_obj, dict):
result.update(stream_obj)
elif isinstance(stream_obj, str):
result[stream_obj] = {}

return result


def _get_manifest_streams(output: dict) -> list | None:
Expand Down Expand Up @@ -98,14 +107,8 @@ def streams_eval(expected: dict, output: dict) -> float:
available_stream_names = [stream.get("name", "") for stream in available_streams]
logger.info(f"Available stream names: {available_stream_names}")

expected_streams = _parse_expected_streams(expected)
expected_stream_names = []
for stream_obj in expected_streams:
if isinstance(stream_obj, dict):
expected_stream_names.extend(stream_obj.keys())
elif isinstance(stream_obj, str):
expected_stream_names.append(stream_obj)

expected_streams = _parse_expected_streams_dict(expected)
expected_stream_names = list(expected_streams.keys())
logger.info(f"Expected stream names: {expected_stream_names}")

# Set attributes on span for visibility
Expand Down Expand Up @@ -135,30 +138,28 @@ def primary_keys_eval(expected: dict, output: dict) -> float:
logger.warning("No manifest found")
return 0.0

expected_streams = _parse_expected_streams(expected)
expected_primary_keys = {}
for stream_obj in expected_streams:
if isinstance(stream_obj, dict):
for stream_name, stream_config in stream_obj.items():
if isinstance(stream_config, dict) and "primary_key" in stream_config:
expected_primary_keys[stream_name] = stream_config["primary_key"]
expected_streams = _parse_expected_streams_dict(expected)

logger.info(f"Expected primary keys: {expected_primary_keys}")
total_expected_streams = sum(
1 for config in expected_streams.values() if config.get("primary_key") is not None
)

if not expected_primary_keys:
if total_expected_streams == 0:
logger.warning("No expected primary keys found")
return 0.0

matched_count = 0
total_expected_streams = len(expected_primary_keys)

for stream in available_streams:
stream_name = stream.get("name", "")
if stream_name not in expected_primary_keys:
if stream_name not in expected_streams:
continue

expected_pk = expected_streams[stream_name].get("primary_key")
if expected_pk is None:
continue

actual_pk = stream.get("primary_key", [])
expected_pk = expected_primary_keys[stream_name]

if actual_pk == expected_pk:
matched_count += 1
Expand Down Expand Up @@ -188,24 +189,22 @@ def records_eval(expected: dict, output: dict) -> float:
logger.warning("No readiness report found")
return 0.0

expected_streams = _parse_expected_streams(expected)
expected_records = {}
for stream_obj in expected_streams:
if isinstance(stream_obj, dict):
for stream_name, stream_config in stream_obj.items():
if isinstance(stream_config, dict) and "expected_records" in stream_config:
expected_records[stream_name] = stream_config["expected_records"]
expected_streams = _parse_expected_streams_dict(expected)

logger.info(f"Expected records: {expected_records}")
total_expected_streams = sum(
1 for config in expected_streams.values() if config.get("expected_records") is not None
)

if not expected_records:
if total_expected_streams == 0:
logger.warning("No expected records found")
return 1.0

matched_count = 0
total_expected_streams = len(expected_records)

for stream_name, expected_value in expected_records.items():
for stream_name, stream_config in expected_streams.items():
expected_value = stream_config.get("expected_records")
if expected_value is None:
continue
actual_count = _extract_record_count(readiness_report, stream_name)

if actual_count is None:
Expand Down
Loading