-
Notifications
You must be signed in to change notification settings - Fork 2
feat: add primary key and record count validation to readiness report and evals #136
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
aaronsteers
wants to merge
22
commits into
main
Choose a base branch
from
devin/1760145011-add-primary-keys-check
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
22 commits
Select commit
Hold shift + click to select a range
33f6c27
feat: add primary keys validation to readiness report and evals
devin-ai-integration[bot] 5d6bd94
refactor: consolidate YAML structure and add expected_records validation
devin-ai-integration[bot] a03f701
refactor: extract common helper functions in evaluators
devin-ai-integration[bot] 0717df0
refactor: simplify evaluators by eliminating intermediate dicts
devin-ai-integration[bot] 595705b
fix: move import re to top of file
devin-ai-integration[bot] ce1703a
refactor: remove unnecessary early returns from evaluators
devin-ai-integration[bot] 519914d
refactor: remove expected_stream_names variable from streams_eval
devin-ai-integration[bot] 4467160
Apply suggestion from @aaronsteers
aaronsteers 4765645
refactor: rename streams_eval to stream_names_eval and total_expected…
devin-ai-integration[bot] 9e247ce
refactor: simplify evaluators with having parameter in _parse_expecte…
devin-ai-integration[bot] 791d6da
add generic evaluator fn
aaronsteers 3be1c8a
try implementation of generic compare
aaronsteers b3c2b21
fix: correct indentation and complete generic evaluator pattern
devin-ai-integration[bot] 0817fdb
Apply suggestion from @aaronsteers
aaronsteers caf5b7c
refactor: rename records_eval to stream_record_counts_eval and simpli…
devin-ai-integration[bot] bfa5cb1
fix: correct stream_names_eval to compare stream name keys instead of…
devin-ai-integration[bot] 5c7f6e2
fix: update session_id format to use 'conv-' prefix for OpenAI API co…
devin-ai-integration[bot] 3db608b
fix: use conv_ prefix and replace hyphens with underscores in session…
devin-ai-integration[bot] aecd00e
fix: let OpenAI auto-create conversation IDs instead of passing custo…
devin-ai-integration[bot] 5feef3a
Merge branch 'main' into devin/1760145011-add-primary-keys-check
devin-ai-integration[bot] 8fc6261
feat: add primary key normalization to handle string vs list comparison
devin-ai-integration[bot] 1f4282d
Merge branch 'main' into devin/1760145011-add-primary-keys-check
aaronsteers File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Some comments aren't visible on the classic Files Changed page.
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,6 +3,9 @@ | |
|
||
import json | ||
import logging | ||
import re | ||
from collections.abc import Callable | ||
from typing import Any | ||
|
||
import pandas as pd | ||
import yaml | ||
|
@@ -36,14 +39,53 @@ | |
""" | ||
|
||
|
||
def readiness_eval(output: dict) -> int: | ||
"""Create Phoenix LLM classifier for readiness evaluation. Return 1 if PASSED, 0 if FAILED.""" | ||
def _parse_expected_streams_dict(expected: dict, having: str | None = None) -> dict: | ||
"""Parse and return expected streams as a dict mapping stream_name -> stream_config. | ||
|
||
Args: | ||
expected: The expected dictionary containing stream configurations | ||
having: Optional key name to filter streams - only returns streams where this key exists | ||
""" | ||
expected_obj = json.loads(expected.get("expected", "{}")) | ||
expected_streams = expected_obj.get("expected_streams", []) | ||
|
||
result = {} | ||
for stream_obj in expected_streams: | ||
if isinstance(stream_obj, dict): | ||
result.update(stream_obj) | ||
elif isinstance(stream_obj, str): | ||
result[stream_obj] = {} | ||
|
||
if having is not None: | ||
result = {name: config for name, config in result.items() if config.get(having) is not None} | ||
|
||
return result | ||
Comment on lines
+42
to
+62
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Devin, spin out this refactor into a new PR. |
||
|
||
|
||
def _get_manifest_streams(output: dict) -> list | None: | ||
"""Extract and parse the manifest streams from output artifacts.""" | ||
if output is None: | ||
logger.warning("Output is None, cannot evaluate readiness") | ||
return 0 | ||
return None | ||
|
||
manifest_str = output.get("artifacts", {}).get("manifest", None) | ||
if manifest_str is None: | ||
return None | ||
|
||
manifest = yaml.safe_load(manifest_str) | ||
return manifest.get("streams", []) | ||
|
||
|
||
def _get_readiness_report(output: dict) -> str | None: | ||
"""Extract the readiness report from output artifacts.""" | ||
if output is None: | ||
return None | ||
|
||
return output.get("artifacts", {}).get("readiness_report", None) | ||
|
||
readiness_report = output.get("artifacts", {}).get("readiness_report", None) | ||
|
||
def readiness_eval(output: dict) -> int: | ||
"""Create Phoenix LLM classifier for readiness evaluation. Return 1 if PASSED, 0 if FAILED.""" | ||
readiness_report = _get_readiness_report(output) | ||
if readiness_report is None: | ||
logger.warning("No readiness report found") | ||
return 0 | ||
|
@@ -66,39 +108,181 @@ def readiness_eval(output: dict) -> int: | |
return score | ||
|
||
|
||
def streams_eval(expected: dict, output: dict) -> float: | ||
def stream_names_eval(expected: dict, output: dict) -> float: | ||
"""Evaluate if all expected streams were built. Return the percentage of expected streams that are present in available streams.""" | ||
expected_streams = _parse_expected_streams_dict(expected) | ||
output_streams = { | ||
stream.get("name", "(undeclared)"): stream for stream in _get_manifest_streams(output) or [] | ||
} | ||
|
||
if output is None: | ||
logger.warning("Output is None, cannot evaluate streams") | ||
return 0.0 | ||
|
||
manifest_str = output.get("artifacts", {}).get("manifest", None) | ||
if manifest_str is None: | ||
logger.warning("No manifest found") | ||
return 0 | ||
expected_stream_names = set(expected_streams.keys()) | ||
output_stream_names = set(output_streams.keys()) | ||
|
||
manifest = yaml.safe_load(manifest_str) | ||
available_streams = manifest.get("streams", []) | ||
available_stream_names = [stream.get("name", "") for stream in available_streams] | ||
logger.info(f"Available stream names: {available_stream_names}") | ||
if not expected_stream_names: | ||
logger.warning("No expected streams found") | ||
return 1.0 | ||
|
||
expected_obj = json.loads(expected.get("expected", "{}")) | ||
expected_stream_names = expected_obj.get("expected_streams", []) | ||
logger.info(f"Expected stream names: {expected_stream_names}") | ||
matched_streams = expected_stream_names & output_stream_names | ||
|
||
# Set attributes on span for visibility | ||
span = get_current_span() | ||
span.set_attribute("available_stream_names", available_stream_names) | ||
span.set_attribute("expected_stream_names", expected_stream_names) | ||
span.set_attribute("name_matched_count", len(matched_streams)) | ||
span.set_attribute("name_evaluated_streams", len(expected_stream_names)) | ||
|
||
if not expected_stream_names: | ||
logger.warning("No expected streams found") | ||
return 0.0 | ||
for stream_name in expected_stream_names: | ||
if stream_name in output_stream_names: | ||
logger.info(f"✓ {stream_name}: name matches") | ||
else: | ||
logger.warning(f"✗ {stream_name}: name not found in output") | ||
|
||
# Calculate the percentage of expected streams that are present in available streams | ||
matched_streams = set(available_stream_names) & set(expected_stream_names) | ||
logger.info(f"Matched streams: {matched_streams}") | ||
percent_matched = len(matched_streams) / len(expected_stream_names) | ||
logger.info(f"Percent matched: {percent_matched}") | ||
logger.info(f"Name percent matched: {percent_matched}") | ||
return float(percent_matched) | ||
|
||
|
||
def _eval_expected_stream_props( | ||
*, | ||
expected_stream_props: dict[str, Any], | ||
output_stream_props: dict[str, Any], | ||
prop: str, | ||
eval_fn: Callable[[Any, Any], bool] = lambda expected, actual: expected == actual, | ||
) -> float: | ||
"""Generic evaluator for expected stream properties.""" | ||
matched_count = 0 | ||
total_count = len(expected_stream_props) | ||
|
||
for stream_name, expected_props in expected_stream_props.items(): | ||
expected_value = expected_props.get(prop) | ||
actual_value = output_stream_props.get(stream_name, {}).get(prop) | ||
|
||
if expected_value is None: | ||
continue | ||
|
||
if eval_fn(expected_value, actual_value): | ||
matched_count += 1 | ||
logger.info(f"✓ {stream_name}: {prop} matches {expected_value}") | ||
else: | ||
logger.warning( | ||
f"✗ {stream_name}: {prop} mismatch - expected {expected_value}, got {actual_value}" | ||
) | ||
|
||
span = get_current_span() | ||
span.set_attribute(f"{prop}_matched_count", matched_count) | ||
span.set_attribute(f"{prop}_evaluated_streams", total_count) | ||
|
||
percent_matched = (matched_count * 1.0) / (total_count * 1.0) if total_count > 0 else 1.0 | ||
logger.info(f"{prop.capitalize()} percent matched: {percent_matched}") | ||
return float(percent_matched) | ||
|
||
|
||
def _normalize_primary_key(pk: Any) -> list[str]: | ||
"""Normalize primary key to a list of strings. | ||
|
||
Handles: | ||
- str -> [str] (e.g., "id" -> ["id"]) | ||
- [str] -> [str] (already normalized) | ||
- [[str]] -> [str] (flatten nested lists) | ||
""" | ||
if pk is None: | ||
return [] | ||
|
||
if isinstance(pk, str): | ||
return [pk] | ||
|
||
if isinstance(pk, list): | ||
if not pk: | ||
return [] | ||
|
||
if all(isinstance(item, str) for item in pk): | ||
return pk | ||
|
||
if all(isinstance(item, list) for item in pk): | ||
flattened = [] | ||
for sublist in pk: | ||
if isinstance(sublist, list): | ||
flattened.extend(sublist) | ||
return flattened | ||
|
||
return [str(pk)] | ||
|
||
|
||
def primary_keys_eval(expected: dict, output: dict) -> float: | ||
"""Evaluate if primary keys match expected values for each stream. | ||
|
||
Returns the percentage of streams with correct primary keys. | ||
Normalizes primary keys before comparison (str -> [str], [[str]] -> [str]). | ||
""" | ||
return _eval_expected_stream_props( | ||
expected_stream_props=_parse_expected_streams_dict(expected, having="primary_key"), | ||
output_stream_props={ | ||
stream.get("name", "(undeclared)"): stream | ||
for stream in _get_manifest_streams(output) or [] | ||
}, | ||
prop="primary_key", | ||
eval_fn=lambda expected, actual: _normalize_primary_key(expected) | ||
== _normalize_primary_key(actual), | ||
) | ||
|
||
|
||
def stream_record_counts_eval(expected: dict, output: dict) -> float: | ||
"""Evaluate if record counts match expected values for each stream. | ||
|
||
Returns the percentage of streams with correct record counts. | ||
Supports both integer values and constraint strings like ">100", "<999", ">100,<999". | ||
""" | ||
readiness_report = _get_readiness_report(output) | ||
if readiness_report is None: | ||
logger.warning("No readiness report found") | ||
return 0.0 | ||
|
||
expected_streams = _parse_expected_streams_dict(expected, having="expected_records") | ||
|
||
output_stream_props = {} | ||
for stream_name in expected_streams.keys(): | ||
record_count = _extract_record_count(readiness_report, stream_name) | ||
output_stream_props[stream_name] = {"expected_records": record_count} | ||
|
||
return _eval_expected_stream_props( | ||
expected_stream_props=expected_streams, | ||
output_stream_props=output_stream_props, | ||
prop="expected_records", | ||
eval_fn=lambda expected, actual: actual is not None | ||
and _validate_record_count(actual, expected), | ||
) | ||
|
||
|
||
def _extract_record_count(readiness_report: str, stream_name: str) -> int | None: | ||
"""Extract record count for a stream from the readiness report.""" | ||
lines = readiness_report.split("\n") | ||
for i, line in enumerate(lines): | ||
if f"**{stream_name}**" in line or f"`{stream_name}`" in line: | ||
for j in range(i, min(i + 10, len(lines))): | ||
if "records" in lines[j].lower(): | ||
match = re.search(r"(\d+)\s+records?", lines[j], re.IGNORECASE) | ||
if match: | ||
return int(match.group(1)) | ||
return None | ||
|
||
|
||
def _validate_record_count(actual_count: int, expected_value: int | str) -> bool: | ||
"""Validate record count against expected value or constraint string.""" | ||
if isinstance(expected_value, int): | ||
return actual_count == expected_value | ||
|
||
if not isinstance(expected_value, str): | ||
return False | ||
|
||
constraints = [c.strip() for c in expected_value.split(",")] | ||
for constraint in constraints: | ||
if constraint.startswith(">"): | ||
threshold = int(constraint[1:]) | ||
if actual_count <= threshold: | ||
return False | ||
elif constraint.startswith("<"): | ||
threshold = int(constraint[1:]) | ||
if actual_count >= threshold: | ||
return False | ||
elif constraint.isdigit(): | ||
if actual_count != int(constraint): | ||
return False | ||
|
||
return True | ||
aaronsteers marked this conversation as resolved.
Show resolved
Hide resolved
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.